"""
Distributional regression.
"""
from __future__ import annotations
import jax.numpy as jnp
import jax.random
import numpy as np
from liesel.goose import EngineBuilder, GibbsKernel, IWLSKernel
from liesel.option import Option
from .goose import GooseModel
from .model import Model, ModelBuilder
from .nodes import (
PIT,
ColumnStack,
DesignMatrix,
Hyperparameter,
InverseLink,
Node,
NodeDistribution,
NodeGroup,
Predictor,
RegressionCoef,
Response,
Smooth,
SmoothingParam,
)
from .types import Array, TFPBijectorClass, TFPDistributionClass
matrix_rank = np.linalg.matrix_rank
[docs]class SmoothGroup(NodeGroup):
"""A node group representing a smooth."""
[docs]class PSmoothGroup(SmoothGroup):
"""A node group representing a parametric smooth."""
[docs]class NPSmoothGroup(SmoothGroup):
"""A node group representing a non-parametric smooth."""
[docs]class DistRegBuilder(ModelBuilder):
"""A model builder for distributional regression models."""
_howto = "Smooths need to be added first, then the predictors, then the response"
def __init__(self) -> None:
super().__init__()
self._smooths: dict[str, list[Node]] = {}
self._distributional_parameters: dict[str, Node] = {}
self._response: Option[Node] = Option(None)
@property
def response(self) -> Node:
"""The response node."""
return self._response.expect(f"No response in {repr(self)}")
def _smooth_name(self, name: str | None, predictor: str, prefix: str) -> str:
"""Generates a name for a smooth if the ``name`` argument is ``None``."""
other_smooths = self._smooths[predictor]
other_names = [node.name for node in other_smooths if node.has_name]
prefix = predictor + "_" + prefix
counter = 0
while prefix + str(counter) in other_names:
counter += 1
if not name:
name = prefix + str(counter)
if name in other_names:
raise RuntimeError(
f"Smooth {repr(name)} already exists in {repr(self)} "
f"for predictor {repr(predictor)}"
)
return name
[docs] def add_p_smooth(
self,
X: Array,
m: float,
s: float,
predictor: str,
name: str | None = None,
) -> PSmoothGroup:
"""
Adds a parametric smooth to the model builder.
Parameters
----------
X
The design matrix.
m
The mean of the Gaussian prior.
s
The standard deviation of the Gaussian prior.
predictor
The name of the predictor to add the smooth to.
name
The name of the smooth.
"""
try:
self._smooths[predictor]
except KeyError:
self._smooths[predictor] = []
name = self._smooth_name(name, predictor, "p")
X_node = DesignMatrix(X, name=name + "_X")
m_node = Hyperparameter(m, name=name + "_m")
s_node = Hyperparameter(s, name=name + "_s")
beta = np.zeros(np.shape(X)[-1], np.float32)
beta_distribution = NodeDistribution("Normal", loc=m_node, scale=s_node)
beta_node = RegressionCoef(beta, beta_distribution, name + "_beta")
smooth_node = Smooth(X_node, beta_node, name=name)
self._smooths[predictor].append(smooth_node)
group = PSmoothGroup(
smooth=smooth_node,
beta=beta_node,
X=X_node,
m=m_node,
s=s_node,
)
group.name = name
self.add_groups(group)
return group
[docs] def add_np_smooth(
self,
X: Array,
K: Array,
a: float,
b: float,
predictor: str,
name: str | None = None,
) -> NPSmoothGroup:
"""
Adds a non-parametric smooth to the model builder.
Parameters
----------
X
The design matrix.
K
The penalty matrix.
a
The a, :math:`\\alpha` or concentration parameter of the inverse gamma
prior.
b
The b, :math:`\\beta` or scale parameter of the inverse gamma prior.
predictor
The name of the predictor to add the smooth to.
name
The name of the smooth.
"""
try:
self._smooths[predictor]
except KeyError:
self._smooths[predictor] = []
name = self._smooth_name(name, predictor, "np")
X_node = DesignMatrix(X, name=name + "_X")
K_node = Hyperparameter(K, name=name + "_K")
a_node = Hyperparameter(a, name=name + "_a")
b_node = Hyperparameter(b, name=name + "_b")
rank_node = Hyperparameter(matrix_rank(K), name=name + "_rank")
tau2_parameters = {"concentration": a_node, "scale": b_node}
tau2_distribution = NodeDistribution("InverseGamma", **tau2_parameters)
tau2_node = SmoothingParam(10000.0, tau2_distribution, name + "_tau2")
beta = np.zeros(np.shape(X)[-1], np.float32)
beta_parameters = {"tau2": tau2_node, "K": K_node, "rank": rank_node}
beta_distribution = NodeDistribution("SmoothPrior", **beta_parameters)
beta_node = RegressionCoef(beta, beta_distribution, name + "_beta")
smooth_node = Smooth(X_node, beta_node, name=name)
self._smooths[predictor].append(smooth_node)
group = NPSmoothGroup(
smooth=smooth_node,
beta=beta_node,
tau2=tau2_node,
rank=rank_node,
X=X_node,
K=K_node,
a=a_node,
b=b_node,
)
group.name = name
self.add_groups(group)
return group
[docs] def add_predictor(
self, name: str, inverse_link: str | TFPBijectorClass
) -> DistRegBuilder:
"""
Adds a predictor to the model builder.
Parameters
----------
name
The name of the parameter of the response distribution. Must match the name
of the parameter of the TFP distribution.
inverse_link
The inverse link mapping the regression predictor to the parameter of the
response distribution. Either a string identifying a TFP bijector, or
alternatively, a TFP-compatible bijector class. If a class is provided
instead of a string, the user needs to make sure it uses the right NumPy
implementation.
"""
if name not in self._smooths or not self._smooths[name]:
msg = f"No smooths in {repr(self)} for predictor {repr(name)}. "
raise RuntimeError(msg + self._howto)
smooth_nodes = self._smooths[name]
predictor_node = Predictor(name=name + "_pdt", *smooth_nodes)
parameter_node = InverseLink(inverse_link, predictor_node, name=name)
self._distributional_parameters[name] = parameter_node
self.add_nodes(predictor_node, parameter_node)
return self
[docs] def add_response(
self, response: Array, distribution: str | TFPDistributionClass
) -> DistRegBuilder:
"""
Adds the response to the model builder.
Parameters
----------
response
The response vector or matrix.
distribution
The conditional distribution of the response variable. Either a string
identifying a TFP distribution, or alternatively, a TFP-compatible
distribution class. If a class is provided instead of a string, the user
needs to make sure it uses the right NumPy implementation.
"""
if not self._distributional_parameters:
raise RuntimeError(f"No predictors in {repr(self)}. {self._howto}")
response_distribution = NodeDistribution(
distribution, **self._distributional_parameters
)
response_node = Response(response, response_distribution, "response")
self._response = Option(response_node)
self.add_nodes(response_node)
return self
[docs]class CopRegBuilder(DistRegBuilder):
"""
A model builder for copula regression models.
Remember to add a predictor for the dependence parameter before adding the copula
and building the model.
Parameters
----------
model0
The first marginal distributional regression model **builder**.
model1
The second marginal distributional regression model **builder**.
"""
def __init__(self, model0: DistRegBuilder, model1: DistRegBuilder) -> None:
super().__init__()
arg0_is_mb = isinstance(model0, DistRegBuilder)
arg1_is_mb = isinstance(model1, DistRegBuilder)
if not arg0_is_mb or not arg1_is_mb:
raise RuntimeError(f"Arguments of {repr(self)} must be model builders")
self.groups = model0.groups + model1.groups
self.nodes = model0.nodes + model1.nodes
self._model0 = self._update_names(model0, "m0_")
self._model1 = self._update_names(model1, "m1_")
@staticmethod
def _update_names(model: DistRegBuilder, prefix: str) -> DistRegBuilder:
for node in model.all_nodes():
node.name = prefix + node.name
for group in model.groups:
group.name = prefix + group.name
return model
[docs] def add_copula(self, copula: str | TFPDistributionClass) -> CopRegBuilder:
"""
Adds a copula.
Parameters
----------
copula
The copula of the response variables. Either a string identifying a TFP
distribution, or alternatively, a TFP-compatible distribution class. If a
class is provided instead of a string, the user needs to make sure it uses
the right NumPy implementation.
"""
pit0_node = PIT(self._model0.response, name="m0_pit")
pit1_node = PIT(self._model1.response, name="m1_pit")
copula_node = ColumnStack(
pit0_node,
pit1_node,
distribution=NodeDistribution(copula, **self._distributional_parameters),
name="copula",
)
self._response = Option(copula_node)
self.add_nodes(pit0_node, pit1_node, copula_node)
return self
[docs]def tau2_gibbs_kernel(group: NPSmoothGroup) -> GibbsKernel:
"""Builds a Gibbs kernel for a smoothing parameter with an inverse gamma prior."""
position_key = group["tau2"].name
def transition(prng_key, model_state):
a_prior = model_state[group["a"].name].value
rank = model_state[group["rank"].name].value
a_gibbs = jnp.squeeze(a_prior + 0.5 * rank)
b_prior = model_state[group["b"].name].value
beta = model_state[group["beta"].name].value
K = model_state[group["K"].name].value
b_gibbs = jnp.squeeze(b_prior + 0.5 * (beta @ K @ beta))
draw = b_gibbs / jax.random.gamma(prng_key, a_gibbs)
return {position_key: draw}
return GibbsKernel([position_key], transition)
[docs]def dist_reg_mcmc(model: Model, seed: int, num_chains: int) -> EngineBuilder:
"""
Configures an :class:`.EngineBuilder` for a distributional regression model.
The EngineBuilder uses a Metropolis-in-Gibbs MCMC algorithm with an
:class:`.IWLSKernel` for the regression coefficients and a :class:`.GibbsKernel` for
the smoothing parameters for a distributional regression model.
Parameters
----------
model
A model built with a :class:`.DistRegBuilder`.
seed
The PRNG seed for the engine builder.
num_chains
The number of chains to be sampled.
"""
builder = EngineBuilder(seed, num_chains)
builder.set_model(GooseModel(model))
builder.set_initial_values(model.state)
for group in model.groups.values():
if not isinstance(group, SmoothGroup):
next
if isinstance(group, NPSmoothGroup):
tau2_kernel = tau2_gibbs_kernel(group)
builder.add_kernel(tau2_kernel)
position_key = group["beta"].name
beta_kernel = IWLSKernel([position_key])
builder.add_kernel(beta_kernel)
return builder