# Standard Python modules
from collections import OrderedDict
import copy
import os
# External modules
import numpy as np
from sqlitedict import SqliteDict
# Local modules
from .pyOpt_error import Error, pyOptSparseWarning
from .pyOpt_utils import EPS
[docs]
class History:
def __init__(self, fileName, optProb=None, temp=False, flag="r"):
"""
This class is essentially a thin wrapper around a SqliteDict dictionary to facilitate
operations with pyOptSparse
Parameters
----------
fileName : str
File name for history file
optProb : pyOpt_Optimization
The optimization object
temp : bool
Flag to signify that the file should be deleted after it is
closed
flag : str
String specifying the mode. Similar to what was used in shelve.
``n`` for a new database and ``r`` to read an existing one.
"""
self.flag = flag
if self.flag == "n":
# If writing, we expliclty remove the file to
# prevent old keys from "polluting" the new histrory
if os.path.exists(fileName):
os.remove(fileName)
self.db = SqliteDict(fileName)
self.optProb = optProb
elif self.flag == "r":
if os.path.exists(fileName):
# we cast the db to OrderedDict so we do not have to
# manually close the underlying db at the end
self.db = OrderedDict(SqliteDict(fileName))
else:
raise FileNotFoundError(
f"The requested history file {fileName} to open in read-only mode does not exist."
)
self._processDB()
else:
raise Error("The flag argument to History must be 'r' or 'n'.")
self.temp = temp
self.fileName = fileName
[docs]
def close(self):
"""
Close the underlying database.
This should only be used in write mode. In read mode, we close the db
during initialization.
"""
if self.flag == "n":
self.db.close()
if self.temp:
os.remove(self.fileName)
[docs]
def write(self, callCounter, data):
"""
This is the main to write data. Basically, we just pass in
the callCounter, the integer forming the key, and a dictionary
which will be written to the key
Parameters
----------
callCounter : int
the callCounter to write to
data : dict
the dictionary corresponding to the callCounter
"""
# String key to database on disk
key = str(callCounter)
# if the point exists, we merely update with new data
if self.pointExists(callCounter):
oldData = self.read(callCounter)
oldData.update(data)
self.db[key] = oldData
else:
self.db[key] = data
self.db["last"] = key
self.db.sync()
self.keys = list(self.db.keys())
[docs]
def writeData(self, key, data):
"""
Write arbitrary `key:data` value to db.
Parameters
----------
key : str
The key to be added to the history file
data
The data corresponding to the key. It can be anything as long as it is serializable
in `sqlitedict`.
"""
self.db[key] = data
self.db.commit()
self.keys = list(self.db.keys())
[docs]
def pointExists(self, callCounter):
"""
Determine if callCounter is in the database
Parameters
----------
callCounter : int or str of int
Returns
-------
bool
True if the callCounter exists in the history file.
False otherwise.
"""
if isinstance(callCounter, int):
callCounter = str(callCounter)
return callCounter in self.keys
[docs]
def read(self, key):
"""
Read data for an arbitrary key. Returns None if key is not found.
If passing in a callCounter, it should be verified by calling pointExists() first.
Parameters
----------
key : str or int
generic key[str] or callCounter[int]
Returns
-------
dict
The value corresponding to `key` is returned.
If the key is not found, `None` is returned instead.
"""
if isinstance(key, int):
key = str(key)
try:
return self.db[key]
except KeyError:
return None
def _searchCallCounter(self, x):
"""
Searches through existing callCounters, and finds the one corresponding
to an evaluation at the design vector `x`.
returns `None` if the point did not match previous evaluations
Parameters
----------
x : ndarray
The unscaled DV as a single array.
Returns
-------
int
The callCounter corresponding to the DV `x`.
`None` is returned if no match was found.
Notes
-----
The tolerance used for this is the value `numpy.finfo(numpy.float64).eps`.
"""
last = int(self.db["last"])
callCounter = None
for i in range(last, 0, -1):
key = str(i)
xuser = self.optProb.processXtoVec(self.db[key]["xuser"])
if np.isclose(xuser, x, atol=EPS, rtol=EPS).all() and "funcs" in self.db[key].keys():
callCounter = i
break
return callCounter
def _processDB(self):
"""
Pre-processes the DB file and store various values into class attributes.
These will be used later when calling self.getXX functions.
"""
# Load any keys it happens to have:
self.keys = list(self.db.keys())
# load info
self.DVInfo = self.read("varInfo")
self.conInfo = self.read("conInfo")
self.objInfo = self.read("objInfo")
# metadata
self.metadata = self.read("metadata")
self.optProb = self.read("optProb")
# load names
self.DVNames = set(self.getDVNames())
self.conNames = set(self.getConNames())
self.objNames = set(self.getObjNames())
# extract list of callCounters from self.keys
# this just checks if each key contains only digits, then cast into int
self.callCounters = sorted([x for x in self.keys if x.isdigit()], key=float)
# extract all information stored in the call counters
self.iterKeys = set()
self.extraFuncsNames = set()
for i in self.callCounters:
val = self.read(i)
self.iterKeys.update(val.keys())
if "funcs" in val.keys():
self.extraFuncsNames.update(val["funcs"].keys())
# remove objective and constraint keys
self.extraFuncsNames = self.extraFuncsNames.difference(self.conNames).difference(self.objNames)
from .__init__ import __version__ # isort: skip
if self.metadata["version"] != __version__:
pyOptSparseWarning(
"The version of pyoptsparse used to generate the history file does not match the one being run right now. There may be compatibility issues."
)
[docs]
def getIterKeys(self):
"""
Returns the keys available at each optimization iteration.
This function is useful for inspecting the history file, to determine
what information is saved at each iteration.
Returns
-------
list of str
A list containing the names of keys stored at each optimization iteration.
"""
return copy.deepcopy(list(self.iterKeys))
[docs]
def getDVNames(self):
"""
Returns the names of the DVs.
Returns
-------
list of str
A list containing the names of DVs.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
return copy.deepcopy(list(self.DVInfo.keys()))
[docs]
def getConNames(self):
"""
Returns the names of constraints.
Returns
-------
list of str
A list containing the names of constraints.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
# we remove linear constraints
conNames = [con for con in self.conInfo.keys() if not self.optProb.constraints[con].linear]
return copy.deepcopy(conNames)
[docs]
def getObjNames(self):
"""
Returns the names of the objectives.
Returns
-------
list of str
A list containing the names of objectives.
Notes
-----
Recall that for the sake of generality, pyOptSparse allows for multiple objectives to be
added. This feature is not used currently, but does make `ObjNames` follow the same structure
as `ConNames` and `DVNames`.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
return copy.deepcopy(list(self.objInfo.keys()))
[docs]
def getObjInfo(self, key=None):
"""
Returns the `ObjInfo`, for all keys by default. A `key` parameter can also
be supplied, to retrieve `ObjInfo` corresponding to specific keys.
Parameters
----------
key : str or list of str, optional
Specifies for which obj to extract `ObjInfo`.
Returns
-------
dict
A dictionary containing ObjInfo. For a single key, the return is one level deeper.
Notes
-----
Recall that for the sake of generality, pyOptSparse allows for multiple objectives to be
added. This feature is not used currently, but does make `ObjInfo` follow the same structure
as `ConInfo` and `DVInfo`.
Because of this, it is recommended that this function be accessed using the optional `key`
argument.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
if key is not None:
if isinstance(key, str):
return copy.deepcopy(self.objInfo[key])
elif isinstance(key, list):
d = OrderedDict()
for k in key:
d[k] = self.objInfo[k]
return d
else:
return copy.deepcopy(self.objInfo)
[docs]
def getConInfo(self, key=None):
"""
Returns the `ConInfo`, for all keys by default. A `key` parameter can also
be supplied, to retrieve `ConInfo` corresponding to specific constraints.
Parameters
----------
key : str or list of str, optional
Specifies for which constraint to extract `ConInfo`.
Returns
-------
dict
A dictionary containing ConInfo. For a single key, the return is one level deeper.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
if key is not None:
if isinstance(key, str):
return copy.deepcopy(self.conInfo[key])
elif isinstance(key, list):
d = OrderedDict()
for k in key:
d[k] = self.conInfo[k]
return d
else:
return copy.deepcopy(self.conInfo)
[docs]
def getDVInfo(self, key=None):
"""
Returns the `DVInfo`, for all keys by default. A `key` parameter can also
be supplied, to retrieve `DVInfo` corresponding to specific DVs.
Parameters
----------
key : str or list of str, optional
Specifies for which DV to extract `DVInfo`.
Returns
-------
dict
A dictionary containing DVInfo. For a single key, the return is one level deeper.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
if key is not None:
if isinstance(key, str):
return copy.deepcopy(self.DVInfo[key])
elif isinstance(key, list):
d = OrderedDict()
for k in key:
d[k] = self.DVInfo[k]
return d
else:
return copy.deepcopy(self.DVInfo)
[docs]
def getMetadata(self):
"""
Returns a copy of the metadata stored in the history file.
Returns
-------
dict
A dictionary containing the metadata.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
return copy.deepcopy(self.metadata)
[docs]
def getOptProb(self):
"""
Returns a copy of the optProb associated with the optimization.
Returns
-------
optProb : pyOpt_optimization object
The optProb associated with the optimization. This is taken from the history file,
and therefore has the ``comm`` and ``objFun`` fields removed.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
return copy.deepcopy(self.optProb)
def _processIterDict(self, d, scale=False):
"""
This function scales the value, where the factor is extracted from the
`Info` dictionaries, according to "name"
Parameters
----------
d : dictionary
The iteration dictionary, i.e. hist['0']
This must be a function evaluation callCounter, and not a gradient callCounter.
scale : bool
Whether the returned values should be scaled.
Returns
-------
conDict : dict
A dictionary containing constraint values
objDict : dict
A dictionary containing objective values
DVDict : dict
A dictionary containing DV values
These are all "flat" dictionaries, with simple key:value pairs.
"""
conDict = {}
objDict = {}
# these require funcs which may not always be there
if "funcs" in d.keys():
for con in list(self.optProb.constraints.keys()):
# linear constraints are not stored in funcs
if not self.optProb.constraints[con].linear:
conDict[con] = d["funcs"][con]
else:
# the linear constraints are removed from optProb so that scaling works
# without needing the linear constraints to be present
self.optProb.constraints.pop(con)
for obj in self.objNames:
objDict[obj] = d["funcs"][obj]
# the DVs will always be there
DVDict = {}
for DV in self.DVNames:
DVDict[DV] = d["xuser"][DV]
if scale:
conDict = self.optProb._mapContoOpt_Dict(conDict)
objDict = self.optProb._mapObjtoOpt_Dict(objDict)
DVDict = self.optProb._mapXtoOpt_Dict(DVDict)
return conDict, objDict, DVDict
[docs]
def getCallCounters(self):
"""
Returns a list of all call counters stored in the history file.
Returns
-------
list
a list of strings, each entry being a call counter.
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
return copy.deepcopy(self.callCounters)
[docs]
def getValues(self, names=None, callCounters=None, major=True, scale=False, stack=False, allowSens=False):
"""
Parses an existing history file and returns a data dictionary used to post-process optimization results, containing the requested optimization iteration history.
Parameters
----------
names : list or str
the values of interest, can be the name of any DV, objective or constraint,
or a list of them. If None, all values are returned. This includes the DVs,
funcs, and any values stored by the optimizer.
callCounters : list of ints, can also contain 'last'
a list of callCounters to extract information from.
If the callCounter is invalid, i.e. outside the range or is a funcsSens evaluation, then it is skipped.
'last' represents the last major iteration.
If None, values from all callCounters are returned.
major : bool
flag to specify whether to include only major iterations.
scale : bool
flag to specify whether to apply scaling for the values. True means
to return values that are scaled the same way as the actual optimization.
stack : bool
flag to specify whether the DV should be stacked into a single numpy array with
the key `xuser`, or retain their separate DVGroups.
allowSens: bool
flag to specify whether gradient evaluation iterations are allowed.
If true, it is up to the user to ensure that the callCounters specified contain the information requested.
Returns
-------
dict
a dictionary containing the information requested. The keys of the dictionary
correspond to the `names` requested. Each value is a numpy array with the first
dimension equal to the number of callCounters requested.
Notes
-----
Regardless of the major flag, failed function evaluations are not returned.
Examples
--------
First we can request DV history over all major iterations:
>>> hist.getValues(names='xvars', major=True)
{'xvars': array([[-2.00000000e+00, 1.00000000e+00],
[-1.00000000e+00, 9.00000000e-01],
[-5.00305827e-17, 4.21052632e-01],
[ 1.73666171e-06, 4.21049838e-01],
[ 9.08477459e-06, 4.21045261e-01],
[ 5.00000000e-01, 2.84786405e-01],
[ 5.00000000e-01, 5.57279939e-01],
[ 5.00000000e-01, 2.00000000e+00]])}
Next we can look at DV and optimality for the first and last iteration only:
>>> hist.getValues(names=['xvars','optimality'],callCounters=[0,'last'])
{'optimality': array([1.27895528, 0. ]),
'xvars': array([[-2. , 1. ],
[ 0.5, 2. ]])}
"""
# only do this if we open the file with 'r' flag
if self.flag != "r":
return
allNames = (
self.DVNames.union(self.conNames)
.union(self.objNames)
.union(self.iterKeys)
.union(self.extraFuncsNames)
.difference({"funcs", "funcsSens", "xuser"})
)
# cast string input into a single list
if isinstance(names, str):
names = {names}
elif names is None:
names = allNames
else:
names = set(names)
if stack:
allNames.add("xuser")
# error if names isn't either a DV, con or obj
if not names.issubset(allNames):
raise Error(
"The names provided are not one of DVNames, conNames or objNames.\n"
+ f"The names must be a subset of {allNames}"
)
DVsAsFuncs = self.DVNames.intersection(self.conNames)
if len(DVsAsFuncs) > 0:
ambiguousNames = names.intersection(DVsAsFuncs)
if len(ambiguousNames) > 0:
pyOptSparseWarning(
f"The names provided {ambiguousNames} is ambiguous, since it is both a DV as well as an objective/constraint. "
+ "It is being assumed to be a DV. If it was set up via addDVsAsFunctions, then there's nothing to worry. "
+ "Otherwise, consider renaming the variable or manually editing the history file."
)
if len(names.intersection(self.iterKeys)) > 0:
if not major:
pyOptSparseWarning(
"The major flag has been set to True, since some names specified only exist on major iterations."
)
major = True
if stack:
DVinNames = names.intersection(self.DVNames)
for DV in DVinNames:
names.remove(DV)
names.add("xuser")
pyOptSparseWarning(
"The stack flag was set to True. Therefore all DV names have been removed, and replaced with a single key 'xuser'."
)
# set up dictionary to return
data = {}
# pre-allocate list for each input
for name in names:
data[name] = []
# this flag is used for error printing only
user_specified_callCounter = False
if callCounters is not None:
user_specified_callCounter = True
if isinstance(callCounters, str):
callCounters = [callCounters]
else:
callCounters = self.callCounters
# parse the 'last' callCounter
if "last" in callCounters:
callCounters.append(self.read("last"))
callCounters.remove("last")
self._previousIterCounter = -1
# loop over call counters, check if each counter is valid, and parse
for i in callCounters:
val = self._readValidCallCounter(i, user_specified_callCounter, allowSens, major)
if val is not None: # if i is valid
conDict, objDict, DVDict = self._processIterDict(val, scale=scale)
for name in names:
if name == "xuser":
data[name].append(self.optProb.processXtoVec(DVDict))
elif name in self.DVNames:
data[name].append(DVDict[name])
elif name in self.conNames:
data[name].append(conDict[name])
elif name in self.objNames:
data[name].append(objDict[name])
elif name in self.extraFuncsNames:
data[name].append(val["funcs"][name])
else: # must be opt
data[name].append(val[name])
# reshape lists into numpy arrays
for name in names:
# we just stack along axis 0
if len(data[name]) > 0:
data[name] = np.stack(data[name], axis=0)
else:
data[name] = np.array(data[name])
# we cast 1D arrays to 2D, for scalar values
if data[name].ndim == 1:
data[name] = np.expand_dims(data[name], 1)
# Raise warning for IPOPT's duplicated history
if self.db["metadata"]["optimizer"] == "IPOPT" and "iter" not in self.db["0"].keys():
pyOptSparseWarning(
"The optimization history of IPOPT has duplicated entries at every iteration. "
+ "Fix the history manually, or re-run the optimization with a current version of pyOptSparse to generate a correct history file. "
)
return data
def _readValidCallCounter(self, i, user_specified_callCounter, allowSens, major):
"""
Checks whether a call counter is valid and read the data. The call counter is valid when it is
1) inside the range of the history data,
2) a function evaluation (i.e. not a sensitivity evaluation, except when `allowSens = True`),
3) not a duplicated entry,
4) not a failed function evaluation,
5) a major iteration (only when `major = True`).
Parameters
----------
i : int
call counter.
user_specified_callCounter : bool
flag to specify whether the call counter `i` is requested by a user or not.
allowSens: bool
flag to specify whether gradient evaluation iterations are allowed.
major : bool
flag to specify whether to include only major iterations.
Returns
-------
val : dict or None
information corresponding to the call counter `i`.
If the call counter is not valid, `None` is returned instead.
"""
if not self.pointExists(i):
if user_specified_callCounter:
# user specified a non-existent call counter
pyOptSparseWarning(f"callCounter {i} was not found and is skipped!")
return None
else:
val = self.read(i)
# check if the callCounter is of a function call
if not ("funcs" in val.keys() or allowSens):
if user_specified_callCounter:
# user unintentionally specified a call counter for sensitivity
pyOptSparseWarning(
f"callCounter {i} did not contain a function evaluation and is skipped! "
+ "Was it a gradient evaluation step?"
)
return None
else:
# exclude the duplicated history (only when we have "iter" recorded)
if "iter" in val.keys():
duplicate_flag = val["iter"] == self._previousIterCounter
self._previousIterCounter = val["iter"] # update iterCounter for next i
if duplicate_flag and not user_specified_callCounter:
# this is a duplicate
return None
# end if "iter" in val.keys()
# check major/minor iteration, and if the call failed
if ((major and val["isMajor"]) or not major) and not val["fail"]:
return val
else:
return None
# end if - ("funcs" in val.keys()
# end if - pointExists
def __del__(self):
try:
self.db.close()
if self.temp:
os.remove(self.fileName)
except Exception:
pass