Source code for sysvar.eigendecomposer

from __future__ import annotations

from os import path, makedirs
import itertools
from functools import cached_property
from typing import List

from tqdm import tqdm

import numpy as np
from pandas import DataFrame

import uproot

from sysvar.corrections import create_correction_object
from sysvar.variations import Variator
from sysvar.templates import Template1D, TemplateND
from sysvar.visualize import EigenDecomposerVisualizer
from sysvar.channel_template_handler import ChannelTemplateHandler
from sysvar.utils import read_yaml


import logging

logging.basicConfig(
    format="%(levelname)s : %(funcName)s: %(lineno)d :  %(message)s",
    level=logging.INFO,
)


[docs] class EigenDecomposer(ChannelTemplateHandler): def __init__( self, df: DataFrame, settings: dict, syst_effect: str | dict | None, verbose: bool = True, seed: int = 8311311, ): super().__init__(df, settings, verbose) if isinstance(syst_effect, dict): self._syst_effect = syst_effect["name"] elif isinstance(syst_effect, str): self._syst_effect = syst_effect elif syst_effect is None: ValueError("syst_effect must be a string or a dict but you pass None") else: ValueError( f"syst_effect must be a string or a dict but you pass {type(syst_effect)}" ) self.correction = create_correction_object(syst_effect, settings["MC_prod"]) self.seed = seed self.variator = Variator(self.correction, Nvar=settings["Nvar"], seed=seed) self.N_important_dims = 0 self.max_variations = None self.important_dims_indices = None @property def syst_weight(self) -> str: return self.settings["systematics"][self.syst_effect]["weight"] @property def prefices(self) -> str: return self.settings["systematics"][self.syst_effect]["prefices"] @property def precision(self) -> float: return self._precision @precision.setter def precision(self, precision): self._precision = precision @property def max_variations(self) -> int | None: return self._max_variations @max_variations.setter def max_variations(self, max_variations: int | None): self._max_variations = max_variations @cached_property def combined_variations(self) -> np.ndarray: return np.vstack( [x._get_absolute_variations() for x in self.templates.values()] ) @property def cov(self) -> np.ndarray: return np.cov(self.combined_variations) # FIXME add a setter @property def corr(self) -> np.ndarray: return np.corrcoef(self.combined_variations) @cached_property def eigen_decomposition(self) -> tuple: return np.linalg.eig(self.cov) @property def eigen_values(self) -> np.ndarray: return self.eigen_decomposition[0] @property def eigen_vectors(self) -> np.ndarray: return self.eigen_decomposition[1] @property def eigen_variations(self) -> np.ndarray: return np.real(self.eigen_vectors * np.sqrt(self.eigen_values)) @cached_property def max_differences(self) -> list: total_N = len(self.eigen_vectors) max_n = min(total_N, 100) # only need the first 100 n_start = 0 # Starting point for considering eigendirections when subtracting from the original covariance matrix running_cov = np.zeros_like(self.cov) # If the first eigenvalue is zero then the eigendirection is also zero # Completely skip this eigendirection but we still have to understand why this happens if self.eigen_values[0] == 0: max_n = max_n - 1 # Consider one less eigendirection in total n_start = 1 # skip the eigendirection that corresponds to 0 eigenvalue max_diffs = np.empty(max_n, dtype=float) logging.warning( "Only the first %d eigendirections will be considered to find " "the maximum number of eigenvariations.", max_n, ) for i in tqdm(range(n_start, max_n), desc="Building partial covariances"): # grab the i-th eigen-variation (shape: [n_features,]) vec = self.eigen_variations[:, i] # update the running sum running_cov += self.var2cov(vec) # compute the max difference diff = np.abs(np.real(self.cov - running_cov)).max() max_diffs[i] = diff return max_diffs
[docs] @staticmethod def var2cov(mat) -> np.ndarray: return np.outer(mat, mat)
[docs] def vary_templates(self): previous_reco_mode = None for (reco_mode, template_name), template in zip( self.iterator, self.templates.values() ): if reco_mode != previous_reco_mode: if self.verbose: logging.info( "########## Reco channel: %s ##########", str(reco_mode[1]) ) # Update the systematic info of the templates template.syst_weight = self.syst_weight template.prefices = self.prefices template.correction = self.correction template.variator = self.variator template.Nvar = self.variator.Nvar # Keep only the necessary columns to avoid memory issues template.drop_unecessary_columns() if self.verbose: logging.info("Adding variations to %s template", str(template_name)) template.add_variations() previous_reco_mode = reco_mode
[docs] def find_important_eigendimension_indices(self, method: str = "max_differences"): if method == "max_differences": important_dims = self._calculate_max_differences() elif method == "trace": important_dims = self._calculate_trace() else: raise NotImplementedError( f"Available methods are: max_differences and trace" ) # Increase number by one to get the last principal component self.N_important_dims = np.sum(important_dims) + 1 # Limit the number of important dimensions if max_variations is set if self.max_variations is not None: self.N_important_dims = min(self.N_important_dims, self.max_variations) # Set the index of the last principal component to True important_dims[self.N_important_dims] = True # Collect the indices of the important dimensions including the last one self.important_dims_indices = important_dims logging.info( f"Found that %s eigendirections matter for %s per cent precision", self.N_important_dims, self.precision, ) if self.max_variations is not None and self.N_important_dims == self.max_variations: logging.info( f"Keeping only the first %s eigendirections", self.max_variations, )
def _calculate_max_differences(self): """ Calculate the important dimensions based on maximum differences. This method identifies the important dimensions by comparing the maximum differences (scaled by the maximum value of the covariance matrix) to a specified precision threshold. Returns: list: A list of boolean values indicating the important dimensions. The length of the list corresponds to the number of maximum differences, with `True` indicating an important dimension and `False` indicating otherwise. Attributes: max_differences (array-like): The maximum differences for each dimension. cov (array-like): The covariance matrix. precision (float): The precision threshold for determining the important dimensions. Example: >>> self.max_differences = [0.2, 0.5, 0.8] >>> self.cov = np.array([[1, 0.1], [0.1, 1]]) >>> self.precision = 0.1 >>> self._calculate_max_differences() [True, True, True] """ important_dims = ( np.asarray(self.max_differences) / self.cov.max() > self.precision ) return important_dims def _calculate_trace(self): """ Calculate the trace and determine the important dimensions based on eigenvalues. This method computes the total trace by summing the square roots of the eigenvalues. It then iteratively computes the truncated trace by adding the square root of each eigenvalue and normalizes it by the total trace. When the normalized trace exceeds the specified precision, it identifies the number of important dimensions. Returns: list: A list of boolean values indicating the important dimensions. The length of the list corresponds to the number of eigenvalues, with `True` indicating an important dimension and `False` indicating otherwise. Attributes: eigen_values (array-like): The eigenvalues of the matrix. precision (float): The precision threshold for determining the important dimensions. Example: >>> self.eigen_values = [4, 1, 0.5] >>> self.precision = 0.95 >>> self._calculate_trace() [True, True] """ total_trace = np.sum(np.sqrt(self.eigen_values)) truncated_trace = 0 for i in range(1, len(self.eigen_values) + 1): truncated_trace += np.sqrt(self.eigen_values[i - 1]) normalized_trace = truncated_trace / total_trace if normalized_trace / total_trace > self.precision: important_dims = [True] * i break return important_dims def _get_unrolled_variations(self): return self.eigen_variations[:, self.important_dims_indices].reshape( len(self.decay_modes), len(self.template_names), self.Nbins, self.N_important_dims, )
[docs] def save_template_variations(self, filepath=None): # Override the global filepath in case we want to run on a batch with b2luigi filepath = self.settings["output_filepath"] if filepath is None else filepath with uproot.update(filepath) as newfile: logging.info( "Updating file with uproot: %s", self.settings["output_filepath"] ) previous_tree = None index = 0 for ((tree_i, tree), (ctgy_i, ctgy)), t in zip( self.enumerated_iterator, self.templates.values() ): if tree != previous_tree: logging.info(50 * "#") logging.info("########## Reco channel: %s ##########", str(tree[1])) logging.info(50 * "#") nominal = t.make_hist() for n_var in range(self.N_important_dims): var = self.eigen_variations[index : index + t.Nbins, n_var] branch_name = self._get_TBranch_name( tree[1], ctgy, f"{self.syst_effect}_var{n_var+1}_up" ) logging.info( "Saving Up variation of MC template %s in TBranch: %s", str(ctgy), branch_name, ) newfile[branch_name] = nominal[0] + var, nominal[1] branch_name = self._get_TBranch_name( tree[1], ctgy, f"{self.syst_effect}_var{n_var+1}_down" ) logging.info( "Saving Down variation of %s in TBranch: %s", str(ctgy), branch_name, ) newfile[branch_name] = nominal[0] - var, nominal[1] index += t.Nbins previous_tree = tree
[docs] def save_toys(self, filepath=None): # Override the global filepath in case we want to run on a batch with b2luigi filepath = ( self.settings["output_toy_filepath"] if filepath is None else filepath ) with uproot.update(filepath) as newfile: logging.info( "Updating file with uproot: %s", self.settings["output_toy_filepath"] ) previous_tree = None index = 0 for ((tree_i, tree), (ctgy_i, ctgy)), t in zip( self.enumerated_iterator, self.templates.values() ): if tree != previous_tree: logging.info(50 * "#") logging.info("########## Reco channel: %s ##########", str(tree[1])) logging.info(50 * "#") for n_var in range(self.settings["Nvar"]): toy = t.make_hist(n_var) branch_name = self._get_TBranch_name( tree[1], ctgy, f"{self.syst_effect}_toy{n_var+1}" ) logging.info( "Saving toy %d of MC template %s in TBranch: %s", n_var, str(ctgy), branch_name, ) newfile[branch_name] = toy[0], toy[1] previous_tree = tree
[docs] def save_one_sigma_variations(self, filepath=None): # Override the global filepath in case we want to run on a batch with b2luigi filepath = ( self.settings["output_extreme_filepath"] if filepath is None else filepath ) with uproot.update(filepath) as newfile: logging.info( "Updating file with uproot: %s", self.settings["output_extreme_filepath"], ) previous_tree = None index = 0 for ((tree_i, tree), (ctgy_i, ctgy)), t in zip( self.enumerated_iterator, self.templates.values() ): if tree != previous_tree: logging.info(50 * "#") logging.info("########## Reco channel: %s ##########", str(tree[1])) logging.info(50 * "#") for var in ["up", "down"]: variation = t.make_hist(var) branch_name = self._get_TBranch_name( tree[1], ctgy, f"{self.syst_effect}_{var}" ) logging.info( "Saving one sigma variation %s of MC template %s in TBranch: %s", var, str(ctgy), branch_name, ) newfile[branch_name] = variation[0], variation[1] filedir = f"{tree[1]}/Data/{self.syst_effect}_{var}" logging.info( "Adding empty observed Data for region: %s in %s", tree[1], filedir, ) # Save empty data now as we work only on Asimov newfile[filedir] = np.array([0, 0, 0]), np.array([0, 1, 2, 3]) previous_tree = tree
[docs] def save_channel_covariance_matrices(self, filepath=None): # PATCH so this is the general output file of the analysis filepath = self.settings["output_filepath"] if filepath is None else filepath # extract its topdir base_path = path.dirname(filepath) # Loop over all the templates for j, tmp_template in enumerate(self.templates.values()): # If this is the first templates initialize an emtpy cov matrix if j == 0: cov = np.zeros((tmp_template.Nbins, tmp_template.Nbins)) # Now add the template cov matrix cov += tmp_template.cov_matrix # Once we have looped over all the templates then save the cov matrix for this channel cov_path_dir = path.join(base_path, f"cov_matrices") # check if the covariance directory exists if not path.exists(cov_path_dir): # if not create it makedirs(cov_path_dir) outpath = path.join(cov_path_dir, f"{dm[1]}_{self.syst_effect}_cov.npy") logging.info("Save covariance matrix at %s ##########", str(outpath)) np.save(outpath, cov)
[docs] def plot_cov_diff(self, save: bool = False, filename: str = ""): self.visualizer = EigenDecomposerVisualizer(self) fig, ax = self.visualizer.plot_cov_diff(save=save, filename=filename) return fig, ax
[docs] def plot_corr_matrix(self, save: bool = False, filename: str = ""): self.visualizer = EigenDecomposerVisualizer(self) fig, ax = self.visualizer.plot_corr_matrix(save=save, filename=filename) return fig, ax
[docs] class ExistingEigenVariationsSaver(ChannelTemplateHandler): def __init__(self, df: DataFrame, settings: dict, verbose: bool = True): super().__init__(df, settings, verbose) @property def weight(self) -> str: return self.settings["systematics"][self.syst_effect]["weight"] @property def prefices(self) -> str | list | None: return self.settings["systematics"][self.syst_effect]["prefices"] @property def Nvar(self) -> int: return self.settings["systematics"][self.syst_effect]["Nvar"]
[docs] def save_existing_eigenvariations(self, filepath=None, verbose: bool = True): # Override the global filepath in case we want to run on a batch with b2luigi filepath = self.settings["output_filepath"] if filepath is None else filepath with uproot.update(filepath) as newfile: logging.info( "Updating file with uproot: %s", self.settings["output_filepath"] ) previous_tree = None for ((tree_i, tree), (ctgy_i, ctgy)), t in zip( self.enumerated_iterator, self.templates.values() ): if tree != previous_tree: logging.info(50 * "#") logging.info("########## Reco channel: %s ##########", str(tree[1])) logging.info(50 * "#") for n in range(self.Nvar): branch_name = self._get_TBranch_name( tree[1], ctgy, f"{self.syst_effect}_var{n+1}_up" ) if verbose: logging.info( "Saving Up variation of MC template %s in TBranch: %s", str(ctgy), branch_name, ) t.syst_weight = self.weight t.prefices = self.prefices t.Nvar = self.Nvar varied_histo = t.make_hist(f"up{n}") newfile[branch_name] = varied_histo[0], varied_histo[1] branch_name = self._get_TBranch_name( tree[1], ctgy, f"{self.syst_effect}_var{n+1}_down" ) if verbose: logging.info( "Saving Down variation of %s in TBranch: %s", str(ctgy), branch_name, ) varied_histo = t.make_hist(f"down{n}") newfile[branch_name] = varied_histo[0], varied_histo[1] previous_tree = tree