Source code for hepstats.hypotests.calculators.basecalculator

from __future__ import annotations

from collections.abc import Callable
from typing import Any

import numpy as np

from ...utils import base_sample, base_sampler, pll
from ..hypotests_object import HypotestsObject
from ..parameters import POI, POIarray, asarray
from ..toyutils import ToyResult, ToysManager


[docs] class BaseCalculator(HypotestsObject): """Base class for calculator.""" def __init__(self, input, minimizer, **kwargs): """ Args: input: loss or fit result minimizer: minimizer to use to find the minimum of the loss function Example with **zfit**: >>> import zfit >>> from zfit.core.loss import UnbinnedNLL >>> from zfit.minimize import Minuit >>> >>> obs = zfit.Space('x', limits=(0.1, 2.0)) >>> data = zfit.data.Data.from_numpy(obs=obs, array=np.random.normal(1.2, 0.1, 10000)) >>> mean = zfit.Parameter("mu", 1.2) >>> sigma = zfit.Parameter("sigma", 0.1) >>> model = zfit.pdf.Gauss(obs=obs, mu=mean, sigma=sigma) >>> loss = UnbinnedNLL(model=model, data=data) >>> >>> calc = BaseCalculator(input=loss, minimizer=Minuit()) """ super().__init__(input, minimizer, **kwargs) self._obs_nll = {} self._parameters = {} for m in self.model: for d in m.get_params(): self._parameters[d.name] = d
[docs] def obs_nll(self, pois: POIarray) -> np.ndarray: """Compute observed negative log-likelihood values for given parameters of interest. Args: pois: parameters of interest. Returns: Observed nll values. Example with **zfit**: >>> mean = zfit.Parameter("mu", 1.2) >>> poi = POI(mean, [1.1, 1.2, 1.0]) >>> nll = calc.obs_nll(poi) """ ret = np.empty(pois.shape) for i, p in enumerate(pois): if p not in self._obs_nll: nll = pll(minimizer=self.minimizer, loss=self.loss, pois=p) self._obs_nll[p] = nll ret[i] = self._obs_nll[p] return ret
[docs] def qobs( self, poinull: POI, onesided: bool = True, onesideddiscovery: bool = True, qtilde: bool = False, ): """Computes observed values of the :math:`\\Delta` log-likelihood test statistic. Args: poinull: parameters of interest for the null hypothesis. qtilde: if `True` use the :math:`\\tilde{q}` test statistics else (default) use the :math:`q` test statistic. onesided: if `True` (default) computes onesided pvalues. onesideddiscovery: if `True` (default) computes onesided pvalues for a discovery test. Returns: Observed values of q. Example with **zfit**: >>> mean = zfit.Parameter("mu", 1.2) >>> poi = POI(mean, [1.1, 1.2, 1.0]) >>> q = calc.qobs(poi) """ self.check_pois(poinull) if poinull.ndim == 1: param = poinull.parameter bestfit = self.bestfit.params[param]["value"] if qtilde and bestfit < 0: bestfitpoi = POI(param, 0) else: bestfitpoi = POI(param, bestfit) self._obs_nll[bestfitpoi] = self.bestfit.fmin nll_bestfitpoi_obs = self.obs_nll(bestfitpoi) nll_poinull_obs = self.obs_nll(poinull) return self.q( nll1=nll_poinull_obs, nll2=nll_bestfitpoi_obs, poi1=poinull, poi2=bestfitpoi, onesided=onesided, onesideddiscovery=onesideddiscovery, )
[docs] def pvalue( self, poinull: POI | POIarray, poialt: POI | None = None, qtilde: bool = False, onesided: bool = True, onesideddiscovery: bool = False, ) -> tuple[np.ndarray, np.ndarray]: """Computes pvalues for the null and alternative hypothesis. Args: poinull: parameters of interest for the null hypothesis. poialt: parameters of interest for the alternative hypothesis. qtilde: if `True` use the :math:`\\widetilde{q}` test statistics else (default) use the :math:`q` test statistic. onesided: if `True` (default) computes onesided pvalues. onesideddiscovery: if `True` (default) computes onesided pvalues for a discovery test. Returns: Tuple of arrays for pnull, palt Example with **zfit**: >>> mean = zfit.Parameter("mu", 1.2) >>> poinull = POI(mean, [1.1, 1.2, 1.0]) >>> poialt = POI(mean, 1.2) >>> pvalues = calc.pavalue(poinull, poialt) """ self.check_pois(poinull) if poialt: self.check_pois(poialt) self.check_pois_compatibility(poinull, poialt) return self._pvalue_( poinull=poinull, poialt=poialt, qtilde=qtilde, onesided=onesided, onesideddiscovery=onesideddiscovery, )
def _pvalue_(self, poinull, poialt, qtilde, onesided, onesideddiscovery): """ To be overwritten in `BaseCalculator` subclasses. """ raise NotImplementedError
[docs] def expected_pvalue( self, poinull: POI | POIarray, poialt: POI | POIarray, nsigma: list[int], CLs: bool = False, qtilde: bool = False, onesided: bool = True, onesideddiscovery: bool = False, ) -> list[np.array]: """Computes the expected pvalues and error bands for different values of :math:`\\sigma` (0=expected/median) Args: poinull: parameters of interest for the null hypothesis. poialt: parameters of interest for the alternative hypothesis. nsigma: list of values of :math:`\\sigma` to compute the expected pvalue. CLs: if `True` computes pvalues as :math:`p_{cls}=p_{null}/p_{alt}=p_{clsb}/p_{clb}` else as :math:`p_{clsb} = p_{null}`. qtilde: if `True` use the :math:`\\widetilde{q}` test statistics else (default) use the :math:`q` test statistic. onesided: if `True` (default) computes onesided pvalues. onesideddiscovery: if `True` (default) computes onesided pvalues for a discovery. Returns: Array of expected pvalues for each :math:`\\sigma` value Example with **zfit**: >>> mean = zfit.Parameter("mu", 1.2) >>> poinull = POI(mean, [1.1, 1.2, 1.0]) >>> poialt = POI(mean, 1.2) >>> nll = calc.expected_pvalue(poinull, poialt) """ self.check_pois(poinull) if poialt: self.check_pois(poialt) self.check_pois_compatibility(poinull, poialt) if qtilde and (poialt.values < 0).any(): poialt = POIarray( parameter=poialt.parameter, values=np.where(poialt.values < 0, 0, poialt.values), ) return self._expected_pvalue_( poinull=poinull, poialt=poialt, nsigma=nsigma, CLs=CLs, qtilde=qtilde, onesided=onesided, onesideddiscovery=onesideddiscovery, )
def _expected_pvalue_(self, poinull, poialt, nsigma, CLs, qtilde, onesided, onesideddiscovery): """ To be overwritten in `BaseCalculator` subclasses. """ raise NotImplementedError
[docs] @staticmethod def check_pois(pois: POI | POIarray): """ Checks if the parameter of interest is a :class:`hepstats.parameters.POIarray` instance. Args: pois: the parameter of interest to check. Raises: TypeError: if pois is not an instance of :class:`hepstats.parameters.POIarray`. """ msg = "POI/POIarray is required." if not isinstance(pois, POIarray): raise TypeError(msg) if pois.ndim > 1: msg = "Tests with more that one parameter of interest are not yet implemented." raise NotImplementedError(msg)
[docs] @staticmethod def check_pois_compatibility(poi1: POI | POIarray, poi2: POI | POIarray): """ Checks compatibility between two lists of :func:`hepstats.parameters.POIarray` instances. Args: poi1: the first parameter of interest. poi2: the second parameter of interest. Raises: ValueError: if the two parameters of interests don't have the same parameters, check by their names. """ if poi1.ndim != poi2.ndim: msg = f"POIs should have the same dimensions, poi1={poi1.ndim}, poi2={poi2.ndim}" raise ValueError(msg) if poi1.ndim == 1 and poi1.name != poi2.name: msg = "The variables used in the parameters of interest should have the same names," msg += f" poi1={poi1.name}, poi2={poi2.name}" raise ValueError(msg)
[docs] def q( self, nll1: np.array, nll2: np.array, poi1: POIarray, poi2: POIarray, onesided: bool = True, onesideddiscovery: bool = False, ) -> np.ndarray: """Compute values of the test statistic q defined as the difference between negative log-likelihood values :math:`q = nll1 - nll2`. Args: nll1: array of nll values #1, evaluated with poi1. nll2: array of nll values #2, evaluated with poi2. poi1: POI's #1. poi2: POI's #2. onesided: if `True` (default) computes onesided pvalues. onesideddiscovery: if `True` (default) computes onesided pvalues for a discovery. Returns: Array of :math:`q` values. """ self.check_pois(poi1) self.check_pois(poi2) self.check_pois_compatibility(poi1, poi2) assert len(nll1) == len(poi1) assert len(nll2) == len(poi2) poi1_values = poi1.values poi2_values = poi2.values q = 2 * (nll1 - nll2) zeros = np.zeros(q.shape) if onesideddiscovery: condition = (poi2_values < poi1_values) | (q < 0) elif onesided: condition = (poi2_values > poi1_values) | (q < 0) else: condition = q < 0 return np.where(condition, zeros, q)
[docs] class BaseToysCalculator(BaseCalculator): def __init__(self, input, minimizer, **kwargs): """Basis for toys calculator class. Args: input: loss or fit result. minimizer: minimizer to use to find the minimum of the loss function. sampler: function used to create sampler with models, number of events and floating parameters in the sample. sample: function used to get samples from the sampler. """ super().__init__(input, minimizer, **kwargs)
[docs] class ToysCalculator(BaseToysCalculator, ToysManager): """ Class for calculators using toys. """ def __init__( self, input, minimizer, ntoysnull: int = 100, ntoysalt: int = 100, sampler: Callable = base_sampler, sample: Callable = base_sample, ): """Toys calculator class. Args: input: loss or fit result. minimizer: minimizer to use to find the minimum of the loss function. ntoysnull: minimum number of toys to generate for the null hypothesis. ntoysalt: minimum number of toys to generate for the alternative hypothesis. sampler: function used to create sampler with models, number of events and floating** parameters. in the sample Default is :func:`hepstats.utils.fit.sampling.base_sampler`. sample: function used to get samples from the sampler. Default is :func:`hepstats.utils.fit..sampling.base_sample`. """ super().__init__(input, minimizer, sampler=sampler, sample=sample) self._ntoysnull = ntoysnull self._ntoysalt = ntoysalt
[docs] @classmethod def from_yaml( cls, filename: str, input, minimizer, sampler: Callable = base_sampler, sample: Callable = base_sample, **kwargs: Any, ): """ ToysCalculator constructor with the toys loaded from a yaml file. Args: filename: the yaml file name. input: loss or fit result. minimizer: minimizer to use to find the minimum of the loss function. sampler: function used to create sampler with models, number of events and floating parameters in the sample Default is :func:`hepstats.utils.fit.sampling.base_sampler`. sample: function used to get samples from the sampler. Default is :func:`hepstats.fitutils.sampling.base_sample`. """ ntoysnull = kwargs.get("ntoysnull", 100) ntoysalt = kwargs.get("ntoysall", 100) calculator = cls( input=input, minimizer=minimizer, ntoysnull=ntoysnull, ntoysalt=ntoysalt, sampler=sampler, sample=sample, ) toysresults = calculator.toysresults_from_yaml(filename) for t in toysresults: calculator.add_toyresult(t) return calculator
@property def ntoysnull(self) -> int: """ Returns the number of toys generated for the null hypothesis. """ return self._ntoysnull @ntoysnull.setter def ntoysnull(self, n: int): """ Set the number of toys generated for the null hypothesis. Args: n: number of toys """ self._ntoysnull = n @property def ntoysalt(self) -> int: """ Returns the number of toys generated for the alternative hypothesis. """ return self._ntoysalt @ntoysalt.setter def ntoysalt(self, n: int): """ Set the number of toys generated for the alternative hypothesis. Args: n: number of toys """ self._ntoysalt = n def _get_toys( self, poigen: POI | POIarray, poieval: POI | POIarray | None = None, qtilde: bool = False, hypothesis: str = "null", ) -> dict[POI, ToyResult]: """ Return the generated toys for a given POI. Args: poigen: POI used to generate the toys. poieval: POI values to evaluate the loss function. qtilde: if `True` use the :math:`\tilde{q}` test statistics else (default) use the :math:`q` test statistic. hypothesis: `null` or `alternative`. """ if hypothesis not in {"null", "alternative"}: msg = "hypothesis must be 'null' or 'alternative'." raise ValueError(msg) ntoys = self.ntoysnull if hypothesis == "null" else self.ntoysalt ret = {} for p in poigen: if poieval is None: poieval_p = asarray(p) else: poieval_p = poieval if p not in poieval_p: poieval_p = poieval_p.append(p.value) if qtilde and 0.0 not in poieval_p.values: poieval_p = poieval_p.append(0.0) ngenerated = self.ntoys(p, poieval_p) ntogen = ntoys - ngenerated if ngenerated < ntoys else 0 if ntogen > 0: self.generate_and_fit_toys(ntoys=ntogen, poigen=p, poieval=poieval_p) ret[p] = self.get_toyresult(p, poieval_p) return ret
[docs] def get_toys_null( self, poigen: POI | POIarray, poieval: POI | POIarray | None = None, qtilde: bool = False, ) -> dict[POI, ToyResult]: """ Return the generated toys for the null hypothesis. Args: poigen: POI used to generate the toys. poieval: POI values to evaluate the loss function. qtilde: if `True` use the :math:`\tilde{q}` test statistics else (default) use the :math:`q` test statistic. Example with **zfit**: >>> mean = zfit.Parameter("mu", 1.2) >>> poinull = POIarray(mean, [1.1, 1.2, 1.0]) >>> poialt = POI(mean, 1.2) >>> for p in poinull: ... calc.get_toys_alt(p, poieval=poialt) """ return self._get_toys(poigen=poigen, poieval=poieval, qtilde=qtilde, hypothesis="null")
[docs] def get_toys_alt( self, poigen: POI | POIarray, poieval: POI | POIarray | None = None, qtilde: bool = False, ) -> dict[POI, ToyResult]: """ Return the generated toys for the alternative hypothesis. Args: poigen: POI used to generate the toys. poieval: POI values to evaluate the loss function. qtilde: if `True` use the :math:`\tilde{q}` test statistics else (default) use the :math:`q` test statistic. Example with **zfit**: >>> mean = zfit.Parameter("mu", 1.2) >>> poinull = POIarray(mean, [1.1, 1.2, 1.0]) >>> poialt = POI(mean, 1.2) >>> calc.get_toys_alt(poialt, poieval=poinull) """ return self._get_toys(poigen=poigen, poieval=poieval, qtilde=qtilde, hypothesis="alternative")