Source code for pysip.regressors

from __future__ import annotations

import warnings
from concurrent.futures import ProcessPoolExecutor
from copy import deepcopy
from dataclasses import dataclass
from functools import partial
from multiprocessing import cpu_count
from typing import Literal, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
import xarray as xr
from scipy.optimize import approx_fprime, minimize

from .params.parameters import Parameters
from .statespace.base import StateSpace
from .statespace_estimator import KalmanQR
from .utils.statistics import ttest


def _make_estimate(theta, data, estimator):
    estimator = deepcopy(estimator)
    estimator.ss.parameters.theta_free = theta
    res = estimator.estimate_output(*data)
    return res.y[None, ..., 0]


class _FdiffLoglikeGrad(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dvector]

    def __init__(self, reg: Regressor, df, eps=None):
        self.estimator = reg.estimator
        self.data = reg.prepare_data(df)
        self.eps = eps

    def perform(self, _, inputs, outputs):
        def _target(eta) -> float:
            estimator = deepcopy(self.estimator)
            estimator.ss.parameters.theta_free = eta
            return estimator.log_likelihood(*self.data)

        (eta,) = inputs  # this will contain my variables
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            outputs[0][0] = approx_fprime(eta, _target, self.eps)


class _Loglike(pt.Op):
    itypes = [pt.dvector]
    otypes = [pt.dscalar]

    def __init__(self, reg: Regressor, df, eps=None):
        self.estimator = reg.estimator
        self.data = reg.prepare_data(df)
        self.eps = eps
        self.logpgrad = _FdiffLoglikeGrad(reg, df, eps)

    def perform(self, _, inputs, outputs):
        estimator = deepcopy(self.estimator)
        (eta,) = inputs
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            estimator.ss.parameters.theta_free = eta
            logl = estimator.log_likelihood(*self.data)
        outputs[0][0] = np.array(logl)

    def grad(self, inputs, g):
        (eta,) = inputs
        return [g[0] * self.logpgrad(eta)]


