#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : med.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date : 29.03.2022
import os
import re
import sys
import contextlib
import pickle
import inspect
import textwrap
import warnings
import subprocess
import time
from datetime import datetime
import numpy as np
import pandas as pd
import cma
from fvgp.gp import GP
from scipy.stats.qmc import discrepancy, LatinHypercube
import toml
from tqdm import tqdm
from coexist import schedulers
from coexist.code_trees import code_contains_variable
from coexist.code_trees import code_substitute_variable
from coexist.plots import format_fig
from coexist.utilities import autorepr, SignalHandlerKI
import plotly.express as px
import plotly.graph_objs as go
from plotly.subplots import make_subplots
import medeq
from .__version__ import __version__
from .utilities import str_summary
# Optional imports
try:
from pysr import PySRRegressor
except ImportError:
class PySRNotInstalled:
pass
PySRRegressor = PySRNotInstalled
signal_handler = SignalHandlerKI()
def validate_parameters(parameters):
'''Validate the free parameters provided or extracted from a user script (a
``pandas.DataFrame``).
'''
if not isinstance(parameters, pd.DataFrame):
raise ValueError(textwrap.fill((
"The `parameters` given is not a `pandas.DataFrame` instance (or "
f"subclass thereof). Received `{type(parameters)}`."
)))
columns_needed = ["min", "max", "value"]
if not all(c in parameters.columns for c in columns_needed):
raise ValueError(textwrap.fill((
"The `parameters` DataFrame provided must have at least three "
"columns defined: ['min', 'max', 'value']. Found these: "
f"`{parameters.columns}`. You can use the "
"`medeq.create_parameters` function as a helper."
)))
if not np.all(parameters["min"] < parameters["max"]):
raise ValueError(textwrap.fill((
"The `parameters` DataFrame must have all values in the column "
"'min' strictly smaller than in the column 'max'."
)))
def upscale(x, lo, hi):
'''Scale the input values `x` from [0, 1) up to [lo, hi).
'''
return lo + (hi - lo) * x
def downscale(x, lo, hi):
'''Scale the input values `x` from [lo, hi) down to [0, 1).
'''
return (x - lo) / (hi - lo)
[docs]def sampler(f):
'''Decorator making a user-defined function a MED sampler.
A MED sampler is simply an object defining the method ``.sample(n, med)``,
where ``med`` is a complete MED instance (for e.g. using historical
sampling information) and ``n`` is the number of samples between [0, 1) to
return.
This decorator simply attaches a method ``.sample`` to the given function
that forwards the function call to the function itself.
Examples
--------
.. code-block:: python
import medeq
import numpy as np
@medeq.sampler
def user_sampler(n, med = None):
num_parameters = len(med.parameters)
return np.random.random((n, num_parameters))
'''
f.sample = f.__call__
return f
[docs]class DVASampler:
'''Parameter sampler that targets the most uncertain regions while
maximising the area covered.
MED samplers return values between [0, 1), which can then be upscaled to
individual parameter ranges.
Parameters
----------
d : int
Sampling dimensionality - i.e. number of parameters.
seed : int, optional
Seed for deterministic random sampling.
Examples
--------
Simple, seeded sample generation:
.. code-block::
import medeq
sampler = medeq.DVASampler(3, seed=123)
sampler.sample(5, None)
If you have a ``MED`` object, you can pass it as a second parameter to the
`sample` method to specifically target high-uncertainty regions:
.. code-block::
import medeq
parameters = medeq.create_parameters(...)
med = medeq.MED(parameters, ...)
sampler = medeq.DVASsampler(3)
sampler.sample(5, med)
'''
[docs] def __init__(self, d, seed = None):
self.d = int(d)
self.seed = seed
[docs] def sample(self, n, med):
# Save the MED instance as an attribute to be accessed in `.cost`
if med is None:
class MEDNotGiven:
gp = None
med = MEDNotGiven()
elif not isinstance(med, MED):
raise TypeError(textwrap.fill((
"The input `med` argument may be either `None` or an instance "
f"of `medeq.MED`. Received `{type(med)}`."
)))
# Draw starting guesses from well-spread out Latin Hypercube
sampler0 = LatinHypercube(self.d, seed = self.seed)
x0 = sampler0.random(n).flatten()
bounds = [np.zeros(n * self.d), np.ones(n * self.d)]
es = cma.CMAEvolutionStrategy(x0, 0.4, cma.CMAOptions(
bounds = bounds,
seed = self.seed,
# maxfevals = 100_000,
verbose = -9,
))
# es.logger = cma.CMADataLogger(os.path.join(folder, "cache", ""))
try:
with tqdm() as pbar:
iterations = 0
nit = 100
while not es.stop():
es.optimize(self.cost, iterations = nit, args = (med,))
iterations += nit
pbar.update(nit)
pbar.set_description((
"DVASampler | "
f"Fitness: {es.result.fbest:4.4e} | "
f"Convergence: {es.sigma:4.4e} : "
))
except KeyboardInterrupt:
line = "*" * 80 + "\n"
print((
f"{line}Caught Ctrl-C; stopping optimisation and collecting "
f"current best results.\n{line}"
), file = sys.stderr)
# es.optimize(self.cost)
return es.result.xbest.reshape(-1, self.d)
[docs] def cost(self, x, med):
x = x.reshape(-1, self.d)
if med.gp is None:
uncertainty = [1]
else:
# Evaluate mean uncertainty for each GP
uncertainty = np.ones((len(med.gp)))
for i in range(len(med.gp)):
uncertainty[i] = med.gp[i].posterior_covariance(
x, variance_only = True
)["v(x)"].mean()
# Take previous samples into consideration for discrepancy
prev = downscale(
med.evaluated,
med.parameters["min"].to_numpy(),
med.parameters["max"].to_numpy(),
)
x = np.vstack((prev, x))
# Remove samples outside given parameters range
x = x[(x < 1).all(axis = 1) & (x >= 0).all(axis = 1)]
return discrepancy(x) / np.prod(uncertainty)
def __repr__(self):
name = self.__class__.__name__
return f"{name}(d={self.d}, seed={self.seed})"
[docs]class RandomSampler:
'''Parameter sampler using uniform random distributions.
MED samplers return values between [0, 1), which can then be upscaled to
individual parameter ranges.
Parameters
----------
d : int
Sampling dimensionality - i.e. number of parameters.
seed : int, optional
Seed for deterministic random sampling.
Examples
--------
Simple, seeded sample generation:
.. code-block::
import medeq
sampler = medeq.RandomSampler(3, seed=123)
sampler.sample(5, None)
'''
[docs] def __init__(self, d, seed = None):
self.d = int(d)
self.seed = seed
self.rng = np.random.default_rng(self.seed)
[docs] def sample(self, n, med = None):
return self.rng.random((n, self.d))
def __repr__(self):
name = self.__class__.__name__
return f"{name}(d={self.d}, seed={self.seed})"
[docs]class LatticeSampler:
'''Parameter sampler following a regular lattice - this also corresponds
to a full factorial design of experiments.
MED samplers return values between [0, 1), which can then be upscaled to
individual parameter ranges.
Parameters
----------
d : int
Sampling dimensionality - i.e. number of parameters.
seed : int, optional
Seed for deterministic random sampling.
Examples
--------
Simple, seeded sample generation:
.. code-block::
import medeq
sampler = medeq.RandomSampler(3, seed=123)
sampler.sample(5, None)
'''
[docs] def __init__(self, d, seed = None):
self.d = int(d)
self.seed = seed
self.rng = np.random.default_rng(self.seed)
[docs] def sample(self, n, med = None):
# Generate regular lattice with the closest number of points to `n`
nd = int(np.ceil(n ** (1 / self.d)))
lattice = np.meshgrid(*([np.linspace(0, 0.999999, nd)] * self.d))
lattice = np.vstack([lat.flatten() for lat in lattice]).T
# Remove some random points to get down to exactly `n` samples
remove = self.rng.integers(0, len(lattice), size = len(lattice) - n)
mask = np.full(len(lattice), True)
mask[remove] = False
return lattice[mask]
def __repr__(self):
name = self.__class__.__name__
return f"{name}(d={self.d}, seed={self.seed})"
[docs]@autorepr
class MEDPaths:
'''Structure handling IO and storing all paths relevant for MED.
Attributes
----------
directory : str
Path to the MED directory, e.g. ``med_seed123``.
'''
[docs] def __init__(self, directory):
# Root directory / prefix
self.directory = directory
# All samples requested and responses found / given, then queued ones
self.results = os.path.join(self.directory, "results.csv")
self.queue = os.path.join(self.directory, "queue.csv")
self.setup = os.path.join(self.directory, "setup.toml")
self.sampler = os.path.join(self.directory, "sampler.pickle")
self.script = os.path.join(self.directory, "med_script.py")
self.results_dir = os.path.join(self.directory, "results")
self.outputs = os.path.join(self.directory, "outputs")
[docs] def update_paths(self, prefix):
'''Translate all paths saved in this class relative to a new `prefix`
(which will replace the `directory` attribute).
Please ensure that the `prefix` directory contains MED-created files.
'''
self.directory = prefix
for attr in ["results", "queue", "setup", "sampler", "script",
"results_dir", "outputs"]:
prev = getattr(self, attr)
if prev is not None:
new_path = os.path.join(prefix, os.path.split(prev)[1])
setattr(self, attr, new_path)
[docs]class MED:
'''Autonomously explore system responses and discover underlying physical
laws or correlations.
Exploring systems responses can be done in one of two ways:
1. Locally / manually: running experiments / simulations, then feeding
results back to MED.
2. Massively parallel: for complex simulations that can be launched in
Python, MED can automatically change simulation parameters and run them
in parallel on OS processes (locally) or SLURM jobs (distributed
clusters).
A typical local workflow is:
1. Define free parameters to explore as a ``pd.DataFrame`` - you can use
the ``medeq.create_parameters`` function for this.
>>> import medeq
>>> parameters = medeq.create_parameters(
>>> ["A", "B"],
>>> minimums = [-5., -5.],
>>> maximums = [10., 10.],
>>> )
>>> print(parameters)
value min max
A 2.5 -5.0 10.0
B 2.5 -5.0 10.0
2. Create a ``medeq.MED`` object and generate samples (i.e. parameter
combinations) to evaluate - the default sampler covers the parameter
space as efficiently as possible, taking previous results into account;
use the ``MED.sample(n)`` method to get ``n`` samples to try.
>>> med = medeq.MED(parameters)
>>> print(med)
MED(seed=42)
---------------------------------------
parameters =
value min max
A 2.5 -5.0 10.0
B 2.5 -5.0 10.0
response_names =
None
---------------------------------------
sampler = DVASampler(d=2, seed=42)
samples = np.ndarray[(0, 2), float64]
responses = NoneType
epochs = list[0, tuple[int, int]]
>>> med.sample(5)
array([[-3.33602115, -0.45639296],
[ 5.55496225, 5.554965 ],
[ 2.72771903, -3.48852585],
[-0.45639308, 8.33602069],
[ 8.48852568, 2.27228172]])
3. For a local / offline workflow, these samples can be evaluated in one of
two ways:
- Evaluate samples manually, offline - i.e. run experiments,
simulations, etc. and feed them back to MED.
- Let MED evaluate a simple Python function / model.
>>> # Evaluate samples manually - run experiments, simulations, etc.
>>> to_evaluate = med.queue
>>> responses = [1, 2, 3, 4, 5]
>>> med.evaluate(responses)
>>>
>>> # Or evaluate simple Python function / model
>>> def instrument(sample):
>>> return sample[0] + sample[1]
>>>
>>> med.evaluate(instrument)
>>> med.results
A B variance response
0 -3.336021 -0.456393 0.037924 -3.792414
1 5.554962 5.554965 0.111099 11.109927
2 2.727719 -3.488526 0.007608 -0.760807
3 -0.456393 8.336021 0.078796 7.879628
4 8.488526 2.272282 0.107608 10.760807
For a massively parallel workflow, e.g. using a complex simulation, all
you need is a standalone Python script that:
- Defines its free parameters between two "# MED PARAMETERS START / END"
directives.
- Runs the simulation in _any_ way - define simulation inline, launch it
on a supercomputer and collect results, etc.
- Defines a variable "response" for the simulated output of interest -
either as a single number or a list of numbers (multi-response).
Here is a simple example of a MED script:
::
# In file `simulation_script.py`
# MED PARAMETERS START
import medeq
parameters = medeq.create_parameters(
["A", "B", "C"],
[-5., -5., -5.],
[10., 10., 10.],
)
# MED PARAMETERS END
# Run simulation in any way, locally, on a supercomputer and collect
# results - then define the variable `response` (float or list[float])
values = parameters["value"]
response = values["A"]**2 + values["B"]**2
If you have previous, separate experimental data, you can ``MED.augment``
the dataset of responses:
>>> # Augment dataset of responses with historical data
>>> samples = [
>>> [1, 1],
>>> [2, 2],
>>> [1, 2],
>>> ]
>>>
>>> responses = [1, 2, 3]
>>> med.augment(samples, responses)
To discover the underlying equation, you need to install Julia (a
beautiful, high-performance programming language) on your system and the
PySR library:
1. Install Julia manually (see https://julialang.org/downloads/).
2. ``pip install pysr``
3. ``python -c 'import pysr; pysr.install()'``
And now discover underlying equations!
>>> med.discover(binary_operators = ["+", "*"])
Hall of Fame:
-----------------------------------------
Complexity Loss Score Equation
1 2.412e+01 5.296e-01 B
3 0.000e+00 1.151e+01 (A + B)
Attributes
----------
parameters : pd.DataFrame
A
response_names : list[str] or None
A
sampler : object
Any Python object defining a method ``.sample(n, med)`` returning ``n``
samples to evaluate.
scheduler : coexist.schedulers.Scheduler subclass or None
An object implementing the ``coexist.schedulers.Scheduler`` interface,
defining a method for scheduling function evaluations in a massively
parallel context. Only relevant if ``parameters`` is given as a user
script.
samples : (M, P) np.ndarray
The parameter samples generated.
responses : (N, K) np.ndarray or None
The responses evaluated.
response_names : list[str] or None
The response names, if given.
epochs : list[tuple[int, int]]
The sample generation-evaluation batches, as indices ranges within
`samples` and `responses`.
seed : int
The random seed used for deterministic results.
verbose : int
Verbosity level, between 0 and 5.
queue : np.ndarray
[Generated] The queue of unevaluated samples.
evaluated : (N, P) np.ndarray
[Generated] The evaluated samples.
results : pd.DataFrame
[Generated] Neat DataFrame of samples tried and corresponding results.
variances : (N, K) np.ndarray
[Generated] The uncertainties in the results found.
gp : list[fvgp.gp.GP] or None
[Internal] List of Gaussian Process objects for each response.
sr : pysr.PySRRegressor or None
[Internal] PySR symbolic regressor for equation discovery.
paths : medeq.med.MEDPaths or None
[Internal] Structure containing paths for saving the MED object.
'''
[docs] def __init__(
self,
parameters,
response_names = None,
sampler = DVASampler,
scheduler = schedulers.LocalScheduler(),
seed = None,
verbose = 3,
):
# Type-checking
if isinstance(parameters, pd.DataFrame):
# DataFrame given directly; experiments will be run offline
validate_parameters(parameters)
self.parameters = parameters
self.scheduler = None
self.script = None
else:
# User-script was given; extract parameters and set scheduler
self._generate_script(parameters, scheduler)
if response_names is not None:
if isinstance(response_names, str):
response_names = [response_names]
else:
response_names = [str(resp) for resp in response_names]
self.response_names = response_names
if seed is None:
# Generate random 3-digit seed
self.seed = np.random.randint(100, 1000)
else:
self.seed = int(seed)
if inspect.isclass(sampler):
# Construct given sampler class with the number of dimensions
self.sampler = sampler(len(self.parameters), seed = self.seed)
else:
# The given sampler is already constructed
self.sampler = sampler
if not hasattr(sampler, "sample"):
raise TypeError(textwrap.fill((
"The input `sampler` must define a method `.sample(n)` to "
"draw `n` samples. If you use a custom object or function "
"(and not a class) you can use the `@sampler` decorator to "
"automatically add it; check the `medeq` docs for more."
)))
self.verbose = int(verbose)
# Setting inner attributes
self.samples = np.empty((0, len(self.parameters)))
self.responses = None
self.epochs = []
self.gp = None
self.sr = None
self.paths = None
@property
def queue(self):
# Samples that were not evaluated yet
ran = 0 if self.responses is None else len(self.responses)
return self.samples[ran:]
@property
def evaluated(self):
# Samples that already have computed responses
ran = 0 if self.responses is None else len(self.responses)
return self.samples[:ran]
@property
def variances(self):
if self.responses is None:
# If no responses were evaluated and no response names were given,
# assume a single response
if self.response_names is None:
nresp = 1
else:
nresp = len(self.response_names)
else:
nresp = self.responses.shape[1]
if self.gp is None:
return np.empty((0, nresp))
return np.vstack([gp.variances for gp in self.gp]).T
@property
def results(self):
# Extract current responses if any were found
if self.responses is None:
# If no responses were evaluated and no response names were given,
# assume a single response
if self.response_names is None:
nresp = 1
else:
nresp = len(self.response_names)
responses = np.empty((0, nresp))
else:
responses = self.responses
nresp = responses.shape[1]
# Extract variances
variances = self.variances
# Extract column names and ensure correct number thereof
if nresp == 1:
variance_names = ["variance"]
else:
variance_names = [f"variance{i}" for i in range(nresp)]
if self.response_names is None:
if nresp == 1:
response_names = ["response"]
else:
response_names = [f"response{i}" for i in range(nresp)]
else:
response_names = self.response_names
# Results DataFrame column names
columns = self.parameters.index.to_list()
columns += variance_names + response_names
# Results DataFrame data
data = np.c_[self.evaluated, variances, responses]
return pd.DataFrame(data, columns = columns)
[docs] def save(self, directory = None):
'''Save all data about the MED object to disk in a given `directory`.
'''
# Instantiate MEDPaths object handling directory hierarchy
if directory is None:
# Include the random seed used in the `med_seed<seed>` dirpath
directory = f"med_seed{self.seed}"
else:
directory = str(directory)
self.paths = MEDPaths(directory)
# Create directories
if not os.path.isdir(self.paths.directory):
os.mkdir(self.paths.directory)
# Save MED setup
if self.gp is None:
hyperparameters = None
else:
hyperparameters = [g.hyperparameters.tolist() for g in self.gp]
setup = {
"seed": self.seed,
"verbose": self.verbose,
"epochs": self.epochs,
"parameter_names": self.parameters.index.to_list(),
"response_names": self.response_names,
"hyperparameters": hyperparameters,
"parameters": self.parameters.to_dict(),
}
now = datetime.now().strftime("%H:%M:%S on %d/%m/%Y")
with open(self.paths.setup, "w") as f:
f.write(f"# Generated by MED-{__version__} at {now}\n")
toml.dump(setup, f)
# Save sampler as a binary pickle file to completely reconstruct the
# arbitrary user-provided object
with open(self.paths.sampler, "wb") as f:
pickle.dump(self.sampler, f)
# Results and outputs directory are only relevant if we have a MED
# script rather than simple parameters and "offline" evaluation
if self.script is not None:
with open(self.paths.script, "w") as f:
f.writelines(self.script)
if not os.path.isdir(self.paths.results_dir):
os.mkdir(self.paths.results_dir)
if not os.path.isdir(self.paths.outputs):
os.mkdir(self.paths.outputs)
# Save information about the run
readmefile = os.path.join(self.paths.directory, "readme.rst")
with open(readmefile, "w", encoding = "utf-8") as f:
f.write(textwrap.dedent(f'''
MED System Response Exploration Directory
-----------------------------------------
This directory was generated by MED-{__version__} at {now}.
MED setup:
::
'''))
f.write(textwrap.indent(str(self), ' '))
# Save evaluated & queued samples and responses found
self.results.to_csv(self.paths.results, index = None)
pd.DataFrame(self.queue, columns = self.parameters.index).to_csv(
self.paths.queue,
index = None,
)
[docs] @staticmethod
def load(dirpath):
'''Load MED instance from a directory (eg "med_seed123").
'''
med = MED.__new__(MED)
paths = MEDPaths(dirpath)
with open(paths.setup) as f:
setup = toml.load(f)
with open(paths.sampler, "rb") as f:
med.sampler = pickle.load(f)
med.seed = setup["seed"]
med.verbose = setup["verbose"]
med.parameters = pd.DataFrame.from_dict(setup["parameters"])
med.response_names = setup.get("response_names", None)
med.epochs = [tuple(e) for e in setup["epochs"]]
# Schedulers are platform-specific; when loading MED instances in
# platform-agnostic manners, schedulers must be left out
med.scheduler = None
# Set derived attributes
med.samples = np.empty((0, len(med.parameters)))
med.responses = None
med.gp = None
med.sr = None
# Load evaluated and queued samples and responses
if os.path.isfile(paths.results):
data = pd.read_csv(paths.results).to_numpy(dtype = float)
samples = data[:, :len(med.parameters)]
med.samples = samples
if len(med.samples):
nresp = (data.shape[1] - len(med.parameters)) // 2
med.responses = data[:, -1] if nresp == 1 else data[:, -nresp:]
med._train()
if os.path.isfile(paths.queue):
queue = pd.read_csv(paths.queue).to_numpy()
if len(queue):
med.samples = np.vstack((med.samples, queue))
med.paths = paths
return med
[docs] def sample(self, n):
'''Generate and return ``n`` new parameter samples; they will be
added to the ``.samples`` and ``.queue`` attribute.
'''
new = self.sampler.sample(n, self)
new = np.asarray(new, dtype = float)
# If it's a simple (row) vector, transform it into a 2D column
if new.ndim == 1:
new = new[:, np.newaxis]
# Ensure correct shape
if new.ndim != 2 or new.shape[0] != n or \
new.shape[1] != len(self.parameters):
if hasattr(self.sampler, "__qualname__"):
sampler_name = self.sampler.__qualname__
else:
sampler_name = self.sampler.__class__.__name__
raise ValueError(textwrap.fill((
f"The samples returned by the `{sampler_name}` sampler must "
"be a 2D matrix with shape (N, D), where N is the number of "
"samples requested and D is the number of dimensions. "
f"Received an array with shape `{new.shape}`."
)))
# Ensure correct values
if np.any(new < 0) or np.any(new > 1):
if hasattr(self.sampler, "__qualname__"):
sampler_name = self.sampler.__qualname__
else:
sampler_name = self.sampler.__class__.__name__
num_outside = np.sum(new < 0) + np.sum(new > 1)
raise ValueError(textwrap.fill((
f"The samples returned by the `{sampler_name}` sampler must "
f"have values between [0, 1). Received {num_outside} values "
f"outside the range.\n< 0:{new[new < 0]}\n> 1:{new[new > 1]}."
)))
# Scale samples from [0, 1) up to real parameter bounds
new = upscale(
new,
self.parameters["min"].to_numpy(),
self.parameters["max"].to_numpy(),
)
self.samples = np.vstack((self.samples, new))
return new
[docs] def subset(self, select):
'''Select a subset of the current samples, returning a new ``MED``
object that can e.g. discover equations for only the selected subset.
'''
newmed = MED(
self.parameters,
response_names = self.response_names,
sampler = self.sampler,
seed = self.seed,
verbose = self.verbose,
)
# Keep track of generated paths, e.g. MED script
newmed.paths = self.paths
newsamples = self.samples[select]
newresponses = self.responses[select]
newmed.augment(newsamples, newresponses)
return newmed
[docs] def evaluate(self, f = None):
'''Evaluate the current samples online or offline.
There are 3 possible workflows:
1. The user evaluated the `MED.queue` values separately (e.g. ran
experiments) - then simply supply a NumPy array of responses.
>>> med.evaluate([1, 2, 3])
2. A simple Python function is supplied that will be evaluated for each
sample; the function must accept a single NumPy vector.
::
def instrument(params):
# `params` is a NumPy array of parameter combinations to try
return params[0] + params[1]
med.evaluate(instrument)
3. If a separate Python script was provided when the class was created,
nothing else is needed; this function will launch jobs and collect
responses from the user script.
'''
# Check samples have been generated
if len(self.queue) == 0:
raise ValueError(textwrap.fill((
"No samples were generated for evaluation. Run the "
"`MED.sample(n)` method to generate samples first."
)))
if f is None:
# If evaluating a user script, generate directory hierarchy
if self.paths is None or not os.path.isdir(self.paths.directory):
self.save()
responses = self._evaluate_script()
elif callable(f):
responses = self._evaluate_function(f)
else:
responses = np.asarray(f, dtype = float)
# Ensure 2D shape
if responses.ndim == 1:
responses = responses[:, np.newaxis]
# Type-check responses found
if len(responses) != len(self.queue):
raise ValueError(textwrap.fill((
"Incorrect number of responses given; expected "
f"{len(self.queue)} responses for each sample generated. "
f"Received {len(responses)} responses."
)))
# Set / extend the inner responses attribute
if self.responses is None:
start = 0
self.responses = responses
else:
start = len(self.responses)
self.responses = np.concatenate((self.responses, responses))
# Keep track of sampling epochs
end = len(self.responses)
self.epochs.append((start, end))
self._check_response_names()
self._train()
# If a user script was evaluated, immediately save results, as they are
# assumed to be long simulations running offline / asynchronously
if f is None:
self.save(self.paths.directory)
def _evaluate_script(self):
# Evaluate the current samples in `queue` using the user-script.
# Aliases
param_names = self.parameters.index
start_index = 0 if self.responses is None else len(self.responses)
queue = self.queue
# For every sample to try, start a separate OS process that runs the
# `med_seed<seed>/med_script.py` script, which computes and saves
# responses
processes = []
# Schedule processes using given scheduler
if not hasattr(self, "scheduler_cmd"):
self.scheduler_cmd = self.scheduler.generate()
# These are this epoch's paths to save the simulation outputs to; they
# will be given to `self.paths.script` as command-line arguments
parameters_paths = [
os.path.join(
self.paths.results_dir,
f"parameters.{start_index + i}.pickle",
) for i in range(len(queue))
]
response_paths = [
os.path.join(
self.paths.results_dir,
f"response.{start_index + i}.pickle",
) for i in range(len(queue))
]
# Catch the KeyboardInterrupt (Ctrl-C) signal to shut down the spawned
# processes before aborting.
try:
signal_handler.set()
# Spawn a separate process for every sample to try / sim to run
for i, sample in enumerate(queue):
# Create new set of parameters and save them to disk
parameters = self.parameters.copy()
for j, sval in enumerate(sample):
parameters.at[param_names[j], "value"] = sval
with open(parameters_paths[i], "wb") as f:
pickle.dump(parameters, f)
processes.append(
subprocess.Popen(
self.scheduler_cmd + [
self.paths.script,
parameters_paths[i],
response_paths[i],
],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
universal_newlines = True, # outputs in text mode
)
)
# Gather results
responses, stdout_rec, stderr_rec, crashed = \
self._gather_results(processes, response_paths)
except KeyboardInterrupt:
for proc in processes:
proc.kill()
raise
finally:
signal_handler.unset()
self._print_status_eval(stdout_rec, stderr_rec, crashed)
# Find number of error values returned for one successful simulation
nresp = 1
for i, resp in enumerate(responses):
if resp is not None:
if nresp == 1:
nresp = len(resp)
continue
if len(resp) != nresp:
raise ValueError(textwrap.fill((
f"The simulation at index {start_index + i} returned "
f"{len(resp)} responses, while previous simulations "
f"had {nresp} responses."
)))
# Substitute results that are None (i.e. crashed) with rows of NaNs
for i in range(len(responses)):
if responses[i] is None:
responses[i] = np.full(nresp, np.nan)
return np.array(responses)
def _evaluate_function(self, f):
samples = self.queue
responses = [f(s) for s in samples]
return np.array(responses, dtype = float)
[docs] def augment(self, samples, responses):
'''Augment set of samples with manually-evaluated responses.
'''
samples = np.asarray(samples, dtype = float)
responses = np.asarray(responses, dtype = float)
# If samples is a simple (row) vector, transform it into a 2D column
if samples.ndim == 1:
samples = samples[:, np.newaxis]
if responses.ndim == 1:
responses = responses[:, np.newaxis]
# Type-check samples and responses shapes
if samples.ndim != 2 or samples.shape[1] != len(self.parameters):
raise ValueError(textwrap.fill((
"The input `samples` must have the same number of columns "
"as the number of dimensions, i.e. number of parameters (= "
f"{len(self.parameters)}). Received array with shape "
f"`{samples.shape}`."
)))
if len(responses) != len(samples):
raise ValueError(textwrap.fill((
"The input `responses` must be a vector / matrix with the "
"same number of values / rows as `samples`. Received "
f"{len(responses)} responses for {len(samples)} samples."
)))
# Append new samples and responses to current ones. Also move the
# previously unevaluated samples to the end
nev = len(self.responses) if self.responses is not None else 0
self.samples = np.vstack((
self.samples[:nev],
samples,
self.samples[nev:]
))
if self.responses is None:
self.responses = responses
else:
self.responses = np.concatenate((self.responses, responses))
self._check_response_names()
self._train()
[docs] def discover(
self,
response = None,
binary_operators = ["+", "-", "*", "/", "^"],
unary_operators = [],
maxsize = 50,
maxdepth = None,
niterations = 100,
populations = 32,
parsimony = 0.0032,
constraints = None,
nested_constraints = None,
denoise = False,
select_k_features = None,
turbo = True,
equation_file = "equations.csv",
progress = False,
**kwargs,
):
'''Discover analytical models of varying complexity for the evaluated
samples and responses.
The most important equation discovery parameters are given to this
method; a full reference is here: https://astroautomata.com/PySR/api/
Parameters
----------
response : str or int, optional
The response name or index to find an equation for; only needs to
be specified for more than 1 reponse.
binary_operators : list[str], default ["+", "-", "*", "/", "^"]
Operators taking two real numbers as input, using Julia syntax,
e.g. ``+(x, y) === x + y``. Can define custom operators like
``binaryfunc(x, y) = x^2 * y``.
unary_operators : list[str], default []
Operators taking a single real number as input, using Julia syntax,
e.g. ``sin(x)`` or ``log(x)``. Can define custom operators like
``unaryfunc(x) = x^2``.
maxsize : int, default 50
Maximum size of equation; smaller values correpond to shorter
equations and faster searching.
maxdepth : int, optional
If defined, limit depth of equation tree - i.e. stacked operations.
niterations : int, default 100
Number of equation finding iterations to run.
populations : int, default 32
Number of equation populations to use; a larger value corresponds
to more equations being sampled and slower search.
parsimony : float, default 0.0032
How much to punish complexity; larger value prefers smaller
expressions.
constraints : dict[str], optional
Optional dictionary of complexity constraints on operators, e.g.
don't allow the exponent of a power operator to have complexity
larger than 2 with `constraints = {"pow": (-1, 2)}`; -1 means
any complexity.
nested_constraints : dict[str], optional
Number of times a combination of operators can be nested, e.g.
{"sin": {"cos": 0}} specifies that `cos` cannot be found inside a
`sin`.
denoise : bool, default False
Symbolic regression should be robust with noise, but can further
denoise data with a Gaussian Process.
select_k_features : int, optional
If defined, use at most `k` parameters from the given samples.
turbo : bool, default True
Whether to use extra, experimental optimisations internally.
equation_file : str, default "equations.csv"
Where to save equations.
progress : bool, default False
Whether to show interactive equation discovery progress.
**kwargs : other keyword arguments
Other keyword arguments to be passed to PySR.
Returns
-------
list[Equation]
List of equations found.
'''
# Check we have data to discover equations for
if not len(self.responses):
raise ValueError(textwrap.fill((
"No responses were saved in MED. Before discovering equations "
"please sample the parameter space and collect some values, "
"e.g. with `MED.sample` and `MED.evaluate`."
)))
if response is not None:
if isinstance(response, str):
response_index = self.response_names.index(response)
responses = self.responses[:, response_index]
else:
response_index = int(response)
responses = self.responses[:, response_index]
elif self.responses.shape[1] > 1:
raise ValueError(textwrap.fill((
f"There are {self.responses.shape[1]} different responses "
"saved (i.e. number of columns in `self.responses`). Please "
"specify which should be used to discover equations as "
"a response name (str) or index (int)."
)))
else:
responses = self.responses[:, 0]
# Flush stdout, as the Julia backend will overwrite it
print(flush = True)
self.sr = PySRRegressor(
binary_operators = binary_operators,
unary_operators = unary_operators,
maxsize = maxsize,
maxdepth = maxdepth,
niterations = niterations,
populations = populations,
parsimony = parsimony,
constraints = constraints,
nested_constraints = nested_constraints,
denoise = denoise,
select_k_features = select_k_features,
equation_file = equation_file,
temp_equation_file = True,
progress = progress,
**kwargs,
)
names = [c.replace(" ", "_") for c in self.parameters.index]
self.sr.fit(self.evaluated, responses, variable_names = names)
return self.sr.equations_
def _generate_script(self, script_path, scheduler):
# Given a path to a user-defined simulation script, extract the free
# parameters and generate the MED script.
# Type-check and generate scheduler commands
if not isinstance(scheduler, schedulers.Scheduler):
raise TypeError(textwrap.fill((
"The input `scheduler` must be a subclass of `coexist."
f"schedulers.Scheduler`. Received {type(scheduler)}."
)))
self.scheduler = scheduler
# Extract parameters and generate ACCES script
with open(script_path, "r") as f:
user_code = f.readlines()
# Find the two parameter definition directives
params_start_line = None
params_end_line = None
regex_prefix = r"#+\s*MED\s+PARAMETERS"
params_start_finder = re.compile(regex_prefix + r"\s+START")
params_end_finder = re.compile(regex_prefix + r"\s+END")
for i, line in enumerate(user_code):
if params_start_finder.match(line):
params_start_line = i
if params_end_finder.match(line):
params_end_line = i
if params_start_line is None or params_end_line is None:
raise NameError(textwrap.fill((
f"The user script found in file `{script_path}` did not "
"contain the blocks `# MED PARAMETERS START` and "
"`# MED PARAMETERS END`. Please define your simulation "
"free parameters between these two comments / directives."
)))
# Execute the code between the two directives to get the initial
# `parameters`. `exec` saves all the code's variables in the
# `parameters_exec` dictionary
user_params_code = "".join(
user_code[params_start_line:params_end_line]
)
user_params_exec = dict()
exec(user_params_code, user_params_exec)
if "parameters" not in user_params_exec:
raise NameError(textwrap.fill((
"The code between the user script's directives "
"`# MED PARAMETERS START` and "
"`# MED PARAMETERS END` does not define a variable "
"named exactly `parameters`."
)))
validate_parameters(user_params_exec["parameters"])
self.parameters = user_params_exec["parameters"]
if not code_contains_variable(user_code, "response"):
raise NameError(textwrap.fill((
f"The user script found in file `{script_path}` does not "
"define the required variable `response`."
)))
# Substitute the `parameters` creation in the user code with loading
# them from a MED-defined location
parameters_code = [
"\n# Unpickle `parameters` from this script's first " +
"command-line argument and set\n",
'# `med_id` to a unique simulation ID\n',
code_substitute_variable(
user_code[params_start_line:params_end_line],
"parameters",
('with open(sys.argv[1], "rb") as f:\n'
' parameters = pickle.load(f)\n')
)
]
# Also define a unique ACCESS ID for each simulation
parameters_code += (
'med_id = int(sys.argv[1].split(".")[-2])\n'
)
# Read in the `async_access_template.py` code template and find the
# code injection directives
template_code_path = os.path.join(
os.path.split(medeq.__file__)[0],
"template_med_script.py"
)
with open(template_code_path, "r") as f:
template_code = f.readlines()
for i, line in enumerate(template_code):
if line.startswith("# MED INJECT USER CODE START"):
inject_start_line = i
if line.startswith("# MED INJECT USER CODE END"):
inject_end_line = i
generated_code = "".join((
template_code[:inject_start_line + 1] +
user_code[:params_start_line + 1] +
parameters_code +
user_code[params_end_line:] +
template_code[inject_end_line:]
))
self.script = generated_code
def _gather_results(self, processes, response_paths):
# Load parallel-executed results from user script
if not hasattr(self, "_stdout"):
self._stdout = None
if not hasattr(self, "_stderr"):
self._stderr = None
responses = []
stdout_rec = []
stderr_rec = []
crashed = []
# Occasionally check if jobs finished
wait = 0.1 # Time between checking results
waited = 0. # Total time waited
logged = 0 # Number of times logged remaining simulations
tlog = 30 * 60 # Time until logging remaining simulations again
while wait != 0:
done = sum((p.poll() is not None for p in processes))
if done == len(processes):
wait = 0
for i, proc in enumerate(processes):
proc_index = int(proc.args[-1].split(".")[-2])
stdout, stderr = proc.communicate()
# If a *new* output message was recorded in stdout, log it
if len(stdout) and stdout != self._stdout:
stdout_rec.append(proc_index)
self._stdout = stdout
stdout_log = os.path.join(
self.paths.outputs,
f"stdout.{proc_index}.log",
)
with open(stdout_log, "w") as f:
f.write(self._stdout)
# If a *new* error message was recorded in stderr, log it
if len(stderr) and stderr != self._stderr:
stderr_rec.append(proc_index)
self._stderr = stderr
stderr_log = os.path.join(
self.paths.outputs,
f"stderr.{proc_index}.log",
)
with open(stderr_log, "w") as f:
f.write(self._stderr)
# Load result if the file exists, otherwise set it to NaN
if os.path.isfile(response_paths[i]):
with open(response_paths[i], "rb") as f:
response = pickle.load(f)
# If it's a dictionary[resp_name -> resp_val]
if hasattr(response, "items") and \
callable(response.items):
resp = []
resp_names = []
for k, v in response.items():
resp_names.append(k)
resp.append(v)
resp = np.array(resp, dtype = float)
self.response_names = resp_names
# If it's a list-like of responses
elif hasattr(response, "__iter__"):
resp = np.array(response, dtype = float)
# If it's a single response
else:
resp = np.array([response], dtype = float)
responses.append(resp)
else:
responses.append(None)
crashed.append(proc_index)
# Every `remaining` seconds print remaining jobs
if (
self.verbose >= 4 and
wait != 0 and
waited > (logged + 1) * tlog
):
logged += 1
tlog *= 1.5
remaining = " ".join([
p.args[-1].split(".")[-2]
for p in processes if p.poll() is None
])
minutes = int(waited / 60)
if minutes > 60:
timer = f"{minutes // 60} h {minutes % 60} min"
else:
timer = f"{minutes} min"
print((
f" * Remaining jobs after {timer}:\n" +
textwrap.indent(textwrap.fill(remaining), " * ")
), flush = True)
# Wait for increasing numbers of seconds until checking for results
# again - at most 1 minute
time.sleep(wait)
waited += wait
wait = min(wait * 1.5, 60)
return responses, stdout_rec, stderr_rec, crashed
def _print_status_eval(self, stdout_rec, stderr_rec, crashed):
# Print logged stdout and stderr messages and crashed simulations
# after evaluating an epoch.
line = "-" * 80
if len(stderr_rec):
stderr_rec_str = textwrap.fill(" ".join(
str(s) for s in stderr_rec
))
print(
line + "\n" +
"New stderr messages were recorded while running jobs:\n" +
textwrap.indent(stderr_rec_str, " ") + "\n" +
f"The error messages were logged in:\n {self.paths.outputs}",
flush = True,
)
if len(stdout_rec):
stdout_rec_str = textwrap.fill(" ".join(
str(s) for s in stdout_rec
))
print(
line + "\n" +
"New stdout messages were recorded while running jobs:\n" +
textwrap.indent(stdout_rec_str, " ") + "\n" +
f"The output messages were logged in:\n {self.paths.outputs}",
flush = True,
)
if len(crashed):
crashed_str = textwrap.fill(" ".join(
str(c) for c in crashed
))
print(
line + "\n" +
"No results were found for these jobs:\n" +
textwrap.indent(crashed_str, " ") + "\n" +
"They crashed or terminated early; for details, check the "
f"output logs in:\n {self.paths.outputs}\n"
"The error values for these simulations were set to NaN.",
flush = True,
)
if len(stderr_rec) or len(stdout_rec) or len(crashed):
print(line + "\n")
def _train(self):
# Train fvgp.GP Gaussian Processes surrogates for each response
if self.responses.ndim == 1:
nresp = 1
responses = self.responses[:, np.newaxis]
else:
nresp = self.responses.shape[1]
responses = self.responses
# First time training, create new GP (single response) / GPs (multi)
if self.gp is None:
self.gp = [None] * nresp
for i in range(nresp):
self.gp[i] = GP(
len(self.parameters),
points = self.evaluated,
values = responses[:, i],
init_hyperparameters = np.ones(1 + len(self.parameters)),
variances = np.abs(0.01 * responses[:, i]),
use_inv = True,
)
# New training data, update GPs
else:
for i in range(len(self.gp)):
self.gp[i].update_gp_data(
self.evaluated,
responses[:, i],
np.abs(0.01 * responses[:, i]),
)
# Train GPs' hyperparameters using our internal CMA-ES method
with open(os.devnull, "w") as f, contextlib.redirect_stdout(f):
for gp in self.gp:
gp.train(None, method = self._train_gp_method)
def _train_gp_method(self, gp):
nparams = 1 + len(self.parameters)
x0 = np.ones(nparams)
bounds = [[0.] * nparams, [None] * nparams]
es = cma.CMAEvolutionStrategy(x0, 1, cma.CMAOptions(
bounds = bounds,
seed = self.seed,
verbose = -9,
))
# es.logger = cma.CMADataLogger(os.path.join(folder, "cache", ""))
es.optimize(gp.log_likelihood)
return es.result.xbest
def _check_response_names(self):
nresp = 1 if self.responses.ndim == 1 else self.responses.shape[1]
if self.response_names is None:
# Generate response names for each response found
if nresp == 1:
self.response_names = ["response"]
else:
self.response_names = [f"response{i}" for i in range(nresp)]
else:
# Check the correct number of response names is saved and append /
# remove the necessary number of responses
nnames = len(self.response_names)
if nresp > nnames:
msg = "\n" + textwrap.fill((
f"The number of responses found (={nresp}) is larger "
f"than the number of response names given (={nnames}). "
f"Appending {nresp - nnames} additional response names."
))
warnings.warn(msg, RuntimeWarning, stacklevel = 3)
self.response_names += [
f"response{i}"
for i in range(nnames, nresp)
]
elif nresp < nnames:
msg = "\n" + textwrap.fill((
f"The number of responses found (={nresp}) is smaller "
f"than the number of response names given (={nnames}). "
f"Removing last {nnames - nresp} response names."
))
warnings.warn(msg, RuntimeWarning, stacklevel = 3)
del self.response_names[nresp:]
[docs] def plot_gp(
self,
response = 0,
resolution = (32, 32),
verbose = True,
):
'''Plot interactive 2D slices of the `response` and uncertainty.
'''
# Compute index of response to plot
try:
rid = int(response)
except ValueError:
rid = self.response_names.index(response)
# Type-check responses
resolution = tuple([int(r) for r in resolution])
if len(resolution) != 2:
raise ValueError(textwrap.fill((
"The input `resolution` must be a tuple with two integers, "
"e.g. `resolution = (32, 32)`."
)))
# Save results
if self.paths is None:
self.save()
else:
self.save(self.paths.directory)
medpath = self.paths.directory
# Path to Julia plotting code
script = os.path.join(
os.path.split(medeq.__file__)[0],
"plot_gp.jl",
)
# Run subprocess OS command using the Julia executable
julia = medeq.base.get_julia_executable()
subprocess.run([
julia,
script,
medpath,
str(rid + 1),
str(resolution[0]),
str(resolution[1]),
])
[docs] def plot_response(
self,
f = None,
colors = px.colors.qualitative.Set1[1:],
):
# Plot samples and uncertainty (columns) for each response (rows)
nrows = len(self.gp)
ncols = 2
subplot_titles = ["MED Sampling", "GP Uncertainty"]
if f is not None:
ncols = 3
subplot_titles += ["Real Response"]
nx = 100
ny = 100
mins = self.parameters["min"]
maxs = self.parameters["max"]
x = np.linspace(mins[0], maxs[0], nx)
y = np.linspace(mins[1], maxs[1], ny)
xx, yy = np.meshgrid(x, y)
# Create Figure
fig = make_subplots(
rows = nrows,
cols = ncols,
subplot_titles = subplot_titles,
)
for igp, gp in enumerate(self.gp):
prediction = gp.posterior_mean(
np.c_[xx.flatten(), yy.flatten()]
)["f(x)"].reshape(nx, ny)
uncertainty = gp.posterior_covariance(
np.c_[xx.flatten(), yy.flatten()],
variance_only = True,
)["v(x)"].reshape(nx, ny)
if f is not None:
real = f(
np.vstack((xx.flatten(), yy.flatten()))
).reshape(nx, ny)
fig.add_trace(
go.Heatmap(
x = x,
y = y,
z = prediction,
colorscale = "magma",
showscale = False,
),
row = 1 + igp,
col = 1,
)
fig.add_trace(
go.Heatmap(
x = x,
y = y,
z = uncertainty,
colorscale = "magma",
colorbar_title = "Variance",
),
row = 1 + igp,
col = 2,
)
if f is not None:
fig.add_trace(
go.Heatmap(
x = x,
y = y,
z = real,
colorscale = "magma",
showscale = False,
),
row = 1 + igp,
col = 3,
)
for i in range(2 if f is None else 3):
separate = np.full(len(self.evaluated), True)
for j, ep in enumerate(self.epochs):
separate[ep[0]:ep[1]] = False
ic = j
while ic >= len(colors):
ic -= len(colors)
fig.add_trace(
go.Scatter(
x = self.evaluated[ep[0]:ep[1], 0],
y = self.evaluated[ep[0]:ep[1], 1],
mode = "markers",
marker_color = colors[ic],
marker_symbol = "x",
hovertext = f"Epoch {j}",
showlegend = False,
),
row = igp + 1,
col = i + 1,
)
separate = self.evaluated[separate]
if len(separate):
fig.add_trace(
go.Scatter(
x = separate[:, 0],
y = separate[:, 1],
mode = "markers",
marker_color = "white",
marker_symbol = "x",
hovertext = "Manual",
showlegend = False,
),
row = igp + 1,
col = i + 1,
)
format_fig(fig)
return fig
[docs] def plot_samples(
self,
colors = px.colors.qualitative.Set1[1:],
marker_size = 10,
**kwargs,
):
# Check we have samples generated
if len(self.samples) == 0:
raise ValueError(textwrap.fill((
"No samples were generated (`.sample(N)`) or introduced "
"(`.augment(...)`). Please add samples before plotting."
)))
# Extract samples and number of dimensions
samples = self.samples
ndim = samples.shape[1]
# Create only 1D, 2D, 3D plots
if ndim > 3:
raise ValueError(textwrap.fill((
f"The samples introduced have `samples.shape[1] = {ndim}` "
"dimensions, which cannot be visualised using static Plotly "
"plots. Use the `plot_gp()` method for an interactive Julia "
"plot with interactive slicing."
)))
fig = go.Figure()
# Plot individual epochs + manually-added samples
if ndim <= 2:
separate = np.full(len(samples), True)
for i, ep in enumerate(self.epochs):
separate[ep[0]:ep[1]] = False
ic = i
while ic >= len(colors):
ic -= len(colors)
x = samples[ep[0]:ep[1], 0]
if ndim == 1:
y = np.zeros(ep[1] - ep[0])
else:
y = samples[ep[0]:ep[1], 1]
fig.add_trace(
go.Scatter(
x = x,
y = y,
mode = "markers",
marker_color = colors[ic],
marker_symbol = "x",
marker_size = marker_size,
hovertext = f"Epoch {i}",
showlegend = False,
**kwargs,
),
)
if len(separate):
x = samples[separate, 0]
if ndim == 1:
y = np.zeros(separate.sum())
else:
y = samples[separate, 1]
fig.add_trace(
go.Scatter(
x = x,
y = y,
mode = "markers",
marker_color = "black",
marker_symbol = "x",
marker_size = marker_size,
hovertext = "Manual",
showlegend = False,
**kwargs,
),
)
if ndim == 1:
fig.update_layout(
xaxis_title = self.parameters.index[0],
yaxis_title = "",
yaxis_showticklabels = False,
)
else:
fig.update_layout(
xaxis_title = self.parameters.index[0],
yaxis_title = self.parameters.index[1],
)
if ndim == 3:
separate = np.full(len(samples), True)
for i, ep in enumerate(self.epochs):
separate[ep[0]:ep[1]] = False
ic = i
while ic >= len(colors):
ic -= len(colors)
fig.add_trace(
go.Scatter3d(
x = samples[ep[0]:ep[1], 0],
y = samples[ep[0]:ep[1], 1],
z = samples[ep[0]:ep[1], 2],
mode = "markers",
marker_color = colors[ic],
marker_symbol = "x",
marker_size = marker_size,
hovertext = f"Epoch {i}",
showlegend = False,
**kwargs,
),
)
if len(separate):
fig.add_trace(
go.Scatter3d(
x = samples[separate, 0],
y = samples[separate, 1],
z = samples[separate, 2],
mode = "markers",
marker_color = "black",
marker_symbol = "x",
marker_size = marker_size,
hovertext = "Manual",
showlegend = False,
**kwargs,
),
)
fig.update_scenes(
xaxis_title = self.parameters.index[0],
yaxis_title = self.parameters.index[1],
zaxis_title = self.parameters.index[2],
)
format_fig(fig)
return fig
def __repr__(self):
parameters = textwrap.indent(str(self.parameters), ' ')
if self.response_names is None:
response_names = " None"
else:
response_names = textwrap.indent(
textwrap.fill(", ".join(self.response_names)),
' ',
)
samples = str_summary(self.samples)
responses = str_summary(self.responses)
epochs = f"list[{len(self.epochs)}, tuple[int, int]]"
if self.scheduler is not None:
scheduler = f"scheduler = {self.scheduler.__class__.__name__}\n"
else:
scheduler = ""
docstr = (
f"MED(seed={self.seed})\n"
"--\n"
f"parameters = \n{parameters}\n"
f"response_names = \n{response_names}\n"
"--\n"
f"sampler = {self.sampler}\n"
f"{scheduler}"
f"samples = {samples}\n"
f"responses = {responses}\n"
f"epochs = {epochs}\n"
).split("\n")
maxline = max(len(d) for d in docstr)
for i in range(len(docstr)):
if len(docstr[i]) > 1 and docstr[i][1] == "-":
docstr[i] = "-" * maxline
return "\n".join(docstr)