Source code for pahfit.base

import numpy as np
import matplotlib as mpl

from pahfit.instrument import within_segment, fwhm
from pahfit.errors import PAHFITModelError
from pahfit.component_models import (
    BlackBody1D,
    ModifiedBlackBody1D,
    S07_attenuation,
    att_Drude1D,
)
from astropy.modeling.physical_models import Drude1D
from astropy.modeling.functional_models import Gaussian1D

__all__ = ["PAHFITBase"]


def _ingest_limits(min_vals, max_vals):
    """
    Ingest the limits read from yaml file and generate the appropriate
    internal format (list of tuples).  Needed to handle the case
    where a limit is not desired as numpy arrays cannot have elements
    of None, instead a value of nan is used.

    Limits that are not set are designated as 'nan' in files and
    these are changed to the python None to be compatible with
    the astropy.modeling convention.

    Parameters
    ----------
    min_vals,
    max_vals : numpy.array (masked arrays)
        min/max values of the limits for a parameter
        nan designates no limit

    Returns
    -------
    plimits : list of tuples
        tuples give the min/max limits for a parameter
    """
    plimits = []
    mask_min = min_vals.mask
    data_min = min_vals.data
    mask_max = max_vals.mask
    data_max = max_vals.data

    mask_min_ind = np.where(np.logical_not(mask_min))[0]
    mask_max_ind = np.where(np.logical_not(mask_max))[0]

    min_vals = np.zeros(len(mask_min))
    min_vals[mask_min_ind] = data_min[mask_min_ind]

    max_vals = np.zeros(len(mask_max))
    max_vals[mask_max_ind] = data_max[mask_max_ind]

    plimits = []
    for cmin, cmax in zip(min_vals, max_vals):
        if np.isinf(cmin):
            cmin = None
        if np.isinf(cmax):
            cmax = None
        plimits.append((cmin, cmax))

    return plimits


def _ingest_fixed(fixed_vals):
    """
    Ingest the fixed value read from a file and generate the appropriate
    internal format (list of booleans). Since this information is indirectly
    hidden in the parameter of a feature, this function is needed to
    extract that.

    Parameters
    ----------
    min_vals : numpy.array (masked array)
        fixed designations

    Returns
    -------
    pfixed : list (boolean)
        True/False designation for parameters
    """
    mask_false_ind = np.where(np.logical_not(fixed_vals.mask))[0]
    fixed_vals = ["True"] * len(fixed_vals.mask)
    for i in range(0, len(mask_false_ind)):
        ind = mask_false_ind[i]
        fixed_vals[ind] = "False"

    pfixed = []
    for cfixed in fixed_vals:
        if cfixed == "True":
            cfixed = True
        if cfixed == "False":
            cfixed = False
        pfixed.append(cfixed)

    return pfixed


