Source code for pyproximal.proximal.TV

import numpy as np

from copy import deepcopy
from scipy.sparse.linalg import lsqr
from pylops import FirstDerivative, Gradient
from pyproximal.ProxOperator import _check_tau
from pyproximal import ProxOperator


[docs]class TV(ProxOperator): r"""TV Norm proximal operator. Proximal operator for the TV norm defined as: :math:`f(\mathbf{x}) = \sigma ||\mathbf{x}||_{\text{TV}}`. Parameters ---------- dims : :obj:`tuple` Number of samples for each dimension (``None`` if only one dimension is available) sigma : :obj:`int`, optional Multiplicative coefficient of TV norm niter : :obj:`int` or :obj:`func`, optional Number of iterations of iterative scheme used to compute the proximal. This can be a constant number or a function that is called passing a counter which keeps track of how many times the ``prox`` method has been invoked before and returns the ``niter`` to be used. rtol : :obj:`float`, optional Relative tolerance for stopping criterion. Notes ----- The proximal algorithm is implemented following [1]. .. [1] Beck, A. and Teboulle, M., "Fast gradient-based algorithms for constrained total variation image denoising and deblurring problems", 2009. """ def __init__(self, dims, sigma=1., niter=10, rtol=1e-4, **kwargs): super().__init__(None, True) self.dims = dims self.ndim = len(dims) self.sigma = sigma self.niter = niter self.count = 0 self.rtol = rtol self.kwargs = kwargs def __call__(self, x): x = x.reshape(self.dims) if self.ndim == 1: derivOp = FirstDerivative(dims=self.dims[0], axis=0, edge=False, dtype=x.dtype, kind="forward") dx = derivOp @ x y = np.sum(np.abs(dx), axis=0) elif self.ndim >= 2: y = 0 gradOp = Gradient(self.dims, edge=False, dtype=x.dtype, kind="forward") grads = gradOp.matvec(x.ravel()) grads = grads.reshape((self.ndim, ) + self.dims) for g in grads: y += np.power(abs(g), 2) y = np.sqrt(y) return self.sigma * np.sum(y) def _increment_count(func): """Increment counter """ def wrapped(self, *args, **kwargs): self.count += 1 return func(self, *args, **kwargs) return wrapped @_increment_count @_check_tau def prox(self, x, tau): # define current number of iterations if isinstance(self.niter, int): niter = self.niter else: niter = self.niter(self.count) gamma = self.sigma * tau rtol = self.rtol # TODO implement test_gamma # Initialization x = x.reshape(self.dims) sol = x if self.ndim == 1: derivOp = FirstDerivative(dims=self.dims[0], axis=0, edge=False, dtype=x.dtype, kind="forward") else: gradOp = Gradient(x.shape, edge=False, dtype=x.dtype, kind="forward") if self.ndim == 1: r = derivOp @ (x * 0) rr = deepcopy(r) elif self.ndim == 2: r, s = gradOp.matvec( (x * 0).ravel()).reshape((self.ndim, ) + x.shape) rr, ss = deepcopy(r), deepcopy(s) elif self.ndim == 3: r, s, k = gradOp.matvec( (x*0).ravel()).reshape((self.ndim, ) + x.shape) rr, ss, kk = deepcopy(r), deepcopy(s), deepcopy(k) elif self.ndim == 4: r, s, k, u = gradOp.matvec( (x*0).ravel()).reshape((self.ndim, ) + x.shape) rr, ss, kk, uu = deepcopy(r), deepcopy(s), deepcopy(k), deepcopy(u) if self.ndim >= 1: pold = r if self.ndim >= 2: qold = s if self.ndim >= 3: oold = k if self.ndim >= 4: mold = u told, prev_obj = 1., 0. # Initialization for weights if self.ndim >= 1: try: wx = self.kwargs["wx"] except (KeyError, TypeError): wx = 1. if self.ndim >= 2: try: wy = self.kwargs["wy"] except (KeyError, TypeError): wy = 1. if self.ndim >= 3: try: wz = self.kwargs["wz"] except (KeyError, TypeError): wz = 1. if self.ndim >= 4: try: wt = self.kwargs["wt"] except (KeyError, TypeError): wt = 1. if self.ndim == 1: mt = wx elif self.ndim == 2: mt = np.maximum(wx, wy) elif self.ndim == 3: mt = np.maximum(wx, np.maximum(wy, wz)) elif self.ndim == 4: mt = np.maximum(np.maximum(wx, wy), np.maximum(wz, wt)) if self.ndim >= 1: try: rr *= np.conjugate(wx) except KeyError: pass if self.ndim >= 2: try: ss *= np.conjugate(wy) except KeyError: pass if self.ndim >= 3: try: kk *= np.conjugate(wz) except KeyError: pass if self.ndim >= 4: try: uu *= np.conjugate(wt) except KeyError: pass iter = 0 while iter <= niter: # Current Solution if self.ndim == 0: raise ValueError("Need to input at least one value") if self.ndim >= 1: div = np.concatenate((np.expand_dims(rr[0, ], axis=0), rr[1:-1, ] - rr[:-2, ], -np.expand_dims(rr[-2, ], axis=0)), axis=0) if self.ndim >= 2: div += np.concatenate((np.expand_dims(ss[:, 0, ], axis=1), ss[:, 1:-1, ] - ss[:, :-2, ], -np.expand_dims(ss[:, -2, ], axis=1)), axis=1) if self.ndim >= 3: div += np.concatenate((np.expand_dims(kk[:, :, 0, ], axis=2), kk[:, :, 1:-1, ] - kk[:, :, :-2, ], -np.expand_dims(kk[:, :, -2, ], axis=2)), axis=2) if self.ndim >= 4: div += np.concatenate((np.expand_dims(uu[:, :, :, 0, ], axis=3), uu[:, :, :, 1:-1, ] - uu[:, :, :, :-2, ], -np.expand_dims(uu[:, :, :, -2, ], axis=3)), axis=3) sol = x - gamma * div # Objective function value obj = 0.5 * np.power(np.linalg.norm(x[:] - sol[:]), 2) + \ gamma * np.sum(self.__call__(sol), axis=0) if obj > 1e-10: rel_obj = np.abs(obj - prev_obj) / obj else: rel_obj = 2 * rtol prev_obj = obj # Stopping criterion if rel_obj < rtol: break # Update divergence vectors and project if self.ndim == 1: dx = derivOp @ sol r -= 1. / (4 * gamma * mt**2) * dx weights = np.maximum(1, np.abs(r)) elif self.ndim == 2: dx, dy = gradOp.matvec(sol.ravel()).reshape((self.ndim, ) + x.shape) r -= (1. / (8. * gamma * mt**2.)) * dx s -= (1. / (8. * gamma * mt**2.)) * dy weights = np.maximum(1, np.sqrt(np.power(np.abs(r), 2) + np.power(np.abs(s), 2))) elif self.ndim == 3: dx, dy, dz = gradOp.matvec(sol.ravel()).reshape((self.ndim, ) + x.shape) r -= 1. / (12. * gamma * mt**2) * dx s -= 1. / (12. * gamma * mt**2) * dy k -= 1. / (12. * gamma * mt**2) * dz weights = np.maximum(1, np.sqrt(np.power(np.abs(r), 2) + np.power(np.abs(s), 2) + np.power(np.abs(k), 2))) elif self.ndim == 4: dx, dy, dz, dt = gradOp.matvec(sol.ravel()).reshape((self.ndim, ) + x.shape) r -= 1. / (16 * gamma * mt**2) * dx s -= 1. / (16 * gamma * mt**2) * dy k -= 1. / (16 * gamma * mt**2) * dz u -= 1. / (16 * gamma * mt**2) * dt weights = np.maximum(1, np.sqrt(np.power(np.abs(r), 2) + np.power(np.abs(s), 2) + np.power(np.abs(k), 2) + np.power(np.abs(u), 2))) # FISTA update t = (1 + np.sqrt(4 * told**2)) / 2. if self.ndim >= 1: p = r / weights r = p + (told - 1) / t * (p - pold) pold = p rr = deepcopy(r) if self.ndim >= 2: q = s / weights s = q + (told - 1) / t * (q - qold) qold = q ss = deepcopy(s) if self.ndim >= 3: o = k / weights k = o + (told - 1) / t * (o - oold) oold = o kk = deepcopy(k) if self.ndim >= 4: m = u / weights u = m + (told - 1) / t * (m - mold) mold = m uu = deepcopy(u) told = t iter += 1 return sol.ravel()