Source code for liesel.model.distreg

"""
Distributional regression.
"""

from __future__ import annotations

from collections.abc import Mapping

import jax.numpy as jnp
import jax.random
import numpy as np
import tensorflow_probability.substrates.jax.distributions as tfd

from liesel.distributions import MultivariateNormalDegenerate
from liesel.goose import EngineBuilder, GibbsKernel, IWLSKernel
from liesel.option import Option

from .goose import GooseModel
from .legacy import (
    DesignMatrix,
    Hyperparameter,
    InverseLink,
    Predictor,
    RegressionCoef,
    Response,
    Smooth,
    SmoothingParam,
)
from .model import GraphBuilder, Model
from .nodes import Array, Bijector, Dist, Distribution, Node, Var

matrix_rank = np.linalg.matrix_rank


[docs]class DistRegBuilder(GraphBuilder): """A model builder for distributional regression models.""" _howto = "Smooths need to be added first, then the predictors, then the response" def __init__(self) -> None: super().__init__() self._smooths: dict[str, list[Var]] = {} self._distributional_parameters: dict[str, Var] = {} self._response: Option[Var] = Option(None) @property def response(self) -> Var: """The response node.""" return self._response.expect(f"No response in {repr(self)}") def _smooth_name(self, name: str | None, predictor: str, prefix: str) -> str: """Generates a name for a smooth if the ``name`` argument is ``None``.""" other_smooths = self._smooths[predictor] other_names = [node.name for node in other_smooths if node.name] prefix = predictor + "_" + prefix counter = 0 while prefix + str(counter) in other_names: counter += 1 if not name: name = prefix + str(counter) if name in other_names: raise RuntimeError( f"Smooth {repr(name)} already exists in {repr(self)} " f"for predictor {repr(predictor)}" ) return name
[docs] def add_p_smooth( self, X: Array, m: float, s: float, predictor: str, name: str | None = None, ) -> dict[str, Var]: """ Adds a parametric smooth to the model builder. Parameters ---------- X The design matrix. m The mean of the Gaussian prior. s The standard deviation of the Gaussian prior. predictor The name of the predictor to add the smooth to. name The name of the smooth. """ try: self._smooths[predictor] except KeyError: self._smooths[predictor] = [] name = self._smooth_name(name, predictor, "p") X_var = DesignMatrix(X, name=name + "_X") m_var = Hyperparameter(m, name=name + "_m") s_var = Hyperparameter(s, name=name + "_s") beta = np.zeros(np.shape(X)[-1], np.float32) beta_distribution = Dist(tfd.Normal, loc=m_var, scale=s_var) beta_var = RegressionCoef(beta, beta_distribution, name + "_beta") smooth_var = Smooth(X_var, beta_var, name=name) self._smooths[predictor].append(smooth_var) group = { "smooth": smooth_var, "beta": beta_var, "X": X_var, "m": m_var, "s": s_var, } self.add_group(name, **group) return group
[docs] def add_np_smooth( self, X: Array, K: Array, a: float, b: float, predictor: str, name: str | None = None, ) -> dict[str, Var]: """ Adds a non-parametric smooth to the model builder. Parameters ---------- X The design matrix. K The penalty matrix. a The a, :math:`\\alpha` or concentration parameter of the inverse gamma prior. b The b, :math:`\\beta` or scale parameter of the inverse gamma prior. predictor The name of the predictor to add the smooth to. name The name of the smooth. """ try: self._smooths[predictor] except KeyError: self._smooths[predictor] = [] name = self._smooth_name(name, predictor, "np") X_var = DesignMatrix(X, name=name + "_X") K_var = Hyperparameter(K, name=name + "_K") a_var = Hyperparameter(a, name=name + "_a") b_var = Hyperparameter(b, name=name + "_b") rank_var = Hyperparameter(matrix_rank(K), name=name + "_rank") tau2_distribution = Dist(tfd.InverseGamma, concentration=a_var, scale=b_var) tau2_var = SmoothingParam(10000.0, tau2_distribution, name + "_tau2") beta = np.zeros(np.shape(X)[-1], np.float32) beta_distribution = Dist( MultivariateNormalDegenerate.from_penalty, loc=0.0, var=tau2_var, pen=K_var, rank=rank_var, ) beta_var = RegressionCoef(beta, beta_distribution, name + "_beta") smooth_var = Smooth(X_var, beta_var, name=name) self._smooths[predictor].append(smooth_var) group = { "smooth": smooth_var, "beta": beta_var, "tau2": tau2_var, "rank": rank_var, "X": X_var, "K": K_var, "a": a_var, "b": b_var, } self.add_group(name, **group) return group
[docs] def add_predictor(self, name: str, inverse_link: type[Bijector]) -> DistRegBuilder: """ Adds a predictor to the model builder. Parameters ---------- name The name of the parameter of the response distribution. Must match the name of the parameter of the TFP distribution. inverse_link The inverse link mapping the regression predictor to the parameter of the response distribution. Either a string identifying a TFP bijector, or alternatively, a TFP-compatible bijector class. If a class is provided instead of a string, the user needs to make sure it uses the right NumPy implementation. """ if name not in self._smooths or not self._smooths[name]: msg = f"No smooths in {repr(self)} for predictor {repr(name)}. " raise RuntimeError(msg + self._howto) smooth_vars = self._smooths[name] predictor_var = Predictor(name=name + "_pdt", *smooth_vars) parameter_var = InverseLink(predictor_var, inverse_link, name=name) self._distributional_parameters[name] = parameter_var self.add(predictor_var, parameter_var) return self
[docs] def add_response( self, response: Array, distribution: type[Distribution] ) -> DistRegBuilder: """ Adds the response to the model builder. Parameters ---------- response The response vector or matrix. distribution The conditional distribution of the response variable. Either a string identifying a TFP distribution, or alternatively, a TFP-compatible distribution class. If a class is provided instead of a string, the user needs to make sure it uses the right NumPy implementation. """ if not self._distributional_parameters: raise RuntimeError(f"No predictors in {repr(self)}. {self._howto}") response_distribution = Dist( distribution, **self._distributional_parameters # type: ignore ) response_var = Response(response, response_distribution, "response") self._response = Option(response_var) self.add(response_var) return self
[docs]def tau2_gibbs_kernel(group: Mapping[str, Node | Var]) -> GibbsKernel: """Builds a Gibbs kernel for a smoothing parameter with an inverse gamma prior.""" group = {k: x if isinstance(x, Node) else x.value_node for k, x in group.items()} position_key = group["tau2"].name def transition(prng_key, model_state): a_prior = model_state[group["a"].name].value rank = model_state[group["rank"].name].value a_gibbs = jnp.squeeze(a_prior + 0.5 * rank) b_prior = model_state[group["b"].name].value beta = model_state[group["beta"].name].value K = model_state[group["K"].name].value b_gibbs = jnp.squeeze(b_prior + 0.5 * (beta @ K @ beta)) draw = b_gibbs / jax.random.gamma(prng_key, a_gibbs) return {position_key: draw} return GibbsKernel([position_key], transition)
[docs]def dist_reg_mcmc(model: Model, seed: int, num_chains: int) -> EngineBuilder: """ Configures an :class:`.EngineBuilder` for a distributional regression model. The EngineBuilder uses a Metropolis-in-Gibbs MCMC algorithm with an :class:`.IWLSKernel` for the regression coefficients and a :class:`.GibbsKernel` for the smoothing parameters for a distributional regression model. Parameters ---------- model A model built with a :class:`.DistRegBuilder`. seed The PRNG seed for the engine builder. num_chains The number of chains to be sampled. """ builder = EngineBuilder(seed, num_chains) builder.set_model(GooseModel(model)) builder.set_initial_values(model.state) for group in model.groups().values(): if "tau2" in group: tau2_kernel = tau2_gibbs_kernel(group) # type: ignore # only vars builder.add_kernel(tau2_kernel) if "beta" in group: position_key = group["beta"].value_node.name # type: ignore # var beta_kernel = IWLSKernel([position_key]) builder.add_kernel(beta_kernel) return builder