Source code for sysvar.templates

from __future__ import annotations

from functools import cached_property

from abc import ABC, abstractmethod

import numpy as np
from pandas import DataFrame, concat
import matplotlib.pyplot as plt

from sysvar.corrections import *
from sysvar.variations import Variator
from sysvar.visualize import TemplateVisualizer
from sysvar.utils import SavableAttributesObject


[docs] class NotADictError(Exception): pass
[docs] class NotCorrectVariableError(Exception): pass
[docs] class NotIncreasingBinning(Exception): pass
[docs] class Template(ABC, SavableAttributesObject): def __init__( self, df: DataFrame, binning: dict, total_weight: str, syst_weight: None | str = None, prefices: None | str | list = None, correction: None | BaseCorrection = None, variator: None | Variator = None, verbose: bool = True, ): super().__init__() if self._is_correct_binning(df.columns, binning): self.binning = binning if self._is_existing_variable(total_weight, df.columns): self.total_weight = total_weight # TODO have a method that build the column name e.g. from prefix and syst_weight and # add a check of is_existing_variable self._syst_weight = syst_weight self._prefices = prefices # Here the deep copy ensures that we're not affecting the original dataframe self._correction = correction self._variator = variator self._Nvar = variator.Nvar if variator is not None else variator self.verbose = verbose # Make a deep copy only of the columns that are needed # But for large dataframes we don't need to copy all these GB. only a handful of columns are necessary. self.df = df self.df = self.df[self._collect_columns_names()].copy(deep=True) @property def syst_weight(self) -> str: return self._syst_weight @syst_weight.setter def syst_weight(self, value: str): if not isinstance(value, str): raise ValueError("syst_weight must be a string") self._syst_weight = value @property def prefices(self) -> str | list: return self._prefices @prefices.setter def prefices(self, value: str | list): if not isinstance(value, (str, list)): raise ValueError("prefices must be a string or a list") self._prefices = value @property def correction(self) -> BaseCorrection: return self._correction @correction.setter def correction(self, value: BaseCorrection): if not isinstance(value, BaseCorrection): raise ValueError("correction must be a BaseCorrection object") self._correction = value @property def variator(self) -> Variator: return self._variator @variator.setter def variator(self, value: Variator): if not isinstance(value, Variator): raise ValueError("variator must be a Variator object") self._variator = value @property def Nvar(self) -> int: return self._Nvar @Nvar.setter def Nvar(self, value: int): if not isinstance(value, int): raise ValueError("Nvar must be an integer") self._Nvar = value
[docs] def drop_unecessary_columns(self): self.df = self.df[self._collect_columns_names()]
@property def cov_matrix(self) -> np.ndarray: return np.cov(self._get_absolute_variations()) @property def corr_matrix(self) -> np.ndarray: return np.corrcoef(self._get_absolute_variations()) @cached_property def eigen_decomposition(self) -> tuple: return np.linalg.eig(self.cov_matrix) @property def eigen_values(self) -> np.ndarray: return self.eigen_decomposition[0] @property def eigen_vectors(self) -> np.ndarray: return self.eigen_decomposition[1] @property def eigen_variations(self) -> np.ndarray: return self.eigen_vectors * np.sqrt(self.eigen_values) @property def Nbins(self) -> int: return np.prod([len(bins) - 1 for bins in self.binning.values()]) def _is_correct_binning(self, columns: list, binning: dict) -> bool: if self._is_a_dict(binning): pass # Check if the variables exist in the dataframe for var_name in binning.keys(): if self._is_existing_variable(var_name, columns): continue for var_name, bins in binning.items(): if self._is_increasing_binning(var_name, bins): continue return True @staticmethod def _is_a_dict(binning: dict) -> bool: # Check of the binning is a dictionary if not isinstance(binning, dict): raise NotADictError( "The binning argument should be a dictionary with the names of the variables and their corresponding binning e.g. {var1: [0, 1, 2], var2: [0.1, 0.2, 0.3]}" ) else: return True @staticmethod def _is_existing_variable(var_name: str, columns: list) -> bool: if var_name not in columns: raise NotCorrectVariableError( f"{var_name} does not exist in the dataframe columns" ) return True @staticmethod def _is_increasing_binning(var_name, bins) -> bool: # Ensure that bins is a np array bins_array = np.array(bins) diff = np.diff(bins_array) if not np.all(diff > 0): raise NotIncreasingBinning( f"The binning for variable {var_name} is not strictly increasing" ) return True
[docs] @abstractmethod def make_hist(self, index: None | int = None) -> np.ndarray: pass
def _collect_columns_names(self): """ Collects and returns a list of column names based on the configuration of the object. The method generates column names by combining a fixed set of base columns (such as "channel" and "fit_ctgy") with dynamic elements like binning keys, system weights, and variables specific to the type of correction applied to the data. The correction could be of type `Correction1D`, `CorrectionBF`, or `CorrectionPID`, and each type defines a different set of variables. Column names are prefixed with one or more `prefices` defined for the object. Returns: list: A list of strings representing the collected column names. Raises: TypeError: If the type of `self.correction` is not recognized. Notes: - If `self.prefices` is not a list, it will be converted into one. - For each prefix in `self.prefices`, a column name is generated for each variable in the correction, using the method `_build_column_name`. - The method constructs column names by appending variables to a set of base columns such as channel, fit_ctgy, and binning keys. """ # Initialize the base columns. # TODO the channel and fit_ctgy should not be hardcoded here columns = ["channel", "template", self.total_weight, *list(self.binning.keys())] # Collect the prefices prefices = ( [x for x in self.prefices] if isinstance(self.prefices, list) else [self.prefices] ) # Collect the important variables based on the type of correction if isinstance(self.correction, Correction1D): variables = [ self.syst_weight, self.correction.dependant_variable, *list(self.correction.info["extra_cuts"].keys()), ] elif isinstance(self.correction, CorrectionBF): variables = [self.syst_weight, self.correction.dependant_variable] elif isinstance(self.correction, Correction2D): variables = [ self.syst_weight, self.correction.dependant_variable_1, self.correction.dependant_variable_2, *list(self.correction.info["extra_cuts"].keys()), ] elif isinstance(self.correction, CorrectionPID): variables = [ self.syst_weight, self.correction.p, self.correction.theta, self.correction.PDG, self.correction.mcPDG, ] elif isinstance(self.correction, CustomCorrection): variables = [ self.syst_weight, self.correction.dependant_variable, ] elif self.correction is None: # Return all the columns if no correction can be found return list(set(self.df.columns)) else: raise TypeError( f"Type {type(self.correction)} not recognized. Please use Correction1D, CorrectionBF, Correction2D or CorrectionPID, CustomCorrection" ) if len(prefices) > 0: # Construct the column names for prefix in prefices: for var in variables: columns.append(self.correction._build_column_name(prefix, var)) else: for var in variables: columns.append( self.correction._build_column_name(prefix=None, variable=var) ) # Remove duplicate columns before we return it return list(set(columns))
[docs] def add_variations(self): if isinstance(self.prefices, str) or self.prefices is None: # If we are dealing with a single column, just build the name and add the variations weightname = self.correction._build_column_name( self.prefices, self.syst_weight ) self._initialize_variations(weightname) self._add_variations_to_df(weightname, self.prefices) elif isinstance(self.prefices, list): # For multuple columns we need to loop over all the prefices. # Assumes that all the weights have the same name if len(self.prefices) > 0: for prefix in self.prefices: # Now build the weightname weightname = self.correction._build_column_name( prefix, self.syst_weight ) # And initialize and add all the variations self._initialize_variations(weightname) self._add_variations_to_df(weightname, prefix) else: weightname = self.correction._build_column_name( prefix=None, variable=self.syst_weight ) # And initialize and add all the variations self._initialize_variations(weightname) self._add_variations_to_df(weightname) else: raise ValueError( f"The prefices must be str or a list of str. You passed {type(self.prefices)}" )
def _initialize_variations(self, weightname: str): # Initialize the nominal up and down variations self.df[f"{weightname}_up"] = 1.0 self.df[f"{weightname}_down"] = 1.0 # Initialize the variations # Create a new dataframe ana concatenate it to avoid PerformanceWarning variation_columns = [f"{weightname}_var_{i}" for i in range(self.Nvar)] variations = DataFrame( data=np.ones((len(self.df), len(variation_columns))), columns=variation_columns, ) # The index needs to be reseted, otherwise pandas will create extra rows # Here we overwrite the Templates dataframe # This is okay since this is only a copy of the original dataframe passed by the user. self.df = concat([self.df.reset_index(drop=True), variations], axis=1) def _add_variations_to_df(self, weightname: str, prefix: None | str = None): # Build the queries based on the prefix queries = self.correction.build_queries(prefix) # Create a list to store the mask results for each query masks = [self.df.query(q).index for q in queries] # Update DataFrame using precomputed masks for i, (mask, te) in enumerate(zip(masks, self.correction.total_error)): # Perform vectorized updates with the precomputed mask self.df.loc[mask, f"{weightname}_up"] = self.df.loc[mask, weightname] + te self.df.loc[mask, f"{weightname}_down"] = self.df.loc[mask, weightname] - te # Assign variations in a vectorized way using column names variation_columns = [f"{weightname}_var_{j}" for j in range(self.Nvar)] self.df.loc[mask, variation_columns] = self.variator.variations[:, i] def _combine_variations(self): # Commenting out, I think I don't need this at all # This could only be useful for the FF stuff but there we don't # calculate eigenvariations ourselves # self.df.loc[:, "combination_up"] = self.df[ # [f"{x}_up" for x in self.syst_weight] # ].prod(axis=1) # self.df.loc[:, "combination_down"] = self.df[ # [f"{x}_down" for x in self.syst_weight] # ].prod(axis=1) for j in range(self.Nvar): # Collect columns to be multiplied. These are all all the corrected particles columns_to_multiply = [ f"{x}_{y}_var_{j}" for x, y in self.syst_weight.items() ] # Multiply the selected columns. Essentially multiplying the corrections to # to get only one back product_column = self.df[columns_to_multiply].prod(axis=1) # Concatenate the product column to the DataFrame self.df = concat( [self.df, product_column.rename(f"combination_var_{j}")], axis=1 ) def _get_absolute_variations(self): absolute_variations = np.empty((self.Nbins, self.Nvar)) for i in range(self.Nvar): absolute_variations[:, i] = self.make_hist(i)[0].flatten() return absolute_variations
[docs] def collect_weights(self, index): """Collects weights based on the provided index, handling different variations. Args: index: Specifies the type of weight to collect. Can be `None`, a string such as "MC", "up", "down", or variations like "up0", "up1", ..., "up8", "down0", "down1", ..., "down8", or an integer. Returns: numpy.ndarray: An array of weights based on the specified index. Raises: NotImplementedError: If the provided index is not supported. Notes: - If `index` is `None`, returns the nominal total weight. - If `index` is "MC", returns the square of the total weight. - For "up" and "down" variations, computes the weights with specific adjustments. - Handles both string and dictionary types for `self.syst_weight`. Example: >>> self.collect_weights(None) array([...]) >>> self.collect_weights("MC") array([...]) >>> self.collect_weights("up1") array([...]) """ if index is None: # Take the nominal total weight weights = np.array(self.df[self.total_weight]) elif isinstance(index, str): if index == "MC": weights = np.square(self.df[self.total_weight]) # FIXME This needs to be safely generalized # This is aligned for Felix's tuples now elif ( index in ["up", "down"] or index in [f"up{x}" for x in range(self.Nvar)] or index in [f"down{x}" for x in range(self.Nvar)] ): # PATCH # Now I'm replacing 0s with 1s to avoid NANs in the histogram if isinstance(self.prefices, str): # PATCH weightname = "_".join([self.prefices, self.syst_weight]) weights = ( self.df[self.total_weight] / self.df[weightname].replace(0, 1) ) * (self.df["_".join((weightname, index))]) else: weightnames = [ "_".join([prefix, self.syst_weight]) for prefix in self.prefices ] weights = np.array( self.df[self.total_weight] / self.df[weightnames].replace(0, 1).prod(axis=1) * self.df[ [ "_".join((weightname, index)) for weightname in weightnames ] ].prod(axis=1) ) else: raise NotImplementedError("only MC, up and down variations implemented") else: # Divide with the nominal systematic weight and multiply with the varied one if isinstance(self.prefices, str): weightname = "_".join([self.prefices, self.syst_weight]) weights = np.array( (self.df[self.total_weight] / self.df[weightname].replace(0, 1)) * self.df[f"{weightname}_var_{index}"] ) elif isinstance(self.prefices, list) and len(self.prefices) > 0: # If we have multiple prefices we need to to create all the column names first weightnames = [ "_".join([prefix, self.syst_weight]) for prefix in self.prefices ] # We divide with the product of all the nominal weights # and multiply with the product of all of those which we added the suffix to weights = np.array( self.df[self.total_weight] / self.df[weightnames].replace(0, 1).prod(axis=1) * self.df[[f"{w}_var_{index}" for w in weightnames]].prod(axis=1) ) elif self.prefices is None or len(self.prefices) == 0: weightname = self.syst_weight weights = np.array( (self.df[self.total_weight] / self.df[weightname].replace(0, 1)) * self.df[f"{weightname}_var_{index}"] ) else: raise NotImplementedError( "Only one prefix at a time currently! Revisit this for HID and pi0s" ) return weights
[docs] def plot_systematic_overview(self, save: bool = False, filename: str = ""): self.visualizer = TemplateVisualizer(self) self.visualizer.plot_systematic_overview(save=save, filename=filename)
[docs] def plot_relative_variations_in_grid( self, title: str = "", save: bool = False, filename: str = "" ) -> tuple[plt.Figure, plt.Axes]: self.visualizer = TemplateVisualizer(self) fig, ax = self.visualizer.plot_relative_variations_in_grid( title=title, save=save, filename=filename ) return fig, ax
[docs] def plot_up_and_down_variations( self, title: str = "", save: bool = False, filename: str = "" ) -> tuple[plt.Figure, plt.Axes]: self.visualizer = TemplateVisualizer(self) fig, ax = self.visualizer.plot_up_and_down_variations( title=title, save=save, filename=filename ) return fig, ax
[docs] class Template1D(Template): def __init__( self, df: DataFrame, binning: dict, total_weight: str, syst_weight: str = None, prefices: str | list = None, correction: None | BaseCorrection = None, variator: None | Variator = None, verbose: bool = True, ): super().__init__( df, binning, total_weight, syst_weight, prefices, correction, variator, verbose, ) self.nom_hist = self.make_hist()
[docs] def make_hist(self, index: None | int | str = None) -> np.ndarray: weights = self.collect_weights(index) hist = np.histogram( np.array(self.df[list(self.binning.keys())[0]]), bins=list(*self.binning.values()), weights=weights, ) return hist[0].flatten(), hist[1].flatten()
[docs] class TemplateND(Template): def __init__( self, df: DataFrame, binning: dict, total_weight: str, syst_weight: None | str = None, prefices: str | list = None, correction: None | BaseCorrection = None, variator: None | Variator = None, verbose: bool = True, ): super().__init__( df, binning, total_weight, syst_weight, prefices, correction, variator, verbose, ) self.nom_hist = self.make_hist()
[docs] def make_hist(self, index: None | int | str = None) -> np.ndarray: weights = self.collect_weights(index) hist = np.histogramdd( np.array(self.df[[*self.binning.keys()]]), bins=[bins for bins in self.binning.values()], weights=weights, ) return (hist[0].flatten(), np.linspace(0, 1, hist[0].flatten().shape[0] + 1))