Source code for coolest.api.composable_models

__author__ = 'aymgal'

import os
import numpy as np
import math
import logging
from scipy import signal
import pandas as pd
from functools import partial
from copy import deepcopy

from coolest.api import util


# logging settings
logging.getLogger().setLevel(logging.WARNING)


[docs] def auto_select_entities(coolest_object): sel_source = [] sel_lens_light = [] sel_lens_mass = [] for i, entity in enumerate(coolest_object.lensing_entities): if entity.lensed is True: sel_source.append(i) if entity.has_mass_profiles: print("A", i, entity.name, len(entity.mass_model)) sel_lens_light.append(i) # i.e. this entity acts as a lens AND is lensed else: sel_lens_mass.append(i) if entity.has_light_profiles: print("B", i, entity.name) if hasattr(entity, 'light_model'): print(entity.light_model) sel_lens_light.append(i) kwargs_sel_source = dict( entity_selection=sel_source, profile_selection='all', ) kwargs_sel_lens_mass = dict( entity_selection=sel_lens_mass, profile_selection='all', ) kwargs_sel_lens_light = dict( entity_selection=sel_lens_light, profile_selection='all', ) return ( kwargs_sel_source, kwargs_sel_lens_mass, kwargs_sel_lens_light, )
def _copy_dict(d): return deepcopy(d) if d is not None else None
[docs] class BaseComposableModel(object): """Given a COOLEST object, evaluates a selection of mass or light profiles. This class serves as parent for more specific classes and should not be instantiated by the user. Parameters ---------- model_type : str Either 'light_model' or 'mass_model' coolest_object : COOLEST COOLEST instance coolest_directory : str, optional Directory which contains the COOLEST template, by default None load_posterior_samples : bool, optional If True, and if the COOLEST metadata provides it, the constructor will attempt to load the chain file containing posterior samples, in addition to point estimates for each profile parameters. Default is False. entity_selection : list, optional List of indices of the lensing entities to consider; If None, selects the first entity which has a model of type model_type, by default None profile_selection : list, optional List of either lists of indices, or 'all', for selecting which (mass or light) profile of a given lensing entity to consider. If None, selects all the profiles of within the corresponding entity, by default None Raises ------ ValueError No valid entity found or no profiles found. """ _chain_key = "chain_file_name" _supported_eval_modes = ('point', 'posterior') def __init__( self, model_type, coolest_object, coolest_directory=None, load_posterior_samples=False, # The following are referred to as `kwargs_selection` in children classes entity_selection=None, profile_selection=None ): if entity_selection is None: # In this case this composable model is essentially empty entity_selection = [] # NOTE: this was the previous behavior: finds the first entity that has a 'model_type' profile # for i, entity in enumerate(coolest_object.lensing_entities): # if model_type == 'light_model' \ # and entity.type == 'galaxy' \ # and len(entity.light_model) > 0: # entity_selection = [i] # break # elif model_type == 'mass_model' \ # and len(entity.mass_model) > 0: # entity_selection = [i] # break # if entity_selection is None: # raise ValueError(f"No lensing entity found for model type '{model_type}'! " # "Please check the COOLEST template or provide an explicit `entity_selection`.") # else: # logging.warning(f"Considering profile for lensing entity (index {i})") if profile_selection is None: # if not specified, we simply consider all profiles from that entity profile_selection = 'all' entities = coolest_object.lensing_entities
[docs] self.directory = coolest_directory
self._posterior_bool, self._csv_path = False, None if load_posterior_samples: metadata = coolest_object.meta if self._chain_key not in metadata: logging.warning(f"Metadata key '{self._chain_key}' is missing " f"from COOLEST template, hence no posterior samples " f"will be loaded.") else: self._posterior_bool = True self._csv_path = os.path.join(self.directory, metadata[self._chain_key]) self.setup_profiles_and_params(model_type, entities, entity_selection, profile_selection)
[docs] self.num_profiles = len(self.profile_list)
[docs] def setup_profiles_and_params(self, model_type, entities, entity_selection, profile_selection): profile_list = [] param_list, post_param_list = [], [] info_list = [] for i, entity in enumerate(entities): if self._selected(i, entity_selection): if model_type == 'light_model' and entity.type == 'external_shear': raise ValueError(f"External shear (entity index {i}) has no light model") for j, profile in enumerate(getattr(entity, model_type)): if self._selected(j, profile_selection): if 'Grid' in profile.type: if self.directory is None: raise ValueError("The directory in which the COOLEST file is located " "must be provided for loading FITS files.") params, fixed_params = self._get_grid_params(profile, self.directory) profile_list.append(self._get_api_profile(model_type, profile, *fixed_params)) post_params = None # TODO: support samples for grid parameters else: params, post_params = self._get_regular_params( profile, samples_file_path=self._csv_path ) profile_list.append(self._get_api_profile(model_type, profile)) param_list.append(params) post_param_list.append(post_params) info_list.append((entity.name, entity.redshift)) self.profile_list = profile_list self.param_list = param_list self.info_list = info_list if self._posterior_bool is True: post_param_list, post_weights = self._finalize_post_samples(post_param_list, self._csv_path) self.post_param_list = post_param_list self.post_weights = np.array(post_weights) else: self.post_param_list = None self.post_weights = None
[docs] def estimate_center(self): # TODO: improve this (for now simply considers the first profile that has a center) for profile, params in zip(self.profile_list, self.param_list): if 'center_x' in params: center_x = params['center_x'] center_y = params['center_y'] logging.info(f"Picked center from profile '{profile.type}'") return center_x, center_y raise ValueError("Could not estimate a center from the composed model")
@staticmethod def _get_api_profile(model_type, profile_in, *extra_profile_args): """ Takes as input a light profile from the template submodule and instantites the corresponding profile from the API submodule """ if model_type == 'light_model': from coolest.api.profiles import light ProfileClass = getattr(light, profile_in.type) elif model_type == 'mass_model': from coolest.api.profiles import mass ProfileClass = getattr(mass, profile_in.type) return ProfileClass(*extra_profile_args) @staticmethod def _get_regular_params(profile_in, samples_file_path=None): parameters = {} # best-fit values samples = {} if samples_file_path else None # posterior samples for name, param in profile_in.parameters.items(): parameters[name] = param.point_estimate.value if samples is not None: # read just the column corresponding to the parameter ID column = pd.read_csv( samples_file_path, usecols=[param.id], delimiter=',', ) # TODO: take into account probability weights from nested sampling runs! samples[name] = list(column[param.id]) return parameters, samples @staticmethod def _get_grid_params(profile_in, fits_dir): param_in = profile_in.parameters['pixels'] if 'PixelatedRegularGrid' in profile_in.type: data = param_in.get_pixels(directory=fits_dir) parameters = {'pixels': data} fov_x = param_in.field_of_view_x fov_y = param_in.field_of_view_y npix_x = param_in.num_pix_x npix_y = param_in.num_pix_y fixed_parameters = (fov_x, fov_y, npix_x, npix_y) if profile_in.type == 'PixelatedRegularGridFullyDefined': # NOTE: so far there is only one such profile, so below is not so general code param_in = profile_in.parameters['pixels_derivative'] data = param_in.get_pixels(directory=fits_dir) parameters.update({'pixels_derivative': data}) param_in = profile_in.parameters['pixels_hessian'] data = param_in.get_pixels(directory=fits_dir) parameters.update({'pixels_hessian': data}) elif 'IrregularGrid' in profile_in.type: x, y, z = param_in.get_xyz(directory=fits_dir) parameters = {'x': x, 'y': y, 'z': z} fov_x = param_in.field_of_view_x fov_y = param_in.field_of_view_y npix = param_in.num_pix fixed_parameters = (fov_x, fov_y, npix) return parameters, fixed_parameters @staticmethod def _finalize_post_samples(param_list_of_samples, samples_file_path): """ Takes as input the samples grouped at the leaves of the nested container structure, and returns a list of items each organized as self.param_list """ num_profiles = len(param_list_of_samples) profile_0 = param_list_of_samples[0] num_samples = len(profile_0[list(profile_0.keys())[0]]) samples_of_param_list = [ [{} for _ in range(num_profiles)] for _ in range(num_samples) ] for i in range(num_samples): for k in range(num_profiles): for key in param_list_of_samples[k].keys(): samples_of_param_list[i][k][key] = param_list_of_samples[k][key][i] # also load and return the probability weights # read just the column corresponding to the parameter ID weights_key = 'probability_weights' column = pd.read_csv( samples_file_path, usecols=[weights_key], delimiter=',', ) weights_list = list(column[weights_key]) return samples_of_param_list, weights_list @staticmethod def _selected(index, selection): if isinstance(selection, str) and selection.lower() == 'all': return True elif isinstance(selection, (list, tuple, np.ndarray)) and index in selection: return True elif isinstance(selection, (int, float)) and int(selection) == index: return True return False def _check_eval_mode(self, mode): if mode not in self._supported_eval_modes: raise NotImplementedError( f"Only evaluation modes " f"{self._supported_eval_modes} are supported " f"(received '{mode}')." )
[docs] class ComposableLightModel(BaseComposableModel): """Given a COOLEST object, evaluates a selection of entity and their light profiles. Parameters ---------- coolest_object : COOLEST COOLEST instance coolest_directory : str, optional Directory which contains the COOLEST template, by default None kwargs_selection : dict, optional Keyword arguments for selecting which entities and profiles to consider, see BaseComposableModel for details. Raises ------ ValueError No valid entity found or no profiles found. """ def __init__( self, coolest_object, coolest_directory=None, **kwargs_selection, ): super().__init__('light_model', coolest_object, coolest_directory=coolest_directory, **kwargs_selection) pixel_size = coolest_object.instrument.pixel_size if pixel_size is None: self.pixel_area = 1. else: self.pixel_area = pixel_size**2
[docs] def surface_brightness(self, return_extra=False): """Returns the surface brightness as stored in the COOLEST file""" if self.num_profiles > 1: logging.warning("When more than a single light profile has been selected, " "the method `surface_brightness()` only considers the first profile") profile = self.profile_list[0] values = profile.surface_brightness(**self.param_list[0]) if return_extra: extent = profile.get_extent() coordinates = profile.get_coordinates() return values, extent, coordinates return values
[docs] def evaluate_surface_brightness(self, x, y): """Evaluates the surface brightness at given coordinates""" image = np.zeros_like(x) for k, (profile, params) in enumerate(zip(self.profile_list, self.param_list)): flux_k = profile.evaluate_surface_brightness(x, y, **params) if profile.units == 'per_ang': flux_k *= self.pixel_area image += flux_k return image
[docs] class ComposableMassModel(BaseComposableModel): """Given a COOLEST object, evaluates a selection of entity and their mass profiles. Parameters ---------- coolest_object : COOLEST COOLEST instance coolest_directory : str, optional Directory which contains the COOLEST template, by default None kwargs_selection : dict, optional Keyword arguments for selecting which entities and profiles to consider, see BaseComposableModel for details. Raises ------ ValueError No valid entity found or no profiles found. """ def __init__(self, coolest_object, coolest_directory=None, load_posterior_samples=False, **kwargs_selection): super().__init__('mass_model', coolest_object, coolest_directory=coolest_directory, load_posterior_samples=load_posterior_samples, **kwargs_selection)
[docs] def evaluate_potential(self, x, y, mode='point', last_n_samples=None): """Evaluates the lensing potential field at given coordinates""" self._check_eval_mode(mode) if mode == 'point' or self._posterior_bool is False: return self._eval_pot_point(x, y, self.param_list) elif mode == 'posterior': return self._eval_pot_posterior(x, y, self.post_param_list, last_n_samples)
def _eval_pot_point(self, x, y, param_list): psi = np.zeros_like(x) for k, profile in enumerate(self.profile_list): psi += profile.evaluate_potential(x, y, **param_list[k]) return psi def _eval_pot_posterior(self, x, y, param_list, last_n_samples): # map the point function at each sample use_all_samples = last_n_samples is None or last_n_samples <= 0 val_list = param_list if use_all_samples else param_list[-last_n_samples:] mapped = map(partial(self._eval_pot_point, x, y), val_list) return np.array(list(mapped))
[docs] def fermat_potential(self, x, y, x_src, y_src, mode='point', last_n_samples=None): """Computes the Fermat potential for image (x, y) and source position (x_src, y_src) """ # gravitational term psi = self.evaluate_potential(x, y, mode=mode, last_n_samples=last_n_samples) # geometric term geo = ((x - x_src)**2 + (y - y_src)**2) / 2. geo = np.broadcast_to(geo, psi.shape) # makes sure geo has same shape as psi return geo - psi
[docs] def evaluate_deflection(self, x, y): """Evaluates the lensing deflection field at given coordinates""" alpha_x, alpha_y = np.zeros_like(x), np.zeros_like(x) for k, (profile, params) in enumerate(zip(self.profile_list, self.param_list)): a_x, a_y = profile.evaluate_deflection(x, y, **params) alpha_x += a_x alpha_y += a_y return alpha_x, alpha_y
[docs] def evaluate_convergence(self, x, y): """Evaluates the lensing convergence (i.e., 2D mass density) at given coordinates""" kappa = np.zeros_like(x) for k, (profile, params) in enumerate(zip(self.profile_list, self.param_list)): kappa += profile.evaluate_convergence(x, y, **params) return kappa
[docs] def evaluate_hessian(self, x, y): """Evaluates the lensing Hessian components at given coordinates""" H_xx_sum = np.zeros_like(x) H_xy_sum = np.zeros_like(x) H_yx_sum = np.zeros_like(x) H_yy_sum = np.zeros_like(x) for k, (profile, params) in enumerate(zip(self.profile_list, self.param_list)): H_xx, H_xy, H_yx, H_yy = profile.evaluate_hessian(x, y, **params) H_xx_sum += H_xx H_xy_sum += H_xy H_yx_sum += H_yx H_yy_sum += H_yy return H_xx_sum, H_xy_sum, H_yx_sum, H_yy_sum
[docs] def evaluate_jacobian(self, x, y): """Evaluates the lensing Jacobian (d beta / d theta) at given coordinates""" H_xx, H_xy, H_yx, H_yy = self.evaluate_hessian(x, y) A = np.array([[1 - H_xx, -H_xy], [-H_yx, 1 - H_yy]]) return A
[docs] def evaluate_magnification(self, x, y): """Evaluates the lensing magnification at given coordinates""" H_xx, H_xy, H_yx, H_yy = self.evaluate_hessian(x, y) det_A = (1 - H_xx) * (1 - H_yy) - H_xy*H_yx mu = 1. / det_A return mu
[docs] def ray_shooting(self, x, y): """evaluates the lens equation beta = theta - alpha(theta)""" alpha_x, alpha_y = self.evaluate_deflection(x, y) x_rs, y_rs = x - alpha_x, y - alpha_y return x_rs, y_rs
[docs] class ComposableLensModel(object): """Given a COOLEST object, evaluates a selection of entity and their mass and light profiles, typically to construct an image of the lens. Parameters ---------- coolest_object : COOLEST COOLEST instance coolest_directory : str, optional Directory which contains the COOLEST template, by default None auto_selection : bool, True If True, will select entities based on their `lensed` attributes. In particular, all entities having a `lensed=True` will be associated to a ComposableLightModel for the source, and all entities having `lensed=False` will be associated to a ComposableLightModel and ComposableLightModel for the lens If False, the user should provide the appropriate keyword arguments in the kwargs_selection_* arguments. kwargs_selection_lens_mass: dict, optional Keyword arguments for ComposableMassModel to select which entities and their profiles to consider for the lens mass model. kwargs_selection_lens_light: dict, optional Keyword arguments for ComposableLightModel to select which entities and their profiles to consider for the lens light model. kwargs_selection_source: dict, optional Keyword arguments for ComposableLightModel to select which entities and their profiles to consider for the source light model. Raises ------ ValueError No valid entity found or no profiles found. """ def __init__(self, coolest_object, coolest_directory=None, auto_selection=True, kwargs_selection_source=None, kwargs_selection_lens_mass=None, kwargs_selection_lens_light=None):
[docs] self.coolest = coolest_object
[docs] self.coord_obs = util.get_coordinates(self.coolest)
[docs] self.directory = coolest_directory
kwargs_source = _copy_dict(kwargs_selection_source) kwargs_lens_mass = _copy_dict(kwargs_selection_lens_mass) kwargs_lens_light = _copy_dict(kwargs_selection_lens_light) if auto_selection is True: ( kwargs_source, kwargs_lens_mass, kwargs_lens_light ) = auto_select_entities(self.coolest) print("Auto-selected the following entities and profiles:") print(f"- source: {kwargs_source}") print(f"- lens mass: {kwargs_lens_mass}") print(f"- lens light: {kwargs_lens_light}") else: if kwargs_selection_source is None: kwargs_source = {} if kwargs_selection_lens_mass is None: kwargs_lens_mass = {} if kwargs_selection_lens_light is None: kwargs_lens_light = {}
[docs] self.lens_mass = ComposableMassModel(coolest_object, coolest_directory, **kwargs_lens_mass)
[docs] self.lens_light = ComposableLightModel(coolest_object, coolest_directory, **kwargs_lens_light)
[docs] self.source = ComposableLightModel(coolest_object, coolest_directory, **kwargs_source)
[docs] def model_image(self, supersampling=5, convolved=True, super_convolution=True): """generates an image of the lens based on the selected model components""" obs = self.coolest.observation psf = self.coolest.instrument.psf if convolved is True and psf.type == 'PixelatedPSF': scale_factor = obs.pixels.pixel_size / psf.pixels.pixel_size supersampling_conv = int(round(scale_factor)) if not math.isclose(scale_factor, supersampling_conv): raise ValueError(f"PSF supersampling ({scale_factor}) not close to an integer?") if supersampling_conv < 1: raise ValueError("PSF pixel size smaller than data pixel size") if supersampling < 1: raise ValueError("Supersampling must be >= 1") if convolved is True and supersampling_conv > supersampling: supersampling = supersampling_conv logging.warning(f"Supersampling adapted to the PSF pixel size ({supersampling})") coord_eval = self.coord_obs.create_new_coordinates(pixel_scale_factor=1./supersampling) x, y = coord_eval.pixel_coordinates image = self.evaluate_lensed_surface_brightness(x, y) image += self.lens_light.evaluate_surface_brightness(x, y) if convolved is True: if psf.type != 'PixelatedPSF': raise NotImplementedError kernel = psf.pixels.get_pixels(directory=self.directory) kernel_sum = kernel.sum() if not math.isclose(kernel_sum, 1., abs_tol=1e-3): kernel /= kernel_sum logging.warning(f"PSF kernel is not normalized (sum={kernel_sum}), " f"so it has been normalized before convolution") if np.isnan(image).any(): np.nan_to_num(image, copy=False, nan=0., posinf=None, neginf=None) logging.warning("Found NaN values in image prior to convolution; " "they have been replaced by zeros.") if super_convolution and supersampling_conv == supersampling: # first convolve then downscale image = signal.fftconvolve(image, kernel, mode='same') image = util.downsampling(image, factor=supersampling) else: # first downscale then convolve image = util.downsampling(image, factor=supersampling) image = signal.fftconvolve(image, kernel, mode='same') elif supersampling > 1: image = util.downsampling(image, factor=supersampling) return image, self.coord_obs
[docs] def model_residuals(self, mask=None, **model_image_kwargs): """computes the normalized residuals map as (data - model) / sigma""" model, _ = self.model_image(**model_image_kwargs) data = self.coolest.observation.pixels.get_pixels(directory=self.directory) noise = self.coolest.observation.noise if noise.type != 'NoiseMap': raise NotImplementedError sigma = noise.noise_map.get_pixels(directory=self.directory) if mask is None: mask = np.ones_like(model) return ((data - model) / sigma) * mask, self.coord_obs
[docs] def evaluate_lensed_surface_brightness(self, x, y): """Evaluates the surface brightness of a lensed source at given coordinates""" # ray-shooting x_rs, y_rs = self.ray_shooting(x, y) # evaluates at ray-shooted coordinates lensed_image = self.source.evaluate_surface_brightness(x_rs, y_rs) return lensed_image
[docs] def ray_shooting(self, x, y): """evaluates the lens equation beta = theta - alpha(theta)""" return self.lens_mass.ray_shooting(x, y)