Source code for aspros.lightcurve

# Licensed under the MIT License - see LICENSE.rst
"""
Methods for taking the raw light curves from MAST and producing cleaned light
curves.
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

from astropy.io import fits
from astropy.time import Time
from astropy.timeseries import BoxLeastSquares
import astropy.units as u
import os
import numpy as np
import matplotlib.pyplot as plt
import shutil

__all__ = ['LightCurve', 'TransitLightCurve',
           'concatenate_transit_light_curves', 'concatenate_light_curves']


[docs]class LightCurve(object): """ Container object for light curves. """ def __init__(self, times=None, fluxes=None, errors=None, quarters=None, name=None): """ Parameters ---------- times : `~numpy.ndarray` Times in JD fluxes : `~numpy.ndarray` Fluxes (normalized or not) errors : `~numpy.ndarray` Uncertainties on the fluxes quarters : `~numpy.ndarray` (optional) Kepler Quarter for each flux name : str Name this light curve (optional) """ # if len(times) < 1: # raise ValueError("Input `times` have no length.") if isinstance(times[0], Time) and isinstance(times, np.ndarray): times = Time(times) elif not isinstance(times, Time): times = Time(times, format='jd') self.times = times self.fluxes = fluxes if self.times is not None and errors is None: errors = np.zeros_like(self.fluxes) - 1 self.errors = errors if self.times is not None and quarters is None: quarters = np.zeros_like(self.fluxes) - 1 self.quarters = quarters self.name = name
[docs] def phases(self, params): phase = ((self.times.jd - params.t0) % params.per) / params.per phase[phase > 0.5] -= 1.0 return phase
[docs] def plot(self, transit_params=None, ax=None, quarter=None, show=False, phase=False, **kwargs): """ Plot light curve. Parameters ---------- transit_params : `~batman.TransitParams` (optional) Transit light curve parameters. Required if `phase` is `True`. ax : `~matplotlib.axes.Axes` (optional) Axis to make plot on top of quarter : float (optional) Plot only this Kepler quarter show : bool If `True`, call `matplotlib.pyplot.show` after plot is made phase : bool If `True`, map times in JD to orbital phases, which requires that `transit_params` be input also. plot_kwargs : dict Keyword arguments to pass to `~matplotlib` calls. """ if quarter is not None: if hasattr(quarter, '__len__'): mask = np.zeros_like(self.fluxes).astype(bool) for q in quarter: mask |= self.quarters == q else: mask = self.quarters == quarter else: mask = np.ones_like(self.fluxes).astype(bool) if ax is None: ax = plt.gca() if phase: x = (self.times.jd - transit_params.t0) / transit_params.per % 1 x[x > 0.5] -= 1 else: x = self.times.jd if len(kwargs) == 0: kwargs = {'color': 'k', 'marker': '.', 'ls': 'none', 'ecolor': 'silver'} ax.errorbar(x[mask], self.fluxes[mask], self.errors[mask], **kwargs) ax.set(xlabel='Time' if not phase else 'Phase', ylabel='Flux', title=self.name) if show: plt.show()
[docs] def save_to(self, path, overwrite=False, for_stsp=False): """ Save times, fluxes, errors to new directory ``dirname`` in ``path`` """ dirname = self.name output_path = os.path.join(path, dirname) self.times = Time(self.times) if not for_stsp: if os.path.exists(output_path) and overwrite: shutil.rmtree(output_path) if not os.path.exists(output_path): os.mkdir(output_path) for attr in ['times_jd', 'fluxes', 'errors', 'quarters']: np.savetxt(os.path.join(path, dirname, '{0}.txt'.format(attr)), getattr(self, attr)) else: if not os.path.exists(output_path) or overwrite: attrs = ['times_jd', 'fluxes', 'errors'] output_array = np.zeros((len(self.fluxes), len(attrs)), dtype=float) for i, attr in enumerate(attrs): output_array[:, i] = getattr(self, attr) np.savetxt(os.path.join(path, dirname + '.txt'), output_array)
[docs] @classmethod def from_raw_fits(cls, fits_paths, name=None): """ Load FITS files downloaded from MAST into the `LightCurve` object. Parameters ---------- fits_paths : list List of paths to FITS files to read in name : str (optional) Name of light curve Returns ------- lc : `LightCurve` The light curve for the data in the fits files. """ fluxes = [] errors = [] times = [] quarter = [] for path in fits_paths: data = fits.getdata(path) header = fits.getheader(path) times.append(data['TIME'] + 2454833.0) errors.append(data['SAP_FLUX_ERR']) fluxes.append(data['SAP_FLUX']) quarter.append(len(data['TIME']) * [header['QUARTER']]) times, fluxes, errors, quarter = [np.concatenate(i) for i in [times, fluxes, errors, quarter]] mask_nans = np.zeros_like(fluxes).astype(bool) for attr in [times, fluxes, errors]: mask_nans |= np.isnan(attr) times, fluxes, errors, quarter = [attr[~mask_nans] for attr in [times, fluxes, errors, quarter]] return LightCurve(times, fluxes, errors, quarters=quarter, name=name)
[docs] @classmethod def from_dir(cls, path, for_stsp=False): """Load light curve from numpy save files in ``dir``""" if not for_stsp: times, fluxes, errors, quarters = [ np.loadtxt(os.path.join(path, '{0}.txt'.format(attr))) for attr in ['times_jd', 'fluxes', 'errors', 'quarters']] else: quarters = None times, fluxes, errors = np.loadtxt(path, unpack=True) if os.sep in path: name = path.split(os.sep)[-1] else: name = path if name.endswith('.txt'): name = name[:-4] return cls(times, fluxes, errors, quarters=quarters, name=name)
[docs] def normalize_each_quarter(self, rename=None, polynomial_order=2, plots=False): """ Use polynomial fit to each quarter to normalize the data. Parameters ---------- rename : str (optional) New name of the light curve after normalization polynomial_order : int (optional) Order of polynomial to fit to the out-of-transit fluxes. Default is 2. plots : bool (optional) Show diagnostic plots after normalization. """ quarter_inds = list(set(self.quarters)) quarter_masks = [quarter == self.quarters for quarter in quarter_inds] for quarter_mask in quarter_masks: polynomial = np.polyfit(self.times[quarter_mask].jd, self.fluxes[quarter_mask], polynomial_order) scaling_term = np.polyval(polynomial, self.times[quarter_mask].jd) self.fluxes[quarter_mask] /= scaling_term self.errors[quarter_mask] /= scaling_term if plots: plt.plot(self.times[quarter_mask], self.fluxes[quarter_mask]) plt.show() if rename is not None: self.name = rename
[docs] def delete_outliers(self): d = np.diff(self.fluxes) spikey = np.abs(d - np.median(d)) > 2.5 * np.std(d) neighboring_spikes = spikey[1:] & spikey[:-1] opposite_signs = np.sign(d[1:]) != np.sign(d[:-1]) outliers = np.argwhere(neighboring_spikes & opposite_signs) + 1 # print('number bad fluxes: {0}'.format(len(outliers))) self.times = Time(np.delete(self.times.jd, outliers), format='jd') self.fluxes = np.delete(self.fluxes, outliers) self.errors = np.delete(self.errors, outliers) self.quarters = np.delete(self.quarters, outliers)
[docs] def mask_out_of_transit(self, params, oot_duration_fraction=0.25, flip=False): """ Mask out the out-of-transit light curve based on transit parameters Parameters ---------- params : `~batman.TransitParams` Transit light curve parameters. Requires that `params.duration` is defined. oot_duration_fraction : float (optional) Fluxes from what fraction of a transit duration of the out-of-transit light curve should be included in the mask? flip : bool (optional) If `True`, mask in-transit rather than out-of-transit. Returns ------- d : dict Inputs for a new `LightCurve` object with the mask applied. """ # Fraction of one duration to capture out of transit phased = (self.times.jd - params.t0) % params.per near_transit = ((phased < params.duration * ( 0.5 + oot_duration_fraction)) | (phased > params.per - params.duration * ( 0.5 + oot_duration_fraction))) if flip: near_transit = ~near_transit sort_by_time = np.argsort(self.times[near_transit].jd) return dict(times=self.times[near_transit][sort_by_time], fluxes=self.fluxes[near_transit][sort_by_time], errors=self.errors[near_transit][sort_by_time], quarters=self.quarters[near_transit][sort_by_time])
[docs] def mask_in_transit(self, params, oot_duration_fraction=0.25): """ Mask out the in-transit light curve based on transit parameters Parameters ---------- params : `~batman.TransitParams` Transit light curve parameters. Requires that `params.duration` is defined. oot_duration_fraction : float (optional) Fluxes from what fraction of a transit duration of the out-of-transit light curve should be included in the mask? Returns ------- d : dict Inputs for a new `LightCurve` object with the mask applied. """ return self.mask_out_of_transit(params, flip=True, oot_duration_fraction=oot_duration_fraction)
[docs] def get_transit_light_curves(self, params, plots=False): """ For a light curve with transits only (i.e. like one returned by `LightCurve.mask_out_of_transit`), split up the transits into their own light curves, return a list of `TransitLightCurve` objects. Parameters ---------- params : `~batman.TransitParams` Transit light curve parameters plots : bool Make diagnostic plots. Returns ------- transit_light_curves : list List of `TransitLightCurve` objects """ time_diffs = np.diff(sorted(self.times.jd)) diff_between_transits = params.per / 2. split_inds = np.argwhere(time_diffs > diff_between_transits) + 1 if len(split_inds) > 0: split_ind_pairs = [[0, split_inds[0][0]]] split_ind_pairs.extend([[split_inds[i][0], split_inds[i + 1][0]] for i in range(len(split_inds) - 1)]) split_ind_pairs.extend([[split_inds[-1], len(self.times)]]) transit_light_curves = [] counter = -1 for start_ind, end_ind in split_ind_pairs: counter += 1 if plots: plt.plot(self.times.jd[start_ind:end_ind], self.fluxes[start_ind:end_ind], '.-') # print(start_ind, end_ind) if type(start_ind) is list or type(start_ind) is np.ndarray: start_ind = start_ind[0] parameters = dict(times=self.times[start_ind:end_ind], fluxes=self.fluxes[start_ind:end_ind], errors=self.errors[start_ind:end_ind], quarters=self.quarters[start_ind:end_ind], name=counter) transit_light_curves.append(TransitLightCurve(**parameters)) if plots: plt.show() else: transit_light_curves = [] return transit_light_curves
[docs] def get_available_quarters(self): """ Get which quarters are available in this `LightCurve` Returns ------- qs : list List of unique quarters available. """ return list(set(self.quarters))
[docs] def get_quarter(self, quarter): """ Get a copy of the data from within `LightCurve` during one Kepler quarter. Parameters ---------- quarter : int Kepler Quarter Returns ------- lc : `LightCurve` Light curve from one Kepler Quarter """ this_quarter = self.quarters == quarter return LightCurve(times=self.times[this_quarter], fluxes=self.fluxes[this_quarter], errors=self.errors[this_quarter], quarters=self.quarters[this_quarter], name=self.name + '_quarter_{0}'.format(quarter))
@property def times_jd(self): """ Get the times in this light curve in JD. Returns ------- t_jd : `~numpy.ndarray` Julian dates. """ return self.times.jd
[docs] def split_at_index(self, index): """ Split the light curve into two light curves, at ``index`` """ return (LightCurve(times=self.times[:index], fluxes=self.fluxes[:index], errors=self.errors[:index], quarters=self.quarters[:index], name=self.name), LightCurve(times=self.times[index:], fluxes=self.fluxes[index:], errors=self.errors[index:], quarters=self.quarters[index:], name=self.name))
[docs] def bls(self, periods, duration): """ Compute Box Least Squares periodogram Parameters ---------- periods : `~numpy.ndarray` duration: `~astropy.units.Quantity` Returns ------- results : `~astropy.timeseries.BoxLeastSquaresResults` bests : list stats : dict """ bls = BoxLeastSquares(self.times, self.fluxes, self.errors) results = bls.power(periods, duration) period = results.period[results.power.argmax()] transit_time = self.times[self.fluxes.argmin()] bests = [period, duration, transit_time] stats = bls.compute_stats(period, duration, transit_time) return results, bests, stats
[docs]class TransitLightCurve(LightCurve): """ Container for a single transit light curve. Subclass of `LightCurve`. """ def __init__(self, times=None, fluxes=None, errors=None, quarters=None, name=None): """ Parameters ---------- times : `~numpy.ndarray` Times in JD fluxes : `~numpy.ndarray` Fluxes (normalized or not) errors : `~numpy.ndarray` Uncertainties on the fluxes quarters : `~numpy.ndarray` (optional) Kepler Quarter for each flux name : str Name this light curve (optional) """ if isinstance(times[0], Time) and isinstance(times, np.ndarray): times = Time(times) elif not isinstance(times, Time): times = Time(times, format='jd') self.times = times self.fluxes = fluxes self.errors = errors if self.times is not None and quarters is None: quarters = np.zeros_like(self.fluxes) - 1 self.quarters = quarters self.name = name self.rescaled = False
[docs] def fit_linear_baseline(self, params, cadence=1 * u.min, return_near_transit=False, plots=False): """ Find OOT portions of transit light curve using similar method to `LightCurve.mask_out_of_transit`, fit linear baseline to OOT. Parameters ---------- params : `~batman.TransitParams` Transit light curve parameters. Requires that `params.duration` is defined. cadence : `~astropy.units.Quantity` (optional) Length of the exposure time for each flux. Default is 1 min. return_near_transit : bool (optional) Return the mask for times in-transit. Returns ------- linear_baseline : `numpy.ndarray` Baseline trend of out-of-transit fluxes near_transit : `numpy.ndarray` (optional) The mask for times in-transit. """ cadence_buffer = cadence.to(u.day).value get_oot_duration_fraction = 0 phased = (self.times.jd - params.t0) % params.per near_transit = ((phased < params.duration * (0.5 + get_oot_duration_fraction) + cadence_buffer) | (phased > params.per - params.duration * (0.5 + get_oot_duration_fraction) - cadence_buffer)) # Remove linear baseline trend order = 1 linear_baseline = np.polyfit(self.times.jd[~near_transit], self.fluxes[~near_transit], order) linear_baseline_fit = np.polyval(linear_baseline, self.times.jd) if plots: fig, ax = plt.subplots(1, 2, figsize=(15, 6)) ax[0].axhline(1, ls='--', color='k') ax[0].plot(self.times.jd, linear_baseline_fit, 'r') ax[0].plot(self.times.jd, self.fluxes, 'bo') plt.show() if return_near_transit: return linear_baseline, near_transit else: return linear_baseline
[docs] def remove_linear_baseline(self, params, plots=False, cadence=1 * u.min): """ Find OOT portions of transit light curve using similar method to `LightCurve.mask_out_of_transit`, fit linear baseline to OOT, divide whole light curve by that fit. Parameters ---------- params : `~batman.TransitParams` Transit light curve parameters. Requires that `params.duration` is defined. cadence : `~astropy.units.Quantity` (optional) Length of the exposure time for each flux. Default is 1 min. plots : bool (optional) Show diagnostic plots. """ linear_baseline, near_transit = self.fit_linear_baseline(params, cadence=cadence, return_near_transit=True) linear_baseline_fit = np.polyval(linear_baseline, self.times.jd) self.fluxes = self.fluxes / linear_baseline_fit self.errors = self.errors / linear_baseline_fit if plots: fig, ax = plt.subplots(1, 2, figsize=(15, 6)) ax[0].axhline(1, ls='--', color='k') ax[0].plot(self.times.jd, self.fluxes, 'o') ax[0].set_title('before trend removal') ax[1].set_title('after trend removal') ax[1].axhline(1, ls='--', color='k') ax[1].plot(self.times.jd, self.fluxes, 'o') plt.show()
[docs] def scale_by_baseline(self, linear_baseline_params): if not self.rescaled: scaling_vector = np.polyval(linear_baseline_params, self.times.jd) self.fluxes *= scaling_vector self.errors *= scaling_vector self.rescaled = True
[docs] def fit_polynomial_baseline(self, params, order=2, cadence=1 * u.min, plots=False, mask=None): """ Find OOT portions of transit light curve using similar method to `LightCurve.mask_out_of_transit`, fit linear baseline to OOT """ if mask is None: mask = np.ones(len(self.fluxes)).astype(bool) cadence_buffer = cadence.to(u.day).value get_oot_duration_fraction = 0 phased = (self.times.jd[mask] - params.t0) % params.per near_transit = ((phased < params.duration * ( 0.5 + get_oot_duration_fraction) + cadence_buffer) | (phased > params.per - params.duration * ( 0.5 + get_oot_duration_fraction) - cadence_buffer)) # Remove polynomial baseline trend after subtracting the times by its # mean -- this improves numerical stability for polyfit downscaled_times = self.times.jd - self.times.jd.mean() polynomial_baseline = np.polyfit(downscaled_times[mask][~near_transit], self.fluxes[mask][~near_transit], order) polynomial_baseline_fit = np.polyval(polynomial_baseline, downscaled_times) if plots: fig, ax = plt.subplots(1, 2, figsize=(15, 6)) ax[0].axhline(1, ls='--', color='k') ax[0].plot(self.times.jd, polynomial_baseline_fit, 'r') ax[0].plot(self.times.jd, self.fluxes, 'bo') if mask is not None: ax[0].plot(self.times.jd[~mask], self.fluxes[~mask], 'ro') plt.show() return polynomial_baseline_fit
[docs] def subtract_polynomial_baseline(self, params, plots=False, order=2, cadence=1 * u.min): """ Find OOT portions of transit light curve using similar method to `LightCurve.mask_out_of_transit`, fit polynomial baseline to OOT, subtract whole light curve by that fit. """ polynomial_baseline_fit = self.fit_polynomial_baseline(cadence=cadence, order=order, params=params) self.fluxes = self.fluxes - polynomial_baseline_fit self.errors = self.errors if plots: fig, ax = plt.subplots(1, 2, figsize=(15, 6)) ax[0].axhline(1, ls='--', color='k') ax[0].plot(self.times.jd, self.fluxes, 'o') ax[0].set_title('before trend removal') ax[1].set_title('after trend removal') ax[1].axhline(1, ls='--', color='k') ax[1].plot(self.times.jd, self.fluxes, 'o') plt.show()
[docs] def remove_polynomial_baseline(self, params, plots=False, order=2, cadence=1 * u.min): """ Find OOT portions of transit light curve using similar method to `LightCurve.mask_out_of_transit`, fit polynomial baseline to OOT, divide whole light curve by that fit. """ polynomial_baseline_fit = self.fit_polynomial_baseline(cadence=cadence, order=order, params=params) self.fluxes = self.fluxes / polynomial_baseline_fit self.errors = self.errors if plots: fig, ax = plt.subplots(1, 2, figsize=(15, 6)) ax[0].axhline(1, ls='--', color='k') ax[0].plot(self.times.jd, self.fluxes, 'o') ax[0].set_title('before trend removal') ax[1].set_title('after trend removal') ax[1].axhline(1, ls='--', color='k') ax[1].plot(self.times.jd, self.fluxes, 'o') plt.show()
[docs] @classmethod def from_dir(cls, path): """Load light curve from numpy save files in ``path``""" times, fluxes, errors, quarters = [ np.loadtxt(os.path.join(path, '{0}.txt'.format(attr))) for attr in ['times_jd', 'fluxes', 'errors', 'quarters']] if os.sep in path: name = path.split(os.sep)[-1] else: name = path return cls(times, fluxes, errors, quarters=quarters, name=name)
[docs]def concatenate_transit_light_curves(light_curve_list, name=None): """ Combine multiple transit light curves into one `TransitLightCurve` object. Parameters ---------- light_curve_list : list List of `TransitLightCurve` objects name : str Name of new light curve Returns ------- tlc : `TransitLightCurve` Concatenated transit light curves """ times = [] fluxes = [] errors = [] quarters = [] for light_curve in light_curve_list: times.append(light_curve.times.jd) fluxes.append(light_curve.fluxes) errors.append(light_curve.errors) quarters.append(light_curve.quarters) times, fluxes, errors, quarters = [np.concatenate(i) for i in [times, fluxes, errors, quarters]] times = Time(times, format='jd') return TransitLightCurve(times=times, fluxes=fluxes, errors=errors, quarters=quarters, name=name)
[docs]def concatenate_light_curves(light_curve_list, name=None): """ Combine multiple transit light curves into one `TransitLightCurve` object. Parameters ---------- light_curve_list : list List of `TransitLightCurve` objects name : str Name of new light curve Returns ------- tlc : `TransitLightCurve` Concatenated transit light curves """ times = [] fluxes = [] errors = [] quarters = [] for light_curve in light_curve_list: times.append(light_curve.times.jd) fluxes.append(light_curve.fluxes) errors.append(light_curve.errors) quarters.append(light_curve.quarters) times, fluxes, errors, quarters = [np.concatenate(i) for i in [times, fluxes, errors, quarters]] times = Time(times, format='jd') return LightCurve(times=times, fluxes=fluxes, errors=errors, quarters=quarters, name=name)