import os
import json
import jsonpickle
import math
import warnings
from coolest.template.standard import COOLEST
from coolest.template.lazy import *
from coolest.template.classes.parameter import PointEstimate, PosteriorStatistics, Prior
from coolest.template.info import all_supported_choices as support
__all__ = ['JSONSerializer']
[docs]
class JSONSerializer(object):
"""Object that write a COOLEST object to a JSON template file,
or loads the content of a JSON template file in the COOLEST format.
For reading, either a plain JSON format or the one generated via `jsonpickle` can be provided.
For the latter, the JSON file should end with the suffix `'_pyAPI'` (before the .json extension).
NOTE: the support for reading the template with `jsonpickle` will probably
be suppressed in the future. Hence we advise users to work with the
pure JSON template files instead.
Parameters
----------
file_path_no_ext : str
Path to the JSON template, or the one to be created.
It should NOT include the .json extension nor the optional '_pyAPI' suffix
obj : object, optional
Instance of the COOLEST object (from the `standard` module)
to be encoded, by default None
indent : int, optional
Number of spaces used to indent lines in the JSON file, by default 2
check_external_files : bool, optional
If True, will check the existence of external (e.g., FITS files)
specified within the JSON template, by default True
Raises
------
ValueError
If the provided path to the JSON file is not an absolute path
ValueError
If the provided path contains the .json extension
"""
# suffix to filename to distinguish files that can be read using jsonpickle
_api_suffix = '_pyAPI'
def __init__(self,
file_path_no_ext: str,
obj: object = None,
indent: int = 2,
check_external_files: bool = True) -> None:
if not os.path.isabs(file_path_no_ext):
raise ValueError("Path to JSON file must be an absolute path")
if file_path_no_ext[-5:].lower() == '.json':
raise ValueError("The provided template name should not contain the JSON extension")
[docs]
self.path = file_path_no_ext
self._json_dir = os.path.dirname(file_path_no_ext)
self._check_files = check_external_files
[docs]
def dump_simple(self, exclude_keys=None):
"""Write to disk the template file, in plain JSON format.
Parameters
----------
exclude_keys : bool, optional
List of class attributes that should not be included
in the JSON file, by default None
"""
if exclude_keys is None:
exclude_keys = self.obj.exclude_keys
json_path = self.path + '.json'
with open(json_path, 'w') as f:
f.write(self.obj.to_JSON(indent=self.indent, exclude_keys=exclude_keys))
[docs]
def dump_jsonpickle(self):
"""Write to disk the template file, using the `jsonpickle` package
WARNING: this feature may be dropped in the future.
"""
json_path = self.path + self._api_suffix + '.json'
result = jsonpickle.encode(self.obj, indent=self.indent)
with open(json_path, 'w') as f:
f.write(result)
[docs]
def load(self, skip_jsonpickle=False, validate=True, verbose=True):
"""Read the JSON template file and build up the corresponding COOLEST object.
It will first try to load the '_pyAPI' template if it exists using `jsonpickle`,
otherwise it will fall back to reading the pure json template.
Parameters
----------
skip_jsonpickle : bool, optional
If True, will not try to read the _pyAPI template with jsonpickle first, by default False
verbose : bool, optional
If True, prints useful output for debugging, by default False
Returns
-------
COOLEST object
COOLEST object that corresponds to the JSON template
"""
json_path = self.path + '.json'
jsonpickle_path = self.path.replace(self._api_suffix, '') + self._api_suffix + '.json'
if os.path.exists(jsonpickle_path) and not skip_jsonpickle:
instance = self.load_jsonpickle(jsonpickle_path)
else:
if verbose:
print(f"Template file '{jsonpickle_path}' not found, now trying to read '{json_path}'.")
instance = self.load_simple(json_path, as_object=True, validate=validate)
assert isinstance(instance, COOLEST)
return instance
[docs]
def load_simple(self, json_path, as_object=True, validate=True):
"""Read the JSON template file and build up the corresponding COOLEST object.
Parameters
----------
json_path: str
Path to the json file to be read.
as_object : bool, optional
_description_, by default True
Returns
-------
COOLEST object
COOLEST object that corresponds to the JSON template
"""
with open(json_path, 'r') as f:
content = json.loads(f.read())
if not as_object:
return content # dictionary
return self._json_to_coolest(content, validate) # COOLEST object
[docs]
def load_jsonpickle(self, jsonpickle_path):
"""Read the JSON template file and build up the corresponding COOLEST object
using the `jsonpickle`.
Parameters
----------
jsonpickle_path: str
Path to the json file to be read with `jsonpickle`.
Returns
-------
COOLEST object
COOLEST object that corresponds to the JSON template
"""
with open(jsonpickle_path, 'r') as f:
c = jsonpickle.decode(f.read())
c = self._fill_missing_props(c)
return c # COOLEST object
def _json_to_coolest(self, json_content, validate):
"""Creates from scratch a COOLEST instance based on the content of a JSON
file, given as a nested dictionnary.
Parameters
----------
json_content : dict
Content of the JSON template file
Returns
-------
COOLEST object
COOLEST object that corresponds to the JSON template
"""
c = json_content # shorter
# MODE
mode = self._check_mode(c['mode'])
# Fill missing entries (e.g. due to previous versions of the template)
c = self._fill_missing_props(c)
# LENSING ENTITIES {GALAXY, MASSFIELDS}
lensing_entities = self._setup_lensing_entities(c['lensing_entities'])
# COSMOLOGY
cosmology = self._setup_cosmology(c.get('cosmology', None))
# COORDINATES
coordinates_origin = self._setup_coordinates(c['coordinates_origin'])
# OBSERVATION
observation = self._setup_observation(c['observation'])
# INSTRUMENT
instrument = self._setup_instrument(c['instrument'])
# LIKELIHOODS
likelihoods = self._setup_likelihoods(c.get('likelihoods', None))
# METADATA
metadata = self._check_metadata(c['meta'])
# instantiate the master class
coolest = COOLEST(mode,
coordinates_origin,
lensing_entities,
observation,
instrument,
cosmology=cosmology,
likelihoods=likelihoods,
metadata=metadata)
# check consistency across the whole coolest object
if validate:
self._validate_global(coolest)
return coolest
@staticmethod
def _validate_global(coolest):
"""Performs consistency checks regarding some key properties of the COOLEST object.
For instance, it checks that the pixel size of both the observation and
the instrument are consistent.
The checks performed here are those that cannot be handled by individual
class constructors called during instantiation of the COOLEST object.
Parameters
----------
coolest : COOLEST object
Instance of a COOLEST object
Raises
------
ValueError
In case observed instrumental pixel sizes are inconsistent
"""
# PIXEL SIZE
instru_pix_size = coolest.instrument.pixel_size
obs_pix_size = coolest.observation.pixels.pixel_size
isclose_bool = math.isclose(instru_pix_size, obs_pix_size,
rel_tol=1e-09, abs_tol=0.0)
if obs_pix_size not in (0, None) and not isclose_bool:
raise ValueError(f"Pixel size of observation ({obs_pix_size}) is inconsistent with "
f"the instrument pixel size ({instru_pix_size})")
# INSTANCE METHODS
coolest.observation.check_consistency_with_instrument(coolest.instrument)
if coolest.likelihoods is not None:
coolest.likelihoods.check_consistency_with_observation(coolest.observation)
# TODO: further standardize these checks (using class methods?)
def _setup_instrument(self, instru_in):
psf_settings = instru_in.pop('psf')
psf = self._setup_psf(psf_settings)
instru_out = Instrument(psf=psf, **instru_in)
return instru_out
def _setup_observation(self, obs_in):
pixels_settings = obs_in.pop('pixels')
pixels = self._setup_grid(pixels_settings, PixelatedRegularGrid)
noise_settings = obs_in.pop('noise')
noise = self._setup_noise(noise_settings)
obs_out = Observation(pixels=pixels, noise=noise, **obs_in)
return obs_out
def _setup_likelihoods(self, likelihoods_in):
if likelihoods_in is None:
return None
likelihoods_out = []
for likelihood_in in likelihoods_in:
likelihoods_out.append(self._setup_likelihood(likelihood_in))
return DataLikelihoodList(*likelihoods_out)
def _setup_likelihood(self, likelihood_in):
if likelihood_in['type'] == 'ImagingDataLikelihood':
likelihood_out = ImagingDataLikelihood(
mask=self._setup_img_ll_mask(likelihood_in['mask'])
)
else:
raise ValueError(f"Supported likelihoods entities are "
f"{support['likelihoods']}")
return likelihood_out
def _setup_img_ll_mask(self, mask_in):
mask_out = self._setup_grid(mask_in, PixelatedRegularGrid)
return mask_out
def _setup_coordinates(self, coord_orig_in):
return CoordinatesOrigin(**coord_orig_in)
def _setup_cosmology(self, cosmology_in):
if cosmology_in is None:
return None
return Cosmology(**cosmology_in)
def _setup_lensing_entities(self, entities_in):
entities_out = []
for entity_in in entities_in:
entities_out.append(self._setup_lensing_entity(entity_in))
return LensingEntityList(*entities_out)
def _setup_lensing_entity(self, entity_in):
if entity_in['type'] == 'Galaxy':
entity_out = Galaxy(entity_in['name'],
entity_in['redshift'],
entity_in['lensed'],
light_model=self._setup_model(entity_in, 'light_model'),
mass_model=self._setup_model(entity_in, 'mass_model'))
elif entity_in['type'] == 'MassField':
entity_out = MassField(entity_in['name'],
entity_in['redshift'],
mass_model=self._setup_model(entity_in, 'mass_model'))
else:
raise ValueError(f"Supported lensing entities are "
f"{support['lensing_entities']}")
self._update_parameters(entity_in, entity_out)
return entity_out
def _setup_model(self, entity_in, model_type):
profile_types = [profile['type'] for profile in entity_in[model_type]]
if model_type == 'light_model':
model_out = LightModel(*profile_types)
elif model_type == 'mass_model':
model_out = MassModel(*profile_types)
else:
raise ValueError("The `model_type` can only be 'light_model' or 'mass_model'")
return model_out
def _update_parameters(self, entity_in, entity_out):
self._update_parameters_values(entity_in, entity_out, 'mass_model')
self._update_parameters_values(entity_in, entity_out, 'light_model')
def _update_parameters_values(self, entity_in, entity_out, model_type):
if model_type not in entity_in:
return
for i, profile in enumerate(entity_in[model_type]):
# get the corresponding profile object in the model being updated
profile_out = getattr(entity_out, model_type)[i]
for name, values in profile['parameters'].items():
# pop the id as this was already set at instantiation
_ = values.pop('id', None)
# pixelated profiles, for now only one value given (point estimate)
if 'Grid' in profile['type']:
self._update_grid_parameter(profile_out, name, values)
# other (analytical) parameters
else:
self._update_std_parameter(profile_out, name, values)
def _update_grid_parameter(self, profile_out, name, values):
if profile_out.type in ('PixelatedRegularGrid', 'PixelatedRegularGridPotential'):
pixels = self._setup_grid(values, PixelatedRegularGrid)
profile_out.parameters[name] = pixels
elif profile_out.type == 'IrregularGrid':
pixels = self._setup_grid(values, IrregularGrid)
profile_out.parameters[name] = pixels
elif profile_out.type == 'PixelatedRegularGridFullyDefined':
pixels = self._setup_grid(values, PixelatedRegularGridStack)
profile_out.parameters[name] = pixels
else:
raise ValueError(f"Unknown grid profile ('{profile_out.type}') and/or parameter name ('{name}').")
def _update_std_parameter(self, profile_out, name, values):
pt_estim = PointEstimate(**values['point_estimate'])
profile_out.parameters[name].set_point_estimate(pt_estim)
if 'posterior_stats' in values:
post_stats = PosteriorStatistics(**values['posterior_stats'])
profile_out.parameters[name].set_posterior(post_stats)
if 'prior' in values:
prior = self._setup_prior(values['prior'])
profile_out.parameters[name].set_prior(prior)
def _setup_prior(self, prior_in):
from coolest.template.classes import probabilities as proba_module
prior_type = prior_in.pop('type')
if prior_type is None:
return Prior()
PriorClass = getattr(proba_module, prior_type)
return PriorClass(**prior_in)
def _setup_noise(self, noise_in):
from coolest.template.classes import noise as noise_module
noise_type = noise_in.pop('type')
if noise_type is None:
return Noise()
if noise_type not in support['noise_types']:
raise ValueError(f"Noise type must be in {support['noise_types']}")
NoiseClass = getattr(noise_module, noise_type)
if noise_type == 'NoiseMap':
pixels_settings = noise_in.pop('noise_map')
noise_map = self._setup_grid(pixels_settings, PixelatedRegularGrid)
noise_out = NoiseClass(noise_map=noise_map, **noise_in)
elif noise_type == 'NoiseRealization':
pixels_settings = noise_in.pop('noise_realization')
noise_realization = self._setup_grid(pixels_settings, PixelatedRegularGrid)
noise_out = NoiseClass(noise_realization=noise_realization, **noise_in)
elif noise_type == 'DrizzledNoise':
pixels_settings = noise_in.pop('wht_map')
wht_map = self._setup_grid(pixels_settings, PixelatedRegularGrid)
noise_out = NoiseClass(wht_map=wht_map, **noise_in)
else:
noise_out = NoiseClass(**noise_in)
return noise_out
def _setup_psf(self, psf_in):
from coolest.template.classes import psf as psf_module
psf_type = psf_in.pop('type')
if psf_type is None:
return PSF()
if psf_type not in support['psf_types']:
raise ValueError(f"PSF type must be in {support['psf_types']}")
PSFClass = getattr(psf_module, psf_type)
if psf_type == 'PixelatedPSF':
pixels_settings = psf_in.pop('pixels')
pixels = self._setup_grid(pixels_settings, PixelatedRegularGrid)
psf_out = PSFClass(pixels=pixels, **psf_in)
else:
psf_out = PSFClass(**psf_in)
return psf_out
def _setup_grid(self, grid_in, GridClass):
fits_path = grid_in.pop('fits_file')['path']
if fits_path is not None and os.path.isabs(fits_path):
raise ValueError(f"FITS file '{fits_path}' must be a relative path instead, "
f"and placed in the same directory as the JSON file")
return GridClass(fits_path,
check_fits_file=self._check_files,
fits_file_dir=self._json_dir,
**grid_in)
def _check_mode(self, mode_in):
mode_out = str(mode_in)
if mode_out not in support['modes']:
raise ValueError(f"The template mode can only be in '{SUPPORTED_MODES}'")
return mode_out
def _check_metadata(self, meta_in):
meta_out = meta_in # TODO: might do more checks here
return meta_out
def _fill_missing_props(self, c):
"""Fills missing properties in the JSON content, if any.
This is useful to ensure backward compatibility with previous versions of the template.
Parameters
----------
c : dict
Content of the JSON template file, as a nested dictionary
Returns
-------
dict
Updated content of the JSON template file, with missing properties filled in
"""
if isinstance(c, dict):
c = self._fill_missing_props_dict(c)
elif isinstance(c, COOLEST):
c = self._fill_missing_props_coolest(c)
else:
raise ValueError(f"Unsupported type '{type(c)}' for filling missing properties. Supported types are either a JSON dictionary or a COOLEST instance.")
return c
def _fill_missing_props_dict(self, c):
# Iterate through the lensing entities and add the `'lensed'` property if missing (added in v0.2.0)
# Since we have to guess when it is not in the template, we use two criterions:
# - if the entity is a source galaxy, then it is lensed; if it is a lens galaxy or a mass field, then it is not lensed
# - if the entity has a mass model, then it is likely a lens galaxy or a mass field, hence not lensed; if it does not have a mass model, then it is likely a source galaxy, hence lensed
for entity in c['lensing_entities']:
if 'lensed' not in entity:
if entity['type'] == 'Galaxy':
if 'mass_model' in entity and len(entity['mass_model']) > 0:
entity['lensed'] = False
warnings.warn(f"Warning: the galaxy '{entity['name']}' has a mass model, hence it is likely a lens galaxy. Setting 'lensed' to False.")
else:
entity['lensed'] = True
warnings.warn(f"Warning: the galaxy '{entity['name']}' does not have a mass model, hence it is likely a source galaxy. Setting 'lensed' to True.")
elif entity['type'] in ('MassField',):
entity['lensed'] = False
warnings.warn(f"Warning: the entity '{entity['name']}' is a mass field, hence it is likely a lensing mass component. Setting 'lensed' to False.")
else:
raise ValueError(f"Unknown lensing entity type '{entity['type']}' when trying to fill missing properties.")
return c
def _fill_missing_props_coolest(self, coolest):
for entity in coolest.lensing_entities:
if not hasattr(entity, 'lensed'):
if isinstance(entity, Galaxy):
if entity.mass_model is not None and len(entity.mass_model) > 0:
entity.lensed = False
warnings.warn(f"Warning: the galaxy '{entity.name}' has a mass model, hence it is likely a lens galaxy. Setting 'lensed' to False.")
else:
entity.lensed = True
warnings.warn(f"Warning: the galaxy '{entity.name}' does not have a mass model, hence it is likely a source galaxy. Setting 'lensed' to True.")
elif isinstance(entity, MassField):
entity.lensed = False
warnings.warn(f"Warning: the entity '{entity.name}' is a mass field, hence it is likely a lensing mass component. Setting 'lensed' to False.")
else:
raise ValueError(f"Unknown lensing entity type '{type(entity)}' when trying to fill missing properties.")
return coolest