Source code for pyproximal.optimization.primal

import time
import warnings
from math import sqrt
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
from pylops.optimization.leastsquares import regularized_inversion
from pylops.utils.backend import get_array_module, to_numpy
from pylops.utils.typing import NDArray

from pyproximal.proximal import L2
from pyproximal.ProxOperator import ProxOperator
from pyproximal.utils.bilinear import BilinearOperator

if TYPE_CHECKING:
    from pylops.linearoperator import LinearOperator


def _backtracking(
    x: NDArray,
    tau: float,
    proxf: ProxOperator,
    proxg: ProxOperator,
    epsg: float,
    beta: float = 0.5,
    niterback: int = 10,
) -> Tuple[NDArray, float]:
    r"""Backtracking

    Line-search algorithm for finding step sizes in proximal algorithms when
    the Lipschitz constant of the operator is unknown (or expensive to
    estimate).

    """

    def ftilde(x: NDArray, y: NDArray, f: ProxOperator, tau: float) -> float:
        xy = x - y
        return float(
            f(y) + np.dot(f.grad(y), xy) + (1.0 / (2.0 * tau)) * np.linalg.norm(xy) ** 2
        )

    iiterback = 0
    while iiterback < niterback:
        z = proxg.prox(x - tau * proxf.grad(x), epsg * tau)
        ft = ftilde(z, x, proxf, tau)
        if proxf(z) <= ft:
            break
        tau *= beta
        iiterback += 1
    return z, tau


def _x0z0_init(
    x0: NDArray | None,
    z0: NDArray | None,
    Op: Optional["LinearOperator"] = None,
    z0name: Optional[str] = "z0",
    Opname: Optional[str] = "Op",
) -> Tuple[NDArray, NDArray]:
    r"""Initialize x0 and z0

    Initialize x0 and z0 using the following convention.

    For ``Op=None``:
    - if both are provided, they are simply returned;
    - if only one is provided (the other is ``None``), the one provided
      is copied to the other one.

    For ``Op!=None``, ``x0`` must be provided, and:
    - if both are provided, they are simply returned;
    - if ``z0`` is not provided, set to ``Op @ x0``.

    Parameters
    ----------
    x0 : :obj:`numpy.ndarray`
        Initial vector
    z0 : :obj:`numpy.ndarray`
        Initial auxiliary vector
    Op : :obj:`pylops.LinearOperator`, optional
        Linear Operator to apply to ``x0``
    z0name : :obj:`str`, optional
        Name to display in error message instead of ``z0``
    Opname : :obj:`str`, optional
        Name to display in error message instead of ``Op``

    """
    if x0 is None and z0 is None:
        raise ValueError(
            f"Both x0 or {z0name} are None, provide either of them or both"
        )

    if Op is None:
        if x0 is None:
            x0 = z0.copy()  # type: ignore[union-attr]
        elif z0 is None:
            z0 = x0.copy()
    else:
        if x0 is None:
            raise ValueError(f"x0 must be provided when {Opname} is also provided")
        elif z0 is None:
            z0 = Op @ x0
    return x0, z0


