"""
All of the utility functions for the ACID package. Some functions may not be useful to the user.
"""
from __future__ import annotations
from beartype import beartype
from beartype.vale import IsAttr, IsEqual
import numpy as np
import glob, emcee, psutil, os
from emcee import EnsembleSampler
import emcee.backends.backend as emceebackend
import scipy.constants as const
from typing import TypeAlias, Annotated
c_kms = float(const.c/1e3)
FloatLike: TypeAlias = float | np.floating
IntLike: TypeAlias = int | np.integer
Scalar: TypeAlias = FloatLike | IntLike | Annotated[np.ndarray, IsAttr["size", IsEqual[1]]]
Array1D: TypeAlias = Annotated[np.ndarray, IsAttr["ndim", IsEqual[1]]] | list[Scalar]
Array2D: TypeAlias = Annotated[np.ndarray, IsAttr["ndim", IsEqual[2]]] | list[list[Scalar]] | list[Array1D]
Array3D: TypeAlias = Annotated[np.ndarray, IsAttr["ndim", IsEqual[3]]] | list[list[list[Scalar]]] | list[list[Array1D]] | list[Array2D]
[docs]
def convert_moves_to_emcee(moves:list[tuple]):
"""Converts a list of move specifications to emcee moves.
Parameters
----------
moves : list[tuple], optional
A list of tuples specifying the moves for the MCMC sampler. The format
tries to follow the emcee documentation as closely as possible.
However, the config cannot store classes directly, so move names are
used instead and converted when building the sampler.
Each tuple should have the form::
(move_name: str, fraction: float, move_kwargs: dict | None)
where:
- "move_name" is the name of the emcee move. Supported variants currently
include "RedBlueMove", "StretchMove", "WalkMove",
"KDEMove", "DEMove", "DESnookerMove", "MHMove",
and "GaussianMove". Refer to the emcee documentation for more
details on each move type. Input move names are checked against the
"emcee.moves" module, so other moves from that module may also work,
although not all have been tested with ACID.
- "fraction" is the fraction of walkers to which this move should be applied.
- "move_kwargs" is an optional dictionary of keyword arguments passed to
the move class initialisation.
Returns
-------
list
A list of emcee move objects corresponding to the input specifications.
"""
emcee_moves = []
for move in moves:
if len(move) == 2:
move_name, fraction = move
move_kwargs = {}
elif len(move) == 3:
move_name, fraction, move_kwargs = move
if not isinstance(move_kwargs, dict):
raise ValueError(
"Move kwargs must be a dictionary of keyword arguments to pass to " \
"the move class initialisation (if passed)."
)
else:
raise ValueError(
"Each move tuple must have length 2 or 3: " \
"(move_name, fraction) or (move_name, fraction, kwargs).")
if not hasattr(emcee.moves, move_name):
raise ValueError(f"Move '{move_name}' is not a valid emcee move.")
move_class = getattr(emcee.moves, move_name)
emcee_moves.append((move_class(**move_kwargs), fraction))
return emcee_moves
[docs]
def mask_invalid(wavelengths, flux, errors=None, return_mask=False, verbose=2):
"""Masks any pixels where the wavelength, flux, or error is infinite or <= 0.
Replaces bad pixels with NaN, which ACID can handle.
Parameters
----------
wavelengths : array_like
The wavelength values of the spectrum.
flux : array_like
The flux values of the spectrum.
errors : array_like
The error values associated with the spectrum.
Returns
-------
tuple
A tuple containing the cleaned wavelength, flux, and error arrays.
"""
mask = (
np.isfinite(wavelengths) &
np.isfinite(flux) &
(np.isfinite(errors) if errors is not None else True) &
(flux > 0) &
(errors > 0 if errors is not None else True)
)
fill_value = np.nan
w = np.where(mask, wavelengths, fill_value)
f = np.where(mask, flux, fill_value)
e = np.where(mask, errors, fill_value) if errors is not None else None
if verbose > 1:
num_invalid = np.size(wavelengths) - np.count_nonzero(mask)
perc_invalid = num_invalid / np.size(wavelengths) * 100
if perc_invalid > 10:
print(f"Your spectrum includes {num_invalid} out of {np.size(wavelengths)} non-positive/non-finite/nan values ({perc_invalid:.2f}%), \n"
f"which will be dropped when necessary, but it is still recommended to check your wavelength, \n"
f"spectrum and error arrays for bad pixels and make sure this is intentional. \n"
f"This warning is only printed if more than 10% of pixels are invalid.")
output = (w, f, e) if errors is not None else (w, f)
output = output + (mask,) if return_mask else output
return output
[docs]
def drop_invalid(wavelengths, flux, errors=None, return_mask=False, verbose=2):
"""Drops any pixels where the wavelength, flux, or error is infinite or <= 0.
Parameters
----------
wavelengths : array_like
The wavelength values of the spectrum.
flux : array_like
The flux values of the spectrum.
errors : array_like
The error values associated with the spectrum.
Returns
-------
tuple
A tuple containing the cleaned wavelength, flux, and error arrays.
"""
mask = (
np.isfinite(wavelengths) &
np.isfinite(flux) &
(np.isfinite(errors) if errors is not None else True) &
(flux > 0) &
((errors > 0) if errors is not None else True)
)
w = wavelengths[mask]
f = flux[mask]
e = errors[mask] if errors is not None else None
if verbose > 1:
num_invalid = np.size(wavelengths) - np.count_nonzero(mask)
perc_invalid = num_invalid / np.size(wavelengths) * 100
if perc_invalid > 10:
print(f"Dropped {num_invalid} invalid pixels out of {np.size(wavelengths)} (non-finite or <= 0), which is {perc_invalid:.2f}% of the total.")
output = (w, f, e) if errors is not None else (w, f)
output = output + (mask,) if return_mask else output
return output
[docs]
def clip_wavelengths(wavelengths, wavelengths_linelist, depths_linelist, pad=5):
"""
Clips the linelist to only include lines within the wavelength range of the observed spectrum.
Includes a pad either side of the wavelength range so that the wings of lines outside
the range can also contribute to the fit.
Parameters
----------
wavelengths : np.ndarray
Wavelengths of the observed spectrum
wavelengths_linelist : np.ndarray
Wavelengths from the linelist
depths_linelist : np.ndarray
Depths from the linelist
pad : float, optional
Number of angstroms to pad on either side of the wavelength range. By default, 5.
Returns
-------
wavelengths_linelist : np.ndarray
Clipped wavelengths from the linelist
depths_linelist : np.ndarray
Clipped depths from the linelist
"""
lower, upper = np.nanmin(wavelengths)-pad, np.nanmax(wavelengths)+pad
idx = (wavelengths_linelist >= lower) & (wavelengths_linelist <= upper)
return wavelengths_linelist[idx], depths_linelist[idx]
[docs]
def drop_edges(array, n_pix=2):
"""
Drops the edges of an array by a specified number of pixels.
Parameters
----------
array : np.ndarray
The input array.
n_pix : int, optional
Number of pixels to drop from each edge. Default is 2.
Returns
-------
np.ndarray
The array with edges dropped.
"""
return array[n_pix:-n_pix]
[docs]
@beartype
def calc_deltav(wavelengths:Array1D)->Scalar:
"""Calculates velocity pixel size
Calculates the velocity pixel size for the LSD velocity grid based off the spectral wavelengths.
Parameters
----------
wavelengths : :py:type:`Array1D`
Wavelengths for Acid input spectrum (in Angstroms), must be a 1D array of positive values.
Returns
-------
:py:type:`float`
Velocity pixel size in km/s
"""
if wavelengths.ndim != 1:
raise ValueError("Input wavelengths must be a 1D array.")
if np.any(wavelengths <= 0):
raise ValueError("Wavelengths must be positive.")
wavelengths = np.sort(wavelengths)
return c_kms * np.nanmean(np.diff(np.log(wavelengths)))
[docs]
@beartype
def guess_SNR(
frame_wavelengths : Array1D | Array2D,
frame_flux : Array1D | Array2D,
frame_errors : Array1D | Array2D
) -> Array1D | Scalar:
"""Estimates S/N for each frame. Takes the median S/N in the central two-thirds of the
wavelength range. Fully vectorized so that all the frames can be passed at once.
Parameters
----------
frame_wavelengths : :py:type:`Array1D | Array2D`
Array/list of wavelengths for each frame.
frame_flux : :py:type:`Array1D | Array2D`
Array/list of flux values for each frame.
frame_errors : :py:type:`Array1D | Array2D`
Array/list of error values for each frame.
Returns
-------
:py:type:`Array1D | Scalar`
Array of estimated signal-to-noise ratios for each frame.
"""
if np.any(frame_flux) <= 0 or np.any(frame_errors) <= 0 or np.any(frame_wavelengths <= 0):
raise ValueError("Flux, errors, and wavelengths must all be positive non-zero to estimate S/N.")
frame_wavelengths = np.atleast_2d(frame_wavelengths)
frame_flux = np.atleast_2d(frame_flux)
frame_errors = np.atleast_2d(frame_errors)
lo, hi = np.nanpercentile(frame_wavelengths, [100/6, 500/6], axis=-1)
mask = (frame_wavelengths > lo[:, None]) & (frame_wavelengths < hi[:, None])
frame_flux = np.where(mask, frame_flux, np.nan)
frame_errors = np.where(mask, frame_errors, np.nan)
sn_per_pixel = frame_flux / frame_errors
return np.nanpercentile(sn_per_pixel, 99, axis=-1).squeeze()
[docs]
@beartype
def guess_errors(
frame_wavelengths : Array1D | Array2D,
frame_flux : Array1D | Array2D,
frame_sn : Scalar | Array1D
) -> Array1D | Array2D:
"""
Estimates errors for each frame based on the flux and S/N.
Fully vectorized so that all the frames can be passed at once.
Parameters
----------
frame_wavelengths : :py:type:`Array1D | Array2D`
Array/list of wavelengths for each frame.
frame_flux : :py:type:`Array1D | Array2D`
Array/list of flux values for each frame.
frame_sn : :py:type:`Scalar | Array1D`
Estimated signal-to-noise ratio for each frame. Can be a single value or an array of values for each frame.
Returns
-------
:py:type:`Array1D | Array2D`
Array of estimated error values for each frame. The dimensions will match those of frame_flux, with the same number of frames and pixels.
"""
if np.any(frame_flux) <= 0 or np.any(frame_wavelengths <= 0) or np.any(frame_sn <= 0):
raise ValueError("Flux, wavelengths and sn must all be positive non-zero to estimate errors.")
frame_wavelengths = np.atleast_2d(frame_wavelengths)
frame_flux = np.atleast_2d(frame_flux)
frame_sn = np.atleast_1d(frame_sn)
if frame_sn.ndim > 1 or (frame_sn.ndim == 1 and frame_sn.size != frame_flux.shape[0]):
raise ValueError("S/N must be a single value or an array of values with the same length as the number of frames in frame_flux.")
errors = frame_flux / frame_sn[:, None]
return errors.squeeze()
[docs]
@beartype
def collapse_SNR(sn:Array1D | Array2D, wavelengths:Array1D | Array2D) -> Array1D|Scalar:
"""
Collapses the SN of a 1D or 2D wavelength and sn array to the median of the SNs
on the central 2/3 wavelengths.
Parameters
----------
sn : :py:type:`Array1D | Array2D`
Signal-to-noise ratio array.
wavelengths : :py:type:`Array1D | Array2D`
Wavelength array.
Returns
-------
:py:type:`Array1D | Scalar`
Collapsed signal-to-noise ratio.
"""
sn = np.atleast_2d(sn)
wavelengths = np.atleast_2d(wavelengths)
lo, hi = np.nanpercentile(wavelengths, [100/6, 500/6], axis=-1)
mask = (wavelengths > lo[:, None]) & (wavelengths < hi[:, None])
return np.nanmedian(np.where(mask, sn, np.nan), axis=-1).squeeze()
[docs]
@beartype
def get_normalisation_coeffs(wl:Array1D)->tuple[Scalar, Scalar]:
"""Calculates normalization coefficients for wavelength array
Parameters
----------
wl : array_like
Wavelength array for which normalization coefficients are calculated.
Returns
-------
tuple
A tuple containing the normalization coefficients (a, b).
"""
a = 2 / (np.nanmax(wl)-np.nanmin(wl))
b = 1 - a * np.nanmax(wl)
return a, b
[docs]
def get_available_memory():
"""
Returns the available memory in bytes.
Checks if in a SLURM environment and uses its memory allocation if available.
Returns
-------
int
Available memory in bytes.
"""
if "SLURM_JOB_ID" in os.environ:
available_memory = int(os.environ.get('SLURM_MEM_PER_NODE')) # in MB
available_memory *= 1024**2 # Convert to bytes as in the else statement below
else:
available_memory = psutil.virtual_memory().available
return available_memory
[docs]
def save_backend_to_hdf5(backend, filename):
nwalkers, ndim = backend.shape
niter = backend.iteration
hdf = emcee.backends.HDFBackend(
filename,
dtype=getattr(backend, "dtype", None),
)
# Overwrite existing file and reset if already exists
hdf.reset(nwalkers, ndim)
blobs = backend.get_blobs()
# Allocate space in the HDF5 datasets.
if niter > 0:
hdf.grow(niter, None if blobs is None else blobs[0])
with hdf.open("a") as f:
g = f["mcmc"] # mcmc is always the emcee default
if niter > 0:
g["chain"][:] = backend.get_chain()
g["log_prob"][:] = backend.get_log_prob()
g["accepted"][:] = backend.accepted
if blobs is not None:
g["blobs"][:] = blobs
g.attrs["has_blobs"] = True
# Required so HDFBackend.iteration works correctly.
g.attrs["iteration"] = niter
# Reload the random state of the walkers if they exist
random_state = getattr(backend, "random_state", None)
if random_state is not None:
for i, v in enumerate(random_state):
g.attrs[f"random_state_{i}"] = v
return hdf
[docs]
def backend_to_sampler(backend, log_prob_fn):
nwalkers, ndim = backend.shape
sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob_fn)
sampler.backend = backend
return sampler
[docs]
def set_dict_defaults(input_dict: dict | None, default_dict: dict) -> dict:
"""Sets default values in a dictionary if they are not already present.
Parameters
----------
input_dict : dict | None
The dictionary to set defaults in (or none if not provided).
default_dict : dict
The dictionary containing default key-value pairs.
Returns
-------
dict
The updated dictionary with defaults set.
"""
input_dict = dict(input_dict or {})
for key, value in default_dict.items():
input_dict.setdefault(key, value)
return input_dict
[docs]
def findfiles(directory:str, file_type:str) -> list[str]:
"""Finds files in a directory matching a specific file type, excluding corrected spectra.
Parameters
----------
directory : str
The directory to search for files.
file_type : str
The file type to search for.
Returns
-------
list[str]
A list of files matching the specified file type, excluding corrected spectra.
"""
filelist_corrected = glob.glob('%s/*/*%s**A_corrected*.fits'%(directory, file_type)) #finding corrected spectra
filelist = glob.glob('%s/*/*%s**A*.fits'%(directory, file_type)) #finding all A band spectra
filelist_final = []
filelist_corrected_set = set(filelist_corrected)
for file in filelist: #filtering out corrected spectra
if file not in filelist_corrected_set:
filelist_final.append(file)
return filelist_final
[docs]
def robust_mean(data:np.ndarray, nsig:int|float=3, axis:int=0) -> np.ndarray|float:
"""Calculates the robust mean of the input data by excluding outliers beyond a
specified number of standard deviations from the median.
Parameters
----------
data : np.ndarray
Input data array.
nsig : int | float, optional
Number of standard deviations to use for outlier rejection. Defaults to 3.
axis : int, optional
Axis along which to compute the robust mean. Defaults to 0.
Returns
-------
float
Robust mean of the input data.
"""
median = np.median(data, axis=axis)
mad = np.median(np.abs(data - median), axis=axis, keepdims=True)
sigma_nmad = 1.4826 * mad
mask = np.abs(data - median) < nsig * sigma_nmad
robust_data = np.where(mask, data, np.nan)
return np.nanmean(robust_data, axis=axis)
[docs]
@beartype
def combine_profiles(
spectra : Array2D,
errors : Array2D | None = None,
covariances : Array3D | None = None,
) -> tuple:
"""
Combine multiple profiles into one.
Parameters
----------
spectra : :py:class:`Array2D`
Input profiles, should have dimensions (n_profiles, n_pix).
errors : :py:class:`Array2D`, optional
1-sigma errors for each profile. Used only if covariances is None. Should have dimensions (n_profiles, n_pix).
covariances : :py:class:`Array3D` or list of :py:class:`Array2D`, optional
Full covariance matrix for each profile. Should have dimensions (n_profiles, n_pix, n_pix).
If provided, this is used instead of errors for the combination, and the combined covariance matrix is also returned.
Returns
-------
combined_profile : :py:class:`Array1D`
combined_errors : :py:class:`Array1D`
sqrt(diag(combined_covariance))
combined_covariance : :py:class:`Array2D`
Only returned if covariances is provided.
"""
from scipy.linalg import cho_factor, cho_solve
spectra = np.asarray(spectra)
if covariances is not None:
errors = np.asarray(errors)
covariances = np.asarray(covariances)
# We are solving the equations:
# z_combined = (∑C_i^{−1}*z_i) / (∑C_i^−1)
# And the denominator is the inverse of the combined covariance matrix, so we can get that as well.
n_profiles, n_pix = spectra.shape
cho_sum = np.zeros((n_pix, n_pix), dtype=float)
rhs = np.zeros(n_pix, dtype=float)
I = np.eye(n_pix)
for z, C in zip(spectra, covariances):
cf = cho_factor(C, check_finite=False)
cho_sum += cho_solve(cf, I, check_finite=False)
rhs += cho_solve(cf, z, check_finite=False)
cf_comb = cho_factor(cho_sum, check_finite=False)
combined_profile = cho_solve(cf_comb, rhs, check_finite=False)
combined_covariance = cho_solve(cf_comb, I, check_finite=False)
combined_errors = np.sqrt(np.diag(combined_covariance))
return combined_profile, combined_errors, combined_covariance
elif errors is not None:
errors = np.asarray(errors)
# Otherwise we do a simple weighted average using the errors, which is equivalent
# to the above if the covariance matrices are diagonal.
weights = 1.0 / errors**2
combined_profile = np.sum(weights * spectra, axis=0) / np.sum(weights, axis=0)
combined_errors = np.sqrt(1.0 / np.sum(weights, axis=0))
return combined_profile, combined_errors
else:
combined_profile = np.nanmean(spectra, axis=0)
return combined_profile
[docs]
@beartype
def flux_to_od(
flux : np.ndarray = None,
errors : np.ndarray = None,
linelist : np.ndarray = None,
cov_matrix : np.ndarray = None
) -> tuple | np.ndarray:
"""Converts flux, errors, linelist, and covariance matrix to optical depth.
The formula used for the conversion is:
- Optical depth (od) is calculated as -log(flux).
- Errors in optical depth are calculated as errors / flux.
- Linelist in optical depth is calculated as -log(1 - linelist).
- Covariance matrix in optical depth is calculated as cov_matrix / (flux[:, np.newaxis] * flux[np.newaxis, :]).
Parameters
----------
flux : np.ndarray
Input flux array.
errors : np.ndarray, optional
Input errors array. Defaults to None.
linelist : np.ndarray, optional
Input linelist array. Defaults to None.
cov_matrix : np.ndarray, optional
Input covariance matrix, corresponding to errors. Defaults to None.
Returns
-------
tuple
A tuple containing any of the following, depending on which inputs were provided:
- the flux in optical depth
- errors in optical depth
- linelist in optical depth
- covariance matrix in optical depth
The function appends the above inputs in the respective order to the output tuple if they were input.
"""
out = []
if flux is not None:
flux_od = -np.log(flux)
out.append(flux_od)
else:
flux_od = None
if errors is not None:
if flux_od is None:
raise ValueError("'flux' must be provided if 'errors' is provided.")
out.append(errors / flux)
if linelist is not None:
out.append(-np.log(1 - linelist))
if cov_matrix is not None:
if flux_od is None:
raise ValueError("'flux' must be provided if 'cov_matrix' is provided.")
out.append(cov_matrix / (flux[:, np.newaxis] * flux[np.newaxis, :]))
return tuple(out) if len(out) > 1 else out[0]
[docs]
@beartype
def od_to_flux(
od : np.ndarray = None,
errors : np.ndarray = None,
linelist : np.ndarray = None,
cov_matrix : np.ndarray = None
) -> tuple | np.ndarray:
"""
Converts optical depth to flux, errors, linelist, and covariance matrix.
The formula used for the conversion is:
- Flux is calculated as exp(-od).
- Errors in flux are calculated as errors * flux.
- Linelist in flux is calculated as 1 - exp(-linelist).
- Covariance matrix in flux is calculated as cov_matrix * (flux[:, np.newaxis] * flux[np.newaxis, :]).
Parameters
----------
od : np.ndarray
Input optical depth array.
errors : np.ndarray, optional
Input errors array. Defaults to None.
linelist : np.ndarray, optional
Input linelist array. Defaults to None.
cov_matrix : np.ndarray, optional
Input covariance matrix. Defaults to None.
Returns
-------
tuple
A tuple containing any of the following, depending on which inputs were provided:
- the flux in flux units
- errors in flux units
- linelist in flux units
- covariance matrix in flux units
The function appends the above inputs in the respective order to the output tuple if they were input.
"""
out = []
if od is not None:
flux = np.exp(-od)
out.append(flux)
else:
flux = None
if errors is not None:
if flux is None:
raise ValueError("'od' must be provided if 'errors' is provided.")
out.append(errors * flux)
if linelist is not None:
out.append(1-np.exp(-linelist))
if cov_matrix is not None:
if flux is None:
raise ValueError("'od' must be provided if 'cov_matrix' is provided.")
out.append(cov_matrix * (flux[:, np.newaxis] * flux[np.newaxis, :]))
return tuple(out) if len(out) > 1 else out[0]
[docs]
def next_pow_2(n):
"""
Calculates the next power of 2 greater than or equal to n.
Parameters
----------
n : int
Input number. Must be real and non-negative.
Returns
-------
int
The next power of 2 greater than or equal to n.
"""
n = int(n)
if n < 0:
raise ValueError("Input must be a non-negative integer.")
return 1 if n == 0 else 2**(n - 1).bit_length()
[docs]
def auto_window(taus: np.ndarray, c: float = 5.0):
"""
Automated windowing procedure following Sokal (1989) in emcee documentation.
Returns the window index to use in taus[window].
"""
m = np.arange(len(taus)) < c * taus
if np.any(m):
return np.argmin(m)
return len(taus) - 1
[docs]
def autocorr_func_1d(x, norm=True):
"""
Autocorrelation estimate using FFT from the emcee tutorial.
"""
x = np.atleast_1d(x)
if len(x.shape) != 1:
raise ValueError("invalid dimensions for 1D autocorrelation function")
n = next_pow_2(len(x))
# Compute the FFT and then (from that) the auto-correlation function
f = np.fft.fft(x - np.mean(x), n=2 * n)
acf = np.fft.ifft(f * np.conjugate(f))[: len(x)].real
acf /= 4 * n
# Optionally normalize
if norm:
acf /= acf[0]
return acf
[docs]
def autocorr_gw2010(y, c=5.0):
"""Goodman & Weare (2010) autocorrelation estimate from emcee documentation"""
f = autocorr_func_1d(np.mean(y, axis=0))
taus = 2.0 * np.cumsum(f) - 1.0
window = auto_window(taus, c)
return taus[window]
[docs]
def autocorr_new(y, c=5.0):
"""New integrated autocorrelation time estimate from emcee documentation"""
# Average ACF across walkers
assert y.ndim == 2, "Expects y with shape (nwalkers, nsteps)"
nwalkers, nsteps = y.shape
f = np.zeros(nsteps)
for walker in range(nwalkers):
f += autocorr_func_1d(y[walker], norm=True)
f /= nwalkers
# Apply sokal windowing on cumulative sum
taus = 2.0 * np.cumsum(f) - 1.0
window = auto_window(taus, c)
return float(taus[window])
[docs]
def sampler_nbytes(sampler) -> IntLike:
if hasattr(sampler, "nbytes"):
return sampler.nbytes
elif hasattr(sampler.backend, "nbytes"):
return sampler.backend.nbytes
else:
nbytes = 0
for name in ("chain", "log_prob", "accepted", "blobs"):
arr = getattr(sampler.backend, name, None)
if arr is not None and hasattr(arr, "nbytes"):
nbytes += arr.nbytes
return nbytes