Source code for pymc_marketing.mmm.events

#   Copyright 2022 - 2025 The PyMC Labs Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
"""Event transformations.

This module provides event transformations for use in Marketing Mix Models.

.. plot::
    :context: close-figs

    import numpy as np
    import pandas as pd
    import pymc as pm

    import matplotlib.pyplot as plt

    from pymc_marketing.mmm.events import EventEffect, GaussianBasis
    from pymc_marketing.plot import plot_curve
    from pymc_extras.prior import Prior

    seed = sum(map(ord, "Events"))
    rng = np.random.default_rng(seed)

    df_events = pd.DataFrame(
        {
            "event": ["single day", "multi day"],
            "start_date": pd.to_datetime(["2025-01-01", "2025-01-20"]),
            "end_date": pd.to_datetime(["2025-01-02", "2025-01-25"]),
        }
    )

    def difference_in_days(model_dates, event_dates):
        if hasattr(model_dates, "to_numpy"):
            model_dates = model_dates.to_numpy()
        if hasattr(event_dates, "to_numpy"):
            event_dates = event_dates.to_numpy()

        one_day = np.timedelta64(1, "D")
        return (model_dates[:, None] - event_dates) / one_day


    def create_basis_matrix(df_events: pd.DataFrame, model_dates: np.ndarray):
        start_dates = df_events["start_date"]
        end_dates = df_events["end_date"]

        start_ref = difference_in_days(model_dates, start_dates)
        end_ref = difference_in_days(model_dates, end_dates)

        return np.where(
            (start_ref >= 0) & (end_ref <= 0),
            0,
            np.where(np.abs(start_ref) < np.abs(end_ref), start_ref, end_ref),
        )


    gaussian = GaussianBasis(
        priors={
            "sigma": Prior("Gamma", mu=7, sigma=1, dims="event"),
        }
    )
    effect_size = Prior("Normal", mu=1, sigma=1, dims="event")
    effect = EventEffect(basis=gaussian, effect_size=effect_size, dims=("event",))

    dates = pd.date_range("2024-12-01", periods=3 * 31, freq="D")

    X = create_basis_matrix(df_events, model_dates=dates)

    coords = {"date": dates, "event": df_events["event"].to_numpy()}
    with pm.Model(coords=coords) as model:
        pm.Deterministic("effect", effect.apply(X), dims=("date", "event"))

        idata = pm.sample_prior_predictive(random_seed=rng)

    fig, axes = idata.prior.effect.pipe(
        plot_curve,
        "date",
        random_seed=rng,
        subplot_kwargs={"ncols": 1},
    )
    fig.suptitle("Gaussian Event Effect")
    plt.show()

"""

from typing import Literal, cast

import numpy as np
import numpy.typing as npt
import pandas as pd
import pymc as pm
import pytensor.tensor as pt
import xarray as xr
from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    InstanceOf,
    model_validator,
    validate_call,
)
from pymc_extras.deserialize import deserialize, register_deserialization
from pymc_extras.prior import Prior, create_dim_handler
from pytensor.tensor.variable import TensorVariable

from pymc_marketing.mmm.components.base import Transformation, create_registration_meta

BASIS_TRANSFORMATIONS: dict = {}
BasisMeta = create_registration_meta(BASIS_TRANSFORMATIONS)


