Source code for pysip.statespace_estimator

from copy import deepcopy
import math
from typing import NamedTuple, Sequence
import warnings
from dataclasses import dataclass
import numpy as np
import pandas as pd
from .statespace.base import StateSpace

from numba.core.errors import NumbaPerformanceWarning
import xarray as xr


#### Models, compatible with the Numba implementation of the Kalman filter ####


class States(NamedTuple):
    A: np.ndarray
    B0: np.ndarray
    B1: np.ndarray
    C: np.ndarray
    D: np.ndarray
    Q: np.ndarray
    R: np.ndarray


class KalmanResult(NamedTuple):
    x: np.ndarray
    P: np.ndarray
    k: np.ndarray
    S: np.ndarray

    def to_xarray(self, idx_name, time, states, outputs):
        coords = {
            idx_name: time,
            "states": states,
            "outputs": outputs,
        }
        ds = xr.Dataset(
            {
                "x": ((idx_name, "states"), self.x[:, :, 0]),
                "P": ((idx_name, "states", "states"), self.P),
                "k": ((idx_name, "outputs"), self.k[:, :, 0]),
                "S": ((idx_name, "outputs", "outputs"), self.S),
            },
            coords=coords,
        )
        return ds


def _allocate_kalman_res(n_timesteps, nx, ny):
    return KalmanResult(
        np.empty((n_timesteps, nx, 1)),
        np.empty((n_timesteps, nx, nx)),
        np.empty((n_timesteps, ny, 1)),
        np.empty((n_timesteps, ny, ny)),
    )


def _pack_kalman_res(res, i, value):
    (
        res.x[i],
        res.P[i],
        res.k[i],
        res.S[i],
    ) = value


class SimulationResults(NamedTuple):
    y: np.ndarray
    x: np.ndarray

    def to_xarray(self, idx_name, time, states, outputs):
        coords = {
            idx_name: time,
            "states": states,
            "outputs": outputs,
        }
        ds = xr.Dataset(
            {
                "x": ((idx_name, "states"), self.x[:, :, 0]),
                "y": ((idx_name, "outputs"), self.y[:, :, 0]),
            },
            coords=coords,
        )
        return ds


def _allocate_simulation_res(n_timesteps, nx, ny):
    return SimulationResults(
        np.empty((n_timesteps, ny, 1)),
        np.empty((n_timesteps, nx, 1)),
    )


def _pack_simulate(res, i, value):
    res.y[i], res.x[i] = value


class OutputEstimateResult(NamedTuple):
    y: np.ndarray
    y_std: np.ndarray

    def to_xarray(self, idx_name, time, states, outputs):
        coords = {
            idx_name: time,
            "states": states,
            "outputs": outputs,
        }
        ds = xr.Dataset(
            {
                "y": ((idx_name, "outputs"), self.y[:, :, 0]),
                "y_std": ((idx_name, "outputs"), self.y_std[:, :, 0]),
            },
            coords=coords,
        )
        return ds


def _pack_output_res(res, i, value):
    res.y[i], res.y_std[i] = value


def _allocate_output_res(n_timestep, ny):
    return OutputEstimateResult(
        np.empty((n_timestep, ny, 1)),
        np.empty((n_timestep, ny, 1)),
    )


#### Pure python / numpy implementation of the Kalman filter ####
# they are not efficient par their own, and are therefore jitted by numba


def _solve_triu_inplace(A, b):
    for i in range(b.shape[0]):
        b[i] = (b[i] - A[i, i + 1 :] @ b[i + 1 :]) / A[i, i]
    return b