[docs]@dataclass class Regressor: """A regressor use a statespace model and a bayesian filter to predict the states of the system given the exogeneous (or boundary) inputs and the initial conditions. This base class does not have the ability to estimate the parameters of the statespace model : see the derived classes for that. Parameters ---------- ss : StateSpace() State-space model inputs : str or list of str, optional Input column names, by default None. If None, they are inferred from the state-space model. outputs : str or list of str, optional Output column names, by default None. If None, they are inferred from the state-space model. time_scale : str Time series frequency, e.g. 's': seconds, 'D': days, etc. """ ss: StateSpace inputs: Optional[Union[str, Sequence[str]]] = None outputs: Optional[Union[str, Sequence[str]]] = None time_scale: str = "s" def __post_init__(self): self.estimator = KalmanQR(self.ss) if self.inputs is None: self.inputs = [node.name for node in self.ss.inputs] if self.outputs is None: self.outputs = [node.name for node in self.ss.outputs] self.states = [node.name for node in self.ss.states] if isinstance(self.inputs, str): self.inputs = [self.inputs] if isinstance(self.outputs, str): self.outputs = [self.outputs] @property def parameters(self) -> Parameters: return self.ss.parameters
[docs] def prepare_data(self, df, with_outputs=True) -> Tuple[pd.DataFrame, ...]: """Prepare data for training Parameters ---------- df : pd.DataFrame Dataframe containing the data with_outputs : bool, optional Whether to return the outputs, by default True Returns ------- DataFrame: time steps DataFrame: input data DataFrame: derivative of input data DataFrame: output data (filled with NaNs if with_outputs=False) """ return self.ss.prepare_data( df, self.inputs, self.outputs if with_outputs else False, self.time_scale )
[docs] def simulate(self, df: pd.DataFrame) -> xr.Dataset: """Stochastic simulation of the state-space model Parameters ---------- df : pandas.DataFrame Dataframe containing the inputs and outputs inputs : str or list of str, optional Input column names, by default None. If None, the regressor's `inputs` attribute is used. time_scale : str, optional Time series frequency, e.g. 's': seconds, 'D': days, etc., by default None. If None, the regressor's `time_scale` attribute is used. Returns ------- xarray.Dataset Dataset containing the simulated outputs and states """ dt, u, dtu, _ = self.prepare_data(df, with_outputs=False) return self.estimator.simulate(dt, u, dtu).to_xarray( df.index.name or "time", df.index, self.states, self.outputs )
[docs] def estimate_output( self, df: pd.DataFrame, x0: np.ndarray = None, P0: np.ndarray = None, use_outputs: bool = True, ) -> xr.Dataset: """Estimate the output filtered distribution Parameters ---------- df : pandas.DataFrame Training data x0 : numpy.ndarray, optional Initial state. If not provided, the initial state is taken from the state-space model defaults. P0 : numpy.ndarray, optional Initial state covariance. If not provided, the initial state covariance is taken from the state-space model defaults. use_outputs : bool, optional Whether to use the data outputs to do the estimation, by default True Returns ------- xr.Dataset Dataset containing the estimated outputs and their covariance """ dt, u, dtu, y = self.prepare_data(df, with_outputs=use_outputs) return self.estimator.estimate_output(dt, u, dtu, y, x0, P0).to_xarray( df.index.name or "time", df.index, self.states, self.outputs )
[docs] def estimate_states( self, df: pd.DataFrame, x0: np.ndarray = None, P0: np.ndarray = None, smooth: bool = False, use_outputs: bool = True, ) -> xr.Dataset: """Estimate the state filtered/smoothed distribution Parameters ---------- dt : float Sampling time u : array_like, shape (n_u, n_steps) Input data u1 : array_like, shape (n_u, n_steps) Forward finite difference of the input data y : array_like, shape (n_y, n_steps) Output data x0 : array_like, shape (n_x, ) Initial state mean different from `ss.x0` P0 : array_like, shape (n_x, n_x) Initial state deviation different from `ss.P0` smooth : bool, optional Use RTS smoother Returns ------- xr.Dataset Dataset containing the estimated states and their covariance """ dt, u, dtu, y = self.prepare_data(df, with_outputs=use_outputs) return ( self.estimator.smoothing(dt, u, dtu, y, x0, P0) if smooth else self.estimator.filtering(dt, u, dtu, y, x0, P0) ).to_xarray(df.index.name or "time", df.index, self.states, self.outputs)
def _target( self, eta: np.ndarray, dt: np.ndarray, u: np.ndarray, dtu: np.ndarray, y: np.ndarray, ) -> float: """Evaluate the negative log-posterior Parameters ---------- eta : array_like, shape (n_eta, ) Unconstrained parameters dt : float Sampling time u : array_like, shape (n_u, n_steps) Input data dtu : array_like, shape (n_u, n_steps) Forward finite difference of the input data y : array_like, shape (n_y, n_steps) Output data Returns ------- log_posterior : float The negative log-posterior """ estimator = deepcopy(self.estimator) estimator.ss.parameters.eta_free = eta log_likelihood = estimator.log_likelihood(dt, u, dtu, y) log_posterior = ( log_likelihood - estimator.ss.parameters.prior + estimator.ss.parameters.penalty ) return log_posterior
[docs] def fit( self, df: pd.DataFrame, options: dict = None, *, init: Literal["unconstrained", "prior", "zero", "fixed", "value"] = "fixed", hpd: float = 0.95, jac="3-point", method="BFGS", **minimize_options, ) -> Union[pd.DataFrame, pd.DataFrame, dict]: """Estimate the parameters of the state-space model. Parameters ---------- df : pandas.DataFrame Training data outputs : str or list of str, optional Output name(s) inputs : str or list of str, optional Input name(s) options : dict, optional, deprecated Options for the minimization method. You should use named arguments instead. Usage of this argument will raise a warning and will be removed in the future. init : str, optional Method to initialize the parameters. Options are: - 'unconstrained': initialize the parameters in the unconstrained space - 'prior': initialize the parameters using the prior distribution - 'zero': initialize the parameters to zero - 'fixed': initialize the parameters to the fixed values - 'value': initialize the parameters to the given values hpd : float, optional Highest posterior density interval. Used only when `init='prior'`. minimize_options : dict, optional Options for the minimization method. See `scipy.optimize.minimize` for details. Compared to the original `scipy.optimize.minimize` function, the following options are set by default: - `method='BFGS'` - `jac='3-point'` - `disp=True` - `gtol=1e-4` Returns ------- pandas.DataFrame Dataframe with the estimated parameters, their standard deviation, the p-value of the t-test and penalty values. pandas.DataFrame Dataframe with the correlation matrix of the estimated parameters. dict Results object from the minimization method. See `scipy.optimize.minimize` for details. """ if options is not None: warnings.warn( "Use of the options argument is deprecated and will raise an error in " "the future. Prefer using named arguments (kwargs) instead.", DeprecationWarning, ) minimize_options.update(options) minimize_options = {"disp": True, "gtol": 1e-4} | minimize_options self.parameters.eta_free = self.parameters.init_parameters(1, init, hpd) data = self.prepare_data(df) results = minimize( fun=self._target, x0=self.parameters.eta_free, args=data, method=method, jac=jac, options=minimize_options, ) self.parameters.eta_free = results.x # inverse jacobian of the transform eta = f(theta) inv_jac = np.diag(1.0 / np.array(self.parameters.eta_jacobian)) # covariance matrix in the constrained space (e.g. theta) cov_theta = inv_jac @ results.hess_inv @ inv_jac # standard deviation of the constrained parameters sig_theta = np.sqrt(np.diag(cov_theta)) * self.parameters.scale inv_sig_theta = np.diag(1.0 / np.sqrt(np.diag(cov_theta))) # correlation matrix of the constrained parameters corr_matrix = inv_sig_theta @ cov_theta @ inv_sig_theta df = pd.DataFrame( data=np.vstack( [ self.parameters.theta_free, sig_theta, ttest(self.parameters.theta_free, sig_theta, len(data[0])), np.abs(results.jac), np.abs(self.parameters.d_penalty), ] ).T, columns=["θ", "σ(θ)", "pvalue", "|g(η)|", "|dpen(θ)|"], index=self.parameters.names_free, ) df_corr = pd.DataFrame( data=corr_matrix, index=self.parameters.names_free, columns=self.parameters.names_free, ) self.summary_ = df self.corr_ = df_corr self.results_ = results return df, df_corr, results
[docs] def eval_residuals( self, df: pd.DataFrame, x0: np.ndarray = None, P0: np.ndarray = None, ) -> xr.Dataset: """Compute the standardized residuals Parameters ---------- df : pandas.DataFrame Training data outputs : str or list of str, optional Output name(s) inputs : str or list of str, optional Input name(s) x0 : numpy.ndarray, optional Initial state. If not provided, the initial state is taken from the state-space model defaults. P0 : numpy.ndarray, optional Initial state covariance. If not provided, the initial state covariance is taken from the state-space model defaults. Returns ------- xr.Dataset Dataset containing the residuals and their standard deviation """ dt, u, dtu, y = self.prepare_data(df) res = self.estimator.filtering(dt, u, dtu, y, x0, P0).to_xarray( df.index.name or "time", df.index, self.states, self.outputs ) return res[["k", "S"]].rename({"k": "residual", "S": "residual_std"})
[docs] def log_likelihood(self, df: pd.DataFrame) -> Union[float, np.ndarray]: """ Evaluate the log-likelihood of the model. Parameters ---------- df : pandas.DataFrame Data. outputs : str or list of str, optional Outputs name(s). If None, all outputs are used. inputs : str or list of str, optional Inputs name(s). If None, all inputs are used. pointwise : bool, optional Evaluate the log-likelihood pointwise. Returns ------- float or numpy.ndarray Negative log-likelihood or predictive density evaluated point-wise. """ dt, u, dtu, y = self.prepare_data(df) return self.estimator.log_likelihood(dt, u, dtu, y)
[docs] def log_posterior( self, df: pd.DataFrame, ) -> Union[float, np.ndarray]: """Evaluate the negative log-posterior, defined as .. math:: -\\log p(\\theta | y) = -\\log p(y | \\theta) - \\log p(\\theta) + \\log with :math:`y` the data, :math:`\\theta` the parameters, :math:`p(y | \\theta)` the likelihood, :math:`p(\\theta)` the prior and :math:`\\log p(\\theta | y)` the log-posterior. Parameters ---------- df : pandas.DataFrame Training data Returns ------- log_posterior : float The negative log-posterior """ return ( self.log_likelihood(df) - self.ss.parameters.prior + self.ss.parameters.penalty )
[docs] def predict( self, df: pd.DataFrame, tnew: Union[np.ndarray, pd.Series] = None, x0: np.ndarray = None, P0: np.ndarray = None, smooth: bool = False, use_outputs: bool = True, ) -> xr.Dataset: """State-space model output prediction Parameters ---------- df : pandas.DataFrame Training data tnew : numpy.ndarray or pandas.Series, optional New time instants x0 : numpy.ndarray, optional Initial state. If not provided, the initial state is taken from the state-space model. P0 : numpy.ndarray, optional Initial state covariance. If not provided, the initial state covariance is taken from the state-space model. smooth : bool, optional If True, the Kalman smoother is used instead of the Kalman filter use_outputs : bool, optional If True, the outputs are used to do the estimation. Default is False. Returns ------- xr.Dataset Dataset containing the predicted outputs and their covariance """ if self.ss.ny > 1: raise NotImplementedError if tnew is not None: itp_df = pd.concat( [df, df.reindex(tnew).drop(df.index, errors="ignore")] ).sort_index() itp_df[self.inputs] = itp_df[self.inputs].interpolate(method="linear") else: itp_df = df.copy() ds = self.estimate_states( itp_df, smooth=smooth, x0=x0, P0=P0, use_outputs=use_outputs ) idx_name = df.index.name or "time" if tnew is not None: ds = ds.sel(**{idx_name: tnew}) ds["y_mean"] = ( (idx_name, "outputs"), (self.ss.C @ ds.x.values.reshape(-1, self.ss.nx, 1))[..., 0], ) ds["y_std"] = ( (idx_name, "outputs"), np.sqrt( np.einsum( "...yx,...xx,...xy->...y", self.ss.C, ds.P.values, self.ss.C.T, ) ) + self.ss.R, ) return ds
@property def pymc_model(reg): class PyMCModel(pm.Model): def __init__(self, df): super().__init__() theta = [] for name, par in zip( reg.parameters.names_free, reg.parameters.parameters_free ): # theta.append(pm.Normal(name, par.eta, 1)) theta.append(par.prior.pymc_dist(name)) theta = pt.as_tensor_variable(theta) pm.Potential("likelihood", -_Loglike(reg, df)(theta)) return PyMCModel
[docs] def sample(self, df, draws=1000, tune=500, chains=4, **kwargs): """Sample from the posterior distribution Parameters ---------- df : pandas.DataFrame Dataframe containing the inputs and outputs draws : int, optional Number of samples, by default 1000 tune : int, optional Number of tuning samples, by default 500 chains : int, optional Number of chains, by default 4 cores : int, optional Number of cores, by default cpu_count() Returns ------- arviz.InferenceData Inference data containing the posterior samples Notes ----- The number of cores is set to the minimum between the number of cores and the number of chains. The sample method directly use the `pymc3.sample` method. See the PyMC3 documentation for more details. """ cores = min(kwargs.pop("cores", cpu_count()), chains) with self.pymc_model(df): self._trace = pm.sample( draws=draws, tune=tune, chains=chains, cores=cores, **kwargs ) return self.trace
[docs] def prior_predictive(self, df, samples=1000, **kwargs): """Sample from the prior predictive distribution Parameters ---------- df : pandas.DataFrame Dataframe containing the inputs and outputs samples : int, optional Number of samples, by default 1000 Returns ------- xarray.Dataset Dataset containing the simulated outputs """ with self.pymc_model(df), warnings.catch_warnings(): warnings.simplefilter("ignore", UserWarning) self._prior_trace = pm.sample_prior_predictive(samples=samples, **kwargs) parameters = self._prior_trace.prior.to_dataframe().to_numpy() data = self.prepare_data(df) with ProcessPoolExecutor() as executor: results = np.vstack( list( executor.map( partial(_make_estimate, data=data, estimator=self.estimator), parameters, ) ) ) idx_name = df.index.name or "time" return xr.DataArray( results, dims=("draw", idx_name, "outputs"), coords={ "draw": np.arange(samples), "outputs": self.outputs, idx_name: df.index, }, ).to_dataset("outputs")
[docs] def posterior_predictive(self, df, **kwargs): """Sample from the posterior predictive distribution Parameters ---------- df : pandas.DataFrame Dataframe containing the inputs and outputs Returns ------- xarray.Dataset Raises ------ RuntimeError If the model has not been sampled yet """ try: trace = self.trace except AttributeError: raise RuntimeError("No trace available : run `sample` first") parameters = trace.posterior.to_dataframe().to_numpy() data = self.prepare_data(df) data = self.prepare_data(df) with ProcessPoolExecutor() as executor: results = np.vstack( list( executor.map( partial(_make_estimate, data=data, estimator=self.estimator), parameters, ) ) ) results = results.reshape( trace.posterior.chain.shape[0], trace.posterior.draw.shape[0], *results.shape[1:], ) idx_name = df.index.name or "time" return xr.DataArray( results, dims=("chain", "draw", idx_name, "outputs"), coords={ "chain": trace.posterior.chain, "draw": trace.posterior.draw, "outputs": self.outputs, idx_name: df.index, }, ).to_dataset("outputs")
@property def trace(self): main_trace = getattr(self, "_trace", None) prior_trace = getattr(self, "_prior_trace", None) if main_trace is not None: if prior_trace is not None: main_trace.extend(prior_trace) return main_trace if prior_trace is not None: return prior_trace raise AttributeError("No trace available")