[docs] class Basis(Transformation, metaclass=BasisMeta): # type: ignore[misc] """Basis transformation associated with an event model.""" prefix: str = "basis" lookup_name: str
[docs] @validate_call def sample_curve( self, parameters: InstanceOf[xr.Dataset] = Field( ..., description="Parameters of the saturation transformation." ), days: int = Field( 14, gt=0, description="Number of days before and after the basis.", ), ) -> xr.DataArray: """Sample the curve of the saturation transformation given parameters. Parameters ---------- parameters : xr.Dataset Dataset with the parameters of the saturation transformation. days : int Number of days around basis. Default is 14 days or two weeks before and after the basis for a total of 28 days. Returns ------- xr.DataArray Curve of the saturation transformation. """ x = np.linspace(-days, days, 100) coords = {"x": x} return self._sample_curve( var_name="saturation", parameters=parameters, x=x, coords=coords, )
[docs] def basis_from_dict(data: dict) -> Basis: """Create a basis transformation from a dictionary.""" data = data.copy() lookup_name = data.pop("lookup_name") cls = BASIS_TRANSFORMATIONS[lookup_name] if "priors" in data: data["priors"] = {k: deserialize(v) for k, v in data["priors"].items()} return cls(**data)
def _is_basis(data): return "lookup_name" in data and data["lookup_name"] in BASIS_TRANSFORMATIONS register_deserialization( is_type=_is_basis, deserialize=basis_from_dict, )
[docs] class EventEffect(BaseModel): """Event effect associated with an event model.""" basis: InstanceOf[Basis] effect_size: InstanceOf[Prior] dims: str | tuple[str, ...] model_config = ConfigDict(extra="forbid") @model_validator(mode="before") def _dims_to_tuple(self): if isinstance(self["dims"], str): self["dims"] = (self["dims"],) return self @model_validator(mode="after") def _validate_dims(self): if not self.dims: raise ValueError("The dims must not be empty.") if not set(self.basis.combined_dims).issubset(set(self.dims)): raise ValueError("The dims must contain all dimensions of the basis.") if not set(self.effect_size.dims).issubset(set(self.dims)): raise ValueError("The dims must contain all dimensions of the effect size.") return self
[docs] def apply(self, X: pt.TensorLike, name: str = "event") -> TensorVariable: """Apply the event effect to the data.""" dim_handler = create_dim_handler(("x", *self.dims)) return self.basis.apply(X, dims=self.dims) * dim_handler( self.effect_size.create_variable(f"{name}_effect_size"), self.effect_size.dims, )
[docs] def to_dict(self) -> dict: """Convert the event effect to a dictionary.""" return { "class": "EventEffect", "data": { "basis": self.basis.to_dict(), "effect_size": self.effect_size.to_dict(), "dims": self.dims, }, }
[docs] @classmethod def from_dict(cls, data: dict) -> "EventEffect": """Create an event effect from a dictionary.""" return cls( basis=deserialize(data["basis"]), effect_size=deserialize(data["effect_size"]), dims=data["dims"], )
def _is_event_effect(data: dict) -> bool: """Check if the data is an event effect.""" return data["class"] == "EventEffect" register_deserialization( is_type=_is_event_effect, deserialize=lambda data: EventEffect.from_dict(data["data"]), )
[docs] class GaussianBasis(Basis): """Gaussian basis transformation.""" lookup_name = "gaussian"
[docs] def function(self, x: pt.TensorLike, sigma: pt.TensorLike) -> TensorVariable: """Gaussian bump function.""" rv = pm.Normal.dist(mu=0.0, sigma=sigma) out = pm.math.exp(pm.logp(rv, x)) return out
default_priors = { "sigma": Prior("Gamma", mu=7, sigma=1), }
[docs] class HalfGaussianBasis(Basis): R"""One-sided Gaussian basis transformation. .. plot:: :context: close-figs import matplotlib.pyplot as plt from pymc_marketing.mmm.events import HalfGaussianBasis from pymc_extras.prior import Prior half_gaussian = HalfGaussianBasis( priors={ "sigma": Prior("Gamma", mu=[3, 4], sigma=1, dims="event"), } ) coords = {"event": ["PyData-Berlin", "PyCon-Finland"]} prior = half_gaussian.sample_prior(coords=coords) curve = half_gaussian.sample_curve(prior) fig, axes = half_gaussian.plot_curve( curve, subplot_kwargs={"figsize": (6, 3), "sharey": True} ) for ax in axes: ax.set_xlabel("") plt.show() Parameters ---------- mode : Literal["after", "before"] Whether the basis is located before or after the event. include_event : bool Whether to include the event days in the basis. priors : dict[str, Prior] Prior for the sigma parameter. prefix : str Prefix for the parameter names. """ lookup_name = "half_gaussian"
[docs] def __init__( self, mode: Literal["after", "before"] = "after", include_event: bool = True, **kwargs, ): super().__init__(**kwargs) self.mode = mode self.include_event = include_event
[docs] def function(self, x: pt.TensorLike, sigma: pt.TensorLike) -> TensorVariable: """One-sided Gaussian bump function.""" rv = pm.Normal.dist(mu=0.0, sigma=sigma) out = pm.math.exp(pm.logp(rv, x)) # Sign determines if the zeroing happens after or before the event. sign = 1 if self.mode == "after" else -1 # Build boolean mask(s) in x's shape and broadcast to out's shape. pre_mask = sign * x < 0 if not self.include_event: pre_mask = pm.math.or_(pre_mask, sign * x == 0) # Ensure mask matches output shape for elementwise switch pre_mask = pt.broadcast_to(pre_mask, out.shape) return pt.switch(pre_mask, 0, out)
[docs] def to_dict(self) -> dict: """Convert the half Gaussian basis to a dictionary.""" return { **super().to_dict(), "mode": self.mode, "include_event": self.include_event, }
default_priors = { "sigma": Prior("Gamma", mu=7, sigma=1), }
[docs] class AsymmetricGaussianBasis(Basis): R"""Asymmetric Gaussian bump basis transformation. Allows different widths (sigma_before, sigma_after) and amplitudes (a_after) after the event. .. plot:: :context: close-figs import matplotlib.pyplot as plt from pymc_marketing.mmm.events import AsymmetricGaussianBasis from pymc_extras.prior import Prior asy_gaussian = AsymmetricGaussianBasis( priors={ "sigma_before": Prior("Gamma", mu=[3, 4], sigma=1, dims="event"), "a_after": Prior("Normal", mu=[-.75, .5], sigma=.2, dims="event"), } ) coords = {"event": ["PyData-Berlin", "PyCon-Finland"]} prior = asy_gaussian.sample_prior(coords=coords) curve = asy_gaussian.sample_curve(prior) fig, axes = asy_gaussian.plot_curve( curve, subplot_kwargs={"figsize": (6, 3), "sharey": True} ) for ax in axes: ax.set_xlabel("") plt.show() Parameters ---------- event_in : Literal["before", "after", "exclude"] Whether to include the event in the before or after part of the basis, or leave it out entirely. Default is "after". priors : dict[str, Prior] Prior for the sigma_before, sigma_after, a_before, and a_after parameters. prefix : str Prefix for the parameters. """ lookup_name = "asymmetric_gaussian"
[docs] def __init__( self, event_in: Literal["before", "after", "exclude"] = "after", **kwargs, ): super().__init__(**kwargs) self.event_in = event_in
[docs] def function( self, x: pt.TensorLike, sigma_before: pt.TensorLike, sigma_after: pt.TensorLike, a_after: pt.TensorLike, ) -> pt.TensorVariable: """Asymmetric Gaussian bump function.""" match self.event_in: case "before": indicator_before = pt.cast(x <= 0, "float32") indicator_after = pt.cast(x > 0, "float32") case "after": indicator_before = pt.cast(x < 0, "float32") indicator_after = pt.cast(x >= 0, "float32") case "exclude": indicator_before = pt.cast(x < 0, "float32") indicator_after = pt.cast(x > 0, "float32") case _: raise ValueError(f"Invalid event_in: {self.event_in}") rv_before = pm.Normal.dist(mu=0.0, sigma=sigma_before) rv_after = pm.Normal.dist(mu=0.0, sigma=sigma_after) y_before = pt.switch( indicator_before, pm.math.exp(pm.logp(rv_before, x)), 0, ) y_after = pt.switch( indicator_after, pm.math.exp(pm.logp(rv_after, x)) * a_after, 0, ) return y_before + y_after
[docs] def to_dict(self) -> dict: """Convert the asymmetric Gaussian basis to a dictionary.""" return { **super().to_dict(), "event_in": self.event_in, }
default_priors = { "sigma_before": Prior("Gamma", mu=3, sigma=1), "sigma_after": Prior("Gamma", mu=7, sigma=2), "a_after": Prior("Normal", mu=1, sigma=0.5), }
[docs] def days_from_reference( dates: pd.Series | pd.DatetimeIndex, reference_date: str | pd.Timestamp, ) -> npt.NDArray[np.int64]: """Calculate the difference in days between dates and a reference date. Parameters ---------- dates : pd.Series | pd.DatetimeIndex Dates to calculate the difference from the reference date. reference_date : str | pd.Timestamp Reference date. Returns ------- np.ndarray Difference in days between dates and the reference date. """ reference_date = cast(pd.Timestamp, pd.to_datetime(reference_date)) dates = pd.to_datetime(dates) diff = dates - reference_date if isinstance(diff, pd.Series): diff = diff.dt # type: ignore return diff.days.to_numpy() # type: ignore