Source code for dddm.plotting.confidence_figures

"""Utility for opening and displaying results from multinest optimization"""

import matplotlib.pyplot as plt
import seaborn as sns
import os
import pandas as pd
import numpy as np
import dddm
from glob import glob
from tqdm import tqdm
import traceback

export, __all__ = dddm.exporter()


[docs]@export class DDDMResult: """Parse results from fitting from nested sampling""" result: dict = None def __init__(self, path, sampler='multinest'): """ Open a class for organizing the results from running an optimization :param path: Path to the base dir of the results to open """ assert os.path.exists(path) self.path = path self.log = dddm.utils.get_logger(self.__class__.__name__) self.sampler = sampler self.setup()
[docs] def setup(self): if self.sampler == 'multinest': self.result = dddm.samplers.pymultinest.load_multinest_samples_from_file(self.path) elif self.sampler == 'nestle': self.result = dddm.samplers.nestle.load_nestle_samples_from_file(self.path) else: raise RuntimeError(f'{self.sampler} is invalid')
def __repr__(self): # Can we avoid duplication with config summary or make a property factory? get_props = ('detector', 'mass', 'sigma', 'nlive', 'halo_model', 'notes', 'n_parameters', ) to_print = [f'{p}: {getattr(self, p)}' for p in get_props] return ', '.join(to_print)
[docs] def config_summary(self, get_props=( 'detector', 'mass', 'sigma', 'nlive', 'halo_model', 'notes', 'n_parameters', ) ) -> pd.DataFrame: df = {k: [getattr(self, k)] for k in get_props} return pd.DataFrame(df)
[docs] def result_summary(self) -> pd.DataFrame: df = {k: [v] for k, v in self.result.get('res_dict', {}).items()} return pd.DataFrame(df)
[docs] def summary(self) -> pd.DataFrame: return pd.concat([self.config_summary(), self.result_summary()], axis=1)
[docs] def get_from_config(self, to_get: str, if_not_available=None): return self.result.get('config', {}).get(to_get, if_not_available)
[docs] def get_samples(self): return self.result.get('weighted_samples').T[:2]
@property def detector(self): return str(self.get_from_config('detector')) @property def nlive(self): return int(self.get_from_config('nlive', -1)) @property def sigma(self): sigma = float(self.get_from_config('sigma', 0)) if sigma == 0: sigma = float(self.get_from_config('log_cross_section', 0)) return sigma @property def mass(self): log_mass = self.get_from_config('log_mass', None) return -1 if log_mass is None else round(np.power(10, log_mass), 3) @property def halo_model(self): return str(self.get_from_config('halo_model')) @property def notes(self): return str(self.get_from_config('notes')) @property def n_parameters(self): param = self.get_from_config('fit_parameters') return None if param is None else len(param)
[docs]@export class SeabornPlot: def __init__(self, result: DDDMResult): self.result = result self.log = dddm.utils.get_logger(self.__class__.__name__) def __repr__(self): return f'{self.__class__.__name__}:: {self.result.__repr__()}'
[docs] def plot_samples(self, **kwargs) -> None: kwargs.setdefault('s', 1) kwargs.setdefault('facecolor', 'gray') kwargs.setdefault('alpha', 0.2) self.log.debug(f'setting kwargs to {kwargs}') plt.scatter(*self.samples, **kwargs)
[docs] def plot_bench(self, c='cyan', **kwargs): plt.scatter(self.result.get_from_config('log_mass'), self.result.sigma, s=10 ** 2, edgecolors='black', c=c, marker='X', zorder=1000, **kwargs)
@property def samples(self) -> np.ndarray: return self.result.get_samples()
[docs] def best_fit(self) -> tuple: best = np.mean(self.samples, axis=1) std = np.std(self.samples, axis=1) return best, std
[docs] def plot_best_fit(self, **kwargs) -> None: kwargs.setdefault('capsize', 5) kwargs.setdefault('marker', 'o') kwargs.setdefault('linewidth', 2) kwargs.setdefault('zorder', 300) (x, y), (x_err, y_err) = self.best_fit() plt.errorbar(x, y, xerr=x_err, yerr=y_err, **kwargs)
[docs] def samples_to_df(self) -> pd.DataFrame: return pd.DataFrame( {'mass': self.samples[0], 'cross_section': self.samples[1]} )
[docs] def plot_sigma_contours(self, nsigma=2, **kwargs): kwargs.setdefault('bw_adjust', 0.25) levels = sorted((1 - np.array([0.6827, 0.9545, 0.9973][:nsigma]))) kwargs.setdefault('levels', levels) df = self.samples_to_df() sns.kdeplot(data=df, x='mass', y='cross_section', **kwargs)
[docs] def plot_kde(self, **kwargs): kwargs.setdefault('levels', 100) kwargs.setdefault('fill', True) self.plot_sigma_contours(**kwargs)
[docs]@export class ResultsManager: result_cache: list = None result_df: pd.DataFrame = None def __init__(self, pattern=None, sampler='multinest'): self.sampler = sampler self.log = dddm.utils.get_logger(self.__class__.__name__) if pattern is not None: self.register_pattern(pattern) def __repr__(self): return f'info for {len(self.result_cache)}' def _add_result(self, path: str, tolerant=False): if self.result_cache is None: self.result_cache = [] try: result = DDDMResult(path, sampler=self.sampler) except KeyboardInterrupt as interrupt: raise interrupt except Exception as e: if not tolerant: raise e self.log.debug(e) self.log.warning(f'loading {path} lead to {e}, {traceback.format_exc()}') return self.result_cache.append(result)
[docs] def add_result(self, path: str): self._add_result(path) self.build_df()
[docs] def register_pattern(self, pattern, show_tqdm=True): matches = glob(pattern) if len(matches) == 0: raise ValueError(f'No matches for {pattern}') self.log.info(f'Opening {len(matches)} matches') for path in tqdm(matches, disable=not show_tqdm): self.log.debug(f'open {path}') self._add_result(path, tolerant=True) self.log.info('Opening done, build df') self.build_df()
[docs] def apply_mask(self, mask): assert len(mask) == len(self.result_cache) self.log.debug(f'Removing {np.sum(mask)}/' f'{len(self.result_cache)}') new_list = [self.result_cache[m_i] for m_i, m in enumerate(mask) if m] del self.result_cache self.result_cache = new_list del new_list assert len(self.result_cache) == np.sum(mask) self.build_df()
[docs] def build_df(self): dfs = [r.summary() for r in self.result_cache] if not len(dfs): raise ValueError('No files!') self.result_df = pd.concat(dfs)
@property def df(self): """Lazy alias""" return self.result_df
def _pow10(x): return 10 ** x def str_fmt(x): """Guess the disired precision of floats based on the values provided""" if isinstance(x, (list, tuple, np.ndarray)): return [str_fmt(x) for x in x] if x <= 0.1: return f'${x:.2f}$' return f'${x:.1f}$' if x <= 1 else f'${int(x)}$' def set_xticks_top(show_lines=False, rotation=0, x_ticks=(0.001, 0.01, 0.1, 0.5, 1, 5, 10, 100, 1000, 10_000), x_label=r"$M_{\chi}$ $[\mathrm{GeV}/\mathrm{c}^{2}]$"): ax = plt.gca() bin_range = ax.get_xlim() secax = ax.secondary_xaxis('top', functions=(_pow10, np.log10)) x_ticks = [t for t in x_ticks if t > 10 ** bin_range[0] and t < 10 ** bin_range[1]] if show_lines: for x_tick in x_ticks: ax.axvline(np.log10(x_tick), alpha=0.1) secax.set_ticks(x_ticks, labels=str_fmt(x_ticks)) secax.xaxis.set_tick_params(rotation=rotation) secax.set_xlabel(x_label) return secax def set_xticks_bottom( show_lines=False, rotation=0, x_ticks=("0.001", "0.01", "0.1", "0.5", "1", "5", "10", "100", "1000", "10000"), x_label=r"$\log_{10}(M_{\chi}/[\mathrm{GeV}/\mathrm{c}^{2}]$)", top=True, ): ax = plt.gca() bin_range = ax.get_xlim() if top: secax = ax.secondary_xaxis('top') secax.set_xlabel(x_label) secax.xaxis.labelpad = 12 x_ticks_pos = [float(t) for t in x_ticks] x_ticks_pos = [t for t in x_ticks_pos if t > 10 ** bin_range[0] and t < 10 ** bin_range[1]] x_ticks = [f'${t}$' for t in x_ticks if float(t) in x_ticks_pos] if show_lines: for x_tick in x_ticks: ax.axvline(np.log10(x_tick), alpha=0.1) labels = x_ticks if all(isinstance(s, str) for s in x_ticks) else str_fmt(x_ticks) ax.set_xticks(np.log10(x_ticks_pos), labels=labels) ax.xaxis.set_tick_params(rotation=rotation) if top: return secax def x_label(): plt.xlabel(r"$\log_{10}(M_{\chi}$/$[\mathrm{GeV}/\mathrm{c}^{2}]$)") def x_label_lin(): plt.xlabel(r"$M_{\chi}$ $[\mathrm{GeV}/\mathrm{c}^{2}]$") def y_label(): plt.ylabel(r"$\log_{10}(\sigma_{S.I.}$/$[\mathrm{cm}^{2}]$)")