Source code for ACID_code.utils

"""
All of the utility functions for the ACID package. Some functions may not be useful to the user.
"""
from __future__ import annotations
from beartype import beartype
from beartype.vale import IsAttr, IsEqual
import numpy as np
import glob, emcee, psutil, os
from emcee import EnsembleSampler
import emcee.backends.backend as emceebackend
import scipy.constants as const
from typing import TypeAlias, Annotated
c_kms = float(const.c/1e3)
FloatLike: TypeAlias = float | np.floating
IntLike: TypeAlias = int | np.integer
Scalar: TypeAlias = FloatLike | IntLike | Annotated[np.ndarray, IsAttr["size", IsEqual[1]]]
Array1D: TypeAlias = Annotated[np.ndarray, IsAttr["ndim", IsEqual[1]]] | list[Scalar]
Array2D: TypeAlias = Annotated[np.ndarray, IsAttr["ndim", IsEqual[2]]] | list[list[Scalar]] | list[Array1D]
Array3D: TypeAlias = Annotated[np.ndarray, IsAttr["ndim", IsEqual[3]]] | list[list[list[Scalar]]] | list[list[Array1D]] | list[Array2D]

[docs] def convert_moves_to_emcee(moves:list[tuple]): """Converts a list of move specifications to emcee moves. Parameters ---------- moves : list[tuple], optional A list of tuples specifying the moves for the MCMC sampler. The format tries to follow the emcee documentation as closely as possible. However, the config cannot store classes directly, so move names are used instead and converted when building the sampler. Each tuple should have the form:: (move_name: str, fraction: float, move_kwargs: dict | None) where: - "move_name" is the name of the emcee move. Supported variants currently include "RedBlueMove", "StretchMove", "WalkMove", "KDEMove", "DEMove", "DESnookerMove", "MHMove", and "GaussianMove". Refer to the emcee documentation for more details on each move type. Input move names are checked against the "emcee.moves" module, so other moves from that module may also work, although not all have been tested with ACID. - "fraction" is the fraction of walkers to which this move should be applied. - "move_kwargs" is an optional dictionary of keyword arguments passed to the move class initialisation. Returns ------- list A list of emcee move objects corresponding to the input specifications. """ emcee_moves = [] for move in moves: if len(move) == 2: move_name, fraction = move move_kwargs = {} elif len(move) == 3: move_name, fraction, move_kwargs = move if not isinstance(move_kwargs, dict): raise ValueError( "Move kwargs must be a dictionary of keyword arguments to pass to " \ "the move class initialisation (if passed)." ) else: raise ValueError( "Each move tuple must have length 2 or 3: " \ "(move_name, fraction) or (move_name, fraction, kwargs).") if not hasattr(emcee.moves, move_name): raise ValueError(f"Move '{move_name}' is not a valid emcee move.") move_class = getattr(emcee.moves, move_name) emcee_moves.append((move_class(**move_kwargs), fraction)) return emcee_moves
[docs] def mask_invalid(wavelengths, flux, errors=None, return_mask=False, verbose=2): """Masks any pixels where the wavelength, flux, or error is infinite or <= 0. Replaces bad pixels with NaN, which ACID can handle. Parameters ---------- wavelengths : array_like The wavelength values of the spectrum. flux : array_like The flux values of the spectrum. errors : array_like The error values associated with the spectrum. Returns ------- tuple A tuple containing the cleaned wavelength, flux, and error arrays. """ mask = ( np.isfinite(wavelengths) & np.isfinite(flux) & (np.isfinite(errors) if errors is not None else True) & (flux > 0) & (errors > 0 if errors is not None else True) ) fill_value = np.nan w = np.where(mask, wavelengths, fill_value) f = np.where(mask, flux, fill_value) e = np.where(mask, errors, fill_value) if errors is not None else None if verbose > 1: num_invalid = np.size(wavelengths) - np.count_nonzero(mask) perc_invalid = num_invalid / np.size(wavelengths) * 100 if perc_invalid > 10: print(f"Your spectrum includes {num_invalid} out of {np.size(wavelengths)} non-positive/non-finite/nan values ({perc_invalid:.2f}%), \n" f"which will be dropped when necessary, but it is still recommended to check your wavelength, \n" f"spectrum and error arrays for bad pixels and make sure this is intentional. \n" f"This warning is only printed if more than 10% of pixels are invalid.") output = (w, f, e) if errors is not None else (w, f) output = output + (mask,) if return_mask else output return output
[docs] def drop_invalid(wavelengths, flux, errors=None, return_mask=False, verbose=2): """Drops any pixels where the wavelength, flux, or error is infinite or <= 0. Parameters ---------- wavelengths : array_like The wavelength values of the spectrum. flux : array_like The flux values of the spectrum. errors : array_like The error values associated with the spectrum. Returns ------- tuple A tuple containing the cleaned wavelength, flux, and error arrays. """ mask = ( np.isfinite(wavelengths) & np.isfinite(flux) & (np.isfinite(errors) if errors is not None else True) & (flux > 0) & ((errors > 0) if errors is not None else True) ) w = wavelengths[mask] f = flux[mask] e = errors[mask] if errors is not None else None if verbose > 1: num_invalid = np.size(wavelengths) - np.count_nonzero(mask) perc_invalid = num_invalid / np.size(wavelengths) * 100 if perc_invalid > 10: print(f"Dropped {num_invalid} invalid pixels out of {np.size(wavelengths)} (non-finite or <= 0), which is {perc_invalid:.2f}% of the total.") output = (w, f, e) if errors is not None else (w, f) output = output + (mask,) if return_mask else output return output
[docs] def clip_wavelengths(wavelengths, wavelengths_linelist, depths_linelist, pad=5): """ Clips the linelist to only include lines within the wavelength range of the observed spectrum. Includes a pad either side of the wavelength range so that the wings of lines outside the range can also contribute to the fit. Parameters ---------- wavelengths : np.ndarray Wavelengths of the observed spectrum wavelengths_linelist : np.ndarray Wavelengths from the linelist depths_linelist : np.ndarray Depths from the linelist pad : float, optional Number of angstroms to pad on either side of the wavelength range. By default, 5. Returns ------- wavelengths_linelist : np.ndarray Clipped wavelengths from the linelist depths_linelist : np.ndarray Clipped depths from the linelist """ lower, upper = np.nanmin(wavelengths)-pad, np.nanmax(wavelengths)+pad idx = (wavelengths_linelist >= lower) & (wavelengths_linelist <= upper) return wavelengths_linelist[idx], depths_linelist[idx]
[docs] def drop_edges(array, n_pix=2): """ Drops the edges of an array by a specified number of pixels. Parameters ---------- array : np.ndarray The input array. n_pix : int, optional Number of pixels to drop from each edge. Default is 2. Returns ------- np.ndarray The array with edges dropped. """ return array[n_pix:-n_pix]
[docs] @beartype def calc_deltav(wavelengths:Array1D)->Scalar: """Calculates velocity pixel size Calculates the velocity pixel size for the LSD velocity grid based off the spectral wavelengths. Parameters ---------- wavelengths : :py:type:`Array1D` Wavelengths for Acid input spectrum (in Angstroms), must be a 1D array of positive values. Returns ------- :py:type:`float` Velocity pixel size in km/s """ if wavelengths.ndim != 1: raise ValueError("Input wavelengths must be a 1D array.") if np.any(wavelengths <= 0): raise ValueError("Wavelengths must be positive.") wavelengths = np.sort(wavelengths) return c_kms * np.nanmean(np.diff(np.log(wavelengths)))
[docs] @beartype def guess_SNR( frame_wavelengths : Array1D | Array2D, frame_flux : Array1D | Array2D, frame_errors : Array1D | Array2D ) -> Array1D | Scalar: """Estimates S/N for each frame. Takes the median S/N in the central two-thirds of the wavelength range. Fully vectorized so that all the frames can be passed at once. Parameters ---------- frame_wavelengths : :py:type:`Array1D | Array2D` Array/list of wavelengths for each frame. frame_flux : :py:type:`Array1D | Array2D` Array/list of flux values for each frame. frame_errors : :py:type:`Array1D | Array2D` Array/list of error values for each frame. Returns ------- :py:type:`Array1D | Scalar` Array of estimated signal-to-noise ratios for each frame. """ if np.any(frame_flux) <= 0 or np.any(frame_errors) <= 0 or np.any(frame_wavelengths <= 0): raise ValueError("Flux, errors, and wavelengths must all be positive non-zero to estimate S/N.") frame_wavelengths = np.atleast_2d(frame_wavelengths) frame_flux = np.atleast_2d(frame_flux) frame_errors = np.atleast_2d(frame_errors) lo, hi = np.nanpercentile(frame_wavelengths, [100/6, 500/6], axis=-1) mask = (frame_wavelengths > lo[:, None]) & (frame_wavelengths < hi[:, None]) frame_flux = np.where(mask, frame_flux, np.nan) frame_errors = np.where(mask, frame_errors, np.nan) sn_per_pixel = frame_flux / frame_errors return np.nanpercentile(sn_per_pixel, 99, axis=-1).squeeze()
[docs] @beartype def guess_errors( frame_wavelengths : Array1D | Array2D, frame_flux : Array1D | Array2D, frame_sn : Scalar | Array1D ) -> Array1D | Array2D: """ Estimates errors for each frame based on the flux and S/N. Fully vectorized so that all the frames can be passed at once. Parameters ---------- frame_wavelengths : :py:type:`Array1D | Array2D` Array/list of wavelengths for each frame. frame_flux : :py:type:`Array1D | Array2D` Array/list of flux values for each frame. frame_sn : :py:type:`Scalar | Array1D` Estimated signal-to-noise ratio for each frame. Can be a single value or an array of values for each frame. Returns ------- :py:type:`Array1D | Array2D` Array of estimated error values for each frame. The dimensions will match those of frame_flux, with the same number of frames and pixels. """ if np.any(frame_flux) <= 0 or np.any(frame_wavelengths <= 0) or np.any(frame_sn <= 0): raise ValueError("Flux, wavelengths and sn must all be positive non-zero to estimate errors.") frame_wavelengths = np.atleast_2d(frame_wavelengths) frame_flux = np.atleast_2d(frame_flux) frame_sn = np.atleast_1d(frame_sn) if frame_sn.ndim > 1 or (frame_sn.ndim == 1 and frame_sn.size != frame_flux.shape[0]): raise ValueError("S/N must be a single value or an array of values with the same length as the number of frames in frame_flux.") errors = frame_flux / frame_sn[:, None] return errors.squeeze()
[docs] @beartype def collapse_SNR(sn:Array1D | Array2D, wavelengths:Array1D | Array2D) -> Array1D|Scalar: """ Collapses the SN of a 1D or 2D wavelength and sn array to the median of the SNs on the central 2/3 wavelengths. Parameters ---------- sn : :py:type:`Array1D | Array2D` Signal-to-noise ratio array. wavelengths : :py:type:`Array1D | Array2D` Wavelength array. Returns ------- :py:type:`Array1D | Scalar` Collapsed signal-to-noise ratio. """ sn = np.atleast_2d(sn) wavelengths = np.atleast_2d(wavelengths) lo, hi = np.nanpercentile(wavelengths, [100/6, 500/6], axis=-1) mask = (wavelengths > lo[:, None]) & (wavelengths < hi[:, None]) return np.nanmedian(np.where(mask, sn, np.nan), axis=-1).squeeze()
[docs] @beartype def get_normalisation_coeffs(wl:Array1D)->tuple[Scalar, Scalar]: """Calculates normalization coefficients for wavelength array Parameters ---------- wl : array_like Wavelength array for which normalization coefficients are calculated. Returns ------- tuple A tuple containing the normalization coefficients (a, b). """ a = 2 / (np.nanmax(wl)-np.nanmin(wl)) b = 1 - a * np.nanmax(wl) return a, b
[docs] def get_available_memory(): """ Returns the available memory in bytes. Checks if in a SLURM environment and uses its memory allocation if available. Returns ------- int Available memory in bytes. """ if "SLURM_JOB_ID" in os.environ: available_memory = int(os.environ.get('SLURM_MEM_PER_NODE')) # in MB available_memory *= 1024**2 # Convert to bytes as in the else statement below else: available_memory = psutil.virtual_memory().available return available_memory
[docs] def save_backend_to_hdf5(backend, filename): nwalkers, ndim = backend.shape niter = backend.iteration hdf = emcee.backends.HDFBackend( filename, dtype=getattr(backend, "dtype", None), ) # Overwrite existing file and reset if already exists hdf.reset(nwalkers, ndim) blobs = backend.get_blobs() # Allocate space in the HDF5 datasets. if niter > 0: hdf.grow(niter, None if blobs is None else blobs[0]) with hdf.open("a") as f: g = f["mcmc"] # mcmc is always the emcee default if niter > 0: g["chain"][:] = backend.get_chain() g["log_prob"][:] = backend.get_log_prob() g["accepted"][:] = backend.accepted if blobs is not None: g["blobs"][:] = blobs g.attrs["has_blobs"] = True # Required so HDFBackend.iteration works correctly. g.attrs["iteration"] = niter # Reload the random state of the walkers if they exist random_state = getattr(backend, "random_state", None) if random_state is not None: for i, v in enumerate(random_state): g.attrs[f"random_state_{i}"] = v return hdf
[docs] def backend_to_sampler(backend, log_prob_fn): nwalkers, ndim = backend.shape sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob_fn) sampler.backend = backend return sampler
[docs] def set_dict_defaults(input_dict: dict | None, default_dict: dict) -> dict: """Sets default values in a dictionary if they are not already present. Parameters ---------- input_dict : dict | None The dictionary to set defaults in (or none if not provided). default_dict : dict The dictionary containing default key-value pairs. Returns ------- dict The updated dictionary with defaults set. """ input_dict = dict(input_dict or {}) for key, value in default_dict.items(): input_dict.setdefault(key, value) return input_dict
[docs] def findfiles(directory:str, file_type:str) -> list[str]: """Finds files in a directory matching a specific file type, excluding corrected spectra. Parameters ---------- directory : str The directory to search for files. file_type : str The file type to search for. Returns ------- list[str] A list of files matching the specified file type, excluding corrected spectra. """ filelist_corrected = glob.glob('%s/*/*%s**A_corrected*.fits'%(directory, file_type)) #finding corrected spectra filelist = glob.glob('%s/*/*%s**A*.fits'%(directory, file_type)) #finding all A band spectra filelist_final = [] filelist_corrected_set = set(filelist_corrected) for file in filelist: #filtering out corrected spectra if file not in filelist_corrected_set: filelist_final.append(file) return filelist_final
[docs] def robust_mean(data:np.ndarray, nsig:int|float=3, axis:int=0) -> np.ndarray|float: """Calculates the robust mean of the input data by excluding outliers beyond a specified number of standard deviations from the median. Parameters ---------- data : np.ndarray Input data array. nsig : int | float, optional Number of standard deviations to use for outlier rejection. Defaults to 3. axis : int, optional Axis along which to compute the robust mean. Defaults to 0. Returns ------- float Robust mean of the input data. """ median = np.median(data, axis=axis) mad = np.median(np.abs(data - median), axis=axis, keepdims=True) sigma_nmad = 1.4826 * mad mask = np.abs(data - median) < nsig * sigma_nmad robust_data = np.where(mask, data, np.nan) return np.nanmean(robust_data, axis=axis)
[docs] @beartype def combine_profiles( spectra : Array2D, errors : Array2D | None = None, covariances : Array3D | None = None, ) -> tuple: """ Combine multiple profiles into one. Parameters ---------- spectra : :py:class:`Array2D` Input profiles, should have dimensions (n_profiles, n_pix). errors : :py:class:`Array2D`, optional 1-sigma errors for each profile. Used only if covariances is None. Should have dimensions (n_profiles, n_pix). covariances : :py:class:`Array3D` or list of :py:class:`Array2D`, optional Full covariance matrix for each profile. Should have dimensions (n_profiles, n_pix, n_pix). If provided, this is used instead of errors for the combination, and the combined covariance matrix is also returned. Returns ------- combined_profile : :py:class:`Array1D` combined_errors : :py:class:`Array1D` sqrt(diag(combined_covariance)) combined_covariance : :py:class:`Array2D` Only returned if covariances is provided. """ from scipy.linalg import cho_factor, cho_solve spectra = np.asarray(spectra) if covariances is not None: errors = np.asarray(errors) covariances = np.asarray(covariances) # We are solving the equations: # z_combined ​= (∑​C_i^{−1}*​z_i​) / (∑C_i^−1) # And the denominator is the inverse of the combined covariance matrix, so we can get that as well. n_profiles, n_pix = spectra.shape cho_sum = np.zeros((n_pix, n_pix), dtype=float) rhs = np.zeros(n_pix, dtype=float) I = np.eye(n_pix) for z, C in zip(spectra, covariances): cf = cho_factor(C, check_finite=False) cho_sum += cho_solve(cf, I, check_finite=False) rhs += cho_solve(cf, z, check_finite=False) cf_comb = cho_factor(cho_sum, check_finite=False) combined_profile = cho_solve(cf_comb, rhs, check_finite=False) combined_covariance = cho_solve(cf_comb, I, check_finite=False) combined_errors = np.sqrt(np.diag(combined_covariance)) return combined_profile, combined_errors, combined_covariance elif errors is not None: errors = np.asarray(errors) # Otherwise we do a simple weighted average using the errors, which is equivalent # to the above if the covariance matrices are diagonal. weights = 1.0 / errors**2 combined_profile = np.sum(weights * spectra, axis=0) / np.sum(weights, axis=0) combined_errors = np.sqrt(1.0 / np.sum(weights, axis=0)) return combined_profile, combined_errors else: combined_profile = np.nanmean(spectra, axis=0) return combined_profile
[docs] @beartype def flux_to_od( flux : np.ndarray = None, errors : np.ndarray = None, linelist : np.ndarray = None, cov_matrix : np.ndarray = None ) -> tuple | np.ndarray: """Converts flux, errors, linelist, and covariance matrix to optical depth. The formula used for the conversion is: - Optical depth (od) is calculated as -log(flux). - Errors in optical depth are calculated as errors / flux. - Linelist in optical depth is calculated as -log(1 - linelist). - Covariance matrix in optical depth is calculated as cov_matrix / (flux[:, np.newaxis] * flux[np.newaxis, :]). Parameters ---------- flux : np.ndarray Input flux array. errors : np.ndarray, optional Input errors array. Defaults to None. linelist : np.ndarray, optional Input linelist array. Defaults to None. cov_matrix : np.ndarray, optional Input covariance matrix, corresponding to errors. Defaults to None. Returns ------- tuple A tuple containing any of the following, depending on which inputs were provided: - the flux in optical depth - errors in optical depth - linelist in optical depth - covariance matrix in optical depth The function appends the above inputs in the respective order to the output tuple if they were input. """ out = [] if flux is not None: flux_od = -np.log(flux) out.append(flux_od) else: flux_od = None if errors is not None: if flux_od is None: raise ValueError("'flux' must be provided if 'errors' is provided.") out.append(errors / flux) if linelist is not None: out.append(-np.log(1 - linelist)) if cov_matrix is not None: if flux_od is None: raise ValueError("'flux' must be provided if 'cov_matrix' is provided.") out.append(cov_matrix / (flux[:, np.newaxis] * flux[np.newaxis, :])) return tuple(out) if len(out) > 1 else out[0]
[docs] @beartype def od_to_flux( od : np.ndarray = None, errors : np.ndarray = None, linelist : np.ndarray = None, cov_matrix : np.ndarray = None ) -> tuple | np.ndarray: """ Converts optical depth to flux, errors, linelist, and covariance matrix. The formula used for the conversion is: - Flux is calculated as exp(-od). - Errors in flux are calculated as errors * flux. - Linelist in flux is calculated as 1 - exp(-linelist). - Covariance matrix in flux is calculated as cov_matrix * (flux[:, np.newaxis] * flux[np.newaxis, :]). Parameters ---------- od : np.ndarray Input optical depth array. errors : np.ndarray, optional Input errors array. Defaults to None. linelist : np.ndarray, optional Input linelist array. Defaults to None. cov_matrix : np.ndarray, optional Input covariance matrix. Defaults to None. Returns ------- tuple A tuple containing any of the following, depending on which inputs were provided: - the flux in flux units - errors in flux units - linelist in flux units - covariance matrix in flux units The function appends the above inputs in the respective order to the output tuple if they were input. """ out = [] if od is not None: flux = np.exp(-od) out.append(flux) else: flux = None if errors is not None: if flux is None: raise ValueError("'od' must be provided if 'errors' is provided.") out.append(errors * flux) if linelist is not None: out.append(1-np.exp(-linelist)) if cov_matrix is not None: if flux is None: raise ValueError("'od' must be provided if 'cov_matrix' is provided.") out.append(cov_matrix * (flux[:, np.newaxis] * flux[np.newaxis, :])) return tuple(out) if len(out) > 1 else out[0]
[docs] def configure_mp_environ(os): """ Configures the multiprocessing environment variables to check for SLURM and set appropriate thread limits. See :ref:`multiprocessing` section of the documentation for more details on why this is done. Parameters ---------- os : module The os module to use for setting environment variables. """ slurm = "SLURM_JOB_ID" in os.environ if slurm: if os.getenv("OMP_NUM_THREADS") != "1" or os.getenv("MKL_NUM_THREADS") != "1": raise ValueError(f"In a SLURM environment, OMP_NUM_THREADS and MKL_NUM_THREADS must be set to 1 before any imports for parallel MCMC. \n" \ "Please set this in your SLURM job script or at the top of your python script before any other imports.\n" \ "See https://acid-code.readthedocs.io/en/latest/using_ACID.html#multiprocessing for more information.") else: # Emcee recommendation, after testing this is absolutely a requirement os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1"
[docs] def next_pow_2(n): """ Calculates the next power of 2 greater than or equal to n. Parameters ---------- n : int Input number. Must be real and non-negative. Returns ------- int The next power of 2 greater than or equal to n. """ n = int(n) if n < 0: raise ValueError("Input must be a non-negative integer.") return 1 if n == 0 else 2**(n - 1).bit_length()
[docs] def auto_window(taus: np.ndarray, c: float = 5.0): """ Automated windowing procedure following Sokal (1989) in emcee documentation. Returns the window index to use in taus[window]. """ m = np.arange(len(taus)) < c * taus if np.any(m): return np.argmin(m) return len(taus) - 1
[docs] def autocorr_func_1d(x, norm=True): """ Autocorrelation estimate using FFT from the emcee tutorial. """ x = np.atleast_1d(x) if len(x.shape) != 1: raise ValueError("invalid dimensions for 1D autocorrelation function") n = next_pow_2(len(x)) # Compute the FFT and then (from that) the auto-correlation function f = np.fft.fft(x - np.mean(x), n=2 * n) acf = np.fft.ifft(f * np.conjugate(f))[: len(x)].real acf /= 4 * n # Optionally normalize if norm: acf /= acf[0] return acf
[docs] def autocorr_gw2010(y, c=5.0): """Goodman & Weare (2010) autocorrelation estimate from emcee documentation""" f = autocorr_func_1d(np.mean(y, axis=0)) taus = 2.0 * np.cumsum(f) - 1.0 window = auto_window(taus, c) return taus[window]
[docs] def autocorr_new(y, c=5.0): """New integrated autocorrelation time estimate from emcee documentation""" # Average ACF across walkers assert y.ndim == 2, "Expects y with shape (nwalkers, nsteps)" nwalkers, nsteps = y.shape f = np.zeros(nsteps) for walker in range(nwalkers): f += autocorr_func_1d(y[walker], norm=True) f /= nwalkers # Apply sokal windowing on cumulative sum taus = 2.0 * np.cumsum(f) - 1.0 window = auto_window(taus, c) return float(taus[window])
[docs] def sampler_nbytes(sampler) -> IntLike: if hasattr(sampler, "nbytes"): return sampler.nbytes elif hasattr(sampler.backend, "nbytes"): return sampler.backend.nbytes else: nbytes = 0 for name in ("chain", "log_prob", "accepted", "blobs"): arr = getattr(sampler.backend, name, None) if arr is not None and hasattr(arr, "nbytes"): nbytes += arr.nbytes return nbytes