def _update(
    C, D, R, x, P, u, y, _Arru
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    ny, _ = C.shape
    _Arru[:ny, :ny] = R
    _Arru[ny:, :ny] = P @ C.T
    _Arru[ny:, ny:] = P
    _, r_fact = np.linalg.qr(_Arru)
    S = r_fact[:ny, :ny]
    if ny == 1:
        k = (y - C @ x - D @ u) / S[0, 0]
        x = x + r_fact[:1, 1:].T * k
    else:
        k = _solve_triu_inplace(S, y - C @ x - D @ u)
        x = x + r_fact[:ny, ny:].T @ k
    P = r_fact[ny:, ny:]
    return x, P, k, S


def _predict(A, B0, B1, Q, x, P, u, dtu) -> tuple[np.ndarray, np.ndarray]:
    _, r = np.linalg.qr(np.vstack((P @ A.T, Q)))
    x = A @ x + B0 @ u + B1 @ dtu
    return x, r


def _kalman_step(
    x, P, u, dtu, y, states, _Arru
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    if ~np.isnan(y).any():
        x_up, P_up, k, S = _update(states.C, states.D, states.R, x, P, u, y, _Arru)
    else:
        x_up, P_up = x, P
        k = np.full((states.C.shape[0], 1), np.nan)
        S = np.full((states.C.shape[0], states.C.shape[0]), np.nan)
    x_pred, P_pred = _predict(
        states.A, states.B0, states.B1, states.Q, x_up, P_up, u, dtu
    )
    return x_up, P_up, k, S, x_pred, P_pred


def _unpack_states(states, i) -> States:
    return States(
        states.A[:, :, i],
        states.B0[:, :, i],
        states.B1[:, :, i],
        states.C,
        states.D,
        states.Q[:, :, i],
        states.R,
    )


def _log_likelihood(x0, P0, u, dtu, y, states) -> float:
    x = x0
    P = P0
    n_timesteps = y.shape[0]
    ny, nx = states.C.shape
    dtype = states.A.dtype
    _Arru = np.zeros((nx + ny, nx + ny), dtype=dtype)
    log_likelihood = 0.5 * n_timesteps * math.log(2.0 * math.pi)
    for i in range(n_timesteps):
        y_i = np.ascontiguousarray(y)[i].reshape(-1, 1)
        u_i = np.ascontiguousarray(u)[i].reshape(-1, 1)
        dtu_i = np.ascontiguousarray(dtu)[i].reshape(-1, 1)
        states_i = _unpack_states(states, i)
        if ~np.isnan(y_i).any():
            x, P, k, S = _update(
                states_i.C, states_i.D, states_i.R, x, P, u_i, y_i, _Arru
            )
            if ny == 1:
                Si = S[0, 0]
                log_likelihood += (
                    math.log(abs(Si.real) + abs(Si.imag)) + 0.5 * k[0, 0] ** 2
                )
            else:
                log_likelihood += np.linalg.slogdet(S)[1] + 0.5 * (k.T @ k)[0, 0]
        x, P = _predict(
            states_i.A, states_i.B0, states_i.B1, states_i.Q, x, P, u_i, dtu_i
        )
    return log_likelihood


def _filtering(x0, P0, u, dtu, y, states) -> KalmanResult:
    x = x0
    P = P0
    n_timesteps = y.shape[0]
    ny, nx = states.C.shape
    dtype = states.A.dtype
    _Arru = np.zeros((nx + ny, nx + ny), dtype=dtype)
    res = _allocate_kalman_res(n_timesteps, nx, ny)
    for i in range(n_timesteps):
        y_i = np.ascontiguousarray(y)[i].reshape(-1, 1)
        u_i = np.ascontiguousarray(u)[i].reshape(-1, 1)
        dtu_i = np.ascontiguousarray(dtu)[i].reshape(-1, 1)
        states_i = _unpack_states(states, i)
        x_up, P_up, k, S, x, P = _kalman_step(x, P, u_i, dtu_i, y_i, states_i, _Arru)
        _pack_kalman_res(res, i, (x_up, P_up.T @ P_up, k, S))
    return res


def _smoothing(x0, P0, u, dtu, y, states) -> tuple[np.ndarray, np.ndarray]:
    x = x0
    P = P0
    n_timesteps = y.shape[0]
    ny, nx = states.C.shape
    dtype = states.A.dtype
    _Arru = np.zeros((nx + ny, nx + ny), dtype=dtype)
    xp = np.empty((n_timesteps, nx, 1), dtype=dtype)
    Pp = np.empty((n_timesteps, nx, nx), dtype=dtype)
    res = _allocate_kalman_res(n_timesteps, nx, ny)
    for i in range(n_timesteps):
        y_i = np.ascontiguousarray(y)[i].reshape(-1, 1)
        u_i = np.ascontiguousarray(u)[i].reshape(-1, 1)
        dtu_i = np.ascontiguousarray(dtu)[i].reshape(-1, 1)
        states_i = _unpack_states(states, i)
        xp[i] = x
        Pp[i] = P.T @ P
        x_up, P_up, k, S, x, P = _kalman_step(x, P, u_i, dtu_i, y_i, states_i, _Arru)
        _pack_kalman_res(res, i, (x_up, P_up.T @ P_up, k, S))

    for i in range(n_timesteps - 2, -1, -1):
        G = np.linalg.solve(Pp[i + 1], states.A[:, :, i] @ res.P[i]).T
        res.x[i, :, :] += G @ (res.x[i + 1, :, :] - xp[i + 1, :, :])
        res.P[i, :, :] += G @ (res.P[i + 1, :, :] - Pp[i + 1, :, :]) @ G.T
    return res


def _simulate(x0, u, dtu, states) -> SimulationResults:
    x = x0
    n_timesteps = u.shape[0]
    ny, nx = states.C.shape
    res = _allocate_simulation_res(n_timesteps, nx, ny)
    for i in range(n_timesteps):
        u_i = np.ascontiguousarray(u)[i].reshape(-1, 1)
        dtu_i = np.ascontiguousarray(dtu)[i].reshape(-1, 1)
        states_i = _unpack_states(states, i)
        y = states_i.C @ x - states_i.D @ u_i
        x = (
            states_i.A @ x
            + states_i.B0 @ u_i
            + states_i.B1 @ dtu_i
            + states_i.Q @ np.random.randn(nx, 1)
        )
        _pack_simulate(res, i, (y, x))
    # add noise to the output
    res.y += states.R @ np.random.randn(n_timesteps, ny, 1)
    return res


def _estimate_output(x0, P0, u, dtu, y, states) -> OutputEstimateResult:
    n_timesteps = y.shape[0]
    ny = states.C.shape[0]

    res_filter = _filtering(x0, P0, u, dtu, y, states)
    output_res = _allocate_output_res(n_timesteps, ny)
    for i in range(n_timesteps):
        x = res_filter.x[i]
        P = res_filter.P[i]
        states_i = _unpack_states(states, i)
        output_res.y[i] = states_i.C @ x - states_i.D @ u[i]
        output_res.y_std[i] = np.sqrt(states_i.C @ P @ states_i.C.T) + states_i.R

    return output_res


# All above will be jitted by numba, if available. Otherwise, the pure python / numpy
# implementation will be used.
try:
    from numba import jit_module

    jit_module(nopython=True, nogil=True, cache=True)
except ImportError:
    warnings.warn("Numba not installed, using pure python implementation")


[docs]@dataclass class KalmanQR: """Bayesian Filter (Kalman Square Root filter) Parameters ---------- ss : StateSpace State space model. Notes ----- All the method except _proxy_params are jitted by numba, if available. Otherwise, the pure (but slower) python / numpy implementation will be used. """ ss: StateSpace def _proxy_params( self, dt: pd.Series, vars: Sequence[pd.DataFrame], x0: np.ndarray | None = None, P0: np.ndarray | None = None, ): ss = self.ss ss.update() dts, idx = np.unique(dt, return_inverse=True) A = np.zeros((ss.nx, ss.nx, dt.size)) B0 = np.zeros((ss.nx, ss.nu, dt.size)) B1 = np.zeros((ss.nx, ss.nu, dt.size)) Q = np.zeros((ss.nx, ss.nx, dt.size)) Ai, B0i, B1i, Qi = map(np.dstack, zip(*map(ss.discretization, dts))) A[:] = Ai[:, :, idx] B0[:] = B0i[:, :, idx] B1[:] = B1i[:, :, idx] Q[:] = Qi[:, :, idx] vars = [var.to_numpy() for var in vars] states = States(A, B0, B1, ss.C, ss.D, Q, ss.R) x0 = x0 if x0 is not None else ss.x0 P0 = P0 if P0 is not None else ss.P0 return tuple([x0, P0, *vars, states])
[docs] def update( self, x: np.ndarray, P: np.ndarray, u: np.ndarray, y: np.ndarray, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Update the state and covariance of the current time step. Note that this function will not update the statespace model to ensure the consistency with the model parameters : to do so, use the `filter.ss.update` method. Parameters ---------- x : np.ndarray State (or endogeneous) vector. P : np.ndarray Covariance matrix. u : np.ndarray Output (or exogeneous) vector. y : np.ndarray Measurement (or observation) vector. Returns ------- x : np.ndarray State (or endogeneous) vector. P : np.ndarray Covariance matrix. """ x, P = deepcopy(x), deepcopy(P) nx = self.ss.nx ny = self.ss.ny C = self.ss.C D = self.ss.D R = self.ss.R _Arru = np.zeros((nx + ny, nx + ny)) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NumbaPerformanceWarning) return _update(C, D, R, x, P, u, y, _Arru)
[docs] def predict( self, x: np.ndarray, P: np.ndarray, u: np.ndarray, dtu: np.ndarray, dt: float, ) -> tuple[np.ndarray, np.ndarray]: """Predict the state and covariance of the next time step. Parameters ---------- x : np.ndarray State (or endogeneous) vector. P : np.ndarray Covariance matrix. u : np.ndarray Output (or exogeneous) vector. dtu : np.ndarray Time derivative of the output vector. dt : float Time step. Returns ------- np.ndarray Predicted state vector. np.ndarray Predicted covariance matrix. """ x, P = deepcopy(x), deepcopy(P) A, B0, B1, Q = self.ss.discretization(dt) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NumbaPerformanceWarning) return _predict(A, B0, B1, Q, x, P, u, dtu)
[docs] def log_likelihood( self, dt: pd.Series, u: pd.DataFrame, dtu: pd.DataFrame, y: pd.DataFrame, ) -> float: """Compute the log-likelihood of the model. Parameters ---------- dt : pd.Series Time steps. u : pd.DataFrame Output (or exogeneous) vector. dtu : pd.DataFrame Time derivative of the output vector. y : pd.DataFrame Measurement (or observation) vector. Returns ------- float Log-likelihood of the model. """ with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NumbaPerformanceWarning) x0, P0, u, dtu, y, states = self._proxy_params(dt, (u, dtu, y)) return _log_likelihood(x0, P0, u, dtu, y, states)
[docs] def filtering( self, dt: pd.Series, u: pd.DataFrame, dtu: pd.DataFrame, y: pd.DataFrame, x0: np.ndarray | None = None, P0: np.ndarray | None = None, ) -> KalmanResult: """Filter the data using the state space model and the Kalman filter. Parameters ---------- dt : pd.Series Time steps. u : pd.DataFrame Output (or exogeneous) vector. dtu : pd.DataFrame Time derivative of the output vector. y : pd.DataFrame Measurement (or observation) vector. x0 : np.ndarray, optional Initial state vector, by default None. If None, the initial state vector provided by the statespace model is used. P0 : np.ndarray, optional Initial covariance matrix, by default None. If None, the initial covariance matrix provided by the statespace model is used. Returns ------- FilteringResults Result of the filtering in the form of a namedtuple with the following fields: - x_update: np.ndarray[n_timesteps, nx, 1] Updated state vector. - P_update: np.ndarray[n_timesteps, nx, nx] Updated covariance matrix. - x_predict: np.ndarray[n_timesteps, nx, 1] Predicted state vector. - P_predict: np.ndarray[n_timesteps, nx, nx] Predicted covariance matrix. - k: np.ndarray[n_timesteps, ny, 1] Kalman gain. - S: np.ndarray[n_timesteps, ny, ny] Innovation covariance matrix. """ with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NumbaPerformanceWarning) x0, P0, u, dtu, y, states = self._proxy_params(dt, (u, dtu, y), x0, P0) return _filtering(x0, P0, u, dtu, y, states)
[docs] def smoothing( self, dt: pd.Series, u: pd.DataFrame, dtu: pd.DataFrame, y: pd.DataFrame, x0: np.ndarray | None = None, P0: np.ndarray | None = None, ) -> KalmanResult: """Smooth the data using the state space model and the Kalman filter. Parameters ---------- dt : pd.Series Time steps. u : pd.DataFrame Output (or exogeneous) vector. dtu : pd.DataFrame Time derivative of the output vector. y : pd.DataFrame Measurement (or observation) vector. x0 : np.ndarray, optional Initial state vector, by default None. If None, the initial state vector provided by the statespace model is used. P0 : np.ndarray, optional Initial covariance matrix, by default None. If None, the initial covariance matrix provided by the statespace model is used. Returns ------- SmoothingResults Result of the smoothing in the form of a namedtuple with the following fields: - x_update: np.ndarray[n_timesteps, nx, 1] Updated state vector. - P_update: np.ndarray[n_timesteps, nx, nx] Updated covariance matrix. - x_predict: np.ndarray[n_timesteps, nx, 1] Predicted state vector. - P_predict: np.ndarray[n_timesteps, nx, nx] Predicted covariance matrix. - x_smooth: np.ndarray[n_timesteps, nx, 1] Smoothed state vector. - P_smooth: np.ndarray[n_timesteps, nx, nx] Smoothed covariance matrix. - k: np.ndarray[n_timesteps, ny, 1] Kalman gain. - S: np.ndarray[n_timesteps, ny, ny] Innovation covariance matrix. """ with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NumbaPerformanceWarning) x0, P0, u, dtu, y, states = self._proxy_params(dt, (u, dtu, y), x0, P0) return _smoothing(x0, P0, u, dtu, y, states)
[docs] def simulate( self, dt: pd.Series, u: pd.DataFrame, dtu: pd.DataFrame, x0: np.ndarray | None = None, P0: np.ndarray | None = None, ) -> SimulationResults: """ Simulate the data using the state space model. The Kalman filter is not involved in this process. The formula reads: .. math:: y = C x + D u + R \epsilon \\ x = A x + B_0 u + B_1 \dot{u} + Q \eta where :math:`\epsilon` and :math:`\eta` are independent white noise with covariance matrices :math:`R` and :math:`Q` respectively. Parameters ---------- dt : pd.Series Time steps. u : pd.DataFrame Output (or exogeneous) vector. dtu : pd.DataFrame Time derivative of the output vector. x0 : np.ndarray, optional Initial state vector, by default None. If None, the initial state vector provided by the statespace model is used. P0 : np.ndarray, optional Initial covariance matrix, by default None. If None, the initial covariance matrix provided by the statespace model is used. Returns ------- SimulationResults Simulated data in the form of a namedtuple with the following fields: - x: np.ndarray[n_timesteps, nx, 1] State vector. - y: np.ndarray[n_timesteps, ny, 1] Output vector. """ with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NumbaPerformanceWarning) x0, _, u, dtu, states = self._proxy_params(dt, (u, dtu), x0, P0) return _simulate(x0, u, dtu, states)
[docs] def estimate_output( self, dt: pd.Series, u: pd.DataFrame, dtu: pd.DataFrame, y: pd.DataFrame, x0: np.ndarray | None = None, P0: np.ndarray | None = None, ) -> OutputEstimateResult: """Estimate the output using the state space model and the Kalman filter. Parameters ---------- dt : pd.Series Time steps. u : pd.DataFrame Output (or exogeneous) vector. dtu : pd.DataFrame Time derivative of the output vector. y : pd.DataFrame Measurement (or observation) vector. x0 : np.ndarray, optional Initial state vector, by default None. If None, the initial state vector provided by the statespace model is used. P0 : np.ndarray, optional Initial covariance matrix, by default None. If None, the initial covariance matrix provided by the statespace model is used. Returns ------- OutputEstimate Estimated output in the form of a namedtuple with the following fields: - x: np.ndarray[n_timesteps, nx, 1] State vector. - P: np.ndarray[n_timesteps, nx, nx] Covariance matrix. - y: np.ndarray[n_timesteps, ny, 1] Output vector. - y_std: np.ndarray[n_timesteps, ny, 1] Standard deviation of the output vector. """ with warnings.catch_warnings(): warnings.simplefilter("ignore", category=NumbaPerformanceWarning) x0, P0, u, dtu, y, states = self._proxy_params(dt, (u, dtu, y), x0, P0) return _estimate_output(x0, P0, u, dtu, y, states)