import time
import warnings
from collections.abc import Callable
from math import sqrt
from typing import TYPE_CHECKING, Any, Optional
import numpy as np
from pylops.optimization.leastsquares import regularized_inversion
from pylops.utils.backend import get_array_module, to_numpy
from pylops.utils.typing import NDArray
from pyproximal.proximal import L2
from pyproximal.ProxOperator import ProxOperator
from pyproximal.utils.bilinear import BilinearOperator
if TYPE_CHECKING:
from pylops.linearoperator import LinearOperator
def _backtracking(
x: NDArray,
tau: float,
proxf: ProxOperator,
proxg: ProxOperator,
epsg: float,
beta: float = 0.5,
niterback: int = 10,
) -> tuple[NDArray, float]:
r"""Backtracking
Line-search algorithm for finding step sizes in proximal algorithms when
the Lipschitz constant of the operator is unknown (or expensive to
estimate).
"""
def ftilde(x: NDArray, y: NDArray, f: ProxOperator, tau: float) -> float:
xy = x - y
return float(
f(y) + np.dot(f.grad(y), xy) + (1.0 / (2.0 * tau)) * np.linalg.norm(xy) ** 2
)
iiterback = 0
while iiterback < niterback:
z = proxg.prox(x - tau * proxf.grad(x), epsg * tau)
ft = ftilde(z, x, proxf, tau)
if proxf(z) <= ft:
break
tau *= beta
iiterback += 1
return z, tau
def _x0z0_init(
x0: NDArray | None,
z0: NDArray | None,
Op: Optional["LinearOperator"] = None,
z0name: str | None = "z0",
Opname: str | None = "Op",
) -> tuple[NDArray, NDArray]:
r"""Initialize x0 and z0
Initialize x0 and z0 using the following convention.
For ``Op=None``:
- if both are provided, they are simply returned;
- if only one is provided (the other is ``None``), the one provided
is copied to the other one.
For ``Op!=None``, ``x0`` must be provided, and:
- if both are provided, they are simply returned;
- if ``z0`` is not provided, set to ``Op @ x0``.
Parameters
----------
x0 : :obj:`numpy.ndarray`
Initial vector
z0 : :obj:`numpy.ndarray`
Initial auxiliary vector
Op : :obj:`pylops.LinearOperator`, optional
Linear Operator to apply to ``x0``
z0name : :obj:`str`, optional
Name to display in error message instead of ``z0``
Opname : :obj:`str`, optional
Name to display in error message instead of ``Op``
"""
if x0 is None and z0 is None:
msg = f"Both x0 or {z0name} are None, provide either of them or both"
raise ValueError(msg)
if Op is None:
if x0 is None:
x0 = z0.copy() # type: ignore[union-attr]
elif z0 is None:
z0 = x0.copy()
else:
if x0 is None:
msg = f"x0 must be provided when {Opname} is also provided"
raise ValueError(msg)
elif z0 is None:
z0 = Op @ x0
return x0, z0
[docs]
def ProximalPoint(
prox: ProxOperator,
x0: NDArray,
tau: float,
niter: int = 10,
tol: float | None = None,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
) -> NDArray:
r"""Proximal point algorithm
Solves the following minimization problem using Proximal point algorithm:
.. math::
\mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x})
where :math:`f(\mathbf{x})` is any convex function that has a known
proximal operator.
Parameters
----------
prox : :obj:`pyproximal.ProxOperator`
Proximal operator
x0 : :obj:`numpy.ndarray`
Initial vector
tau : :obj:`float`
Positive scalar weight
niter : :obj:`int`, optional
Number of iterations of iterative scheme
tol : :obj:`float`, optional
Tolerance on change of objective function (used as stopping criterion). If
``tol=None``, run until ``niter`` is reached
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
Notes
-----
The Proximal point algorithm can be expressed by the following recursion:
.. math::
\mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{x}^k)
"""
if show:
tstart = time.time()
print(
"Proximal point algorithm\n"
"---------------------------------------------------------\n"
"Proximal operator: %s\n"
"tau = %10e\tniter = %d\ttol = %s\n" % (type(prox), tau, niter, str(tol))
)
head = " Itn x[0] f"
print(head)
# initialize model
x = x0.copy()
pf = np.inf
tolbreak = False
# iterate
for iiter in range(niter):
x = prox.prox(x, tau)
# run callback
if callback is not None:
callback(x)
# tolerance check: break iterations if overall
# objective does not decrease below tolerance
if tol is not None:
pfold = pf
pf = prox(x)
if np.abs(1.0 - pf / pfold) < tol:
tolbreak = True
# show iteration logger
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
if tol is None:
pf = prox(x)
msg = "%6g %12.5e %10.3e" % (iiter + 1, x[0], pf)
print(msg)
# break if tolerance condition is met
if tolbreak:
break
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x
[docs]
def ProximalGradient(
proxf: ProxOperator,
proxg: ProxOperator,
x0: NDArray,
epsg: float | NDArray = 1.0,
tau: float | None = None,
backtracking: bool = False,
beta: float = 0.5,
eta: float = 1.0,
niter: int = 10,
niterback: int = 100,
acceleration: str | None = None,
tol: float | None = None,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
) -> NDArray:
r"""Proximal gradient (optionally accelerated)
Solves the following minimization problem using (Accelerated) Proximal
gradient algorithm:
.. math::
\mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + \epsilon g(\mathbf{x})
where :math:`f(\mathbf{x})` is a smooth convex function with a uniquely
defined gradient and :math:`g(\mathbf{x})` is any convex function that
has a known proximal operator.
Parameters
----------
proxf : :obj:`pyproximal.ProxOperator`
Proximal operator of f function (must have ``grad`` implemented)
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
x0 : :obj:`numpy.ndarray`
Initial vector
epsg : :obj:`float` or :obj:`numpy.ndarray`, optional
Scaling factor of g function
tau : :obj:`float` or :obj:`numpy.ndarray`, optional
Positive scalar weight, which should satisfy the following condition
to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is
the Lipschitz constant of :math:`\nabla f`. When ``tau=None``,
backtracking is used to adaptively estimate the best tau at each
iteration. Finally, note that :math:`\tau` can be chosen to be a vector
when dealing with problems with multiple right-hand-sides
backtracking : :obj:`bool`, optional
Force backtracking, even if ``tau`` is not equal to ``None``. In this case
the chosen ``tau`` will be used as the initial guess in the first
step of backtracking
beta : :obj:`float`, optional
Backtracking parameter (must be between 0 and 1)
eta : :obj:`float`, optional
Relaxation parameter (must be between 0 and 1, 0 excluded).
niter : :obj:`int`, optional
Number of iterations of iterative scheme
niterback : :obj:`int`, optional
Max number of iterations of backtracking
acceleration : :obj:`str`, optional
Acceleration (``None``, ``vandenberghe`` or ``fista``)
tol : :obj:`float`, optional
Tolerance on change of objective function (used as stopping criterion). If
``tol=None``, run until ``niter`` is reached
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
Notes
-----
The Proximal gradient algorithm can be expressed by the following recursion:
.. math::
\mathbf{x}^{k+1} = \mathbf{y}^k + \eta (\prox_{\tau^k \epsilon g}(\mathbf{y}^k -
\tau^k \nabla f(\mathbf{y}^k)) - \mathbf{y}^k) \\
\mathbf{y}^{k+1} = \mathbf{x}^k + \omega^k
(\mathbf{x}^k - \mathbf{x}^{k-1})
where at each iteration :math:`\tau^k` can be estimated by back-tracking
as follows:
.. math::
\begin{aligned}
&\tau = \tau^{k-1} &\\
&repeat \; \mathbf{z} = \prox_{\tau \epsilon g}(\mathbf{x}^k -
\tau \nabla f(\mathbf{x}^k)), \tau = \beta \tau \quad if \;
f(\mathbf{z}) \leq \tilde{f}_\tau(\mathbf{z}, \mathbf{x}^k) \\
&\tau^k = \tau, \quad \mathbf{x}^{k+1} = \mathbf{z} &\\
\end{aligned}
where :math:`\tilde{f}_\tau(\mathbf{x}, \mathbf{y}) = f(\mathbf{y}) +
\nabla f(\mathbf{y})^T (\mathbf{x} - \mathbf{y}) +
1/(2\tau)||\mathbf{x} - \mathbf{y}||_2^2`.
Different accelerations are provided:
- ``acceleration=None``: :math:`\omega^k = 0`;
- ``acceleration=vandenberghe`` [1]_: :math:`\omega^k = k / (k + 3)` for `
- ``acceleration=fista``: :math:`\omega^k = (t_{k-1}-1)/t_k` where
:math:`t_k = (1 + \sqrt{1+4t_{k-1}^{2}}) / 2` [2]_
.. [1] Vandenberghe, L., "Fast proximal gradient methods", 2010.
.. [2] Beck, A., and Teboulle, M. "A Fast Iterative Shrinkage-Thresholding
Algorithm for Linear Inverse Problems", SIAM Journal on
Imaging Sciences, vol. 2, pp. 183-202. 2009.
"""
# check if epgs is a vector
epsg = np.asarray(epsg, dtype=float)
if epsg.size == 1:
epsg = epsg * np.ones(niter)
epsg_print = str(epsg[0])
else:
epsg_print = "Multi"
if acceleration not in [None, "None", "vandenberghe", "fista"]:
msg = "Acceleration should be None, vandenberghe or fista"
raise NotImplementedError(msg)
if show:
tstart = time.time()
print(
"Accelerated Proximal Gradient\n"
"---------------------------------------------------------\n"
"Proximal operator (f): %s\n"
"Proximal operator (g): %s\n"
"tau = %s\tbacktrack = %s\tbeta = %10e\n"
"epsg = %s\tniter = %d\ttol = %s\n"
""
"niterback = %d\tacceleration = %s\n"
% (
type(proxf),
type(proxg),
str(tau),
backtracking,
beta,
epsg_print,
niter,
str(tol),
niterback,
acceleration,
)
)
head = " Itn x[0] f g J=f+eps*g tau"
print(head)
if tau is None:
backtracking = True
tau = 1.0
# initialize model
t = 1.0
x = x0.copy()
y = x.copy()
pfg = np.inf
tolbreak = False
# iterate
for iiter in range(niter):
xold = x.copy()
# proximal step
if not backtracking:
if eta == 1.0:
x = proxg.prox(y - tau * proxf.grad(y), epsg[iiter] * tau)
else:
x = x + eta * (
proxg.prox(x - tau * proxf.grad(x), epsg[iiter] * tau) - x
)
else:
x, tau = _backtracking(
y, tau, proxf, proxg, epsg[iiter], beta=beta, niterback=niterback
)
if eta != 1.0:
x = x + eta * (
proxg.prox(x - tau * proxf.grad(x), epsg[iiter] * tau) - x
)
# update internal parameters for bilinear operator
if isinstance(proxf, BilinearOperator):
proxf.updatexy(x)
# update y
if acceleration == "vandenberghe":
omega = iiter / (iiter + 3)
elif acceleration == "fista":
told = t
t = (1.0 + np.sqrt(1.0 + 4.0 * t**2)) / 2.0
omega = (told - 1.0) / t
else:
omega = 0
y = x + omega * (x - xold)
# run callback
if callback is not None:
callback(x)
# tolerance check: break iterations if overall
# objective does not decrease below tolerance
if tol is not None:
pfgold = pfg
pf, pg = proxf(x), proxg(x)
pfg = pf + np.sum(epsg[iiter] * pg)
if np.abs(1.0 - pfg / pfgold) < tol:
tolbreak = True
# show iteration logger
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
if tol is None:
pf, pg = proxf(x), proxg(x)
pfg = pf + np.sum(epsg[iiter] * pg)
msg = "%6g %12.5e %10.3e %10.3e %10.3e %10.3e" % (
iiter + 1,
(
np.real(to_numpy(x[0]))
if x.ndim == 1
else np.real(to_numpy(x[0, 0]))
),
pf,
pg,
pfg,
tau,
)
print(msg)
# break if tolerance condition is met
if tolbreak:
break
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x
[docs]
def AcceleratedProximalGradient(
proxf: ProxOperator,
proxg: ProxOperator,
x0: NDArray,
tau: float | None = None,
beta: float = 0.5,
epsg: float = 1.0,
niter: int = 10,
niterback: int = 100,
acceleration: str = "vandenberghe",
tol: float | None = None,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
) -> NDArray:
r"""Accelerated Proximal gradient
This is a thin wrapper around :func:`pyproximal.optimization.primal.ProximalGradient` with
``vandenberghe`` or ``fista`` acceleration. See :func:`pyproximal.optimization.primal.ProximalGradient`
for details.
"""
warnings.warn(
"AcceleratedProximalGradient has been integrated directly into ProximalGradient "
"from v0.5.0. It is recommended to start using ProximalGradient by selecting the "
"appropriate acceleration parameter as this behaviour will become default in "
"version v1.0.0 and AcceleratedProximalGradient will be removed.",
FutureWarning,
stacklevel=2,
)
return ProximalGradient(
proxf,
proxg,
x0,
tau=tau,
beta=beta,
epsg=epsg,
niter=niter,
niterback=niterback,
acceleration=acceleration,
tol=tol,
callback=callback,
show=show,
)
[docs]
def AndersonProximalGradient(
proxf: ProxOperator,
proxg: ProxOperator,
x0: NDArray,
epsg: float | NDArray = 1.0,
tau: float | NDArray = 1.0,
niter: int = 10,
nhistory: int = 10,
epsr: float = 1e-10,
safeguard: bool = False,
tol: float | None = None,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
) -> NDArray:
r"""Proximal gradient with Anderson acceleration
Solves the following minimization problem using the Proximal
gradient algorithm with Anderson acceleration:
.. math::
\mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + \epsilon g(\mathbf{x})
where :math:`f(\mathbf{x})` is a smooth convex function with a uniquely
defined gradient and :math:`g(\mathbf{x})` is any convex function that
has a known proximal operator.
Parameters
----------
proxf : :obj:`pyproximal.ProxOperator`
Proximal operator of f function (must have ``grad`` implemented)
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
x0 : :obj:`numpy.ndarray`
Initial vector
epsg : :obj:`float` or :obj:`numpy.ndarray`, optional
Scaling factor of g function
tau : :obj:`float` or :obj:`numpy.ndarray`, optional
Positive scalar weight, which should satisfy the following condition
to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is
the Lipschitz constant of :math:`\nabla f`. N ote that :math:`\tau`
can be chosen to be a vector when dealing with problems with
multiple right-hand-sides
niter : :obj:`int`, optional
Number of iterations of iterative scheme
nhistory : :obj:`int`, optional
Number of previous iterates to be kept in memory (to compute the scaling factors
epsr : :obj:`float`, optional
Scaling factor for regularization added to the inverse of :math:\mathbf{R}^T \mathbf{R}`
safeguard : :obj:`bool`, optional
Apply safeguarding strategy to the update (``True``) or not (``False``)
tol : :obj:`float`, optional
Tolerance on change of objective function (used as stopping criterion). If
``tol=None``, run until ``niter`` is reached
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
Notes
-----
The Proximal gradient algorithm with Anderson acceleration can be expressed by the
following recursion [1]_:
.. math::
m_k = min(m, k)\\
\mathbf{g}^{k} = \mathbf{x}^{k} - \tau^k \nabla f(\mathbf{x}^k)\\
\mathbf{r}^{k} = \mathbf{g}^{k} - \mathbf{g}^{k}\\
\mathbf{G}^{k} = [\mathbf{g}^{k},..., \mathbf{g}^{k-m_k}]\\
\mathbf{R}^{k} = [\mathbf{r}^{k},..., \mathbf{r}^{k-m_k}]\\
\alpha_k = (\mathbf{R}^{kT} \mathbf{R}^{k})^{-1} \mathbf{1} / \mathbf{1}^T
(\mathbf{R}^{kT} \mathbf{R}^{k})^{-1} \mathbf{1}\\
\mathbf{y}^{k+1} = \mathbf{G}^{k} \alpha_k\\
\mathbf{x}^{k+1} = \prox_{\tau^{k+1} g}(\mathbf{y}^{k+1})
where :math:`m` equals ``nhistory``, :math:`k=1,2,...,n_{iter}`, :math:`\mathbf{y}^{0}=\mathbf{x}^{0}`,
:math:`\mathbf{y}^{1}=\mathbf{x}^{0} - \tau^0 \nabla f(\mathbf{x}^0)`,
:math:`\mathbf{x}^{1}=\prox_{\tau^k g}(\mathbf{y}^{1})`, and
:math:`\mathbf{g}^{0}=\mathbf{y}^{1}`.
Refer to [1]_ for the guarded version of the algorithm (when ``safeguard=True``).
.. [1] Mai, V., and Johansson, M. "Anderson Acceleration of Proximal Gradient
Methods", 2020.
"""
# check if epgs is a vector
epsg = np.asarray(epsg, dtype=float)
if epsg.size == 1:
epsg = epsg * np.ones(niter)
epsg_print = str(epsg[0])
else:
epsg_print = "Multi"
if show:
tstart = time.time()
print(
"Proximal Gradient with Anderson Acceleration \n"
"---------------------------------------------------------\n"
"Proximal operator (f): %s\n"
"Proximal operator (g): %s\n"
"tau = %s\t\tepsg = %s\tniter = %d\n"
"nhist = %d\tepsr = %s\n"
"guard = %s\ttol = %s\n"
% (
type(proxf),
type(proxg),
str(tau),
epsg_print,
niter,
nhistory,
str(epsr),
str(safeguard),
str(tol),
)
)
head = " Itn x[0] f g J=f+eps*g tau"
print(head)
# initialize model
y = x0 - tau * proxf.grad(x0)
x = proxg.prox(y, epsg[0] * tau)
g = y.copy()
r = g - x0
R, G = (
[
g,
],
[
r,
],
)
pf = proxf(x)
pfg = np.inf
tolbreak = False
# iterate
for iiter in range(niter):
# update fix point
g = x - tau * proxf.grad(x)
r = g - y
# update history vectors
R.insert(0, r)
G.insert(0, g)
if iiter >= nhistory - 1:
R.pop(-1)
G.pop(-1)
# solve for alpha coefficients
Rstack = np.vstack(R)
Rinv = np.linalg.pinv(Rstack @ Rstack.T + epsr * np.linalg.norm(Rstack) ** 2)
ones = np.ones(min(nhistory, iiter + 2))
Rinvones = Rinv @ ones
alpha = Rinvones / (ones[None] @ Rinvones)
if not safeguard:
# update auxiliary variable
y = np.vstack(G).T @ alpha
# update main variable
x = proxg.prox(y, epsg[iiter] * tau)
else:
# update auxiliary variable
ytest = np.vstack(G).T @ alpha
# update main variable
xtest = proxg.prox(ytest, epsg[iiter] * tau)
# check if function is decreased, otherwise do basic PG step
pfold, pf = pf, proxf(xtest)
if pf <= pfold - tau * np.linalg.norm(proxf.grad(x)) ** 2 / 2:
y = ytest
x = xtest
else:
x = proxg.prox(g, epsg[iiter] * tau)
y = g
# run callback
if callback is not None:
callback(x)
# tolerance check: break iterations if overall
# objective does not decrease below tolerance
if tol is not None:
pfgold = pfg
pf, pg = proxf(x), proxg(x)
pfg = pf + np.sum(epsg[iiter] * pg)
if np.abs(1.0 - pfg / pfgold) < tol:
tolbreak = True
# show iteration logger
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
if tol is None:
pf, pg = proxf(x), proxg(x)
pfg = pf + np.sum(epsg[iiter] * pg)
msg = "%6g %12.5e %10.3e %10.3e %10.3e %10.3e" % (
iiter + 1,
(
np.real(to_numpy(x[0]))
if x.ndim == 1
else np.real(to_numpy(x[0, 0]))
),
pf,
pg,
pfg,
tau,
)
print(msg)
# break if tolerance condition is met
if tolbreak:
break
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x
[docs]
def GeneralizedProximalGradient(
proxfs: list[ProxOperator],
proxgs: list[ProxOperator],
x0: NDArray,
tau: float | None,
epsg: float | NDArray = 1.0,
weights: NDArray | None = None,
eta: float = 1.0,
niter: int = 10,
acceleration: str | None = None,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
) -> NDArray:
r"""Generalized Proximal gradient
Solves the following minimization problem using Generalized Proximal
gradient algorithm:
.. math::
\mathbf{x} = \argmin_\mathbf{x} \sum_{i=1}^n f_i(\mathbf{x})
+ \sum_{j=1}^m \epsilon_j g_j(\mathbf{x}),~~n,m \in \mathbb{N}^+
where the :math:`f_i(\mathbf{x})` are smooth convex functions with a uniquely
defined gradient and the :math:`g_j(\mathbf{x})` are any convex function that
have a known proximal operator.
Parameters
----------
proxfs : :obj:`list`
Proximal operators of the :math:`f_i` functions (must have ``grad`` implemented)
proxgs : :obj:`list`
Proximal operators of the :math:`g_j` functions
x0 : :obj:`numpy.ndarray`
Initial vector
tau : :obj:`float`
Positive scalar weight, which should satisfy the following condition
to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is
the Lipschitz constant of :math:`\sum_{i=1}^n \nabla f_i`.
epsg : :obj:`float` or :obj:`numpy.ndarray`, optional
Scaling factor(s) of ``g`` function(s)
weights : :obj:`float`, optional
Weighting factors of ``g`` functions. Must sum to 1.
eta : :obj:`float`, optional
Relaxation parameter (must be between 0 and 1, 0 excluded). Note that
this will be only used when ``acceleration=None``.
niter : :obj:`int`, optional
Number of iterations of iterative scheme
acceleration: :obj:`str`, optional
Acceleration (``None``, ``vandenberghe`` or ``fista``)
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
Notes
-----
The Generalized Proximal gradient algorithm can be expressed by the
following recursion [1]_:
.. math::
\text{for } j=1,\cdots,n, \\
~~~~\mathbf z_j^{k+1} = \mathbf z_j^{k} + \eta
\left[prox_{\frac{\tau^k \epsilon_j}{w_j} g_j}\left(2 \mathbf{x}^{k} - \mathbf{z}_j^{k}
- \tau^k \sum_{i=1}^n \nabla f_i(\mathbf{x}^{k})\right) - \mathbf{x}^{k} \right] \\
\mathbf{x}^{k+1} = \sum_{j=1}^n w_j \mathbf z_j^{k+1} \\
where :math:`\sum_{j=1}^n w_j=1`. In the current implementation, :math:`w_j=1/n` when
not provided.
.. [1] Raguet, H., Fadili, J. and PeyrΓ©, G. "Generalized Forward-Backward Splitting",
arXiv, 2012.
"""
# check if weights sum to 1
if weights is None:
weights = np.ones(len(proxgs)) / len(proxgs)
if len(weights) != len(proxgs) or np.sum(weights) != 1.0:
msg = f"omega={weights} must be an array of size {len(proxgs)} summing to 1"
raise ValueError(msg)
# check if epgs is a vector
epsg = np.asarray(epsg, dtype=float)
if epsg.size == 1:
epsg_print = str(epsg)
epsg = epsg * np.ones(len(proxgs))
else:
epsg_print = "Multi"
if acceleration not in [None, "None", "vandenberghe", "fista"]:
msg = "Acceleration should be None, vandenberghe or fista"
raise NotImplementedError(msg)
if show:
tstart = time.time()
print(
"Generalized Proximal Gradient\n"
"---------------------------------------------------------\n"
"Proximal operators (f): %s\n"
"Proximal operators (g): %s\n"
"tau = %10e\nepsg = %s\tniter = %d\n"
% (
[type(proxf) for proxf in proxfs],
[type(proxg) for proxg in proxgs],
0 if tau is None else tau,
epsg_print,
niter,
)
)
head = " Itn x[0] f g J=f+g"
print(head)
if tau is None:
tau = 1.0
# initialize model
t = 1.0
x = x0.copy()
y = x.copy()
zs = [x.copy() for _ in range(len(proxgs))]
# iterate
for iiter in range(niter):
xold = x.copy()
# gradient
grad = np.zeros_like(x)
for _, proxf in enumerate(proxfs):
grad += proxf.grad(x)
# proximal step
x = np.zeros_like(x)
for i, proxg in enumerate(proxgs):
ztmp = 2 * y - zs[i] - tau * grad
ztmp = proxg.prox(ztmp, tau * epsg[i] / weights[i])
zs[i] += eta * (ztmp - y)
x += weights[i] * zs[i]
# update y
if acceleration == "vandenberghe":
omega = iiter / (iiter + 3)
elif acceleration == "fista":
told = t
t = (1.0 + np.sqrt(1.0 + 4.0 * t**2)) / 2.0
omega = (told - 1.0) / t
else:
omega = 0
y = x + omega * (x - xold)
# run callback
if callback is not None:
callback(x)
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf: float = np.sum([proxf(x) for proxf in proxfs])
pg: float = np.sum(
[eg * proxg(x) for proxg, eg in zip(proxgs, epsg, strict=True)]
)
msg = "%6g %12.5e %10.3e %10.3e %10.3e" % (
iiter + 1,
np.real(to_numpy(x[0]))
if x.ndim == 1
else np.real(to_numpy(x[0, 0])),
pf,
pg,
pf + pg,
)
print(msg)
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x
[docs]
def HQS(
proxf: ProxOperator,
proxg: ProxOperator,
x0: NDArray,
tau: float | NDArray,
niter: int = 10,
z0: NDArray | None = None,
gfirst: bool = True,
callback: Callable[..., None] | None = None,
callbackz: bool = False,
show: bool = False,
) -> tuple[NDArray, NDArray]:
r"""Half Quadratic splitting
Solves the following minimization problem using Half Quadratic splitting
algorithm:
.. math::
\mathbf{x},\mathbf{z} = \argmin_{\mathbf{x},\mathbf{z}}
f(\mathbf{x}) + g(\mathbf{z}) \\
s.t. \; \mathbf{x}=\mathbf{z}
where :math:`f(\mathbf{x})` and :math:`g(\mathbf{z})` are any convex
function that has a known proximal operator.
Parameters
----------
proxf : :obj:`pyproximal.ProxOperator`
Proximal operator of f function
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
x0 : :obj:`numpy.ndarray`
Initial vector (not required when ``gfirst=False``, can pass ``None``)
tau : :obj:`float` or :obj:`numpy.ndarray`
Positive scalar weight, which should satisfy the following condition
to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is
the Lipschitz constant of :math:`\nabla f`. Finally note that
:math:`\tau` can be chosen to be a vector of size ``niter`` such that
different :math:`\tau` is used at different iterations (i.e., continuation
strategy)
niter : :obj:`int`
Number of iterations of iterative scheme
z0 : :obj:`numpy.ndarray`, optional
Initial z vector (not required when ``gfirst=True``)
gfirst : :obj:`bool`, optional
Apply Proximal of operator ``g`` first (``True``) or Proximal of
operator ``f`` first (``False``)
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
callbackz : :obj:`bool`, optional
Modify callback signature to (``callback(x, z)``) when ``callbackz=True``
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
z : :obj:`numpy.ndarray`
Inverted second model
Raises
------
ValueError
If both ``x0`` and ``z0`` are set to ``None``
Notes
-----
The HQS algorithm can be expressed by the following recursion [1]_:
.. math::
\mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{x}^{k}) \\
\mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{z}^{k+1})
for ``gfirst=False``, or
.. math::
\mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{z}^{k}) \\
\mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{x}^{k+1})
for ``gfirst=False``. Note that ``x`` and ``z`` converge to each other,
however if iterations are stopped too early ``x`` is guaranteed to belong to
the domain of ``f`` while ``z`` is guaranteed to belong to the domain of ``g``.
Depending on the problem either of the two may be the best solution.
.. [1] D., Geman, and C., Yang, "Nonlinear image recovery with halfquadratic
regularization", IEEE Transactions on Image Processing,
4, 7, pp. 932-946, 1995.
"""
# initialize variables
x, z = _x0z0_init(x0, z0)
ncp = get_array_module(x)
# check if tau is a vector
tau = ncp.asarray(tau, dtype=float)
if tau.size == 1:
tau_print = str(tau)
tau = tau * np.ones(niter)
else:
tau_print = "Variable"
if show:
tstart = time.time()
print(
"HQS\n"
"---------------------------------------------------------\n"
"Proximal operator (f): %s\n"
"Proximal operator (g): %s\n"
"tau = %s\tniter = %d\n" % (type(proxf), type(proxg), tau_print, niter)
)
head = " Itn x[0] f g J = f + g"
print(head)
# run iterations
for iiter in range(niter):
if gfirst:
z = proxg.prox(x, tau[iiter])
x = proxf.prox(z, tau[iiter])
else:
x = proxf.prox(z, tau[iiter])
z = proxg.prox(x, tau[iiter])
# run callback
if callback is not None:
if callbackz:
callback(x, z)
else:
callback(x)
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf, pg = proxf(x), proxg(x)
msg = "%6g %12.5e %10.3e %10.3e %10.3e" % (
iiter + 1,
np.real(to_numpy(x[0])),
pf,
pg,
pf + pg,
)
print(msg)
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x, z
[docs]
def ADMM(
proxf: ProxOperator,
proxg: ProxOperator,
x0: NDArray,
tau: float,
niter: int = 10,
z0: NDArray | None = None,
gfirst: bool = False,
callback: Callable[..., None] | None = None,
callbackz: bool = False,
show: bool = False,
) -> tuple[NDArray, NDArray]:
r"""Alternating Direction Method of Multipliers
Solves the following minimization problem using Alternating Direction
Method of Multipliers:
.. math::
\mathbf{x},\mathbf{z} = \argmin_{\mathbf{x},\mathbf{z}}
f(\mathbf{x}) + g(\mathbf{z}) \\
s.t. \; \mathbf{x}=\mathbf{z}
where :math:`f(\mathbf{x})` and :math:`g(\mathbf{z})` are any convex
function that has a known proximal operator.
ADMM can also solve the problem of the form above with a more general
constraint: :math:`\mathbf{Ax}+\mathbf{Bz}=\mathbf{c}`. This routine implements
the special case where :math:`\mathbf{A}=\mathbf{I}`, :math:`\mathbf{B}=-\mathbf{I}`,
and :math:`\mathbf{c}=\mathbf{0}`, as a general algorithm can be obtained for any choice of
:math:`f` and :math:`g` provided they have a known proximal operator.
On the other hand, for more general choice of :math:`\mathbf{A}`, :math:`\mathbf{B}`,
and :math:`\mathbf{c}`, the iterations are not generalizable, i.e. they depend on the choice of
the :math:`f` and :math:`g` functions. For this reason, we currently only provide an additional
solver for the special case where :math:`f` is a :class:`pyproximal.proximal.L2`
operator with a linear operator :math:`\mathbf{G}` and data :math:`\mathbf{y}`,
:math:`\mathbf{B}=-\mathbf{I}` and :math:`\mathbf{c}=\mathbf{0}`,
called :func:`pyproximal.optimization.primal.ADMML2`. Note that for the very same choice
of :math:`\mathbf{B}` and :math:`\mathbf{c}`, the :func:`pyproximal.optimization.primal.LinearizedADMM`
can also be used (and this does not require a specific choice of :math:`f`).
Parameters
----------
proxf : :obj:`pyproximal.ProxOperator`
Proximal operator of f function
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
x0 : :obj:`numpy.ndarray`
Initial vector (not required when ``gfirst=False``, can pass ``None``)
tau : :obj:`float`
Positive scalar weight, which should satisfy the following condition
to guarantees convergence: :math:`\tau \in (0, 1/L]` where ``L`` is
the Lipschitz constant of :math:`\nabla f`.
niter : :obj:`int`
Number of iterations of iterative scheme
z0 : :obj:`numpy.ndarray`, optional
Initial z vector (not required when ``gfirst=True``)
gfirst : :obj:`bool`, optional
Apply Proximal of operator ``g`` first (``True``) or Proximal of
operator ``f`` first (``False``)
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
callbackz : :obj:`bool`, optional
Modify callback signature to (``callback(x, z)``) when ``callbackz=True``
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
z : :obj:`numpy.ndarray`
Inverted second model
See Also
--------
ADMML2: ADMM with L2 misfit function
LinearizedADMM: Linearized ADMM
Raises
------
ValueError
If both ``x0`` and ``z0`` are set to ``None``
Notes
-----
The ADMM algorithm can be expressed by the following recursion [1]_:
.. math::
\mathbf{x}^{k+1} = \prox_{\tau f}(\mathbf{z}^{k} - \mathbf{u}^{k})\\
\mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{x}^{k+1} + \mathbf{u}^{k})\\
\mathbf{u}^{k+1} = \mathbf{u}^{k} + \mathbf{x}^{k+1} - \mathbf{z}^{k+1}
Note that ``x`` and ``z`` converge to each other, however if iterations are
stopped too early ``x`` is guaranteed to belong to the domain of ``f``
while ``z`` is guaranteed to belong to the domain of ``g``. Depending on
the problem either of the two may be the best solution.
.. [1] S. Boyd, N. Parikh, E. Chu, B. Peleato, and J. Eckstein. 2011.
Distributed optimization and statistical learning via the alternating
direction method of multipliers. Foundations and Trends in Machine
Learning, 3 (1), 1-122. https://doi.org/10.1561/2200000016.
"""
# initialize variables
x, z = _x0z0_init(x0, z0)
ncp = get_array_module(x)
u = ncp.zeros_like(x)
if show:
tstart = time.time()
print(
"ADMM\n"
"---------------------------------------------------------\n"
"Proximal operator (f): %s\n"
"Proximal operator (g): %s\n"
"tau = %10e\tniter = %d\n" % (type(proxf), type(proxg), tau, niter)
)
head = " Itn x[0] f g J = f + g"
print(head)
# run iterations
for iiter in range(niter):
if gfirst:
z = proxg.prox(x + u, tau)
x = proxf.prox(z - u, tau)
else:
x = proxf.prox(z - u, tau)
z = proxg.prox(x + u, tau)
u = u + x - z
# run callback
if callback is not None:
if callbackz:
callback(x, z)
else:
callback(x)
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf, pg = proxf(x), proxg(x)
msg = "%6g %12.5e %10.3e %10.3e %10.3e" % (
iiter + 1,
np.real(to_numpy(x[0])),
pf,
pg,
pf + pg,
)
print(msg)
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x, z
[docs]
def ADMML2(
proxg: ProxOperator,
Op: "LinearOperator",
b: NDArray,
A: "LinearOperator",
x0: NDArray,
tau: float,
niter: int = 10,
z0: NDArray | None = None,
gfirst: bool = False,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
**kwargs_solver: dict[str, Any],
) -> tuple[NDArray, NDArray]:
r"""Alternating Direction Method of Multipliers for L2 misfit term
Solves the following minimization problem using Alternating Direction
Method of Multipliers:
.. math::
\mathbf{x},\mathbf{z} = \argmin_{\mathbf{x},\mathbf{z}}
\frac{1}{2}||\mathbf{Op}\mathbf{x} - \mathbf{b}||_2^2 + g(\mathbf{z}) \\
s.t. \; \mathbf{Ax}=\mathbf{z}
where :math:`g(\mathbf{z})` is any convex function that has a known proximal operator.
Parameters
----------
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
Op : :obj:`pylops.LinearOperator`
Linear operator of data misfit term
b : :obj:`numpy.ndarray`
Data
A : :obj:`pylops.LinearOperator`
Linear operator of regularization term
x0 : :obj:`numpy.ndarray`
Initial vector
tau : :obj:`float`
Positive scalar weight, which should satisfy the following condition
to guarantees convergence: :math:`\tau \in (0, 1/\lambda_{max}(\mathbf{A}^H\mathbf{A})]`.
niter : :obj:`int`, optional
Number of iterations of iterative scheme
z0 : :obj:`numpy.ndarray`
Initial auxiliary vector. If ``None``, initialized to ``A @ x0``.
gfirst : :obj:`bool`, optional
Apply Proximal of operator ``g`` first (``True``) or Proximal of
operator ``f`` first (``False``)
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
**kwargs_solver
Arbitrary keyword arguments for :py:func:`scipy.sparse.linalg.lsqr` used
to solve the x-update
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
z : :obj:`numpy.ndarray`
Inverted second model
Raises
------
ValueError
If both ``x0`` and ``z0`` are set to ``None`` or ``x0`` is set to None
See Also
--------
ADMM: ADMM
LinearizedADMM: Linearized ADMM
Notes
-----
The ADMM algorithm can be expressed by the following recursion:
.. math::
\mathbf{x}^{k+1} = \argmin_{\mathbf{x}} \frac{1}{2}||\mathbf{Op}\mathbf{x}
- \mathbf{b}||_2^2 + \frac{1}{2\tau} ||\mathbf{Ax} - \mathbf{z}^k + \mathbf{u}^k||_2^2\\
\mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{Ax}^{k+1} + \mathbf{u}^{k})\\
\mathbf{u}^{k+1} = \mathbf{u}^{k} + \mathbf{Ax}^{k+1} - \mathbf{z}^{k+1}
"""
# initialize variables
x, z = _x0z0_init(x0, z0, A, Opname="A")
ncp = get_array_module(x)
u = ncp.zeros_like(z)
if show:
tstart = time.time()
print(
"ADMM\n"
"---------------------------------------------------------\n"
"Proximal operator (g): %s\n"
"tau = %10e\tniter = %d\n" % (type(proxg), tau, niter)
)
head = " Itn x[0] f g J = f + g"
print(head)
# run iterations
sqrttau = 1.0 / sqrt(tau)
for iiter in range(niter):
if gfirst:
Ax = A @ x
z = proxg.prox(Ax + u, tau)
# solve augumented system
x = regularized_inversion(
Op,
b,
[
A,
],
x0=x,
dataregs=[
z - u,
],
epsRs=[
sqrttau,
],
**kwargs_solver,
)[0]
else:
# solve augumented system
x = regularized_inversion(
Op,
b,
[
A,
],
x0=x,
dataregs=[
z - u,
],
epsRs=[
sqrttau,
],
**kwargs_solver,
)[0]
Ax = A @ x
z = proxg.prox(Ax + u, tau)
u = u + Ax - z
# run callback
if callback is not None:
callback(x)
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf, pg = 0.5 * np.linalg.norm(Op @ x - b) ** 2, proxg(Ax)
msg = "%6g %12.5e %10.3e %10.3e %10.3e" % (
iiter + 1,
np.real(to_numpy(x[0])),
pf,
pg,
pf + pg,
)
print(msg)
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x, z
[docs]
def LinearizedADMM(
proxf: ProxOperator,
proxg: ProxOperator,
A: "LinearOperator",
x0: NDArray,
tau: float,
mu: float,
niter: int = 10,
z0: NDArray | None = None,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
) -> tuple[NDArray, NDArray]:
r"""Linearized Alternating Direction Method of Multipliers
Solves the following minimization problem using Linearized Alternating
Direction Method of Multipliers:
.. math::
\mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + g(\mathbf{A}\mathbf{x})
where :math:`f(\mathbf{x})` and :math:`g(\mathbf{x})` are any convex
function that has a known proximal operator and :math:`\mathbf{A}` is a
linear operator.
Parameters
----------
proxf : :obj:`pyproximal.ProxOperator`
Proximal operator of f function
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
A : :obj:`pylops.LinearOperator`
Linear operator
x0 : :obj:`numpy.ndarray`
Initial vector
tau : :obj:`float`, optional
Positive scalar weight, which should satisfy the following
condition to guarantee convergence: :math:`\mu \in (0,
\tau/\lambda_{max}(\mathbf{A}^H\mathbf{A})]`.
mu : :obj:`float`, optional
Second positive scalar weight, which should satisfy the following
condition to guarantees convergence: :math:`\mu \in (0,
\tau/\lambda_{max}(\mathbf{A}^H\mathbf{A})]`.
niter : :obj:`int`, optional
Number of iterations of iterative scheme
z0 : :obj:`numpy.ndarray`
Initial auxiliary vector. If ``None``, initialized to ``A @ x0``.
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
z : :obj:`numpy.ndarray`
Inverted second model
Raises
------
ValueError
If both ``x0`` and ``z0`` are set to ``None``
See Also
--------
ADMM: ADMM
ADMML2: ADMM with L2 misfit function
Notes
-----
The Linearized-ADMM algorithm can be expressed by the following recursion [1]_:
.. math::
\mathbf{x}^{k+1} = \prox_{\mu f}(\mathbf{x}^{k} - \frac{\mu}{\tau}
\mathbf{A}^H(\mathbf{A} \mathbf{x}^k - \mathbf{z}^k + \mathbf{u}^k))\\
\mathbf{z}^{k+1} = \prox_{\tau g}(\mathbf{A} \mathbf{x}^{k+1} +
\mathbf{u}^k)\\
\mathbf{u}^{k+1} = \mathbf{u}^{k} + \mathbf{A}\mathbf{x}^{k+1} -
\mathbf{z}^{k+1}
.. [1] N., Parikh, "Proximal Algorithms", Foundations and Trends
in Optimization. 2013.
"""
# initialize variables
x, z = _x0z0_init(x0, z0, A, Opname="A")
Ax = A.matvec(x) if z0 is None else z
ncp = get_array_module(x)
u = ncp.zeros_like(z)
if show:
tstart = time.time()
print(
"Linearized-ADMM\n"
"---------------------------------------------------------\n"
"Proximal operator (f): %s\n"
"Proximal operator (g): %s\n"
"Linear operator (A): %s\n"
"tau = %10e\tmu = %10e\tniter = %d\n"
% (type(proxf), type(proxg), type(A), tau, mu, niter)
)
head = " Itn x[0] f g J = f + g"
print(head)
# run iterations
for iiter in range(niter):
x = proxf.prox(x - mu / tau * A.rmatvec(Ax - z + u), mu)
Ax = A.matvec(x)
z = proxg.prox(Ax + u, tau)
u = u + Ax - z
# run callback
if callback is not None:
callback(x)
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf, pg = proxf(x), proxg(Ax)
msg = "%6g %12.5e %10.3e %10.3e %10.3e" % (
iiter + 1,
np.real(to_numpy(x[0])),
pf,
pg,
pf + pg,
)
print(msg)
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
return x, z
[docs]
def TwIST(
proxg: ProxOperator,
A: "LinearOperator",
b: NDArray,
x0: NDArray,
alpha: float | None = None,
beta: float | None = None,
eigs: tuple[float, float] | None = None,
niter: int = 10,
callback: Callable[[NDArray], None] | None = None,
show: bool = False,
returncost: bool = False,
) -> NDArray | tuple[NDArray, NDArray]:
r"""Two-step Iterative Shrinkage/Threshold
Solves the following minimization problem using Two-step Iterative
Shrinkage/Threshold:
.. math::
\mathbf{x} = \argmin_\mathbf{x} \frac{1}{2}
||\mathbf{b} - \mathbf{Ax}||_2^2 + g(\mathbf{x})
where :math:`\mathbf{A}` is a linear operator and :math:`g(\mathbf{x})`
is any convex function that has a known proximal operator.
Parameters
----------
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
A : :obj:`pylops.LinearOperator`
Linear operator
b : :obj:`numpy.ndarray`
Data
x0 : :obj:`numpy.ndarray`
Initial vector
alpha : :obj:`float`, optional
Positive scalar weight (if ``None``, estimated based on the
eigenvalues of :math:`\mathbf{A}`, see Notes for details)
beta : :obj:`float`, optional
Positive scalar weight (if ``None``, estimated based on the
eigenvalues of :math:`\mathbf{A}`, see Notes for details)
eigs : :obj:`tuple`, optional
Largest and smallest eigenvalues of :math:`\mathbf{A}^H \mathbf{A}`.
If passed, computes `alpha` and `beta` based on them.
niter : :obj:`int`, optional
Number of iterations of iterative scheme
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
returncost : :obj:`bool`, optional
Return cost function
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
j : :obj:`numpy.ndarray`
Cost function
Notes
-----
The TwIST algorithm can be expressed by the following recursion:
.. math::
\mathbf{x}^{k+1} = (1-\alpha) \mathbf{x}^{k-1} +
(\alpha-\beta) \mathbf{x}^k +
\beta \prox_{g} (\mathbf{x}^k + \mathbf{A}^H
(\mathbf{b} - \mathbf{A}\mathbf{x}^k)).
where :math:`\mathbf{x}^{1} = \prox_{g} (\mathbf{x}^0 + \mathbf{A}^T
(\mathbf{b} - \mathbf{A}\mathbf{x}^0))`.
The optimal weighting parameters :math:`\alpha` and :math:`\beta` are
linked to the smallest and largest eigenvalues of
:math:`\mathbf{A}^H\mathbf{A}` as follows:
.. math::
\alpha = 1 + \rho^2 \\
\beta =\frac{2 \alpha}{\Lambda_{max} + \lambda_{min}}
where :math:`\rho=\frac{1-\sqrt{k}}{1+\sqrt{k}}` with
:math:`k=\frac{\lambda_{min}}{\Lambda_{max}}` and
:math:`\Lambda_{max}=max(1, \lambda_{max})`.
Experimentally, it has been observed that TwIST is robust to the
choice of such parameters. Finally, note that in the case of
:math:`\alpha=1` and :math:`\beta=1`, TwIST is identical to IST.
"""
# define proxf as L2 proximal
proxf = L2(Op=A, b=b)
# find alpha and beta
if alpha is None or beta is None:
if eigs is None:
emin = A.eigs(neigs=1, which="SM")
emax = max([1, A.eigs(neigs=1, which="LM")])
else:
emax, emin = eigs
k = emin / emax
rho = (1 - sqrt(k)) / (1 + sqrt(k))
alpha = 1 + rho**2
beta = 2 * alpha / (emax + emin)
# compute proximal of g on initial guess (x_1)
xold = x0.copy()
x = proxg.prox(xold - proxf.grad(xold), 1.0)
if show:
tstart = time.time()
print(
"TwIST\n"
"---------------------------------------------------------\n"
"Proximal operator (g): %s\n"
"Linear operator (A): %s\n"
"alpha = %10e\tbeta = %10e\tniter = %d\n"
% (type(proxg), type(A), alpha, beta, niter)
)
head = " Itn x[0] f g J = f + g"
print(head)
# iterate
if returncost:
j = np.zeros(niter)
for iiter in range(niter):
# compute new x
xnew = (
(1 - alpha) * xold
+ (alpha - beta) * x
+ beta * proxg.prox(x - proxf.grad(x), 1.0)
)
# save current x as old (x_i -> x_i-1)
xold = x.copy()
# save new x as current (x_i+1 -> x_i)
x = xnew.copy()
# compute cost function
if returncost:
j[iiter] = proxf(x) + proxg(x)
# run callback
if callback is not None:
callback(x)
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf, pg = proxf(x), proxg(x)
msg = "%6g %12.5e %10.3e %10.3e %10.3e" % (
iiter + 1,
np.real(to_numpy(x[0])),
pf,
pg,
pf + pg,
)
print(msg)
if show:
print("\nTotal time (s) = %.2f" % (time.time() - tstart))
print("---------------------------------------------------------\n")
if returncost:
return x, j
else:
return x
[docs]
def DouglasRachfordSplitting(
proxf: ProxOperator,
proxg: ProxOperator,
x0: NDArray,
tau: float,
eta: float = 1.0,
niter: int = 10,
gfirst: bool = True,
callback: Callable[..., None] | None = None,
callbacky: bool = False,
show: bool = False,
) -> tuple[NDArray, NDArray]:
r"""Douglas-Rachford Splitting
Solves the following minimization problem using Douglas-Rachford Splitting
algorithm:
.. math::
\mathbf{x} = \argmin_\mathbf{x} f(\mathbf{x}) + g(\mathbf{x})
where :math:`f(\mathbf{x})` and :math:`g(\mathbf{x})` are any convex
functions that has known proximal operators.
Parameters
----------
proxf : :obj:`pyproximal.ProxOperator`
Proximal operator of f function
proxg : :obj:`pyproximal.ProxOperator`
Proximal operator of g function
x0 : :obj:`numpy.ndarray`
Initial vector
tau : :obj:`float`
Positive scalar weight
eta : :obj:`float`, optional
Relaxation parameter (must be between 0 and 2, 0 excluded).
niter : :obj:`int`, optional
Number of iterations of iterative scheme
gfirst : :obj:`bool`, optional
Apply Proximal of operator ``g`` first (``True``) or Proximal of
operator ``f`` first (``False``)
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
callbacky : :obj:`bool`, optional
Modify callback signature to (``callback(x, y)``)
when ``callbacky=True``
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
y : :obj:`numpy.ndarray`
Inverted second model
Notes
-----
The Douglas-Rachford Splitting algorithm can be expressed by the following
recursion [1]_, [2]_, [3]_, [4]_:
.. math::
\mathbf{x}^{k} &= \prox_{\tau g}(\mathbf{y}^k) \\
\mathbf{y}^{k+1} &= \mathbf{y}^{k} +
\eta (\prox_{\tau f}(2 \mathbf{x}^{k} - \mathbf{y}^{k})
- \mathbf{x}^{k})
.. [1] Patrick L. Combettes and Jean-Christophe Pesquet. 2011. Proximal
Splitting Methods in Signal Processing. In Fixed-Point Algorithms for
Inverse Problems in Science and Engineering, Springer, pp. 185-212.
Algorithm 10.15.
https://doi.org/10.1007/978-1-4419-9569-8_10
.. [2] Scott B. Lindstrom and Brailey Sims. 2021. Survey: Sixty Years of
Douglas-Rachford. Journal of the Australian Mathematical Society, 110,
3, 333-370. Eq.(15). https://doi.org/10.1017/S1446788719000570
https://arxiv.org/abs/1809.07181
.. [3] Ryu, E.K., Yin, W., 2022. Large-Scale Convex Optimization: Algorithms
& Analyses via Monotone Operators. Cambridge University Press,
Cambridge. Eq.(2.18). https://doi.org/10.1017/9781009160865
https://large-scale-book.mathopt.com/
.. [4] Combettes, P.L., Pesquet, J.-C., 2008. A proximal decomposition
method for solving convex variational inverse problems. Inverse Problems
24, 065014. Proposition 3.2. https://doi.org/10.1088/0266-5611/24/6/065014
https://arxiv.org/abs/0807.2617
"""
if show:
tstart = time.time()
print(
"Douglas-Rachford Splitting\n"
"---------------------------------------------------------\n"
f"Proximal operator (f): {type(proxf)}\n"
f"Proximal operator (g): {type(proxg)}\n"
f"tau = {tau:10e}\tniter = {niter:d}\n"
)
head = " Itn x[0] f g J = f + g"
print(head)
y = x0.copy()
for iiter in range(niter):
if gfirst:
x = proxg.prox(y, tau)
y = y + eta * (proxf.prox(2 * x - y, tau) - x)
else:
x = proxf.prox(y, tau)
y = y + eta * (proxg.prox(2 * x - y, tau) - x)
# run callback
if callback is not None:
if callbacky:
callback(x, y)
else:
callback(x)
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf, pg = proxf(x), proxg(x)
print(
f"{iiter + 1:6d} {np.real(to_numpy(x[0])):12.5e} "
f"{pf:10.3e} {pg:10.3e} {pf + pg:10.3e}"
)
if show:
print(f"\nTotal time (s) = {time.time() - tstart:.2f}")
print("---------------------------------------------------------\n")
return x, y
[docs]
def PPXA( # pylint: disable=invalid-name
proxfs: list[ProxOperator],
x0: NDArray | list[NDArray],
tau: float,
eta: float = 1.0,
weights: NDArray | list[float] | None = None,
niter: int = 1000,
tol: float | None = 1e-7,
callback: Callable[..., None] | None = None,
show: bool = False,
) -> NDArray:
r"""Parallel Proximal Algorithm (PPXA)
Solves the following minimization problem using
Parallel Proximal Algorithm (PPXA):
.. math::
\mathbf{x} = \argmin_\mathbf{x} \sum_{i=1}^m f_i(\mathbf{x})
where :math:`f_i(\mathbf{x})` are any convex
functions that has known proximal operators.
Parameters
----------
proxfs : :obj:`list`
A list of proximable functions :math:`f_1, \ldots, f_m`.
x0 : :obj:`numpy.ndarray` or :obj:`list`
Initial vector :math:`\mathbf{x}` for all :math:`f_i` if 1-dimensional array
is provided, or initial vectors :math:`\mathbf{x}_{i}` for each :math:`f_i`
for :math:`i=1,\ldots,m` if a :obj:`list` of 1-dimensional arrays or a 2-dimensional
array of size ``(m, d)`` is provided, where ``d`` is the dimension of :math:`\mathbf{x}_{i}`.
tau : :obj:`float`
Positive scalar weight
eta : :obj:`float`, optional
Relaxation parameter (must be between 0 and 2, 0 excluded).
weights : :obj:`numpy.ndarray` or :obj:`list` or :obj:`None`, optional
Weights :math:`\sum_{i=1}^m w_i = 1, \ 0 < w_i < 1`,
Defaults to None, which means :math:`w_1 = \cdots = w_m = \frac{1}{m}.`
niter : :obj:`int`, optional
Number of iterations of iterative scheme.
tol : :obj:`float`, optional
Tolerance on change of the solution (used as stopping criterion).
If ``tol=0``, run until ``niter`` is reached.
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
See Also
--------
ConsensusADMM: Consensus ADMM
Notes
-----
The Parallel Proximal Algorithm (PPXA) can be expressed by the following
recursion [1]_, [2]_, [3]_, [4]_:
* :math:`\mathbf{y}_{i}^{0} = \mathbf{x}` or :math:`\mathbf{y}_{i}^{0} = \mathbf{x}_{i}` for :math:`i=1,\ldots,m`
* :math:`\mathbf{x}^{0} = \sum_{i=1}^m w_i \mathbf{y}_{i}^{0}`
* for :math:`k = 1, \ldots`
* for :math:`i = 1, \ldots, m`
* :math:`\mathbf{p}_{i}^{k} = \prox_{\frac{\tau}{w_i} f_i} (\mathbf{y}_{i}^{k})`
* :math:`\mathbf{p}^{k} = \sum_{i=1}^{m} w_i \mathbf{p}_{i}^{k}`
* for :math:`i = 1, \ldots, m`
* :math:`\mathbf{y}_{i}^{k+1} = \mathbf{y}_{i}^{k} + \eta (2 \mathbf{p}^{k} - \mathbf{x}^{k} - \mathbf{p}_i^{k})`
* :math:`\mathbf{x}^{k+1} = \mathbf{x}^{k} + \eta (\mathbf{p}^{k} - \mathbf{x}^{k})`
where :math:`0 < \eta < 2` and
:math:`\sum_{i=1}^m w_i = 1, \ 0 < w_i < 1`.
In the current implementation, :math:`w_i = 1 / m` when not provided.
References
----------
.. [1] Combettes, P.L., Pesquet, J.-C., 2008. A proximal decomposition
method for solving convex variational inverse problems. Inverse Problems
24, 065014. Algorithm 3.1. https://doi.org/10.1088/0266-5611/24/6/065014
https://arxiv.org/abs/0807.2617
.. [2] Combettes, P.L., Pesquet, J.-C., 2011. Proximal Splitting Methods in
Signal Processing, in Fixed-Point Algorithms for Inverse Problems in
Science and Engineering, Springer, pp. 185-212. Algorithm 10.27.
https://doi.org/10.1007/978-1-4419-9569-8_10
.. [3] Bauschke, H.H., Combettes, P.L., 2011. Convex Analysis and Monotone
Operator Theory in Hilbert Spaces, 1st ed, CMS Books in Mathematics.
Springer, New York, NY. Proposition 27.8.
https://doi.org/10.1007/978-1-4419-9467-7
.. [4] Ryu, E.K., Yin, W., 2022. Large-Scale Convex Optimization: Algorithms
& Analyses via Monotone Operators. Cambridge University Press,
Cambridge. Exercise 2.38 https://doi.org/10.1017/9781009160865
https://large-scale-book.mathopt.com/
"""
if show:
tstart = time.time()
print(
"Parallel Proximal Algorithm\n"
"---------------------------------------------------------"
)
for i, proxf in enumerate(proxfs):
print(f"Proximal operator (f{i}): {type(proxf)}")
print(f"tau = {tau:10e}\tniter = {niter:d}\n")
head = " Itn x[0] J=sum_i f_i"
print(head)
ncp = get_array_module(x0)
# initialize model
m = len(proxfs)
if weights is None:
w = ncp.full(m, 1.0 / m)
else:
w = ncp.asarray(weights)
if isinstance(x0, list) or x0.ndim == 2:
y = ncp.asarray(x0) # yi_0 = xi_0, for i = 1, ..., m
else:
y = ncp.full((m, x0.size), x0) # y1_0 = y2_0 = ... = ym_0 = x0
x = ncp.mean(y, axis=0)
x_old = x.copy()
# iterate
for iiter in range(niter):
p = ncp.stack([proxfs[i].prox(y[i], tau / w[i]) for i in range(m)])
pn = ncp.sum(w[:, None] * p, axis=0)
y = y + eta * (2 * pn - x - p)
x = x + eta * (pn - x)
# run callback
if callback is not None:
callback(x)
# show iteration logger
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf = ncp.sum([proxfs[i](x) for i in range(m)])
print(f"{iiter + 1:6d} {ncp.real(to_numpy(x[0])):12.5e} {pf:10.3e}")
# break if tolerance condition is met
if ncp.abs(x - x_old).max() < tol:
break
x_old = x
if show:
print(f"\nTotal time (s) = {time.time() - tstart:.2f}")
print("---------------------------------------------------------\n")
return x
[docs]
def ConsensusADMM( # pylint: disable=invalid-name
proxfs: list[ProxOperator],
x0: NDArray,
tau: float,
niter: int = 1000,
tol: float | None = 1e-7,
callback: Callable[..., None] | None = None,
show: bool = False,
) -> NDArray:
r"""Consensus ADMM
Solves the following global consensus problem using ADMM:
.. math::
\argmin_{\mathbf{x_1}, \mathbf{x_2}, \ldots, \mathbf{x_m}}
\sum_{i=1}^m f_i(\mathbf{x}_i) \quad \text{s.t.}
\quad \mathbf{x_1} = \mathbf{x_2} = \cdots = \mathbf{x_m}
where :math:`f_i(\mathbf{x})` are any convex
functions that has known proximal operators.
Parameters
----------
proxfs : :obj:`list`
A list of proximable functions :math:`f_1, \ldots, f_m`.
x0 : :obj:`numpy.ndarray`
Initial vector
tau : :obj:`float`
Positive scalar weight
niter : :obj:`int`, optional
Number of iterations of iterative scheme.
tol : :obj:`float`, optional
Tolerance on change of the solution (used as stopping criterion).
If ``tol=0``, run until ``niter`` is reached.
callback : :obj:`callable`, optional
Function with signature (``callback(x)``) to call after each iteration
where ``x`` is the current model vector
show : :obj:`bool`, optional
Display iterations log
Returns
-------
x : :obj:`numpy.ndarray`
Inverted model
See Also
--------
ADMM: Alternating Direction Method of Multipliers
PPXA: Parallel Proximal Algorithm
Notes
-----
The ADMM for the consensus problem can be expressed by the following
recursion [1]_, [2]_:
* :math:`\bar{\mathbf{x}}^{0} = \mathbf{x}`
* for :math:`k = 1, \ldots`
* for :math:`i = 1, \ldots, m`
* :math:`\mathbf{x}_i^{k+1} = \mathrm{prox}_{\tau f_i} \left(\bar{\mathbf{x}}^{k} - \mathbf{y}_i^{k}\right)`
* :math:`\bar{\mathbf{x}}^{k+1} = \frac{1}{m} \sum_{i=1}^m \mathbf{x}_i^{k}`
* for :math:`i = 1, \ldots, m`
* :math:`\mathbf{y}_i^{k+1} = \mathbf{y}_i^{k} + \mathbf{x}_i^{k+1} - \bar{\mathbf{x}}^{k+1}`
The current implementation returns :math:`\bar{\mathbf{x}}`.
References
----------
.. [1] Boyd, S., Parikh, N., Chu, E., Peleato, B., Eckstein, J., 2011.
Distributed Optimization and Statistical Learning via the Alternating
Direction Method of Multipliers. Foundations and Trends in Machine Learning,
Vol. 3, No. 1, pp 1-122. Section 7.1. https://doi.org/10.1561/2200000016
https://stanford.edu/~boyd/papers/pdf/admm_distr_stats.pdf
.. [2] Parikh, N., Boyd, S., 2014. Proximal Algorithms. Foundations and
Trends in Optimization, Vol. 1, No. 3, pp 127-239.
Section 5.2.1. https://doi.org/10.1561/2400000003
https://web.stanford.edu/~boyd/papers/pdf/prox_algs.pdf
"""
if show:
tstart = time.time()
print(
"Consensus ADMM\n---------------------------------------------------------"
)
for i, proxf in enumerate(proxfs):
print(f"Proximal operator (f{i}): {type(proxf)}")
print(f"tau = {tau:10e}\tniter = {niter:d}\n")
head = " Itn x[0] J=sum_i f_i"
print(head)
ncp = get_array_module(x0)
# initialize model
m = len(proxfs)
x_bar = x0.copy()
x_bar_old = x0.copy()
y = ncp.zeros((m, x0.size), dtype=x0.dtype)
# iterate
for iiter in range(niter):
x = ncp.stack([proxfs[i].prox(x_bar - y[i], tau) for i in range(m)])
x_bar = ncp.mean(x, axis=0)
y = y + x - x_bar
# run callback
if callback is not None:
callback(x_bar)
# show iteration logger
if show:
if iiter < 10 or niter - iiter < 10 or iiter % (niter // 10) == 0:
pf = ncp.sum([proxfs[i](x_bar) for i in range(m)])
print(
f"{iiter + 1:6d} {ncp.real(to_numpy(x_bar[0])):12.5e} {pf:10.3e}"
)
# break if tolerance condition is met
if ncp.abs(x_bar - x_bar_old).max() < tol:
break
x_bar_old = x_bar
if show:
print(f"\nTotal time (s) = {time.time() - tstart:.2f}")
print("---------------------------------------------------------\n")
return x_bar