# Standard Python modules
from typing import Tuple, Union
# External modules
import numpy as np
from numpy import ndarray
# Local modules
from .pyOpt_MPI import MPI
from .pyOpt_optimization import Optimization
from .pyOpt_types import Dict1DType, Dict2DType
[docs]
class Gradient:
    def __init__(self, optProb: Optimization, sensType: str, sensStep: float = None, sensMode: str = "", comm=None):
        """
        Gradient class for automatically computing gradients with finite
        difference or complex step.
        Parameters
        ----------
        optProb : Optimization instance
            This is the complete description of the optimization problem.
        sensType : str
            - ``FD`` for forward difference
            - ``CD`` for central difference
            - ``FDR`` for forward difference with relative step size
            - ``CDR`` for central difference with relative step size
            - ``CS`` for complex step
        sensStep : float
            Step size to use for differencing
        sensMode : str
            Flag to compute gradients in parallel.
        """
        self.optProb = optProb
        self.sensType = sensType
        self.sensStep: Union[float, complex]
        if sensStep is None:
            if self.sensType in ["fd", "fdr"]:
                self.sensStep = 1e-6
            elif self.sensType in ["cd", "cdr"]:
                self.sensStep = 1e-4
            else:
                self.sensStep = 1e-40j
        else:
            self.sensStep = sensStep
        self.sensMode = sensMode
        self.comm = comm
        # Now we can compute which dvs each process will need to
        # compute:
        ndvs = self.optProb.ndvs
        if self.sensMode == "pgc" and self.comm:
            self.mydvs = list(range(self.comm.rank, ndvs, self.comm.size))
        else:
            self.mydvs = list(range(ndvs))
    def _eval_func(self, x: ndarray) -> Tuple[ndarray, ndarray, bool]:
        """Internal method to call function and extract obj, con"""
        xCall = self.optProb.processXtoDict(x)
        # Call objective
        [funcs, fail] = self.optProb.objFun(xCall)
        # Process constraint in case they are in dict form
        self.optProb.evaluateLinearConstraints(x, funcs)
        fobj = self.optProb.processObjtoVec(funcs, scaled=False)
        if self.sensType == "cs":
            fcon = self.optProb.processContoVec(funcs, scaled=False, dtype="D", natural=True)
        else:
            fcon = self.optProb.processContoVec(funcs, scaled=False, natural=True)
        return fobj, fcon, fail
    def __call__(self, x: Dict1DType, funcs: Dict1DType) -> Tuple[Dict2DType, bool]:
        """
        We need to make this object "look" the same as a user supplied
        function handle. That way, the optimizers need not care how
        the gradients are **actually** calculated.
        Parameters
        ----------
        x : dict
            Optimization variables from optimizer
        funcs : dict
            Dictionary of all the function values
        Returns
        -------
        funcsSens : dict
            Dictionary of sensitivities
        fail : bool
            Flag for failure. It currently always returns False
        """
        # Since this is *very* dumb loop over all the design
        # variables, it is easier to just loop over the x values as an
        # array. Furthermore, since the user **should** have
        # reasonably well scaled variables, the fixed step size should
        # have more meaning.
        # Generate final array sizes for the objective and constraint
        # gradients
        ndvs = self.optProb.ndvs
        ncon = self.optProb.nCon
        gobj = np.zeros(ndvs, "d")
        gcon = np.zeros((ncon, ndvs), "d")
        if self.sensMode == "pgc":
            funcsBase = self.comm.bcast(funcs)
        else:
            funcsBase = funcs
        # We DO NOT want the constraints scaled here....the constraint
        # scaling will be taken into account when the derivatives are
        # processed as per normal.
        xBase = self.optProb.processXtoVec(x)
        self.optProb.evaluateLinearConstraints(xBase, funcsBase)
        fconBase = self.optProb.processContoVec(funcsBase, scaled=False, natural=True)
        fobjBase = self.optProb.processObjtoVec(funcsBase, scaled=False)
        # Convert to complex if necessary:
        if self.sensType == "cs":
            xBase = xBase.astype("D")
        masterFail = False
        for i in self.mydvs:
            xph = xBase.copy()
            if self.sensType in ["fdr", "cdr"]:
                sensStep = max(abs(self.sensStep * xph[i]), self.sensStep)
            else:
                sensStep = self.sensStep
            xph[i] += sensStep
            fobj_ph, fcon_ph, fail = self._eval_func(xph)
            if fail:
                masterFail = True
            # forward difference
            if self.sensType in ["fd", "fdr"]:
                gobj[i] = (fobj_ph - fobjBase) / sensStep
                gcon[:, i] = (fcon_ph - fconBase) / sensStep
            # central difference
            elif self.sensType in ["cd", "cdr"]:
                xmh = xph  # xph isn't used anymore so  just point to same location
                xmh[i] -= 2 * sensStep
                fobj_mh, fcon_mh, fail = self._eval_func(xmh)
                if fail:
                    masterFail = True
                gobj[i] = (fobj_ph - fobj_mh) / (2 * sensStep)
                gcon[:, i] = (fcon_ph - fcon_mh) / (2 * sensStep)
            # complex step
            else:
                gobj[i] = np.imag(fobj_ph) / np.imag(sensStep)
                gcon[:, i] = np.imag(fcon_ph) / np.imag(sensStep)
        if self.sensMode == "pgc":
            # We just mpi_reduce to the root with sum. This uses the
            # efficient numpy versions
            self.comm.Reduce(gobj.copy(), gobj, op=MPI.SUM, root=0)
            self.comm.Reduce(gcon.copy(), gcon, op=MPI.SUM, root=0)
        # Logically reduce (over the comm) if the fail if *ANY*
        # gradient calc failed:
        if self.comm is not None:
            masterFail = self.comm.allreduce(masterFail, op=MPI.LOR)
        # Finally, we have to convert everything **back** to a
        # dictionary so the rest of the code works:
        funcsSens: Dict2DType = {}
        for objKey in self.optProb.objectives:
            funcsSens[objKey] = {}
            for dvGroup in self.optProb.variables:
                ss = self.optProb.dvOffset[dvGroup]
                funcsSens[objKey][dvGroup] = gobj[ss[0] : ss[1]]
        for conKey in self.optProb.constraints:
            con = self.optProb.constraints[conKey]
            funcsSens[conKey] = {}
            for dvGroup in self.optProb.variables:
                ss = self.optProb.dvOffset[dvGroup]
                funcsSens[conKey][dvGroup] = gcon[con.rs : con.re, ss[0] : ss[1]]
        return funcsSens, masterFail