from __future__ import annotations
import math
import typing
import warnings
from typing import Any
import numpy as np
from scipy.stats import norm
from ...utils import array2dataset, eval_pdf, get_value, pll, set_values
from ...utils.fit.api_check import is_valid_fitresult, is_valid_loss
from ...utils.fit.diverse import get_ndims
from ..parameters import POI, POIarray
from .basecalculator import BaseCalculator
[docs]
def generate_asimov_hist(
model, params: dict[Any, dict[str, Any]], nbins: int | None = None
) -> tuple[np.ndarray, np.ndarray]:
"""Generate the Asimov histogram using a model and dictionary of parameters.
Args:
model: model used to generate the dataset.
params: values of the parameters of the models.
nbins: number of bins.
Returns:
Tuple of hist and bin_edges.
Example with **zfit**:
>>> obs = zfit.Space('x', limits=(0.1, 2.0))
>>> mean = zfit.Parameter("mu", 1.2)
>>> sigma = zfit.Parameter("sigma", 0.1)
>>> model = zfit.pdf.Gauss(obs=obs, mu=mean, sigma=sigma)
>>> hist, bin_edges = generate_asimov_hist(model, {"mean": 1.2, "sigma": 0.1})
"""
if nbins is None:
nbins = 100
space = model.space
bounds = space.limit1d
bin_edges = np.linspace(*bounds, nbins + 1)
bin_centers = bin_edges[:-1] + np.diff(bin_edges) / 2
hist = eval_pdf(model, bin_centers, params, allow_extended=True)
hist *= space.area() / nbins
return hist, bin_edges
[docs]
def generate_asimov_dataset(data, model, is_binned, nbins, values):
"""Generate the Asimov dataset using a model and dictionary of parameters.
Args:
data: Data, the same class should be used for the generated dataset.
model: Model to use for the generation. Can be binned or unbinned.
is_binned: If the model is binned.
nbins: Number of bins for the asimov dataset.
values: Dictionary of parameters values.
Returns:
Dataset with the asimov dataset.
"""
nsample = None
if not model.is_extended:
nsample = get_value(data.n_events)
if is_binned:
with set_values(list(values), [v["value"] for v in values.values()]):
dataset = model.to_binneddata()
if nsample is not None:
dataset = type(dataset).from_hist(dataset.to_hist() * nsample)
else:
if len(nbins) > 1: # meaning we have multiple dimensions
msg = (
"Currently, only one dimension is supported for models that do not follow"
" the new binned loss convention. New losses can be registered with the"
" asymtpotic calculator."
)
raise ValueError(msg)
weights, bin_edges = generate_asimov_hist(model, values, nbins[0])
bin_centers = bin_edges[:-1] + np.diff(bin_edges) / 2
if nsample is not None: # It's not extended
weights *= nsample
dataset = array2dataset(type(data), data.space, bin_centers, weights)
return dataset
[docs]
class AsymptoticCalculator(BaseCalculator):
"""
Class for asymptotic calculators, using asymptotic formulae of the likelihood ratio described in
:cite:`Cowan:2010js`. Can be used only with one parameter of interest.
"""
UNBINNED_TO_BINNED_LOSS: typing.ClassVar = {}
try:
from zfit.loss import (
BinnedNLL,
ExtendedBinnedNLL,
ExtendedUnbinnedNLL,
UnbinnedNLL,
)
except ImportError:
pass
else:
UNBINNED_TO_BINNED_LOSS[UnbinnedNLL] = BinnedNLL
UNBINNED_TO_BINNED_LOSS[ExtendedUnbinnedNLL] = ExtendedBinnedNLL
def __init__(
self,
input,
minimizer,
asimov_bins: int | list[int] | None = None,
):
"""Asymptotic calculator class using Wilk's and Wald's asymptotic formulae.
The asympotic formula is significantly faster than the Frequentist calculator, as it does not
require the calculation of the frequentist p-value, which involves the calculation of toys (sample-and-fit).
Args:
input: loss or fit result.
minimizer: minimizer to use to find the minimum of the loss function.
asimov_bins: number of bins of the Asimov dataset.
Example with **zfit**:
>>> import zfit
>>> from zfit.loss import UnbinnedNLL
>>> from zfit.minimize import Minuit
>>>
>>> obs = zfit.Space('x', limits=(0.1, 2.0))
>>> data = zfit.data.Data.from_numpy(obs=obs, array=np.random.normal(1.2, 0.1, 10000))
>>> mean = zfit.Parameter("mu", 1.2)
>>> sigma = zfit.Parameter("sigma", 0.1)
>>> model = zfit.pdf.Gauss(obs=obs, mu=mean, sigma=sigma)
>>> loss = UnbinnedNLL(model=model, data=data)
>>>
>>> calc = AsymptoticCalculator(input=loss, minimizer=Minuit(), asimov_bins=100)
"""
if is_valid_fitresult(input):
loss = input.loss
elif is_valid_loss(input):
loss = input
else:
msg = "input must be a fitresult or a loss"
raise ValueError(msg)
asimov_bins_converted = self._check_convert_asimov_bins(asimov_bins, loss.data)
super().__init__(input, minimizer)
self._asimov_bins = asimov_bins_converted
self._asimov_dataset: dict = {}
self._asimov_loss: dict = {}
self._binned_loss = None
# cache of nll values computed with the asimov dataset
self._asimov_nll: dict[POI, np.ndarray] = {}
def _convert_to_binned(self, loss, asimov_bins):
"""Converts the loss to binned if necessary."""
for unbinned_loss, binned_loss in self.UNBINNED_TO_BINNED_LOSS.items():
if type(loss) is unbinned_loss:
datasets = []
models = []
for d, m, nbins in zip(loss.data, loss.model, asimov_bins, strict=False):
binnings = m.space.with_binning(nbins)
model_binned = m.to_binned(binnings)
data_binned = d.to_binned(binnings)
datasets.append(data_binned)
models.append(model_binned)
loss = binned_loss(model=models, data=datasets, constraints=loss.constraints)
break
if type(loss) is binned_loss:
break
else:
loss = False
return loss
def _get_binned_loss(self):
"""Returns the binned loss."""
binned_loss = self._binned_loss
if binned_loss is None:
binned_loss = self._convert_to_binned(self.loss, self._asimov_bins)
self._binned_loss = binned_loss
return binned_loss
@staticmethod
def _check_convert_asimov_bins(asimov_bins, datasets) -> list[list[int]]: # TODO: we want to allow axes from UHI
nsimultaneous = len(datasets)
ndims = [get_ndims(dataset) for dataset in datasets]
if asimov_bins is None:
asimov_bins = [[math.ceil(100 / ndim**0.5)] * ndim for ndim in ndims]
if isinstance(asimov_bins, int):
if nsimultaneous == 1:
asimov_bins = [[asimov_bins] * ndim for ndim in ndims]
else:
msg = (
"asimov_bins is an int but there are multiple datasets. "
"Please provide a list of int for each dataset."
)
raise ValueError(msg)
elif isinstance(asimov_bins, list):
if len(asimov_bins) != nsimultaneous:
msg = "asimov_bins is a list but the number of elements is different from the number of datasets."
raise ValueError(msg)
else:
msg = f"asimov_bins must be an int or a list of int (or list of list of int), not {type(asimov_bins)}"
raise TypeError(msg)
for i, (asimov_bin, ndim) in enumerate(zip(asimov_bins, ndims, strict=False)):
if isinstance(asimov_bin, int):
if ndim == 1:
asimov_bins[i] = [asimov_bin]
else:
msg = f"asimov_bins[{i}] is not a list but the dataset has {ndim} dimensions."
raise ValueError(msg)
elif isinstance(asimov_bin, list):
if len(asimov_bin) != ndim:
msg = (
f"asimov_bins[{i}] is a list with {len(asimov_bin)} elements but the"
f" dataset has {ndim} dimensions."
)
raise ValueError(msg)
if not all(isinstance(x, int) for x in asimov_bin):
msg = f"asimov_bins[{i}] is a list with non-int elements."
raise ValueError(msg)
else:
msg = f"asimov_bins[{i}] is not an int or a list but a {type(asimov_bin)}."
raise TypeError(msg)
assert isinstance(asimov_bins, list), "INTERNAL ERROR: Could not correctly convert asimov_bins"
assert all(
isinstance(asimov_bin, list) and len(asimov_bin) == ndim
for ndim, asimov_bin in zip(ndims, asimov_bins, strict=False)
), "INTERNAL ERROR: Could not correctly convert asimov_bins, dimensions wrong"
return asimov_bins
[docs]
@staticmethod
def check_pois(pois: POI | POIarray):
"""
Checks if the parameter of interest is a :class:`hepstats.parameters.POIarray` instance.
Args:
pois: the parameter of interest to check.
Raises:
TypeError: if pois is not an instance of :class:`hepstats.parameters.POIarray`.
"""
msg = "POI/POIarray is required."
if not isinstance(pois, POIarray):
raise TypeError(msg)
if pois.ndim > 1:
msg = "Tests using the asymptotic calculator can only be used with one parameter of interest."
raise NotImplementedError(msg)
[docs]
def asimov_dataset(self, poi: POI, ntrials_fit: int | None = None):
"""Gets the Asimov dataset for a given alternative hypothesis.
Args:
poi: parameter of interest of the alternative hypothesis.
ntrials_fit: (default: 5) maximum number of fits to perform
Returns:
The asymov dataset.
Example with **zfit**:
>>> poialt = POI(mean, 1.2)
>>> dataset = calc.asimov_dataset(poialt)
"""
if ntrials_fit is None:
ntrials_fit = 5
if poi not in self._asimov_dataset:
binned_loss = self._get_binned_loss()
if binned_loss is False: # LEGACY
model = self.model
data = self.data
loss = self.loss
else:
model = binned_loss.model
data = binned_loss.data
loss = binned_loss
minimizer = self.minimizer
oldverbose = minimizer.verbosity
minimizer.verbosity = 0
poiparam = poi.parameter
poivalue = poi.value
msg = "\nGet fitted values of the nuisance parameters for the"
msg += " alternative hypothesis!"
self.set_params_to_bestfit()
poiparam.floating = False
if not self.loss.get_params():
values = {poiparam: {"value": poivalue}}
else:
with poiparam.set_value(poivalue):
for _trial in range(ntrials_fit):
minimum = minimizer.minimize(loss=loss)
if minimum.valid:
break
# shift other parameter values to change starting point of minimization
for p in self.parameters:
if p != poiparam:
p.set_value(get_value(p) * np.random.normal(1, 0.02, 1)[0])
else:
msg = "No valid minimum was found when fitting the loss function for the alternative"
msg += f"hypothesis ({poi}), after {ntrials_fit} trials."
warnings.warn(msg, stacklevel=2)
values = dict(minimum.params)
values[poiparam] = {"value": poivalue}
poiparam.floating = True
minimizer.verbosity = oldverbose
asimov_data = []
asimov_bins = self._asimov_bins
assert len(asimov_bins) == len(data)
is_binned_loss = isinstance(loss, tuple(self.UNBINNED_TO_BINNED_LOSS.values()))
for _i, (m, d, nbins) in enumerate(zip(model, data, asimov_bins, strict=False)):
dataset = generate_asimov_dataset(d, m, is_binned_loss, nbins, values)
asimov_data.append(dataset)
self._asimov_dataset[poi] = asimov_data
return self._asimov_dataset[poi]
[docs]
def asimov_loss(self, poi: POI):
"""Constructs a loss function using the Asimov dataset for a given alternative hypothesis.
Args:
poi: parameter of interest of the alternative hypothesis.
Returns:
Loss function.
Example with **zfit**:
>>> poialt = POI(mean, 1.2)
>>> loss = calc.asimov_loss(poialt)
"""
oldloss = self._get_binned_loss()
if oldloss is False: # LEGACY
oldloss = self.loss
if oldloss is None:
msg = "No loss function was provided."
raise ValueError(msg)
if poi not in self._asimov_loss:
loss = self.lossbuilder(oldloss.model, self.asimov_dataset(poi), oldloss=oldloss)
self._asimov_loss[poi] = loss
return self._asimov_loss[poi]
[docs]
def asimov_nll(self, pois: POIarray, poialt: POI) -> np.ndarray:
"""Computes negative log-likelihood values for given parameters of interest using the Asimov dataset
generated with a given alternative hypothesis.
Args:
pois: parameters of interest.
poialt: parameter of interest of the alternative hypothesis.
Returns:
Array of nll values for the alternative hypothesis.
Example with **zfit**:
>>> mean = zfit.Parameter("mu", 1.2)
>>> poinull = POIarray(mean, [1.1, 1.2, 1.0])
>>> poialt = POI(mean, 1.2)
>>> nll = calc.asimov_nll(poinull, poialt)
"""
self.check_pois(pois)
self.check_pois(poialt)
minimizer = self.minimizer
ret = np.empty(pois.shape)
for i, p in enumerate(pois):
if p not in self._asimov_nll:
loss = self.asimov_loss(poialt)
nll = pll(minimizer, loss, p)
self._asimov_nll[p] = nll
ret[i] = self._asimov_nll[p]
return ret
[docs]
def pnull(
self,
qobs: np.ndarray,
qalt: np.ndarray | None = None,
onesided: bool = True,
onesideddiscovery: bool = False,
qtilde: bool = False,
nsigma: int = 0,
) -> np.ndarray:
"""Computes the pvalue for the null hypothesis.
Args:
qobs: observed values of the test-statistic q.
qalt: alternative values of the test-statistic q using the asimov dataset.
onesided: if `True` (default) computes onesided pvalues.
onesideddiscovery: if `True` (default) computes onesided pvalues for a discovery.
qtilde: if `True` use the :math:`\\widetilde{q}` test statistics else (default)
use the :math:`q` test statistic.
nsigma: significance shift.
Returns:
Array of the pvalues for the null hypothesis.
"""
sqrtqobs = np.sqrt(qobs)
# 1 - norm.cdf(x) == norm.cdf(-x)
if onesided or onesideddiscovery:
pnull = 1.0 - norm.cdf(sqrtqobs - nsigma)
else:
pnull = (1.0 - norm.cdf(sqrtqobs - nsigma)) * 2.0
if qalt is not None and qtilde:
cond = (qobs > qalt) & (qalt > 0)
sqrtqalt = np.sqrt(qalt)
pnull_2 = 1.0 - norm.cdf((qobs + qalt) / (2.0 * sqrtqalt) - nsigma)
if not (onesided or onesideddiscovery):
pnull_2 += 1.0 - norm.cdf(sqrtqobs - nsigma)
pnull = np.where(cond, pnull_2, pnull)
return pnull
[docs]
def qalt(
self,
poinull: POIarray,
poialt: POI,
onesided: bool,
onesideddiscovery: bool,
qtilde: bool = False,
) -> np.ndarray:
"""Computes alternative hypothesis values of the :math:`\\Delta` log-likelihood test statistic using the asimov
dataset.
Args:
poinull: parameters of interest for the null hypothesis.
poialt: parameters of interest for the alternative hypothesis.
onesided: if `True` (default) computes onesided pvalues.
onesideddiscovery: if `True` (default) computes onesided pvalues for a
discovery test.
qtilde: if `True` use the :math:`\\widetilde{q}` test statistics else (default)
use the :math:`q` test statistic.
Returns:
Q values for the alternative hypothesis.
Example with **zfit**:
>>> mean = zfit.Parameter("mu", 1.2)
>>> poinull = POI(mean, [1.1, 1.2, 1.0])
>>> poialt = POI(mean, [1.2])
>>> q = calc.qalt(poinull, poialt)
"""
param = poialt.parameter
poialt_bf = POI(param, 0) if qtilde and poialt.value < 0 else poialt
nll_poialt_asy = self.asimov_nll(poialt_bf, poialt)
nll_poinull_asy = self.asimov_nll(poinull, poialt)
return self.q(
nll1=nll_poinull_asy,
nll2=nll_poialt_asy,
poi1=poinull,
poi2=poialt,
onesided=onesided,
onesideddiscovery=onesideddiscovery,
)
[docs]
def palt(
self,
qobs: np.ndarray,
qalt: np.ndarray,
onesided: int = True,
onesideddiscovery: int = False,
qtilde: int = False,
) -> np.ndarray:
"""Computes the pvalue for the alternative hypothesis.
Args:
qobs: observed values of the test-statistic q.
qalt: alternative values of the test-statistic q using the Asimov dataset.
onesided: if `True` (default) computes onesided pvalues.
onesideddiscovery: if `True` (default) computes onesided pvalues for a discovery.
qtilde: if `True` use the :math:`\\widetilde{q}` test statistics else (default)
use the :math:`q` test statistic.
Returns:
Array of the pvalues for the alternative hypothesis.
"""
sqrtqobs = np.sqrt(qobs)
sqrtqalt = np.sqrt(qalt)
# 1 - norm.cdf(x) == norm.cdf(-x)
if onesided or onesideddiscovery:
palt = 1.0 - norm.cdf(sqrtqobs - sqrtqalt)
else:
palt = 1.0 - norm.cdf(sqrtqobs + sqrtqalt)
palt += 1.0 - norm.cdf(sqrtqobs - sqrtqalt)
if qtilde:
cond = qobs > qalt
palt_2 = 1.0 - norm.cdf((qobs - qalt) / (2.0 * sqrtqalt))
if not (onesided or onesideddiscovery):
palt_2 += 1.0 - norm.cdf(sqrtqobs + sqrtqalt)
palt = np.where(cond, palt_2, palt)
return palt
def _pvalue_(self, poinull, poialt, qtilde, onesided, onesideddiscovery):
qobs = self.qobs(
poinull,
onesided=onesided,
qtilde=qtilde,
onesideddiscovery=onesideddiscovery,
)
needpalt = poialt is not None
if needpalt:
qalt = self.qalt(
poinull=poinull,
poialt=poialt,
onesided=onesided,
onesideddiscovery=onesideddiscovery,
qtilde=qtilde,
)
palt = self.palt(
qobs=qobs,
qalt=qalt,
onesided=onesided,
qtilde=qtilde,
onesideddiscovery=onesideddiscovery,
)
else:
qalt = None
palt = None
pnull = self.pnull(
qobs=qobs,
qalt=qalt,
onesided=onesided,
qtilde=qtilde,
onesideddiscovery=onesideddiscovery,
)
return pnull, palt
def _expected_pvalue_(self, poinull, poialt, nsigma, CLs, onesided, onesideddiscovery, qtilde):
qalt = self.qalt(poinull, poialt, onesided=onesided, onesideddiscovery=onesideddiscovery)
qalt = np.where(qalt < 0, 0, qalt)
expected_pvalues = []
for ns in nsigma:
p_clsb = self.pnull(
qobs=qalt,
qalt=None,
onesided=onesided,
qtilde=qtilde,
onesideddiscovery=onesideddiscovery,
nsigma=ns,
)
if CLs:
p_clb = norm.cdf(ns)
p_cls = p_clsb / p_clb
expected_pvalues.append(np.where(p_cls < 0, 0, p_cls))
else:
expected_pvalues.append(np.where(p_clsb < 0, 0, p_clsb))
return expected_pvalues