[docs] class PAHFITBase: """ Old implementation. Some functions are still used by the new Model class. The unused functionality has been removed. Construct that is still used for now param_info: tuple of dicts called (bb_info, df_info, h2_info, ion_info, abs_info, att_info) The dictionaries contain info for each type of component. Each component of the dictonaries is a vector. bb_info - dict with {name, temps, temps_limits, temps_fixed, amps, amps_limits, amps_fixed}, each a vector dust_features, h2_features, ion_features - dict with {name amps, amps_limits, amps_fixed, x_0, x_0_limits, x_0_fixed, fwhms, fwhms_limits, fwhm_fixed}. """
[docs] @staticmethod def model_from_param_info(param_info): # setup the model bb_info = param_info[0] dust_features = param_info[1] h2_features = param_info[2] ion_features = param_info[3] abs_info = param_info[4] att_info = param_info[5] model = None if bb_info is not None: bbs = [] for k in range(len(bb_info["names"])): BBClass = ModifiedBlackBody1D if bb_info["modified"][ k] else BlackBody1D bbs.append( BBClass( name=bb_info["names"][k], temperature=bb_info["temps"][k], amplitude=bb_info["amps"][k], bounds={ "temperature": bb_info["temps_limits"][k], "amplitude": bb_info["amps_limits"][k], }, fixed={ "temperature": bb_info["temps_fixed"][k], "amplitude": bb_info["amps_fixed"][k], }, ) ) model = sum(bbs[1:], bbs[0]) if dust_features is not None: df = [] for k in range(len(dust_features["names"])): df.append( Drude1D( name=dust_features["names"][k], amplitude=dust_features["amps"][k], x_0=dust_features["x_0"][k], fwhm=dust_features["fwhms"][k], bounds={ "amplitude": dust_features["amps_limits"][k], "x_0": dust_features["x_0_limits"][k], "fwhm": dust_features["fwhms_limits"][k], }, fixed={ "amplitude": dust_features["amps_fixed"][k], "x_0": dust_features["x_0_fixed"][k], "fwhm": dust_features["fwhms_fixed"][k], }, )) df = sum(df[1:], df[0]) if model: model += df else: model = df if h2_features is not None: h2 = [] for k in range(len(h2_features["names"])): h2.append( Gaussian1D( name=h2_features["names"][k], amplitude=h2_features["amps"][k], mean=h2_features["x_0"][k], stddev=h2_features["fwhms"][k] / 2.355, bounds={ "amplitude": h2_features["amps_limits"][k], "mean": h2_features["x_0_limits"][k], "stddev": ( h2_features["fwhms"][k] * 0.9 / 2.355, h2_features["fwhms"][k] * 1.1 / 2.355, ), }, fixed={ "amplitude": h2_features["amps_fixed"][k], "mean": h2_features["x_0_fixed"][k], "stddev": h2_features["fwhms_fixed"][k], }, )) h2 = sum(h2[1:], h2[0]) if model: model += h2 else: model = h2 if ion_features is not None: ions = [] for k in range(len(ion_features["names"])): ions.append( Gaussian1D( name=ion_features["names"][k], amplitude=ion_features["amps"][k], mean=ion_features["x_0"][k], stddev=ion_features["fwhms"][k] / 2.355, bounds={ "amplitude": ion_features["amps_limits"][k], "mean": ion_features["x_0_limits"][k], "stddev": ( ion_features["fwhms"][k] * 0.9 / 2.355, ion_features["fwhms"][k] * 1.1 / 2.355, ), }, fixed={ "amplitude": ion_features["amps_fixed"][k], "mean": ion_features["x_0_fixed"][k], "stddev": ion_features["fwhms_fixed"][k], }, )) ions = sum(ions[1:], ions[0]) if model: model += ions else: model = ions # add additional att components to the model if necessary if not model: raise PAHFITModelError("No model components found") if abs_info is not None: for k in range(len(abs_info["names"])): model *= att_Drude1D( name=abs_info["names"][k], tau=abs_info["amps"][k], x_0=abs_info["x_0"][k], fwhm=abs_info["fwhms"][k], bounds={ "tau": abs_info["amps_limits"][k], "fwhm": abs_info["fwhms_limits"][k], }, fixed={"x_0": abs_info["x_0_fixed"][k]}, ) if att_info is not None: model *= S07_attenuation( name=att_info["name"], tau_sil=att_info["tau_sil"], bounds={"tau_sil": att_info["tau_sil_limits"]}, fixed={"tau_sil": att_info["tau_sil_fixed"]}, ) return model
[docs] @staticmethod def plot(axs, x, y, yerr, model, model_samples=1000, scalefac_resid=2): """ Plot model using axis object. Parameters ---------- axs : matplotlib.axis objects where to put the plot x : floats wavelength points y : floats observed spectrum yerr: floats observed spectrum uncertainties model : PAHFITBase model (astropy modeling CompoundModel) model giving all the components and parameters model_samples : int Total number of wavelength points to allocate to the model display scalefac_resid : float Factor multiplying the standard deviation of the residuals to adjust plot limits """ # remove units if they are present if hasattr(x, "value"): x = x.value if hasattr(y, "value"): y = y.value if hasattr(yerr, "value"): yerr = yerr.value # Fine x samples for model fit x_mod = np.logspace(np.log10(min(x)), np.log10(max(x)), model_samples) # spectrum and best fit model ax = axs[0] ax.set_yscale("linear") ax.set_xscale("log") ax.minorticks_on() ax.tick_params(axis="both", which="major", top="on", right="on", direction="in", length=10) ax.tick_params(axis="both", which="minor", top="on", right="on", direction="in", length=5) ax_att = ax.twinx() # axis for plotting the extinction curve ax_att.tick_params(which="minor", direction="in", length=5) ax_att.tick_params(which="major", direction="in", length=10) ax_att.minorticks_on() # get the extinction model (probably a better way to do this) ext_model = None for cmodel in model: if isinstance(cmodel, S07_attenuation): ext_model = cmodel(x_mod) # get additional extinction components that can be # characterized by functional forms (Drude profile in this case) for cmodel in model: if isinstance(cmodel, att_Drude1D): if ext_model is not None: ext_model *= cmodel(x_mod) else: ext_model = cmodel(x_mod) ax_att.plot(x_mod, ext_model, "k--", alpha=0.5) ax_att.set_ylabel("Attenuation") ax_att.set_ylim(0, 1.1) # Define legend lines Leg_lines = [ mpl.lines.Line2D([0], [0], color="k", linestyle="--", lw=2), mpl.lines.Line2D([0], [0], color="#FE6100", lw=2), mpl.lines.Line2D([0], [0], color="#648FFF", lw=2, alpha=0.5), mpl.lines.Line2D([0], [0], color="#DC267F", lw=2, alpha=0.5), mpl.lines.Line2D([0], [0], color="#785EF0", lw=2, alpha=1), mpl.lines.Line2D([0], [0], color="#FFB000", lw=2, alpha=0.5), ] # create the continum compound model (base for plotting lines) cont_components = [] for cmodel in model: if isinstance(cmodel, BlackBody1D): cont_components.append(cmodel) # plot as we go ax.plot(x_mod, cmodel(x_mod) * ext_model / x_mod, "#FFB000", alpha=0.5) cont_model = cont_components[0] for cmodel in cont_components[1:]: cont_model += cmodel cont_y = cont_model(x_mod) # now plot the dust bands and lines for cmodel in model: if isinstance(cmodel, Gaussian1D): ax.plot( x_mod, (cont_y + cmodel(x_mod)) * ext_model / x_mod, "#DC267F", alpha=0.5, ) if isinstance(cmodel, Drude1D): ax.plot( x_mod, (cont_y + cmodel(x_mod)) * ext_model / x_mod, "#648FFF", alpha=0.5, ) ax.plot(x_mod, cont_y * ext_model / x_mod, "#785EF0", alpha=1) ax.plot(x_mod, model(x_mod) / x_mod, "#FE6100", alpha=1) ax.errorbar( x, y / x, yerr=yerr / x, fmt="o", markeredgecolor="k", markerfacecolor="none", ecolor="k", elinewidth=0.2, capsize=0.5, markersize=6, ) ax.set_ylim(0) ax.set_ylabel(r"$\nu F_{\nu}$") ax.legend( Leg_lines, [ "S07_attenuation", "Spectrum Fit", "Dust Features", r"Atomic and $H_2$ Lines", "Total Continuum Emissions", "Continuum Components", ], prop={"size": 10}, loc="best", facecolor="white", framealpha=1, ncol=3, ) # residuals, lower sub-figure res = (y - model(x)) / x std = np.std(res) ax = axs[1] ax.set_yscale("linear") ax.set_xscale("log") ax.tick_params(axis="both", which="major", top="on", right="on", direction="in", length=10) ax.tick_params(axis="both", which="minor", top="on", right="on", direction="in", length=5) ax.minorticks_on() # Custom X axis ticks ax.xaxis.set_ticks( [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 14, 16, 20, 25, 30, 40]) ax.axhline(0, linestyle="--", color="gray", zorder=0) ax.plot(x, res, "ko-", fillstyle="none", zorder=1) ax.set_ylim(-scalefac_resid * std, scalefac_resid * std) ax.set_xlim(np.floor(np.amin(x)), np.ceil(np.amax(x))) ax.set_xlabel(r"$\lambda$ [$\mu m$]") ax.set_ylabel("Residuals [%]") # scalar x-axis marks ax.xaxis.set_minor_formatter(mpl.ticker.ScalarFormatter()) ax.xaxis.set_major_formatter(mpl.ticker.ScalarFormatter())
[docs] @staticmethod def update_dictionary(feature_dict, instrumentname, update_fwhms=False, redshift=0): """ Update parameter dictionary based on the instrument name. Based on the instrument name, this function removes the features outside of the wavelength range and updates the FWHMs of the lines. Parameters ---------- feature_dict : dictionary Dictionary created by reading in a science pack. instrumentname : string Name of the instrument with which the input spectrum is observed. update_fwhms = Boolean True for h2_info and ion_info False for df_info Returns ------- updated feature_dict """ if feature_dict is None: return None # convert from physical feature, to observed wavelength def redshifted_waves(): return feature_dict["x_0"] * (1 + redshift) ind = np.nonzero(within_segment(redshifted_waves(), instrumentname))[0] # select the valid entries in these arrays array_keys = ("x_0", "amps", "fwhms", "names") new_values_1 = {key: feature_dict[key][ind] for key in array_keys} # these are lists instead list_keys = ( "amps_fixed", "fwhms_fixed", "x_0_fixed", "x_0_limits", "amps_limits", "fwhms_limits", ) new_values_2 = { key: [feature_dict[key][i] for i in ind] for key in list_keys } feature_dict.update(new_values_1) feature_dict.update(new_values_2) if len(feature_dict['names']) == 0: # if we removed all the things, be careful here. Setting to # None should make the model construction function behave # normally. feature_dict = None return feature_dict if update_fwhms: # observe the lines at the redshifted wavelength fwhm_min_max = fwhm(instrumentname, redshifted_waves(), as_bounded=True) # shift the observed fwhm back to the rest frame (where the # observed data will be moved, and its features will become # narrower) fwhm_min_max /= (1 + redshift) # For astropy a numpy.bool does not work for the 'fixed' # parameter. It needs to be a regular bool. Doing tolist() # instead of using the array mask directly solves this. feature_dict.update( { "fwhms": fwhm_min_max[:, 0], # masked means there is no min/max, i.e. they need to be fixed "fwhms_fixed": fwhm_min_max[:, 1].mask.tolist(), "fwhms_limits": fwhm_min_max[:, 1:].tolist(), } ) return feature_dict
[docs] @staticmethod def parse_table(pack_table): """ Load the model parameters from a Table Parameters ---------- pack_table : Table Table created by reading in a science pack. Returns ------- readout : tuple Tuple containing dictionaries of all components from the input Table. Can be used to create PAHFITBase instance using param_info argument. Dictionary in tuple is None if no components of that type were specified. """ # Getting indices for the different components bb_ind = np.where((pack_table["kind"] == "starlight") | (pack_table["kind"] == "dust_continuum"))[0] df_ind = np.where(pack_table["kind"] == "dust_feature")[0] ga_ind = np.where(pack_table["kind"] == "line")[0] at_ind = np.where(pack_table["kind"] == "attenuation")[0] ab_ind = np.where(pack_table["kind"] == "absorption")[0] # now split the gas emission lines between H2 and ions names = [str(i) for i in pack_table["name"][ga_ind]] if len(names) > 0: # this has trouble with empty list h2_temp = np.char.find(names, "H2") >= 0 ion_temp = np.char.find(names, "H2") == -1 h2_ind = ga_ind[h2_temp] ion_ind = ga_ind[ion_temp] else: h2_ind = [] ion_ind = [] # the rest works fine with empty list # Creating the blackbody dict bb_info = None if len(bb_ind) > 0: bb_info = { "names": np.array(pack_table["name"][bb_ind].data), "temps": np.array(pack_table["temperature"][:, 0][bb_ind].data), "temps_limits": _ingest_limits( pack_table["temperature"][:, 1][bb_ind], pack_table["temperature"][:, 2][bb_ind], ), "temps_fixed": _ingest_fixed(pack_table["temperature"][:, 1][bb_ind]), "amps": np.array(pack_table["tau"][:, 0][bb_ind].data), "amps_limits": _ingest_limits( pack_table["tau"][:, 1][bb_ind], pack_table["tau"][:, 2][bb_ind], ), "amps_fixed": _ingest_fixed(pack_table["tau"][:, 1][bb_ind]), "modified": np.array(pack_table["kind"][bb_ind] == "dust_continuum"), } # Creating the dust_features dict df_info = None if len(df_ind) > 0: df_info = { "names": np.array(pack_table["name"][df_ind].data), "x_0": np.array(pack_table["wavelength"][:, 0][df_ind].data), "x_0_limits": _ingest_limits( pack_table["wavelength"][:, 1][df_ind], pack_table["wavelength"][:, 2][df_ind], ), "x_0_fixed": _ingest_fixed(pack_table["wavelength"][:, 1][df_ind]), "amps": np.array(pack_table["power"][:, 0][df_ind].data), "amps_limits": _ingest_limits( pack_table["power"][:, 1][df_ind], pack_table["power"][:, 2][df_ind], ), "amps_fixed": _ingest_fixed(pack_table["power"][:, 1][df_ind]), "fwhms": np.array(pack_table["fwhm"][:, 0][df_ind].data), "fwhms_limits": _ingest_limits( pack_table["fwhm"][:, 1][df_ind], pack_table["fwhm"][:, 2][df_ind], ), "fwhms_fixed": _ingest_fixed(pack_table["fwhm"][:, 1][df_ind]), } # Creating the H2 dict h2_info = None if len(h2_ind) > 0: h2_info = { "names": np.array(pack_table["name"][h2_ind].data), "x_0": np.array(pack_table["wavelength"][:, 0][h2_ind].data), "x_0_limits": _ingest_limits( pack_table["wavelength"][:, 1][h2_ind], pack_table["wavelength"][:, 2][h2_ind], ), "x_0_fixed": _ingest_fixed(pack_table["wavelength"][:, 1][h2_ind]), "amps": np.array(pack_table["power"][:, 0][h2_ind].data), "amps_limits": _ingest_limits( pack_table["power"][:, 1][h2_ind], pack_table["power"][:, 2][h2_ind], ), "amps_fixed": _ingest_fixed(pack_table["power"][:, 1][h2_ind]), "fwhms": np.array(pack_table["fwhm"][:, 0][h2_ind].data), "fwhms_limits": _ingest_limits( pack_table["fwhm"][:, 1][h2_ind], pack_table["fwhm"][:, 2][h2_ind], ), "fwhms_fixed": _ingest_fixed(pack_table["fwhm"][:, 1][h2_ind]), } # Creating the ion dict ion_info = None if len(ion_ind) > 0: ion_info = { "names": np.array(pack_table["name"][ion_ind].data), "x_0": np.array(pack_table["wavelength"][:, 0][ion_ind].data), "x_0_limits": _ingest_limits( pack_table["wavelength"][:, 1][ion_ind], pack_table["wavelength"][:, 2][ion_ind], ), "x_0_fixed": _ingest_fixed(pack_table["wavelength"][:, 1][ion_ind]), "amps": np.array(pack_table["power"][:, 0][ion_ind].data), "amps_limits": _ingest_limits( pack_table["power"][:, 1][ion_ind], pack_table["power"][:, 2][ion_ind], ), "amps_fixed": _ingest_fixed(pack_table["power"][:, 1][ion_ind]), "fwhms": np.array(pack_table["fwhm"][:, 0][ion_ind].data), "fwhms_limits": _ingest_limits( pack_table["fwhm"][:, 1][ion_ind].data, pack_table["fwhm"][:, 2][ion_ind].data, ), "fwhms_fixed": _ingest_fixed(pack_table["fwhm"][:, 1][ion_ind].data), } # Create the attenuation dict (could be absorption drudes # and S07 model) abs_info = None if len(ab_ind) > 0: abs_info = { "names": np.array(pack_table["name"][at_ind].data), "x_0": np.array(pack_table["wavelength"][:, 0][at_ind].data), "x_0_limits": _ingest_limits( pack_table["wavelength"][:, 1][at_ind], pack_table["wavelength"][:, 2][at_ind], ), "x_0_fixed": _ingest_fixed(pack_table["wavelength"][:, 1][at_ind]), "amps": np.array(pack_table["tau"][:, 0][at_ind].data), "amps_limits": _ingest_limits( pack_table["tau"][:, 0][at_ind], pack_table["tau"][:, 1][at_ind], ), "amps_fixed": _ingest_fixed(pack_table["tau"][:, 1][at_ind]), "fwhms": np.array(pack_table["fwhm"][:, 0][at_ind].data), "fwhms_limits": _ingest_limits( pack_table["fwhm"][:, 1][at_ind], pack_table["fwhm"][:, 2][at_ind], ), "fwhms_fixed": _ingest_fixed(pack_table["fwhm"][:, 1][at_ind]), } att_info = None if len(at_ind) > 1: raise NotImplementedError("More than one attenuation component not supported") elif len(at_ind) == 1: i = at_ind[0] att_info = {"name": pack_table["name"][i], "tau_sil": pack_table["tau"][i][0], "tau_sil_limits": pack_table["tau"][i][1:], "tau_sil_fixed": True if pack_table["tau"][i].mask[1] else False} return [bb_info, df_info, h2_info, ion_info, abs_info, att_info]