Source code for ACID_code.lsd

from __future__ import annotations
import numpy as np
from astropy.io import  fits
import glob, psutil, os, traceback
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from tqdm import tqdm
from scipy.linalg import cho_factor, cho_solve
from beartype import beartype
from . import utils
from .errors import LineListRangeError, SNCutError
from .data import Config, Data, LineList
from .utils import c_kms, IntLike, Scalar, Array1D, Array2D

[docs] @beartype class LSD: """ Class containing all useful functions for performing least-squares deconvolution. This does not simultaneously fit the continuum and perform LSD (which ACID does). It is used for the initial parameters of the ACID mcmc run and for obtaining final profiles. It can also be used as a standalone LSD implementation and for trying to do LSD without OD. For more details and an example, see :ref:`LSD` in the documentation. """ def __init__( self, data : object|None = None, od : bool = None, verbose : IntLike|bool|str|None = None, ) -> None: """Initialises the LSD class, optionally with a Data instance to take parameters from. Parameters ---------- data : object | None, optional A data instance to draw parameters and configs from, by default None od : bool, optional Whether to perform LSD in optical depth space (True) or flux space (False), by default None. If None, takes from Data instance if provided, else defaults to True. We generally recommend always using optical depth as ACID was always intended, but you can set this to False if you wish to do your own testing. See :ref:`LSD` in the documentation for more details. verbose : :py:type:`IntLike | bool | str | None`, optional Verbosity level, if None, uses the :py:class:`Config` class existing value (in Data), or default of 2. Should follow the same format as :py:class:`Acid` verbosity. Will overwrite the verbosity level in the config if a Data instance is input, by default None. """ # Set class variables, taking from input data if it exists, else setting to defaults self.slurm = "SLURM_JOB_ID" in os.environ self.data = data if data is not None else Data() self.linelist = self.data.linelist if self.data is not None else None self.od = od if od is not None else self.data.config.od try: self.config = self.data.config except: self.config = Config() # uses defaults self.config.update_hipri(verbose=verbose) # Update config with new values, if not None
[docs] def run_LSD( self, wavelengths : Array1D, flux : Array1D, errors : Array1D, sn : Scalar, linelist : Array2D|str|LineList|dict|None = None, velocities : Array1D|None = None, alpha : Array2D|None = None, ) -> None: """Runs the LSD algorithm to extract the average line profile from the observed spectrum. Parameters ---------- wavelengths : :py:type:`Array1D` Array of wavelengths of the observed spectrum in Angstroms flux : :py:type:`Array1D` Array of flux values corresponding to the wavelengths (in linear space, and should be continuum normalized) errors : :py:type:`Array1D` Array of error values corresponding to the flux sn : :py:type:`Scalar` Signal-to-noise ratio of the observed spectrum linelist : :py:type:`Array2D | str | LineList | dict | None`, optional Linelist to use for LSD, should follow the same format as :py:class:`Acid`. If None, uses the linelist already stored in the class, if it exists, by default None. velocities : :py:type:`Array1D`, optional Array of velocities corresponding to the observed spectrum. If the class was not initialised with an Acid instance, this is required, by default None alpha : :py:type:`Array2D` | None, optional Precomputed alpha matrix, if already calculated and you want to skip directly to the Cholesky decomposition and solving for the profile, by default None """ # Ensure inputs are numpy arrays wavelengths = np.array(wavelengths) flux = np.array(flux) errors = np.array(errors) # Ensure dimensions match if not wavelengths.shape == flux.shape == errors.shape: raise ValueError("Input wavelengths, flux, and errors must have the same shape.") # Set linelist and velocities either from inputs or from Data class if initialised with Acid instance self.data.velocities = velocities if velocities is not None else self.data.velocities if self.data.velocities is None: raise ValueError("Velocities must be provided either as an argument to run_LSD or when initialising the class with an Acid instance.") self.data.linelist = linelist # Raises if no linelist available, overwrites if input # Unpack the linelist stored in data wavelengths_linelist, depths_linelist = self.data.linelist # Clip linelist to wavelength range of spectrum wavelengths_linelist, depths_linelist = utils.clip_wavelengths(wavelengths, wavelengths_linelist, depths_linelist) if len(wavelengths_linelist) == 0: error = LineListRangeError( "No lines in linelist are within the wavelength range of the observed spectrum.\n" "You may have mismatched wavelength units between linelist and spectrum or an empty linelist.\n" "Please check your linelist and input spectrum." ) self.data.exception = error self.data.traceback = traceback.format_stack() raise error # Apply S/N cut (of 1/(3*SN)) to linelist wavelengths_linelist, depths_linelist = self.sn_clip(wavelengths_linelist, depths_linelist, sn) # Convert to optical depth space for the linelist and the spectrum if needed, and convert errors accordingly if self.od: flux, errors, depths_linelist = utils.flux_to_od(flux, errors, depths_linelist) else: flux -= 1 # Calculates alpha in optical depth, selects lines greater than 1/(3*sn) if alpha is None: self.alpha = self.calc_alpha(wavelengths, wavelengths_linelist, depths_linelist) else: self.alpha = alpha # Now solve for profile using Cholesky decomposition self.c_factor = self.calc_cholesky(self.alpha, errors) # Solve for profile and profile errors using Cholesky factors self.profile, self.profile_errors, self.cov_z = self.solve_z(self.alpha, flux, errors, self.c_factor, return_cov=True) self.forward_model = self.alpha @ self.profile self.forward_model_errors = np.sqrt(np.sum((self.alpha * self.profile_errors)**2, axis=1)) # Convert profile back to flux if needed if self.od: self.profile_F, self.profile_errors_F, self.cov_z_F = utils.od_to_flux(self.profile, self.profile_errors, cov_matrix=self.cov_z) self.forward_model, self.forward_model_errors = utils.od_to_flux(self.forward_model, self.forward_model_errors) else: self.profile += 1 self.profile_F, self.profile_errors_F, self.cov_z_F = self.profile, self.profile_errors, self.cov_z self.forward_model += 1 return
[docs] def sn_clip( self, wavelengths_linelist : Array1D, depths_linelist : Array1D, sn : Scalar, ) -> tuple[Array1D, Array1D]: """ Applies a signal-to-noise cut to the linelist, removing lines shallower than 1/(3*sn) as per Dolan et al (2024). Parameters ---------- wavelengths_linelist : :py:type:`Array1D` Wavelengths from the linelist depths_linelist : :py:type:`Array1D` Depths from the linelist sn : :py:type:`Scalar` Signal-to-noise ratio threshold Returns ------- tuple[:py:type:`Array1D`, :py:type:`Array1D`] Clipped wavelengths and depths from the linelist """ # Selecting lines deeper than 1/(3*sn) idx = (depths_linelist >= 1/(3*sn)) wavelengths_linelist = wavelengths_linelist[idx] depths_linelist = depths_linelist[idx] # Analyse remaining lines ncut = np.sum(~idx) nrest = np.sum(idx) perc = 100 * nrest / (nrest + ncut) if nrest == 0: error = SNCutError(f"No lines remain in the linelist after S/N cut. Please check your linelist and S/N value.") self.data.exception = error self.data.traceback = traceback.format_stack() raise error if self.config.verbose > 0: if perc < 5: print("Warning: Less than 5% of lines remain after S/N cut. Check your linelist and S/N value.") if self.config.verbose > 2: print(f"{perc:.2f}% of lines used in LSD: {nrest} out of {nrest + ncut} remain from S/N cut.") return wavelengths_linelist, depths_linelist
[docs] def calc_alpha( self, wavelengths : Array1D, wavelengths_linelist : Array1D, depths_linelist : Array1D, velocities : Array1D|None = None, ) -> Array2D: """ Calculates the alpha matrix given flux and errors and a linelist. Note that if this function is called without using run_LSD, there is no selection of lines deeper than 1/(3*sn). If you still wish to do this, it needs to be done in linear space with the sn_clip function before converting to (if desired) OD space. The units of the alpha matrix will match the units of the input linelist. Parameters ---------- wavelengths : :py:type:`Array1D` Array of wavelengths of the observed spectrum in optical depth space wavelengths_linelist : :py:type:`Array1D` Array of wavelengths from the linelist in optical depth space depths_linelist : :py:type:`Array1D` Array of depths from the linelist in optical depth space velocities : :py:type:`Array1D`, optional Array of velocities, needs to either be initialised by class with Acid instance, or input here, by default None Returns ------- :py:type:`Array2D` The alpha matrix, to be used in the Cholesky decomposition and solving for the profile. The units will match the units of the input linelist (OD or flux). """ # Set velocities from input or from class variable if initialised with Acid instance, raise error if not provided in either way if velocities is not None: self.data.velocities = velocities elif self.data.velocities is None: raise ValueError("Velocities must be provided either as input or initialized in the Acid instance.") # Check velocity spacing is constant if not np.allclose(np.diff(self.data.velocities), self.data.velocities[1] - self.data.velocities[0]): raise ValueError("Velocity spacing must be constant for the alpha matrix calculation.") # Calculate velocity pixel size deltav = self.data.velocities[1] - self.data.velocities[0] # Clip linelist to wavelength range of spectrum (again just in case this is called without run_LSD, saves memory by reducing lines) wavelengths_linelist, depths_linelist = utils.clip_wavelengths(wavelengths, wavelengths_linelist, depths_linelist) # Find differences and velocities blankwaves = wavelengths diff = blankwaves[:, np.newaxis] - wavelengths_linelist vel = c_kms * (diff / wavelengths_linelist) # Get memory available depnding on whether were on slurm or not available_memory = utils.get_available_memory() # in bytes mat_size = len(wavelengths_linelist) * len(self.data.velocities) * len(blankwaves) * 8 # Matrix size in bytes m_available = available_memory / 2 # Available memory in bytes (divided by 2 to be safe) # Calculate alpha matrix in one go if it fits in memory if mat_size < m_available: # Calculating entire alpha matrix at once, broadcasts into a 3D array of shape (n_wl, n_lines, n_vel) x = (vel[:, :, np.newaxis] - self.data.velocities) / deltav delta = np.clip(1.0 - np.abs(x), 0.0, 1.0) alpha = (depths_linelist[:, None] * delta).sum(axis=1) # (n_wl, n_vel) # Else, calculate in blocks to save memory else: n_blank = len(blankwaves) n_vel = len(self.data.velocities) mem_size = available_memory // 2 bytes_per_row = n_blank * n_vel * 8 * 3 # *8 for float64, *3 for vel, x, delta in a row max_block = max(1, mem_size // bytes_per_row) block = int(min(max_block, len(wavelengths_linelist))) # Set initial alpha matrix to np.zeros alpha = np.zeros((len(blankwaves), len(self.data.velocities)), dtype=np.float64) # Use tqdm progress bar if verbose if self.config.verbose>1: iterable = tqdm(range(0, len(wavelengths_linelist), block), desc='Calculating alpha matrix') else: iterable = range(0, len(wavelengths_linelist), block) for start_pos in iterable: # Ensure we don't go out of bounds on last iteration end_pos = min(start_pos + block, len(wavelengths_linelist)) wl = wavelengths_linelist[start_pos:end_pos] dep = depths_linelist[start_pos:end_pos] # Perform calculations for this block vel_blk = c_kms * (blankwaves[:, None] - wl) / wl x_blk = (vel_blk[:, :, None] - self.data.velocities) / deltav delta = np.clip(1.0 - np.abs(x_blk), 0.0, 1.0) alpha += (dep[:, None] * delta).sum(axis=1) return alpha
[docs] @staticmethod def calc_cholesky( alpha : Array2D, error : Array1D, **kwargs, ) -> tuple: """ Calculates the LHS Cholesky factorisation matrix given the alpha matrix and flux errors. The units of alpha and error should match (ie OD or flux), the output will be in the same units. Parameters ---------- alpha : :py:type:`Array2D` The precomputed alpha matrix error : :py:type:`Array1D` Flux errors **kwargs : dict, optional Additional keyword arguments to pass to scipy.linalg.cho_factor. Overwrite_a=False must be set by us, do not pass this as a kwarg. Returns ------- c_factor : tuple Cholesky factorisation matrix and lower/upper flag, to be put straight into solve_z as `c_factor` parameter """ V = 1.0 / (error ** 2) # variance vector in log space, error already in log space # M = αT V α, b = αT V R AVA = alpha.T @ (V[:, None] * alpha) # Diangostics for common 1-th leading order linalg error # M = alpha.T @ (V[:, None] * alpha) # print("finite M:", np.all(np.isfinite(M))) # print("min diag:", np.min(np.diag(M))) # print("rank:", np.linalg.matrix_rank(M), " / ", M.shape[0]) # col_norm = np.linalg.norm(np.sqrt(V)[:, None] * alpha, axis=0) # print("zero columns:", np.sum(col_norm == 0)) # Cholesky factorisation of M c_factor = cho_factor(AVA, overwrite_a=False, **kwargs) return c_factor
[docs] @staticmethod def solve_z( alpha : Array2D, flux : Array1D, error : Array1D, c_factor : tuple, return_error : bool = True, return_cov : bool = False, **kwargs, ) -> tuple: """ Solves for the LSD profile and its errors using the Cholesky factors. All units should match between alpha, flux, and error (ie all in OD or all in flux). Returns the profile in the same units. Parameters ---------- alpha : :py:type:`Array2D` The precomputed alpha matrix flux : :py:type:`Array1D` The observed flux values in optical depth space error : :py:type:`Array1D` The flux errors in optical depth space c_factor : tuple Cholesky factorisation matrix and lower/upper flag, to be put straight into scipy.linalg.cho_solve as `c_factor` parameter return_error : bool, optional Whether to calculate and return the profile errors along with the profile, by default True return_cov : bool, optional Whether to return the full covariance matrix instead of just the errors, by default False **kwargs : dict, optional Additional keyword arguments to pass to both scipy.linalg.cho_solve calls (one for the profile, one for the covariance matrix) Overwrite_b=False is already set by default, do not pass this as a kwarg. Returns ------- profile, profile_errors, cov_z : tuple LSD profile and its errors (if return_error is True) and covariance matrix (if return_cov is True) """ V = 1.0 / (error ** 2) # variance vector in log space, error already in log space R = flux # R matrix in log space # M = αT V α, b = αT V R AVR = alpha.T @ (V * R) # z = M⁻¹ b profile = cho_solve(c_factor, AVR, overwrite_b=False, **kwargs) # Find error, cov(z) = M⁻¹, take diagonal if return_error or return_cov: AVA = alpha.T @ (V[:, None] * alpha) cov_z = cho_solve(c_factor, np.eye(AVA.shape[0]), overwrite_b=False, **kwargs) profile_errors = np.sqrt(np.diag(cov_z)) if return_cov: return profile, profile_errors, cov_z else: return profile, profile_errors else: return profile
[docs] @classmethod def convolve_profile( cls, profile : Array1D, profile_errors : Array1D, alpha : Array2D|None = None, velocities : Array1D|None = None, wavelengths : Array1D|None = None, linelist_wavelengths : Array1D|None = None, linelist_depths : Array1D|None = None, ) -> tuple[Array1D, Array1D]: """ Convolve your profile either using an inputted alpha matrix or by calculating one using :py:meth:`calc_alpha` with the inputted wavelengths and linelist. The units of the output convolved model spectrum will match the units of the input profile (ie OD or flux) and alpha matrix/linelist depths. If alpha is not input, the wavelengths and linelist inputs are required to calculate the alpha matrix. See :py:func:`utils.flux_to_od` and :py:func:`utils.od_to_flux` for conversions. Parameters ---------- profile : :py:type:`Array1D` The LSD profile to be convolved, in the same units as the alpha matrix (OD or flux) profile_errors : :py:type:`Array1D` The errors on the LSD profile, in the same units as the alpha matrix (OD or flux) alpha : :py:type:`Array2D` | None, optional Precomputed alpha matrix, if already calculated and you want to skip directly to the convolution, by default None velocities : :py:type:`Array1D` | None, optional Array of velocities corresponding to the observed spectrum, required if alpha is not input, by default None wavelengths : :py:type:`Array1D` | None, optional Array of wavelengths of the observed spectrum, required if alpha is not input, by default None linelist_wavelengths : :py:type:`Array1D` | None, optional Array of wavelengths from the linelist, required if alpha is not input, by default None linelist_depths : :py:type:`Array1D` | None, optional Array of depths from the linelist, required if alpha is not input. Must be in the same units as the alpha matrix (OD or flux), by default None """ if alpha is None: if velocities is None or wavelengths is None or linelist_wavelengths is None or linelist_depths is None: raise ValueError("If alpha matrix is not input, velocities, wavelengths, linelist_wavelengths, and " \ "linelist_depths must all be provided to calculate the alpha matrix.") cls.__init__(cls) alpha = cls.calc_alpha(cls, wavelengths, linelist_wavelengths, linelist_depths, velocities, verbose=2) model_spectrum = alpha @ profile model_errors = np.sqrt((alpha**2) @ (profile_errors**2)) return model_spectrum, model_errors