Source code for acm.hod.box

import os
from pathlib import Path
import yaml
import numpy as np
from abacusnbody.hod import abacus_hod
from cosmoprimo.fiducial import DESI, AbacusSummit
import mockfactory
from astropy.io import fits
from astropy.table import Table
import logging
import warnings
import sys
warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning)


from acm.utils.paths import get_Abacus_dirs
LRG_Abacus_DM = get_Abacus_dirs(tracer='LRG', simtype='box')

[docs] class BoxHOD: """ BoxHOD is a wrapper around AbacusHOD, a class for handling Halo Occupation Distribution (HOD) modeling using the AbacusSummit simulations. Note ---- Only the 'LRG' tracers are currently supported. Note that BGS is also supported, by using `tracer='LRG'` and BGS characteristics (mean density, redshift, etc.) """ logger = logging.getLogger('AbacusHOD') # Set up logger for the class as a class attribute def __init__( self, varied_params, config_file: str = None, cosmo_idx: int = 0, phase_idx: int = 0, sim_type: str = 'base', redshift: float = 0.5, DM_DICT: dict = LRG_Abacus_DM): """ Initialize the BoxHOD class. Parameters ---------- varied_params : dict Dictionary of parameters that vary. config_file : str, optional Path to the configuration file. If None, defaults to 'box.yaml' in `acm.hod`. See `setup()` for more details. Default is None. cosmo_idx : int, optional Index of the cosmology to use. Default is 0. phase_idx : int, optional Index of the phase to use. Default is 0. sim_type : str, optional Type of simulation. Must be either 'base' or 'small'. Default is 'base'. redshift : float, optional Redshift value. Default is 0.5. DM_DICT : dict, optional Dictionary containing dark matter information. Default is the LRG_Abacus_DM dictionary for boxes from `acm.data.paths`. Raises ------ ValueError If `sim_type` is not 'base' or 'small'. """ self.logger = logging.getLogger('BoxHOD') self.cosmo_idx = cosmo_idx self.phase_idx = phase_idx if sim_type not in ['base', 'small', 'png']: raise ValueError('Invalid sim_type. Must be either "base", "small", or "png".') self.sim_type = sim_type self.boxsize = 2000 if sim_type in ['base', 'png'] else 500 self.redshift = redshift if config_file is None: config_dir = os.path.dirname(os.path.abspath(__file__)) config_file = Path(config_dir) / 'box.yaml' config = yaml.safe_load(open(config_file)) self.setup(config, DM_DICT) self.check_params(varied_params)
[docs] def setup(self, config: dict, DM_DICT: dict): # Will override most of the config file ! """ Set up the simulation parameters and initialize the AbacusHOD object. This method overrides most of the configuration file settings with the provided `config` and `DM_DICT` parameters. Parameters ---------- config : dict Configuration dictionary containing simulation parameters and HOD parameters. DM_DICT : dict Dictionary containing dark matter simulation directories. """ sim_params = config['sim_params'] sim_dir, subsample_dir = self.abacus_simdirs(DM_DICT) sim_params['sim_dir'] = sim_dir sim_params['subsample_dir'] = subsample_dir sim_params['sim_name'] = self.abacus_simname() sim_params['z_mock'] = self.redshift HOD_params = config['HOD_params'] self.ball = abacus_hod.AbacusHOD(sim_params, HOD_params) self.ball.params['Lbox'] = self.boxsize self.cosmo_fid = DESI() if self.cosmo_idx in [300, 301, 302, 303]: self.cosmo = AbacusSummit(0) else: self.cosmo = AbacusSummit(self.cosmo_idx) self.az = 1 / (1 + self.redshift) self.hubble = 100 * self.cosmo.efunc(self.redshift) self.q_par = 100 * self.cosmo_fid.efunc(self.redshift) / self.hubble self.q_perp = self.cosmo.angular_diameter_distance(self.redshift) / self.cosmo_fid.angular_diameter_distance(self.redshift) self.logger.info(f'Processing {self.abacus_simname()} at z = {self.redshift}')
[docs] def abacus_simdirs(self, DM_DICT: dict): """ Get the simulation and subsample directories from the dark matter dictionary. Parameters ---------- DM_DICT : dict Dictionary containing dark matter simulation directories. Returns ------- tuple Tuple containing the simulation and subsample directories. """ sim_dir = DM_DICT[self.sim_type]['sim_dir'] subsample_dir = DM_DICT[self.sim_type]['subsample_dir'] return sim_dir, subsample_dir
[docs] def abacus_simname(self): """ Get the simulation name. Returns ------- str Simulation name, following the Abacus format. """ if self.sim_type == 'png': return f'Abacus_{self.sim_type}base_c{self.cosmo_idx:03}_ph{self.phase_idx:03}' return f'AbacusSummit_{self.sim_type}_c{self.cosmo_idx:03}_ph{self.phase_idx:03}'
[docs] def check_params(self, params): """ Check if the parameters are valid, i.e. if they are in the list of valid parameters. Parameters ---------- params : list List of parameters to check. Raises ------ ValueError If the parameters are invalid. """ params = list(params) params = self.param_mapping(params) # re-map custom keys to Abacus keys for param in params: if param not in self.ball.tracers['LRG'].keys(): raise ValueError(f'Invalid parameter: {param}. Valid list ' f'of parameters include: {list(self.ball.tracers["LRG"].keys())}') self.logger.info(f'Varied parameters: {params}.') self.varied_params = params default = {key: value for key, value in self.ball.tracers['LRG'].items() if key not in params} self.logger.info(f'Default parameters: {default}.')
[docs] def run( self, hod_params: dict, nthreads: int = 1, tracer: str = 'LRG', tracer_density: list = None, process_underdense: bool = True, seed = None, save_fn: str|Path = None, add_ap: bool = False, )-> dict: """ Run the HOD model with the given parameters. Parameters ---------- hod_params : dict Dictionary of HOD parameters. nthreads : int, optional Number of threads to use. Default is 1. tracer : str, optional Tracer type. Default is 'LRG'. tracer_density : list, optional List containing (min_nbar, max_nbar) for downsampling catalogue to desired density (nbar > max_nbar) or cutting from sample (nbar < min_nbar). If only one value provided, this is taken as the maximum threshold (no minimum threshold applied). Default is None (no thresholds applied). process_underdense: bool, optional If set to False, does not process (and save) catalogs that are not in tracer_density limits (only used if tracer_density is provided). Defaults to True. seed : int, optional Random seed. Default is None. save_fn : str|Path, optional Filename to save the catalog. Creates parent tree if it does not exist. Default is None. add_ap: bool, optional Whether to take Alcock-Paczynski distortions into account when computing the number density. To use if you plan to apply AP distortions to the catalog later on. Default is False. Returns ------- dict Dictionary containing the HOD catalog. Galaxy positions ('X','Y','Z') and velocities ('VX','VY','VZ') are in real-space. To get positions with RSD and/or AP distortions, use the `get_positions` class method. Raises ------ ValueError If the tracer is not 'LRG'. ValueError If the HOD parameters do not match the varied parameters. """ self.add_ap = add_ap # flag to indicate if AP distortions were applied to number density if seed == 0: seed = None if tracer not in ['LRG']: raise ValueError('Only LRGs are currently supported.') hod_params = self.param_mapping(hod_params) if set(hod_params.keys()) != set(self.varied_params): raise ValueError('Invalid HOD parameters. Must match the varied parameters.') for key in hod_params.keys(): if key == 'sigma' and tracer == 'LRG': self.ball.tracers[tracer][key] = 10**hod_params[key] else: self.ball.tracers[tracer][key] = hod_params[key] self.ball.tracers[tracer]['ic'] = 1 self.in_density = True # Flag if mock is within density threshold hod_dict = self.ball.run_hod(self.ball.tracers, want_rsd=False, Nthread=nthreads, reseed=seed) # workaround for compute_ngal issue with high sigma values n_gal = len(hod_dict[tracer]['x']) subsample = None if tracer_density is not None: n_target = np.array(tracer_density) * self.boxsize ** 3 if self.add_ap: n_target /= self.q_par * self.q_perp**2 if (n_target.size > 1) & (n_target.min() / n_gal > 1): self.logger.info('Catalogue below minimum density threshold') self.in_density = False # Flag that mock is below density threshold if not process_underdense: return hod_dict # Unprocessed catalog, should not be used elif (n_target.max() / n_gal) < 1: self.logger.info('Downsampling mock') subsample = np.random.choice(range(n_gal), size=int(n_target.max()), replace=False) else: self.logger.info('Mock within density thresholds') # Catalogue positions not distorted by AP to allow freedom of applying to any axis at a later stage hod_dict = self.postprocess_catalog(hod_dict, tracer, subsample) if save_fn is not None: self.save_catalog(save_fn, hod_dict, tracer) return hod_dict
[docs] def postprocess_catalog( self, hod_dict: dict, tracer: str = 'LRG', subsample: list = None, ): """ Add distortion effects and format the HOD catalog. Parameters ---------- hod_dict : dict Dictionary containing the HOD catalog. tracer : str, optional Tracer type. Default is 'LRG'. subsample: list, optional List of indices used to subsample the catalogue. Returns ------- dict Dictionary containing the HOD catalog. """ Ncent = hod_dict[tracer]['Ncent'] hod_dict[tracer].pop('Ncent', None) is_central = np.zeros(len(hod_dict[tracer]['x'])) is_central[:Ncent] += 1 hod_dict[tracer]['is_cent'] = is_central # workaround for compute_ngal issue if subsample is None: hod_dict[tracer] = {k.upper():v for k, v in hod_dict[tracer].items()} else: hod_dict[tracer] = {k.upper():v[subsample] for k, v in hod_dict[tracer].items()} return hod_dict
[docs] def save_catalog( self, save_fn: str|Path, hod_dict: dict, tracer: str = 'LRG', ): """ Save the HOD catalog to a FITS file. Parameters ---------- save_fn : str|Path Filename to save the catalog. If parent tree directories do not exist, they will be created. hod_dict : dict Dictionary containing the HOD catalog. tracer : str, optional Tracer type. Default is 'LRG'. """ # Ensure parent directories exist save_fn = Path(save_fn) save_fn.parent.mkdir(parents=True, exist_ok=True) table = Table(hod_dict[tracer]) header = fits.Header({ 'gal_type': tracer, 'hubble': self.hubble, 'az': self.az, 'boxsize': self.boxsize, 'q_par': self.q_par, 'q_perp': self.q_perp, **self.ball.tracers[tracer], }) myfits = fits.BinTableHDU(data=table, header=header) myfits.writeto(save_fn, overwrite=True) self.logger.info(f'Saving {save_fn}.')
[docs] def param_mapping(self, hod_params: dict | list): """ Map custom HOD parameters to Abacus HOD parameters. Parameters ---------- hod_params : dict or list Dictionary or list of HOD parameters. Returns ------- dict or list Dictionary or list of AbacusHOD parameters. Raises ------ ValueError If the type of hod_params is not dict or list. """ # Add custom keys here if needed. # Be careful to the one-to-one position mapping in the list !! abacus_keys = ['logM1', 'Acent', 'Asat', 'Bcent', 'Bsat'] custom_keys = ['logM_1', 'A_cen', 'A_sat', 'B_cen', 'B_sat'] # Check if custom keys are used if any(key in hod_params for key in custom_keys): # Same syntax for dict and list :) for abacus_key, custom_key in zip(abacus_keys, custom_keys): if custom_key in hod_params: # Just in case not all custom keys are used # Replace custom keys with Abacus keys if type(hod_params) is dict: hod_params[abacus_key] = hod_params.pop(custom_key) elif type(hod_params) is list: hod_params[hod_params.index(custom_key)] = abacus_key else: raise ValueError('Invalid type for hod_params. Must be either dict or list.') return hod_params
[docs] @classmethod def get_boxsize(cls, boxsize: float|list, add_ap: bool = False, los: str = None, q_par: float = None, q_perp: float = None) -> float|list: """ Get the box size, taking into account Alcock-Paczynski distortions if specified. Parameters ---------- boxsize : float|list Original box size (as a float or a list of three floats for each axis). add_ap : bool, optional Whether to add Alcock-Paczynski distortions to the box size or not. Default is False. los : str, optional Line-of-sight for AP distortions. If None, no distortions are applied. q_par : float, optional Parallel AP distortion factor. Required if `los` is not None. q_perp : float, optional Perpendicular AP distortion factor. Required if `los` is not None. Returns ------- float or np.ndarray Box size after applying AP distortions, or original box size if no distortions are applied. """ if not add_ap: return boxsize elif any(v is None for v in [los, q_par, q_perp]): raise ValueError('los, q_par and q_perp must be provided when add_ap is True.') if isinstance(boxsize, (float, int)): boxsizes = [boxsize] * 3 else: if len(boxsize) != 3: # Sanity check raise ValueError('boxsize must be a float or a list of three floats.') boxsizes = boxsize for i, ax in enumerate(('X', 'Y', 'Z')): if ax == los.upper(): boxsizes[i] = boxsizes[i] / q_par else: boxsizes[i] = boxsizes[i] / q_perp return np.array(boxsizes)
[docs] @classmethod def get_positions( cls, hod_dict: dict, tracer: str = None, los: str = None, add_rsd: bool = False, hubble: float = None, az: float = None, boxsize: float = None, add_ap: bool = False, q_par: float = None, q_perp: float = None, ) -> np.ndarray: """ Get the galaxy positions from the HOD catalog. Parameters ---------- hod_dict : dict Dictionary containing the tracer positions. tracer : str, optional Tracer type to read from `hod_dict`. If None, uses the top-level keys of `hod_dict`. Default is None. los: str, optional Line-of-sight for RSD and AP distortions. If None, no distortions are applied. add_rsd : bool, optional Whether to add redshift-space distortions to the catalog or not. Default is False. hubble : float, optional Hubble parameter at the redshift of the catalog. Required if `add_rsd` is True. az : float, optional Scale factor at the redshift of the catalog. Required if `add_rsd` is True. boxsize : float, optional Box size of the simulation. Required if `add_rsd` is True. add_ap: bool, optional Whether to add Alcock-Paczynski distortions to the number density or not. Default is False. q_par : float, optional Parallel AP distortion factor. Required if `add_ap` is True. q_perp : float, optional Perpendicular AP distortion factor. Required if `add_ap` is True. Returns ------- np.ndarray Array of galaxy positions with shape (N_gal, 3). """ hod_dict = hod_dict.copy() # Avoid modifying the original dictionary tracer_dict = hod_dict[tracer] if tracer is not None else hod_dict # Apply RSD before AP distortions if add_rsd: if any(v is None for v in [hubble, az, boxsize, los]): # Check we have everything we need to add RSD raise ValueError('hubble, az, boxsize and los must be provided to add RSD distortions.') cls.logger.debug('Applying RSD distortions to positions.') tracer_dict = cls._add_rsd(tracer_dict, hubble=hubble, az=az, boxsize=boxsize, los=los) if add_ap: if any(v is None for v in [q_par, q_perp, los]): # Check we have everything we need to add AP raise ValueError('q_par, q_perp and los must be provided to add AP distortions.') cls.logger.debug('Applying AP distortions to positions.') tracer_dict = cls._add_ap(tracer_dict, q_par=q_par, q_perp=q_perp, los=los) positions = np.column_stack([tracer_dict[key] for key in ['X', 'Y', 'Z']]) cls.logger.debug(f'Obtained positions array of shape {positions.shape}.') return positions
@staticmethod def _add_rsd( tracer_dict: dict, hubble: float, az: float, boxsize: float, los: str, )-> dict: """ Add redshift-space distortions to the catalog. Parameters ---------- tracer_dict : dict Dictionary containing the tracer positions in `X`, `Y`, `Z`. hubble : float Hubble parameter at the redshift of the catalog. az : float Scale factor at the redshift of the catalog. boxsize : float Box size of the simulation. los: str Line-of-sight for RSD distortion. Returns ------- dict Dictionary containing the HOD catalog with redshift-space distortions to the specified axis. Will overwrite the input `tracer_dict` in place, use a copy if needed ! """ ax = los.upper() offset = boxsize / 2 pos = tracer_dict[ax] + offset vel = tracer_dict[f'V{ax}'] pos_rsd = (pos + vel / (hubble * az)) % boxsize tracer_dict[ax] = pos_rsd - offset # Overwrite real-space positions with RSD positions return tracer_dict @staticmethod def _add_ap( tracer_dict: dict, q_par: float, q_perp: float, los: str, )-> dict: """ Add Alcock-Paczynski distortions to the catalog. Parameters ---------- tracer_dict : dict Dictionary containing the tracer positions in `X`, `Y`, `Z`. q_par : float Parallel AP distortion factor. q_perp : float Perpendicular AP distortion factor. los: str Line-of-sight for AP distortion. Returns ------- dict Dictionary containing the HOD catalog with Alcock-Paczynski distortions applied to the specified axis. Will overwrite the input `tracer_dict` in place, use a copy if needed ! """ for ax in ('X', 'Y', 'Z'): pos = tracer_dict[ax] if ax == los.upper(): tracer_dict[ax] = pos / q_par else: tracer_dict[ax] = pos / q_perp return tracer_dict