[docs]def ProximalPoint( prox: ProxOperator, x0: NDArray, tau: float, niter: int = 10, tol: Optional[float] = None, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, ) -> NDArray: r"""Proximal point algorithm Solves the following minimization problem using Proximal point algorithm: .. math:: \mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) where :math:`f(\mathbf{x})` is any convex function that has a known proximal operator. Parameters ---------- prox : :obj:`pyproximal.ProxOperator` Proximal operator x0 : :obj:`numpy.ndarray` Initial vector tau : :obj:`float` Positive scalar weight niter : :obj:`int`, optional Number of iterations of iterative scheme tol : :obj:`float`, optional Tolerance on change of objective function (used as stopping criterion). If ``tol=None``, run until ``niter`` is reached callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model Notes ----- The Proximal point algorithm can be expressed by the following recursion: .. math:: \mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{x}^k) """ if show: tstart = time.time() print( "Proximal point algorithm\n" "---------------------------------------------------------\n" "Proximal operator: %s\n" "tau = %10e\tniter = %d\ttol = %s\n" % (type(prox), tau, niter, str(tol)) ) head = " Itn x[0] f" print(head) # initialize model x = x0.copy() pf = np.inf tolbreak = False # iterate for iiter in range(niter): x = prox.prox(x, tau) # run callback if callback is not None: callback(x) # tolerance check: break iterations if overall # objective does not decrease below tolerance if tol is not None: pfold = pf pf = prox(x) if np.abs(1.0 - pf / pfold) < tol: tolbreak = True # show iteration logger if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: if tol is None: pf = prox(x) msg = "%6g %12.5e %10.3e" % (iiter + 1, x[0], pf) print(msg) # break if tolerance condition is met if tolbreak: break if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x
[docs]def ProximalGradient( proxf: ProxOperator, proxg: ProxOperator, x0: NDArray, epsg: Union[float, NDArray] = 1.0, tau: Optional[float] = None, backtracking: bool = False, beta: float = 0.5, eta: float = 1.0, niter: int = 10, niterback: int = 100, acceleration: Optional[str] = None, tol: Optional[float] = None, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, ) -> NDArray: r"""Proximal gradient (optionally accelerated) Solves the following minimization problem using (Accelerated) Proximal gradient algorithm: .. math:: \mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + \epsilon g(\mathbf{x}) where :math:`f(\mathbf{x})` is a smooth convex function with a uniquely defined gradient and :math:`g(\mathbf{x})` is any convex function that has a known proximal operator. Parameters ---------- proxf : :obj:`pyproximal.ProxOperator` Proximal operator of f function (must have ``grad`` implemented) proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function x0 : :obj:`numpy.ndarray` Initial vector epsg : :obj:`float` or :obj:`numpy.ndarray`, optional Scaling factor of g function tau : :obj:`float` or :obj:`numpy.ndarray`, optional Positive scalar weight, which should satisfy the following condition to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is the Lipschitz constant of :math:`\nabla f`. When ``tau=None``, backtracking is used to adaptively estimate the best tau at each iteration. Finally, note that :math:`\tau` can be chosen to be a vector when dealing with problems with multiple right-hand-sides backtracking : :obj:`bool`, optional Force backtracking, even if ``tau`` is not equal to ``None``. In this case the chosen ``tau`` will be used as the initial guess in the first step of backtracking beta : :obj:`float`, optional Backtracking parameter (must be between 0 and 1) eta : :obj:`float`, optional Relaxation parameter (must be between 0 and 1, 0 excluded). niter : :obj:`int`, optional Number of iterations of iterative scheme niterback : :obj:`int`, optional Max number of iterations of backtracking acceleration : :obj:`str`, optional Acceleration (``None``, ``vandenberghe`` or ``fista``) tol : :obj:`float`, optional Tolerance on change of objective function (used as stopping criterion). If ``tol=None``, run until ``niter`` is reached callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model Notes ----- The Proximal gradient algorithm can be expressed by the following recursion: .. math:: \mathbf{x}^{k+1} = \mathbf{y}^k + \eta (\prox_{\tau^k \epsilon g}(\mathbf{y}^k - \tau^k \nabla f(\mathbf{y}^k)) - \mathbf{y}^k) \\ \mathbf{y}^{k+1} = \mathbf{x}^k + \omega^k (\mathbf{x}^k - \mathbf{x}^{k-1}) where at each iteration :math:`\tau^k` can be estimated by back-tracking as follows: .. math:: \begin{aligned} &\tau = \tau^{k-1} &\\ &repeat \; \mathbf{z} = \prox_{\tau \epsilon g}(\mathbf{x}^k - \tau \nabla f(\mathbf{x}^k)), \tau = \beta \tau \quad if \; f(\mathbf{z}) \leq \tilde{f}_\tau(\mathbf{z}, \mathbf{x}^k) \\ &\tau^k = \tau, \quad \mathbf{x}^{k+1} = \mathbf{z} &\\ \end{aligned} where :math:`\tilde{f}_\tau(\mathbf{x}, \mathbf{y}) = f(\mathbf{y}) + \nabla f(\mathbf{y})^T (\mathbf{x} - \mathbf{y}) + 1/(2\tau)||\mathbf{x} - \mathbf{y}||_2^2`. Different accelerations are provided: - ``acceleration=None``: :math:`\omega^k = 0`; - ``acceleration=vandenberghe`` [1]_: :math:`\omega^k = k / (k + 3)` for ` - ``acceleration=fista``: :math:`\omega^k = (t_{k-1}-1)/t_k` where :math:`t_k = (1 + \sqrt{1+4t_{k-1}^{2}}) / 2` [2]_ .. [1] Vandenberghe, L., "Fast proximal gradient methods", 2010. .. [2] Beck, A., and Teboulle, M. "A Fast Iterative Shrinkage-Thresholding Algorithm for Linear Inverse Problems", SIAM Journal on Imaging Sciences, vol. 2, pp. 183-202. 2009. """ # check if epgs is a vector epsg = np.asarray(epsg, dtype=float) if epsg.size == 1: epsg = epsg * np.ones(niter) epsg_print = str(epsg[0]) else: epsg_print = "Multi" if acceleration not in [None, "None", "vandenberghe", "fista"]: raise NotImplementedError( "Acceleration should be None, vandenberghe " "or fista" ) if show: tstart = time.time() print( "Accelerated Proximal Gradient\n" "---------------------------------------------------------\n" "Proximal operator (f): %s\n" "Proximal operator (g): %s\n" "tau = %s\tbacktrack = %s\tbeta = %10e\n" "epsg = %s\tniter = %d\ttol = %s\n" "" "niterback = %d\tacceleration = %s\n" % ( type(proxf), type(proxg), str(tau), backtracking, beta, epsg_print, niter, str(tol), niterback, acceleration, ) ) head = " Itn x[0] f g J=f+eps*g tau" print(head) if tau is None: backtracking = True tau = 1.0 # initialize model t = 1.0 x = x0.copy() y = x.copy() pfg = np.inf tolbreak = False # iterate for iiter in range(niter): xold = x.copy() # proximal step if not backtracking: if eta == 1.0: x = proxg.prox(y - tau * proxf.grad(y), epsg[iiter] * tau) else: x = x + eta * ( proxg.prox(x - tau * proxf.grad(x), epsg[iiter] * tau) - x ) else: x, tau = _backtracking( y, tau, proxf, proxg, epsg[iiter], beta=beta, niterback=niterback ) if eta != 1.0: x = x + eta * ( proxg.prox(x - tau * proxf.grad(x), epsg[iiter] * tau) - x ) # update internal parameters for bilinear operator if isinstance(proxf, BilinearOperator): proxf.updatexy(x) # update y if acceleration == "vandenberghe": omega = iiter / (iiter + 3) elif acceleration == "fista": told = t t = (1.0 + np.sqrt(1.0 + 4.0 * t**2)) / 2.0 omega = (told - 1.0) / t else: omega = 0 y = x + omega * (x - xold) # run callback if callback is not None: callback(x) # tolerance check: break iterations if overall # objective does not decrease below tolerance if tol is not None: pfgold = pfg pf, pg = proxf(x), proxg(x) pfg = pf + np.sum(epsg[iiter] * pg) if np.abs(1.0 - pfg / pfgold) < tol: tolbreak = True # show iteration logger if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: if tol is None: pf, pg = proxf(x), proxg(x) pfg = pf + np.sum(epsg[iiter] * pg) msg = "%6g %12.5e %10.3e %10.3e %10.3e %10.3e" % ( iiter + 1, ( np.real(to_numpy(x[0])) if x.ndim == 1 else np.real(to_numpy(x[0, 0])) ), pf, pg, pfg, tau, ) print(msg) # break if tolerance condition is met if tolbreak: break if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x
[docs]def AcceleratedProximalGradient( proxf: ProxOperator, proxg: ProxOperator, x0: NDArray, tau: Optional[float] = None, beta: float = 0.5, epsg: float = 1.0, niter: int = 10, niterback: int = 100, acceleration: str = "vandenberghe", tol: Optional[float] = None, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, ) -> NDArray: r"""Accelerated Proximal gradient This is a thin wrapper around :func:`pyproximal.optimization.primal.ProximalGradient` with ``vandenberghe`` or ``fista`` acceleration. See :func:`pyproximal.optimization.primal.ProximalGradient` for details. """ warnings.warn( "AcceleratedProximalGradient has been integrated directly into ProximalGradient " "from v0.5.0. It is recommended to start using ProximalGradient by selecting the " "appropriate acceleration parameter as this behaviour will become default in " "version v1.0.0 and AcceleratedProximalGradient will be removed.", FutureWarning, ) return ProximalGradient( proxf, proxg, x0, tau=tau, beta=beta, epsg=epsg, niter=niter, niterback=niterback, acceleration=acceleration, tol=tol, callback=callback, show=show, )
[docs]def AndersonProximalGradient( proxf: ProxOperator, proxg: ProxOperator, x0: NDArray, epsg: Union[float, NDArray] = 1.0, tau: Union[float, NDArray] = 1.0, niter: int = 10, nhistory: int = 10, epsr: float = 1e-10, safeguard: bool = False, tol: Optional[float] = None, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, ) -> NDArray: r"""Proximal gradient with Anderson acceleration Solves the following minimization problem using the Proximal gradient algorithm with Anderson acceleration: .. math:: \mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + \epsilon g(\mathbf{x}) where :math:`f(\mathbf{x})` is a smooth convex function with a uniquely defined gradient and :math:`g(\mathbf{x})` is any convex function that has a known proximal operator. Parameters ---------- proxf : :obj:`pyproximal.ProxOperator` Proximal operator of f function (must have ``grad`` implemented) proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function x0 : :obj:`numpy.ndarray` Initial vector epsg : :obj:`float` or :obj:`numpy.ndarray`, optional Scaling factor of g function tau : :obj:`float` or :obj:`numpy.ndarray`, optional Positive scalar weight, which should satisfy the following condition to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is the Lipschitz constant of :math:`\nabla f`. N ote that :math:`\tau` can be chosen to be a vector when dealing with problems with multiple right-hand-sides niter : :obj:`int`, optional Number of iterations of iterative scheme nhistory : :obj:`int`, optional Number of previous iterates to be kept in memory (to compute the scaling factors epsr : :obj:`float`, optional Scaling factor for regularization added to the inverse of :math:\mathbf{R}^T \mathbf{R}` safeguard : :obj:`bool`, optional Apply safeguarding strategy to the update (``True``) or not (``False``) tol : :obj:`float`, optional Tolerance on change of objective function (used as stopping criterion). If ``tol=None``, run until ``niter`` is reached callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model Notes ----- The Proximal gradient algorithm with Anderson acceleration can be expressed by the following recursion [1]_: .. math:: m_k = min(m, k)\\ \mathbf{g}^{k} = \mathbf{x}^{k} - \tau^k \nabla f(\mathbf{x}^k)\\ \mathbf{r}^{k} = \mathbf{g}^{k} - \mathbf{g}^{k}\\ \mathbf{G}^{k} = [\mathbf{g}^{k},..., \mathbf{g}^{k-m_k}]\\ \mathbf{R}^{k} = [\mathbf{r}^{k},..., \mathbf{r}^{k-m_k}]\\ \alpha_k = (\mathbf{R}^{kT} \mathbf{R}^{k})^{-1} \mathbf{1} / \mathbf{1}^T (\mathbf{R}^{kT} \mathbf{R}^{k})^{-1} \mathbf{1}\\ \mathbf{y}^{k+1} = \mathbf{G}^{k} \alpha_k\\ \mathbf{x}^{k+1} = \prox_{\tau^{k+1} g}(\mathbf{y}^{k+1}) where :math:`m` equals ``nhistory``, :math:`k=1,2,...,n_{iter}`, :math:`\mathbf{y}^{0}=\mathbf{x}^{0}`, :math:`\mathbf{y}^{1}=\mathbf{x}^{0} - \tau^0 \nabla f(\mathbf{x}^0)`, :math:`\mathbf{x}^{1}=\prox_{\tau^k g}(\mathbf{y}^{1})`, and :math:`\mathbf{g}^{0}=\mathbf{y}^{1}`. Refer to [1]_ for the guarded version of the algorithm (when ``safeguard=True``). .. [1] Mai, V., and Johansson, M. "Anderson Acceleration of Proximal Gradient Methods", 2020. """ # check if epgs is a vector epsg = np.asarray(epsg, dtype=float) if epsg.size == 1: epsg = epsg * np.ones(niter) epsg_print = str(epsg[0]) else: epsg_print = "Multi" if show: tstart = time.time() print( "Proximal Gradient with Anderson Acceleration \n" "---------------------------------------------------------\n" "Proximal operator (f): %s\n" "Proximal operator (g): %s\n" "tau = %s\t\tepsg = %s\tniter = %d\n" "nhist = %d\tepsr = %s\n" "guard = %s\ttol = %s\n" % ( type(proxf), type(proxg), str(tau), epsg_print, niter, nhistory, str(epsr), str(safeguard), str(tol), ) ) head = " Itn x[0] f g J=f+eps*g tau" print(head) # initialize model y = x0 - tau * proxf.grad(x0) x = proxg.prox(y, epsg[0] * tau) g = y.copy() r = g - x0 R, G = [ g, ], [ r, ] pf = proxf(x) pfg = np.inf tolbreak = False # iterate for iiter in range(niter): # update fix point g = x - tau * proxf.grad(x) r = g - y # update history vectors R.insert(0, r) G.insert(0, g) if iiter >= nhistory - 1: R.pop(-1) G.pop(-1) # solve for alpha coefficients Rstack = np.vstack(R) Rinv = np.linalg.pinv(Rstack @ Rstack.T + epsr * np.linalg.norm(Rstack) ** 2) ones = np.ones(min(nhistory, iiter + 2)) Rinvones = Rinv @ ones alpha = Rinvones / (ones[None] @ Rinvones) if not safeguard: # update auxiliary variable y = np.vstack(G).T @ alpha # update main variable x = proxg.prox(y, epsg[iiter] * tau) else: # update auxiliary variable ytest = np.vstack(G).T @ alpha # update main variable xtest = proxg.prox(ytest, epsg[iiter] * tau) # check if function is decreased, otherwise do basic PG step pfold, pf = pf, proxf(xtest) if pf <= pfold - tau * np.linalg.norm(proxf.grad(x)) ** 2 / 2: y = ytest x = xtest else: x = proxg.prox(g, epsg[iiter] * tau) y = g # run callback if callback is not None: callback(x) # tolerance check: break iterations if overall # objective does not decrease below tolerance if tol is not None: pfgold = pfg pf, pg = proxf(x), proxg(x) pfg = pf + np.sum(epsg[iiter] * pg) if np.abs(1.0 - pfg / pfgold) < tol: tolbreak = True # show iteration logger if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: if tol is None: pf, pg = proxf(x), proxg(x) pfg = pf + np.sum(epsg[iiter] * pg) msg = "%6g %12.5e %10.3e %10.3e %10.3e %10.3e" % ( iiter + 1, ( np.real(to_numpy(x[0])) if x.ndim == 1 else np.real(to_numpy(x[0, 0])) ), pf, pg, pfg, tau, ) print(msg) # break if tolerance condition is met if tolbreak: break if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x
[docs]def GeneralizedProximalGradient( proxfs: List[ProxOperator], proxgs: List[ProxOperator], x0: NDArray, tau: Optional[float], epsg: Union[float, NDArray] = 1.0, weights: Optional[NDArray] = None, eta: float = 1.0, niter: int = 10, acceleration: Optional[str] = None, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, ) -> NDArray: r"""Generalized Proximal gradient Solves the following minimization problem using Generalized Proximal gradient algorithm: .. math:: \mathbf{x} = \argmin_\mathbf{x} \sum_{i=1}^n f_i(\mathbf{x}) + \sum_{j=1}^m \epsilon_j g_j(\mathbf{x}),~~n,m \in \mathbb{N}^+ where the :math:`f_i(\mathbf{x})` are smooth convex functions with a uniquely defined gradient and the :math:`g_j(\mathbf{x})` are any convex function that have a known proximal operator. Parameters ---------- proxfs : :obj:`list` Proximal operators of the :math:`f_i` functions (must have ``grad`` implemented) proxgs : :obj:`list` Proximal operators of the :math:`g_j` functions x0 : :obj:`numpy.ndarray` Initial vector tau : :obj:`float` Positive scalar weight, which should satisfy the following condition to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is the Lipschitz constant of :math:`\sum_{i=1}^n \nabla f_i`. epsg : :obj:`float` or :obj:`numpy.ndarray`, optional Scaling factor(s) of ``g`` function(s) weights : :obj:`float`, optional Weighting factors of ``g`` functions. Must sum to 1. eta : :obj:`float`, optional Relaxation parameter (must be between 0 and 1, 0 excluded). Note that this will be only used when ``acceleration=None``. niter : :obj:`int`, optional Number of iterations of iterative scheme acceleration: :obj:`str`, optional Acceleration (``None``, ``vandenberghe`` or ``fista``) callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model Notes ----- The Generalized Proximal gradient algorithm can be expressed by the following recursion: .. math:: \text{for } j=1,\cdots,n, \\ ~~~~\mathbf z_j^{k+1} = \mathbf z_j^{k} + \eta \left[prox_{\frac{\tau^k \epsilon_j}{w_j} g_j}\left(2 \mathbf{x}^{k} - \mathbf{z}_j^{k} - \tau^k \sum_{i=1}^n \nabla f_i(\mathbf{x}^{k})\right) - \mathbf{x}^{k} \right] \\ \mathbf{x}^{k+1} = \sum_{j=1}^n w_j \mathbf z_j^{k+1} \\ where :math:`\sum_{j=1}^n w_j=1`. In the current implementation, :math:`w_j=1/n` when not provided. """ # check if weights sum to 1 if weights is None: weights = np.ones(len(proxgs)) / len(proxgs) if len(weights) != len(proxgs) or np.sum(weights) != 1.0: raise ValueError( f"omega={weights} must be an array of size {len(proxgs)} " f"summing to 1" ) # check if epgs is a vector epsg = np.asarray(epsg, dtype=float) if epsg.size == 1: epsg_print = str(epsg) epsg = epsg * np.ones(len(proxgs)) else: epsg_print = "Multi" if acceleration not in [None, "None", "vandenberghe", "fista"]: raise NotImplementedError( "Acceleration should be None, vandenberghe " "or fista" ) if show: tstart = time.time() print( "Generalized Proximal Gradient\n" "---------------------------------------------------------\n" "Proximal operators (f): %s\n" "Proximal operators (g): %s\n" "tau = %10e\nepsg = %s\tniter = %d\n" % ( [type(proxf) for proxf in proxfs], [type(proxg) for proxg in proxgs], 0 if tau is None else tau, epsg_print, niter, ) ) head = " Itn x[0] f g J=f+g" print(head) if tau is None: tau = 1.0 # initialize model t = 1.0 x = x0.copy() y = x.copy() zs = [x.copy() for _ in range(len(proxgs))] # iterate for iiter in range(niter): xold = x.copy() # gradient grad = np.zeros_like(x) for i, proxf in enumerate(proxfs): grad += proxf.grad(x) # proximal step x = np.zeros_like(x) for i, proxg in enumerate(proxgs): ztmp = 2 * y - zs[i] - tau * grad ztmp = proxg.prox(ztmp, tau * epsg[i] / weights[i]) zs[i] += eta * (ztmp - y) x += weights[i] * zs[i] # update y if acceleration == "vandenberghe": omega = iiter / (iiter + 3) elif acceleration == "fista": told = t t = (1.0 + np.sqrt(1.0 + 4.0 * t**2)) / 2.0 omega = (told - 1.0) / t else: omega = 0 y = x + omega * (x - xold) # run callback if callback is not None: callback(x) if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: pf: float = np.sum([proxf(x) for proxf in proxfs]) pg: float = np.sum([eg * proxg(x) for proxg, eg in zip(proxgs, epsg)]) msg = "%6g %12.5e %10.3e %10.3e %10.3e" % ( iiter + 1, x[0] if x.ndim == 1 else x[0, 0], pf, pg, pf + pg, ) print(msg) if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x
[docs]def HQS( proxf: ProxOperator, proxg: ProxOperator, x0: NDArray, tau: Union[float, NDArray], niter: int = 10, z0: Optional[NDArray] = None, gfirst: bool = True, callback: Optional[Callable[..., None]] = None, callbackz: bool = False, show: bool = False, ) -> Tuple[NDArray, NDArray]: r"""Half Quadratic splitting Solves the following minimization problem using Half Quadratic splitting algorithm: .. math:: \mathbf{x},\mathbf{z} = \argmin_{\mathbf{x},\mathbf{z}} f(\mathbf{x}) + g(\mathbf{z}) \\ s.t. \; \mathbf{x}=\mathbf{z} where :math:`f(\mathbf{x})` and :math:`g(\mathbf{z})` are any convex function that has a known proximal operator. Parameters ---------- proxf : :obj:`pyproximal.ProxOperator` Proximal operator of f function proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function x0 : :obj:`numpy.ndarray` Initial vector (not required when ``gfirst=False``, can pass ``None``) tau : :obj:`float` or :obj:`numpy.ndarray` Positive scalar weight, which should satisfy the following condition to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is the Lipschitz constant of :math:`\nabla f`. Finally note that :math:`\tau` can be chosen to be a vector of size ``niter`` such that different :math:`\tau` is used at different iterations (i.e., continuation strategy) niter : :obj:`int` Number of iterations of iterative scheme z0 : :obj:`numpy.ndarray`, optional Initial z vector (not required when ``gfirst=True``) gfirst : :obj:`bool`, optional Apply Proximal of operator ``g`` first (``True``) or Proximal of operator ``f`` first (``False``) callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector callbackz : :obj:`bool`, optional Modify callback signature to (``callback(x, z)``) when ``callbackz=True`` show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model z : :obj:`numpy.ndarray` Inverted second model Raises ------ ValueError If both ``x0`` and ``z0`` are set to ``None`` Notes ----- The HQS algorithm can be expressed by the following recursion [1]_: .. math:: \mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{x}^{k}) \\ \mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{z}^{k+1}) for ``gfirst=False``, or .. math:: \mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{z}^{k}) \\ \mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{x}^{k+1}) for ``gfirst=False``. Note that ``x`` and ``z`` converge to each other, however if iterations are stopped too early ``x`` is guaranteed to belong to the domain of ``f`` while ``z`` is guaranteed to belong to the domain of ``g``. Depending on the problem either of the two may be the best solution. .. [1] D., Geman, and C., Yang, "Nonlinear image recovery with halfquadratic regularization", IEEE Transactions on Image Processing, 4, 7, pp. 932-946, 1995. """ # initialize variables x, z = _x0z0_init(x0, z0) ncp = get_array_module(x) # check if tau is a vector tau = ncp.asarray(tau, dtype=float) if tau.size == 1: tau_print = str(tau) tau = tau * np.ones(niter) else: tau_print = "Variable" if show: tstart = time.time() print( "HQS\n" "---------------------------------------------------------\n" "Proximal operator (f): %s\n" "Proximal operator (g): %s\n" "tau = %s\tniter = %d\n" % (type(proxf), type(proxg), tau_print, niter) ) head = " Itn x[0] f g J = f + g" print(head) # run iterations for iiter in range(niter): if gfirst: z = proxg.prox(x, tau[iiter]) x = proxf.prox(z, tau[iiter]) else: x = proxf.prox(z, tau[iiter]) z = proxg.prox(x, tau[iiter]) # run callback if callback is not None: if callbackz: callback(x, z) else: callback(x) if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: pf, pg = proxf(x), proxg(x) msg = "%6g %12.5e %10.3e %10.3e %10.3e" % ( iiter + 1, np.real(to_numpy(x[0])), pf, pg, pf + pg, ) print(msg) if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x, z
[docs]def ADMM( proxf: ProxOperator, proxg: ProxOperator, x0: NDArray, tau: float, niter: int = 10, z0: Optional[NDArray] = None, gfirst: bool = False, callback: Optional[Callable[..., None]] = None, callbackz: bool = False, show: bool = False, ) -> Tuple[NDArray, NDArray]: r"""Alternating Direction Method of Multipliers Solves the following minimization problem using Alternating Direction Method of Multipliers: .. math:: \mathbf{x},\mathbf{z} = \argmin_{\mathbf{x},\mathbf{z}} f(\mathbf{x}) + g(\mathbf{z}) \\ s.t. \; \mathbf{x}=\mathbf{z} where :math:`f(\mathbf{x})` and :math:`g(\mathbf{z})` are any convex function that has a known proximal operator. ADMM can also solve the problem of the form above with a more general constraint: :math:`\mathbf{Ax}+\mathbf{Bz}=\mathbf{c}`. This routine implements the special case where :math:`\mathbf{A}=\mathbf{I}`, :math:`\mathbf{B}=-\mathbf{I}`, and :math:`\mathbf{c}=\mathbf{0}`, as a general algorithm can be obtained for any choice of :math:`f` and :math:`g` provided they have a known proximal operator. On the other hand, for more general choice of :math:`\mathbf{A}`, :math:`\mathbf{B}`, and :math:`\mathbf{c}`, the iterations are not generalizable, i.e. they depend on the choice of the :math:`f` and :math:`g` functions. For this reason, we currently only provide an additional solver for the special case where :math:`f` is a :class:`pyproximal.proximal.L2` operator with a linear operator :math:`\mathbf{G}` and data :math:`\mathbf{y}`, :math:`\mathbf{B}=-\mathbf{I}` and :math:`\mathbf{c}=\mathbf{0}`, called :func:`pyproximal.optimization.primal.ADMML2`. Note that for the very same choice of :math:`\mathbf{B}` and :math:`\mathbf{c}`, the :func:`pyproximal.optimization.primal.LinearizedADMM` can also be used (and this does not require a specific choice of :math:`f`). Parameters ---------- proxf : :obj:`pyproximal.ProxOperator` Proximal operator of f function proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function x0 : :obj:`numpy.ndarray` Initial vector (not required when ``gfirst=False``, can pass ``None``) tau : :obj:`float` Positive scalar weight, which should satisfy the following condition to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is the Lipschitz constant of :math:`\nabla f`. niter : :obj:`int` Number of iterations of iterative scheme z0 : :obj:`numpy.ndarray`, optional Initial z vector (not required when ``gfirst=True``) gfirst : :obj:`bool`, optional Apply Proximal of operator ``g`` first (``True``) or Proximal of operator ``f`` first (``False``) callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector callbackz : :obj:`bool`, optional Modify callback signature to (``callback(x, z)``) when ``callbackz=True`` show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model z : :obj:`numpy.ndarray` Inverted second model See Also -------- ADMML2: ADMM with L2 misfit function LinearizedADMM: Linearized ADMM Raises ------ ValueError If both ``x0`` and ``z0`` are set to ``None`` Notes ----- The ADMM algorithm can be expressed by the following recursion [1]_: .. math:: \mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{z}^{k} - \mathbf{u}^{k})\\ \mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{x}^{k+1} + \mathbf{u}^{k})\\ \mathbf{u}^{k+1} = \mathbf{u}^{k} + \mathbf{x}^{k+1} - \mathbf{z}^{k+1} Note that ``x`` and ``z`` converge to each other, however if iterations are stopped too early ``x`` is guaranteed to belong to the domain of ``f`` while ``z`` is guaranteed to belong to the domain of ``g``. Depending on the problem either of the two may be the best solution. .. [1] S. Boyd, N. Parikh, E. Chu, B. Peleato, and J. Eckstein. 2011. Distributed optimization and statistical learning via the alternating direction method of multipliers. Foundations and Trends in Machine Learning, 3 (1), 1-122. https://doi.org/10.1561/2200000016. """ # initialize variables x, z = _x0z0_init(x0, z0) ncp = get_array_module(x) u = ncp.zeros_like(x) if show: tstart = time.time() print( "ADMM\n" "---------------------------------------------------------\n" "Proximal operator (f): %s\n" "Proximal operator (g): %s\n" "tau = %10e\tniter = %d\n" % (type(proxf), type(proxg), tau, niter) ) head = " Itn x[0] f g J = f + g" print(head) # run iterations for iiter in range(niter): if gfirst: z = proxg.prox(x + u, tau) x = proxf.prox(z - u, tau) else: x = proxf.prox(z - u, tau) z = proxg.prox(x + u, tau) u = u + x - z # run callback if callback is not None: if callbackz: callback(x, z) else: callback(x) if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: pf, pg = proxf(x), proxg(x) msg = "%6g %12.5e %10.3e %10.3e %10.3e" % ( iiter + 1, np.real(to_numpy(x[0])), pf, pg, pf + pg, ) print(msg) if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x, z
[docs]def ADMML2( proxg: ProxOperator, Op: "LinearOperator", b: NDArray, A: "LinearOperator", x0: NDArray, tau: float, niter: int = 10, z0: Optional[NDArray] = None, gfirst: bool = False, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, **kwargs_solver: Dict[str, Any], ) -> Tuple[NDArray, NDArray]: r"""Alternating Direction Method of Multipliers for L2 misfit term Solves the following minimization problem using Alternating Direction Method of Multipliers: .. math:: \mathbf{x},\mathbf{z} = \argmin_{\mathbf{x},\mathbf{z}} \frac{1}{2}||\mathbf{Op}\mathbf{x} - \mathbf{b}||_2^2 + g(\mathbf{z}) \\ s.t. \; \mathbf{Ax}=\mathbf{z} where :math:`g(\mathbf{z})` is any convex function that has a known proximal operator. Parameters ---------- proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function Op : :obj:`pylops.LinearOperator` Linear operator of data misfit term b : :obj:`numpy.ndarray` Data A : :obj:`pylops.LinearOperator` Linear operator of regularization term x0 : :obj:`numpy.ndarray` Initial vector tau : :obj:`float` Positive scalar weight, which should satisfy the following condition to guarantees convergence: :math:`\tau \in (0, 1/\lambda_{max}(\mathbf{A}^H\mathbf{A})]`. niter : :obj:`int`, optional Number of iterations of iterative scheme z0 : :obj:`numpy.ndarray` Initial auxiliary vector. If ``None``, initialized to ``A @ x0``. gfirst : :obj:`bool`, optional Apply Proximal of operator ``g`` first (``True``) or Proximal of operator ``f`` first (``False``) callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector show : :obj:`bool`, optional Display iterations log **kwargs_solver Arbitrary keyword arguments for :py:func:`scipy.sparse.linalg.lsqr` used to solve the x-update Returns ------- x : :obj:`numpy.ndarray` Inverted model z : :obj:`numpy.ndarray` Inverted second model Raises ------ ValueError If both ``x0`` and ``z0`` are set to ``None`` or ``x0`` is set to None See Also -------- ADMM: ADMM LinearizedADMM: Linearized ADMM Notes ----- The ADMM algorithm can be expressed by the following recursion: .. math:: \mathbf{x}^{k+1} = \argmin_{\mathbf{x}} \frac{1}{2}||\mathbf{Op}\mathbf{x} - \mathbf{b}||_2^2 + \frac{1}{2\tau} ||\mathbf{Ax} - \mathbf{z}^k + \mathbf{u}^k||_2^2\\ \mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{Ax}^{k+1} + \mathbf{u}^{k})\\ \mathbf{u}^{k+1} = \mathbf{u}^{k} + \mathbf{Ax}^{k+1} - \mathbf{z}^{k+1} """ # initialize variables x, z = _x0z0_init(x0, z0, A, Opname="A") ncp = get_array_module(x) u = ncp.zeros_like(z) if show: tstart = time.time() print( "ADMM\n" "---------------------------------------------------------\n" "Proximal operator (g): %s\n" "tau = %10e\tniter = %d\n" % (type(proxg), tau, niter) ) head = " Itn x[0] f g J = f + g" print(head) # run iterations sqrttau = 1.0 / sqrt(tau) for iiter in range(niter): if gfirst: Ax = A @ x z = proxg.prox(Ax + u, tau) # solve augumented system x = regularized_inversion( Op, b, [ A, ], x0=x, dataregs=[ z - u, ], epsRs=[ sqrttau, ], **kwargs_solver, )[0] else: # solve augumented system x = regularized_inversion( Op, b, [ A, ], x0=x, dataregs=[ z - u, ], epsRs=[ sqrttau, ], **kwargs_solver, )[0] Ax = A @ x z = proxg.prox(Ax + u, tau) u = u + Ax - z # run callback if callback is not None: callback(x) if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: pf, pg = 0.5 * np.linalg.norm(Op @ x - b) ** 2, proxg(Ax) msg = "%6g %12.5e %10.3e %10.3e %10.3e" % ( iiter + 1, np.real(to_numpy(x[0])), pf, pg, pf + pg, ) print(msg) if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x, z
[docs]def LinearizedADMM( proxf: ProxOperator, proxg: ProxOperator, A: "LinearOperator", x0: NDArray, tau: float, mu: float, niter: int = 10, z0: Optional[NDArray] = None, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, ) -> Tuple[NDArray, NDArray]: r"""Linearized Alternating Direction Method of Multipliers Solves the following minimization problem using Linearized Alternating Direction Method of Multipliers: .. math:: \mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + g(\mathbf{A}\mathbf{x}) where :math:`f(\mathbf{x})` and :math:`g(\mathbf{x})` are any convex function that has a known proximal operator and :math:`\mathbf{A}` is a linear operator. Parameters ---------- proxf : :obj:`pyproximal.ProxOperator` Proximal operator of f function proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function A : :obj:`pylops.LinearOperator` Linear operator x0 : :obj:`numpy.ndarray` Initial vector tau : :obj:`float`, optional Positive scalar weight, which should satisfy the following condition to guarantee convergence: :math:`\mu \in (0, \tau/\lambda_{max}(\mathbf{A}^H\mathbf{A})]`. mu : :obj:`float`, optional Second positive scalar weight, which should satisfy the following condition to guarantees convergence: :math:`\mu \in (0, \tau/\lambda_{max}(\mathbf{A}^H\mathbf{A})]`. niter : :obj:`int`, optional Number of iterations of iterative scheme z0 : :obj:`numpy.ndarray` Initial auxiliary vector. If ``None``, initialized to ``A @ x0``. callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model z : :obj:`numpy.ndarray` Inverted second model Raises ------ ValueError If both ``x0`` and ``z0`` are set to ``None`` See Also -------- ADMM: ADMM ADMML2: ADMM with L2 misfit function Notes ----- The Linearized-ADMM algorithm can be expressed by the following recursion [1]_: .. math:: \mathbf{x}^{k+1} = \prox_{\mu f}(\mathbf{x}^{k} - \frac{\mu}{\tau} \mathbf{A}^H(\mathbf{A} \mathbf{x}^k - \mathbf{z}^k + \mathbf{u}^k))\\ \mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{A} \mathbf{x}^{k+1} + \mathbf{u}^k)\\ \mathbf{u}^{k+1} = \mathbf{u}^{k} + \mathbf{A}\mathbf{x}^{k+1} - \mathbf{z}^{k+1} .. [1] N., Parikh, "Proximal Algorithms", Foundations and Trends in Optimization. 2013. """ # initialize variables x, z = _x0z0_init(x0, z0, A, Opname="A") Ax = A.matvec(x) if z0 is None else z ncp = get_array_module(x) u = ncp.zeros_like(z) if show: tstart = time.time() print( "Linearized-ADMM\n" "---------------------------------------------------------\n" "Proximal operator (f): %s\n" "Proximal operator (g): %s\n" "Linear operator (A): %s\n" "tau = %10e\tmu = %10e\tniter = %d\n" % (type(proxf), type(proxg), type(A), tau, mu, niter) ) head = " Itn x[0] f g J = f + g" print(head) # run iterations for iiter in range(niter): x = proxf.prox(x - mu / tau * A.rmatvec(Ax - z + u), mu) Ax = A.matvec(x) z = proxg.prox(Ax + u, tau) u = u + Ax - z # run callback if callback is not None: callback(x) if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: pf, pg = proxf(x), proxg(Ax) msg = "%6g %12.5e %10.3e %10.3e %10.3e" % ( iiter + 1, np.real(to_numpy(x[0])), pf, pg, pf + pg, ) print(msg) if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") return x, z
[docs]def TwIST( proxg: ProxOperator, A: "LinearOperator", b: NDArray, x0: NDArray, alpha: Optional[float] = None, beta: Optional[float] = None, eigs: Optional[Tuple[float, float]] = None, niter: int = 10, callback: Optional[Callable[[NDArray], None]] = None, show: bool = False, returncost: bool = False, ) -> Union[NDArray, Tuple[NDArray, NDArray]]: r"""Two-step Iterative Shrinkage/Threshold Solves the following minimization problem using Two-step Iterative Shrinkage/Threshold: .. math:: \mathbf{x} = \argmin_\mathbf{x} \frac{1}{2} ||\mathbf{b} - \mathbf{Ax}||_2^2 + g(\mathbf{x}) where :math:`\mathbf{A}` is a linear operator and :math:`g(\mathbf{x})` is any convex function that has a known proximal operator. Parameters ---------- proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function A : :obj:`pylops.LinearOperator` Linear operator b : :obj:`numpy.ndarray` Data x0 : :obj:`numpy.ndarray` Initial vector alpha : :obj:`float`, optional Positive scalar weight (if ``None``, estimated based on the eigenvalues of :math:`\mathbf{A}`, see Notes for details) beta : :obj:`float`, optional Positive scalar weight (if ``None``, estimated based on the eigenvalues of :math:`\mathbf{A}`, see Notes for details) eigs : :obj:`tuple`, optional Largest and smallest eigenvalues of :math:`\mathbf{A}^H \mathbf{A}`. If passed, computes `alpha` and `beta` based on them. niter : :obj:`int`, optional Number of iterations of iterative scheme callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector show : :obj:`bool`, optional Display iterations log returncost : :obj:`bool`, optional Return cost function Returns ------- x : :obj:`numpy.ndarray` Inverted model j : :obj:`numpy.ndarray` Cost function Notes ----- The TwIST algorithm can be expressed by the following recursion: .. math:: \mathbf{x}^{k+1} = (1-\alpha) \mathbf{x}^{k-1} + (\alpha-\beta) \mathbf{x}^k + \beta \prox_{g} (\mathbf{x}^k + \mathbf{A}^H (\mathbf{b} - \mathbf{A}\mathbf{x}^k)). where :math:`\mathbf{x}^{1} = \prox_{g} (\mathbf{x}^0 + \mathbf{A}^T (\mathbf{b} - \mathbf{A}\mathbf{x}^0))`. The optimal weighting parameters :math:`\alpha` and :math:`\beta` are linked to the smallest and largest eigenvalues of :math:`\mathbf{A}^H\mathbf{A}` as follows: .. math:: \alpha = 1 + \rho^2 \\ \beta =\frac{2 \alpha}{\Lambda_{max} + \lambda_{min}} where :math:`\rho=\frac{1-\sqrt{k}}{1+\sqrt{k}}` with :math:`k=\frac{\lambda_{min}}{\Lambda_{max}}` and :math:`\Lambda_{max}=max(1, \lambda_{max})`. Experimentally, it has been observed that TwIST is robust to the choice of such parameters. Finally, note that in the case of :math:`\alpha=1` and :math:`\beta=1`, TwIST is identical to IST. """ # define proxf as L2 proximal proxf = L2(Op=A, b=b) # find alpha and beta if alpha is None or beta is None: if eigs is None: emin = A.eigs(neigs=1, which="SM") emax = max([1, A.eigs(neigs=1, which="LM")]) else: emax, emin = eigs k = emin / emax rho = (1 - sqrt(k)) / (1 + sqrt(k)) alpha = 1 + rho**2 beta = 2 * alpha / (emax + emin) # compute proximal of g on initial guess (x_1) xold = x0.copy() x = proxg.prox(xold - proxf.grad(xold), 1.0) if show: tstart = time.time() print( "TwIST\n" "---------------------------------------------------------\n" "Proximal operator (g): %s\n" "Linear operator (A): %s\n" "alpha = %10e\tbeta = %10e\tniter = %d\n" % (type(proxg), type(A), alpha, beta, niter) ) head = " Itn x[0] f g J = f + g" print(head) # iterate if returncost: j = np.zeros(niter) for iiter in range(niter): # compute new x xnew = ( (1 - alpha) * xold + (alpha - beta) * x + beta * proxg.prox(x - proxf.grad(x), 1.0) ) # save current x as old (x_i -> x_i-1) xold = x.copy() # save new x as current (x_i+1 -> x_i) x = xnew.copy() # compute cost function if returncost: j[iiter] = proxf(x) + proxg(x) # run callback if callback is not None: callback(x) if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: pf, pg = proxf(x), proxg(x) msg = "%6g %12.5e %10.3e %10.3e %10.3e" % ( iiter + 1, np.real(to_numpy(x[0])), pf, pg, pf + pg, ) print(msg) if show: print("\nTotal time (s) = %.2f" % (time.time() - tstart)) print("---------------------------------------------------------\n") if returncost: return x, j else: return x
[docs]def DouglasRachfordSplitting( proxf: ProxOperator, proxg: ProxOperator, x0: NDArray, tau: float, eta: float = 1.0, niter: int = 10, gfirst: bool = True, callback: Optional[Callable[..., None]] = None, callbacky: bool = False, show: bool = False, ) -> Tuple[NDArray, NDArray]: r"""Douglas-Rachford Splitting Solves the following minimization problem using Douglas-Rachford Splitting algorithm: .. math:: \mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + g(\mathbf{x}) where :math:`f(\mathbf{x})` and :math:`g(\mathbf{x})` are any convex functions that has known proximal operators. Parameters ---------- proxf : :obj:`pyproximal.ProxOperator` Proximal operator of f function proxg : :obj:`pyproximal.ProxOperator` Proximal operator of g function x0 : :obj:`numpy.ndarray` Initial vector tau : :obj:`float` Positive scalar weight eta : :obj:`float`, optional Relaxation parameter (must be between 0 and 2, 0 excluded). niter : :obj:`int`, optional Number of iterations of iterative scheme gfirst : :obj:`bool`, optional Apply Proximal of operator ``g`` first (``True``) or Proximal of operator ``f`` first (``False``) callback : :obj:`callable`, optional Function with signature (``callback(x)``) to call after each iteration where ``x`` is the current model vector callbacky : :obj:`bool`, optional Modify callback signature to (``callback(x, y)``) when ``callbacky=True`` show : :obj:`bool`, optional Display iterations log Returns ------- x : :obj:`numpy.ndarray` Inverted model y : :obj:`numpy.ndarray` Inverted second model Notes ----- The Douglas-Rachford Splitting algorithm can be expressed by the following recursion [1]_, [2]_: .. math:: \mathbf{y}^{k} &= \prox_{\tau g}(\mathbf{x}^k) \\ \mathbf{x}^{k+1} &= \mathbf{x}^{k} + \eta (\prox_{\tau f}(2 \mathbf{y}^{k} - \mathbf{x}^{k}) - \mathbf{y}^{k}) .. [1] Patrick L. Combettes and Jean-Christophe Pesquet. 2011. Proximal Splitting Methods in Signal Processing. In Fixed-Point Algorithms for Inverse Problems in Science and Engineering, Springer, 185-212. https://doi.org/10.1007/978-1-4419-9569-8_10 .. [2] Scott B. Lindstrom and Brailey Sims. 2021. Survey: Sixty Years of Douglas-Rachford. Journal of the Australian Mathematical Society, 110, 3, 333-370. https://doi.org/10.1017/S1446788719000570 https://arxiv.org/abs/1809.07181 """ if show: tstart = time.time() print( "Douglas-Rachford Splitting\n" "---------------------------------------------------------\n" f"Proximal operator (f): {type(proxf)}\n" f"Proximal operator (g): {type(proxg)}\n" f"tau = {tau:10e}\tniter = {niter:d}\n" ) head = " Itn x[0] f g J = f + g" print(head) x = x0.copy() for iiter in range(niter): if gfirst: y = proxg.prox(x, tau) x = x + eta * (proxf.prox(2 * y - x, tau) - y) else: y = proxf.prox(x, tau) x = x + eta * (proxg.prox(2 * y - x, tau) - y) # run callback if callback is not None: if callbacky: callback(x, y) else: callback(x) if show: if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0: pf, pg = proxf(x), proxg(x) print( f"{iiter + 1:6d} {np.real(to_numpy(x[0])):12.5e} " f"{pf:10.3e} {pg:10.3e} {pf + pg:10.3e}" ) if show: print(f"\nTotal time (s) = {time.time() - tstart:.2f}") print("---------------------------------------------------------\n") return x, y