Source code for dddm.statistics

"""
Statistical model giving likelihoods for detecting a spectrum given a
benchmark to compare it with.
"""

import os
from sys import platform
import numericalunits as nu
import numpy as np
from immutabledict import immutabledict
from dddm import utils
from dddm.priors import get_priors
from dddm.recoil_rates import halo, halo_shielded, spectrum, detector_spectrum
from scipy.special import loggamma
import typing as ty
import dddm

export, __all__ = dddm.exporter()

# Set a lower bound to the log-likelihood (this becomes a problem due to
# machine precision). Set to same number as multinest.
LL_LOW_BOUND = 1e-90


def get_prior_list():
    return ['log_mass', 'log_cross_section', 'v_0', 'v_esc', 'density']


def get_param_list():
    return ['log_mass', 'log_cross_section', 'v_0', 'v_esc', 'density']


[docs]@export class StatModel: # Keep these fit parameters in this order _parameter_order = ('log_mass', 'log_cross_section', 'v_0', 'v_esc', 'density', 'k') allow_multiple_detectors = False known_parameters = tuple(get_param_list()) benchmark_values = None def __init__( self, wimp_mass: ty.Union[float, int], cross_section: ty.Union[float, int], spectrum_class: ty.Union[detector_spectrum.DetectorSpectrum, spectrum.GenSpectrum], prior: dict, tmp_folder: str, fit_parameters=('log_mass', 'log_cross_section', 'v_0', 'v_esc', 'density', 'k'), detector_name=None, verbose=False, notes='default', ): """ Statistical model used for Bayesian interference of detection in multiple experiments. :param detector_name: name of the detector (e.g. Xe) """ if detector_name is None: detector_name = spectrum_class.detector.detector_name if not issubclass( spectrum_class.__class__, detector_spectrum.GenSpectrum ) and ( not isinstance(spectrum_class, (list, tuple)) or not issubclass( spectrum_class[0].__class__, detector_spectrum.GenSpectrum ) ): raise ValueError(f'{spectrum_class}, {spectrum_class.__class__} is not the right class') self.spectrum_class = spectrum_class if isinstance(prior, str): prior = get_priors(prior) self.config = dict( detector=detector_name, notes=notes, start=utils.now(), prior=prior, _wimp_mass=wimp_mass, _cross_section=cross_section, # _spectrum_class=spectrum_class, ) self.log = self.get_logger(tmp_folder, verbose) self.log.info(f"initialized for {detector_name} detector.") self.set_fit_parameters(fit_parameters) def __str__(self): return (f"StatModel::for {self.config['detector']} detector. " f"For info see the config file:\n{self.config}")
[docs] def get_logger(self, tmp_folder, verbosity): if verbosity > 1: level = 'DEBUG' elif verbosity: level = 'INFO' else: level = 'WARNING' if 'win' not in platform: log_path = os.path.join(tmp_folder, f"log_{utils.now()}.log") self.config['logging'] = log_path else: log_path = None return utils.get_logger(self.__class__.__name__, level, path=log_path)
[docs] def set_benchmark(self): """ Set up the benchmark used in this statistical model. Likelihood of other models can be evaluated for this 'truth' """ self.config['log_mass'] = np.log10(self.config['_wimp_mass']) self.config['log_cross_section'] = np.log10(self.config['_cross_section'])
[docs] def set_fit_parameters(self, params): """Write the fit parameters to the config""" self.log.info(f'NestedSamplersetting fit parameters to {params}') if not isinstance(params, (list, tuple)): raise TypeError("Set the parameter names in a list of strings") known_params = self.known_parameters[:len(params)] if tuple(params) != tuple(known_params): err_message = f"The parameters are not input in the correct order. Please" \ f"insert {known_params} rather than {params}." raise NameError(err_message) self.config['fit_parameters'] = params
[docs] def set_models(self): """ Update the dm model with with the required settings from the prior """ dm_model = self.spectrum_class.dm_model if self._earth_shielding: dm_model.log_mass = self.log_mass dm_model.log_cross_section = self.log_cross_section dm_model.location = self.spectrum_class.location dm_model.v_0 = self.v_0 * nu.km / nu.s dm_model.rho_dm = self.density * nu.GeV / nu.c0 ** 2 / nu.cm ** 3 dm_model.v_esc = self.v_esc * nu.km / nu.s assert self.spectrum_class.dm_model.v_0 == self.v_0 * nu.km / nu.s
def _fix_parameters(self, _do_evaluate_benchmark=True): """ This is a very important function as it makes sure all the classes are setup in the right order :param _do_evaluate_benchmark: Evaluate the benchmark :return: None """ if no_prior_has_been_set := self.config['prior'] is None: raise ValueError if no_wimp_mass_set := self.config.get('log_mass') is None: self.set_benchmark() elif self.config['log_cross_section'] is None: raise ValueError('Someone forgot to set sigma?!') # Very important that this comes AFTER the prior setting as we depend on it self.set_models() # Finally, set the benchmark if not _do_evaluate_benchmark: # Only do this for the combined experiments! self.log.info('Skipping evaluating the benchmark!') else: self.log.info('evaluate benchmark\tall ready to go!') self.eval_benchmark() # No more changes allowed self.config = immutabledict(self.config)
[docs] def check_spectrum(self): """Lazy alias for eval_spectrum""" parameter_names = self._parameter_order[:2] parameter_values = [self.config['log_mass'], self.config['log_cross_section']] return self.eval_spectrum(parameter_values, parameter_names)
[docs] def eval_benchmark(self): self.log.info('preparing for running, setting the benchmark') if self.bench_is_set: raise RuntimeError(self.bench_is_set) self.benchmark_values = self.check_spectrum() # Save a copy of the benchmark in the config file self.config['benchmark_values'] = list(self.benchmark_values)
[docs] def total_log_prior(self, parameter_vals, parameter_names): """ For each of the parameter names, read the prior :param parameter_vals: the values of the model/benchmark considered as the truth :param parameter_names: the names of the parameter_values :return: """ # single parameter to fit if isinstance(parameter_names, str): lp = self.log_prior(parameter_vals, parameter_names) # check the input and compute the prior elif len(parameter_names) > 1: if len(parameter_vals) != len(parameter_names): raise ValueError( f"provide enough names {parameter_names}) " f"for parameters (len{len(parameter_vals)})") lp = np.sum([self.log_prior(*_x) for _x in zip(parameter_vals, parameter_names)]) else: raise TypeError( f"incorrect format provided. Theta should be array-like for " f"single value of parameter_names or Theta should be " f"matrix-like for array-like parameter_names. Theta, " f"parameter_names (provided) = " f"{parameter_vals, parameter_names}") return lp
[docs] def log_probability(self, parameter_vals, parameter_names): """ :param parameter_vals: the values of the model/benchmark considered as the truth :param parameter_names: the names of the parameter_values :return: """ self.log.debug('evaluate log probability') lp = self.total_log_prior(parameter_vals, parameter_names) if not np.isfinite(lp): return -LL_LOW_BOUND self.log.info('loading rate for given parameters') evaluated_rate = self.eval_spectrum(parameter_vals, parameter_names) # Compute the likelihood ll = log_likelihood(self.benchmark_values, evaluated_rate) if np.isnan(lp + ll): raise ValueError( f"Returned NaN from likelihood. lp = {lp}, ll = {ll}") self.log.info('likelihood evaluated') return lp + ll
[docs] def log_prior(self, value, variable_name): """ Compute the prior of variable_name for a given value :param value: value of variable name :param variable_name: name of the 'value'. This name should be in the config of the class under the priors with a similar content as the priors as specified in the get_prior function. :return: prior of value """ # For each of the priors read from the config file how the prior looks # like. Get the boundaries (and mean (m) and width (s) for gaussian # distributions). self.log.info(f'evaluating priors for {variable_name}') if self.config['prior'][variable_name]['prior_type'] == 'flat': a, b = self.config['prior'][variable_name]['param'] return log_flat(a, b, value) elif self.config['prior'][variable_name]['prior_type'] == 'gauss': a, b = self.config['prior'][variable_name]['range'] m, s = self.config['prior'][variable_name]['param'] return log_gauss(a, b, m, s, value) else: raise TypeError( f"unknown prior type '{self.config['prior'][variable_name]['prior_type']}'," f" choose either gauss or flat")
@property def _earth_shielding(self): return str(self.spectrum_class.dm_model) == 'shielded_shm'
[docs] def eval_spectrum(self, values: ty.Union[list, tuple, np.ndarray], parameter_names: ty.Union[ty.List[str], ty.Tuple[str]] ): """ For given values and parameter names, return the spectrum one would have with these parameters. The values and parameter names should be array like objects of the same length. Usually, one fits either two ('log_mass', 'log_cross_section') or five parameters ('log_mass', 'log_cross_section', 'v_0', 'v_esc', 'density'). :param values: array like object of :param parameter_names: names of parameters :return: a spectrum as specified by the parameter_names """ self.log.debug(f'evaluate spectrum for {len(values)} parameters') if len(values) != len(parameter_names): raise ValueError(f'{len(values)} != len({parameter_names})') if isinstance(parameter_names, str): raise NotImplementedError(f"Got single param {parameter_names}?") if len(parameter_names) not in [2, 5]: raise NotImplementedError('Use either 2 or 5 parameters to fit') checked_values = check_shape(values) log_mass = checked_values[0] log_cross_section = checked_values[1] # Update dark matter parameters in place dm_class = self.spectrum_class.dm_model if self._earth_shielding and len(parameter_names) >= 2: dm_class.log_cross_section = log_cross_section dm_class.log_mass = log_mass assert self.spectrum_class.dm_model.log_mass == log_mass elif len(parameter_names) == 5: if tuple(parameter_names) != tuple(self._parameter_order[:len(parameter_names)]): raise NameError( f"The parameters are not in correct order. Please insert" f"{self._parameter_order[:len(parameter_names)]} rather than " f"{parameter_names}.") v_0 = checked_values[2] * nu.km / nu.s v_esc = checked_values[3] * nu.km / nu.s rho_dm = checked_values[4] * nu.GeV / nu.c0 ** 2 / nu.cm ** 3 dm_class.v_0 = v_0 dm_class.v_esc = v_esc dm_class.rho_dm = rho_dm assert self.spectrum_class.dm_model.rho_dm == rho_dm spectrum = self.spectrum_class.get_counts( wimp_mass=10. ** log_mass, cross_section=10. ** log_cross_section, poisson=False) self.log.debug('returning results') return spectrum
[docs] def read_priors_mean(self, prior_name) -> ty.Union[int, float]: self.log.debug(f'reading {prior_name}') if self.config['prior'] is None: raise ValueError('Prior not set!') return self.config['prior'][prior_name]['mean']
@property def v_0(self) -> ty.Union[int, float]: return self.read_priors_mean('v_0') @property def v_esc(self) -> ty.Union[int, float]: return self.read_priors_mean('v_esc') @property def density(self) -> ty.Union[int, float]: return self.read_priors_mean('density') @property def log_mass(self): return self.config['log_mass'] @property def log_cross_section(self): return self.config['log_cross_section'] @property def bench_is_set(self): return self.benchmark_values is not None
def log_likelihood_function(nb, nr): """ return the ln(likelihood) for Nb expected events and Nr observed events # :param nb: expected events # :param nr: observed events # :return: ln(likelihood) """ if nr == 0: # For i~0, machine precision sets nr to 0. However, this becomes a # little problematic since the Poisson log likelihood for 0 is not # defined. Hence we cap it off by setting nr to 10^-100. nr = LL_LOW_BOUND return np.log(nr) * nb - loggamma(nb + 1) - nr def log_likelihood(model, y): """ :param model: pandas dataframe containing the number of counts in bin i :param y: the number of counts in bin i :return: sum of the log-likelihoods of the bins """ if len(y) != len(model): raise ValueError(f"Data and model should be of same dimensions (now " f"{len(y)} and {len(model)})") res = 0 # pylint: disable=consider-using-enumerate for i in range(len(y)): Nr = y[i] Nb = model[i] res_bin = log_likelihood_function(Nb, Nr) if np.isnan(res_bin): raise ValueError( f"Returned NaN in bin {i}. Below follows data dump.\n" f"log_likelihood: {log_likelihood_function(Nb, Nr)}\n" f"i = {i}, Nb, Nr =" + " %.2g %.2g \n" % (Nb, Nr) + "") if not np.isfinite(res_bin): return -np.inf res += res_bin return res def check_shape(xs): """ :param xs: values :return: flat array of values """ if len(xs) <= 0: raise TypeError( f"Provided incorrect type of {xs}. Takes either np.array or list") if not isinstance(xs, np.ndarray): xs = np.array(xs) for i, x in enumerate(xs): if np.shape(x) == (1,): xs[i] = x[0] return xs def log_flat(a, b, x): """ Return a flat prior as function of x in log space :param a: lower bound :param b: upper bound :param x: value :return: 0 for x in bound, -np.inf otherwise """ try: return 0 if a < x < b else -np.inf except ValueError: result = np.zeros(len(x)) mask = (x > a) & (x < b) result[~mask] = -np.inf return result def log_gauss(a, b, mu, sigma, x): """ Return a gaussian prior as function of x in log space :param a: lower bound :param b: upper bound :param mu: mean of gauss :param sigma: std of gauss :param x: value to evaluate :return: log prior of x evaluated for gaussian (given by mu and sigma) if in between the bounds """ try: # for single values of x if a < x < b: return -0.5 * np.sum( (x - mu) ** 2 / (sigma ** 2) + np.log(sigma ** 2)) return -np.inf except ValueError: # for array like objects return as follows result = np.zeros(len(x)) mask = (x > a) & (x < b) result[~mask] = -np.inf result[mask] = -0.5 * np.sum( (x - mu) ** 2 / (sigma ** 2) + np.log(sigma ** 2)) return result