Source code for space_phot.photometry

import os

import numpy as np
import astropy
from astropy.coordinates import SkyCoord
from astropy.wcs.utils import skycoord_to_pixel

from stsci.skypac import pamutils

import sncosmo
import scipy
import dynesty
from dynesty import NestedSampler
from dynesty import utils as dyfunc
from dynesty.pool import Pool


import photutils
from photutils.psf import EPSFModel

import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable

from collections import OrderedDict
from copy import copy, deepcopy

from .util import (
    generic_aperture_phot,
    jwst_apcorr,
    jwst_apcorr_interp,
    hst_apcorr,
    simple_aperture_sum,
)

from .cal import (
    calibrate_JWST_flux,
    calibrate_HST_flux,
    calc_jwst_psf_corr,
    calc_hst_psf_corr,
    JWST_mag_to_flux,
    HST_mag_to_flux,
)


#from .MIRIMBkgInterp import MIRIMBkgInterp

__all__ = ['observation3','observation2']


def loglike(parameters,psf_models,wcs_list,vparam_names,xs,ys,fit_bkg,fluxes,fluxerrs,xb,yb,multi_flux,fit_radec,fit_pixel):

    parameters = xb*parameters + yb
    print(parameters)
    total = 0
    for i in range(len(fluxes)):
        posx = xs[i]
        posy = ys[i]
        
        if multi_flux:
            psf_models[i].flux = parameters[vparam_names.index('flux%i'%i)]
        else:
            psf_models[i].flux = parameters[vparam_names.index('flux')]

        if fit_radec:
            sky_location = astropy.coordinates.SkyCoord(parameters[vparam_names.index('ra')],
                                                        parameters[vparam_names.index('dec')],
                                                        unit=astropy.units.deg)
            y,x = astropy.wcs.utils.skycoord_to_pixel(sky_location,wcs_list[i])
            psf_models[i].x_0 = x
            psf_models[i].y_0 = y
        elif fit_pixel:
            psf_models[i].x_0 = parameters[vparam_names.index('x%i'%(i))]
            psf_models[i].y_0 = parameters[vparam_names.index('y%i'%(i))]

        mflux = psf_models[i](posx,posy)
        #print(np.nanmax(mflux),np.nanmax(fluxes[i]))
        if fit_bkg:
            if multi_flux:
                mflux+=parameters[vparam_names.index('bkg%i'%i)]
            else:
                mflux+=parameters[vparam_names.index('bkg')]
        
        total+=np.nansum((fluxes[i]-mflux)**2/fluxerrs[i]**2)
    print(-.5*total)
    return -.5*total

def prior_transform(parameters):
    return parameters 
    

def do_nest(args):
    import dill
    dynesty.utils.pickle_module = dill
    psf_model_list,wcs_list,vparam_names,xs,ys,fit_bkg,fluxes,fluxerrs,bounds,multi_flux,fit_radec,fit_pixel = args
    xb = []
    yb = []
    for bound in bounds:
        x,y = np.linalg.solve(np.array([[0.5,1],[1,1]]),np.array([bound[0]+(bound[1]-bound[0])/2,bound[1]]))
        xb.append(x)
        yb.append(y)
    print(bounds)
    args = psf_model_list,wcs_list,vparam_names,xs,ys,fit_bkg,fluxes,fluxerrs,np.array(xb),np.array(yb),multi_flux,fit_radec,fit_pixel
    print(loglike(np.array([.5]*len(vparam_names)),psf_model_list,wcs_list,vparam_names,xs,ys,fit_bkg,fluxes,fluxerrs,np.array(xb),np.array(yb),multi_flux,fit_radec,fit_pixel))
    #ptfm = prior_transform([bounds[p] for p in vparam_names])
    with Pool(10, loglike, prior_transform,logl_args=args) as pool:
        sampler = NestedSampler(pool.loglike, pool.prior_transform,
                                len(vparam_names), pool = pool)
        sampler.run_nested()
    return sampler

class observation():
    def __init__(self):
        pass
    

    def fast_psf(self, psf_model, centers, psf_width=5, local_bkg=False, **kwargs):
        """
        Fast PSF photometry wrapper around photutils.PSFPhotometry.

        Parameters
        ----------
        psf_model : astropy.modeling.Model
            PSF/PRF model (e.g., IntegratedGaussianPRF).
        centers : array-like
            Initial centers, shape (N, 2), in (y, x) pixel coordinates.
        psf_width : int
            Fit stamp size (also used as aperture_radius).
        local_bkg : bool
            If True, estimate a local background with MMMBackground.
        **kwargs :
            Passed through to PSFPhotometry call (currently unused, kept for API stability).

        Returns
        -------
        phot : astropy.table.Table
            Photutils output table with added calibrated columns: flux, fluxerr, mag, magerr, zp.
        """
        import numpy as np
        import astropy
        import photutils
        from copy import deepcopy

        # Choose local background estimator
        if local_bkg:
            from photutils.background import LocalBackground, MMMBackground
            bkgstat = MMMBackground()
            localbkg_estimator = LocalBackground(psf_width, psf_width * 2, bkgstat)
        else:
            localbkg_estimator = None

        # Photutils expects init_params table with x_0, y_0
        centers = np.atleast_2d(centers)
        init_params = astropy.table.Table(
            {"y_0": centers[:, 0].astype(float), "x_0": centers[:, 1].astype(float)}
        )

        # Finder: you’re using DAOStarFinder in "forced coords" mode.
        # Keep this behavior to avoid usability changes.
        daofind = photutils.detection.DAOStarFinder(
            threshold=5,
            fwhm=2,
            xycoords=np.column_stack([init_params["x_0"], init_params["y_0"]]),
        )

        psfphot = photutils.psf.PSFPhotometry(
            psf_model,
            fit_shape=psf_width,
            finder=daofind,
            aperture_radius=psf_width,
            localbkg_estimator=localbkg_estimator,
        )

        # Select the correct frame WITHOUT mutating self
        if self.pipeline_level == 3:
            data = self.data
            err = self.err
            wcs = self.wcs
            prim_header = self.prim_header
            sci_header = self.sci_header
        else:
            # level2: first exposure (preserves your prior behavior)
            data = self.data_arr_pam[0]
            err = self.err_arr[0]
            wcs = self.wcs_list[0]
            prim_header = self.prim_headers[0]
            sci_header = self.sci_headers[0]

        # Run photutils PSF photometry
        phot = psfphot(data, error=err, init_params=init_params)

        # Build result container
        self.psf_result = sncosmo.utils.Result()
        self.psf_result.phot_table = phot

        # Store the key fit outputs in a stable way
        for col in ["x_fit", "y_fit", "flux_fit", "flux_err", "local_bkg"]:
            if col in phot.colnames:
                self.psf_result[col] = phot[col]

        # Generate model/data/residual cutouts for quick inspection
        all_mflux_arr, all_resid_arr, all_data_arr = [], [], []
        psf_arr, resid_arr, data_arr = [], [], []

        # Use a local model copy so we don't permanently mutate psf_model
        try:
            model_template = psf_model.copy()
        except Exception:
            model_template = deepcopy(psf_model)

        for row in phot:
            # Guard: if fit columns are missing, skip cutout generation gracefully
            if ("x_fit" not in phot.colnames) or ("y_fit" not in phot.colnames) or ("flux_fit" not in phot.colnames):
                break

            y0 = float(row["y_fit"])
            x0 = float(row["x_fit"])

            slc_lg, _ = astropy.nddata.overlap_slices(
                data.shape, (psf_width, psf_width), (y0, x0), mode="trim"
            )
            yy, xx = np.mgrid[slc_lg]

            if "local_bkg" in phot.colnames:
                bkg = np.ones_like(data[yy, xx], dtype=float) * float(row["local_bkg"])
            else:
                bkg = np.zeros_like(data[yy, xx], dtype=float)

            stamp = data[yy, xx] - bkg

            # Evaluate model on the same grid
            try:
                m = model_template.copy()
            except Exception:
                m = deepcopy(model_template)

            # Set parameters
            if hasattr(m, "x_0"):
                m.x_0 = x0
            if hasattr(m, "y_0"):
                m.y_0 = y0
            if hasattr(m, "flux"):
                m.flux = float(row["flux_fit"])

            mflux = m(xx, yy)

            psf_arr.append(mflux)
            data_arr.append(stamp)
            resid_arr.append(stamp - mflux)

        self.psf_result["psf_arr"] = psf_arr
        self.psf_result["data_arr"] = data_arr
        self.psf_result["resid_arr"] = resid_arr

        # Calibrate fluxes/mags
        if self.telescope.lower() == "jwst":
            flux, fluxerr, mag, magerr, zp = calibrate_JWST_flux(
                phot["flux_fit"], phot["flux_err"], wcs
            )
        else:
            flux, fluxerr, mag, magerr, zp = calibrate_HST_flux(
                phot["flux_fit"], phot["flux_err"], prim_header, sci_header
            )

        # Attach calibrated columns
        phot["flux"] = flux
        phot["fluxerr"] = fluxerr
        phot["mag"] = mag
        phot["magerr"] = magerr
        phot["zp"] = zp

        self.psf_result.phot_cal_table = phot
        return phot


    def nest_psf(self,vparam_names, bounds,fluxes, fluxerrs,xs,ys,cutout_big=None,psf_width=7,use_MLE=False,
                       minsnr=0., priors=None, ppfs=None, npoints=100, method='single',center_weight=20,
                       maxiter=None, maxcall=None, modelcov=False, rstate=None,
                       verbose=False, warn=True, **kwargs):

        # Taken from SNCosmo nest_lc
        # experimental parameters
        tied = kwargs.get("tied", None)

        

        vparam_names = list(vparam_names)
        if ppfs is None:
            ppfs = {}
        if tied is None:
            tied = {}
        
        # Convert bounds/priors combinations into ppfs
        if bounds is not None:
            for key, val in bounds.items():
                if key in ppfs:
                    continue  # ppfs take priority over bounds/priors
                a, b = val
                if priors is not None and key in priors:
                    # solve ppf at discrete points and return interpolating
                    # function
                    x_samples = np.linspace(0., 1., 101)
                    ppf_samples = sncosmo.utils.ppf(priors[key], x_samples, a, b)
                    f = sncosmo.utils.Interp1D(0., 1., ppf_samples)
                else:
                    f = sncosmo.utils.Interp1D(0., 1., np.array([a, b]))
                ppfs[key] = f

        # NOTE: It is important that iparam_names is in the same order
        # every time, otherwise results will not be reproducible, even
        # with same random seed.  This is because iparam_names[i] is
        # matched to u[i] below and u will be in a reproducible order,
        # so iparam_names must also be.

        iparam_names = [key for key in vparam_names if key in ppfs]

        ppflist = [ppfs[key] for key in iparam_names]
        npdim = len(iparam_names)  # length of u
        ndim = len(vparam_names)  # length of v

        # Check that all param_names either have a direct prior or are tied.
        for name in vparam_names:
            if name in iparam_names:
                continue
            if name in tied:
                continue
            raise ValueError("Must supply ppf or bounds or tied for parameter '{}'"
                             .format(name))

        def prior_transform(u):
            d = {}
            for i in range(npdim):
                d[iparam_names[i]] = ppflist[i](u[i])
            v = np.empty(ndim, dtype=float)
            for i in range(ndim):
                key = vparam_names[i]
                if key in d:
                    v[i] = d[key]
                else:
                    v[i] = tied[key](d)
            return v
        
        pos_start = [i for i in range(len(vparam_names))]
        
        if len([x for x in vparam_names if 'flux' in x])>1:
            multi_flux = True
        else:
            multi_flux = False

        if np.any(['dec' in x for x in vparam_names]):
            fit_radec = True
        else:
            fit_radec = False
        if np.any(['y' in x for x in vparam_names]):
            fit_pixel = True
        else:
            fit_pixel = False

        if np.any(['bkg' in x for x in vparam_names]):
            fit_bkg = True
        else:
            fit_bkg = False

        

        sums = [np.sum(f) for f in fluxes]
        all_weights = []
        for i in range(len(fluxes)):
            y, x = np.indices(fluxes[i].shape)
            yc, xc = np.array(fluxes[i].shape) // 2  # Central coordinates
            distance_squared = (x - xc) ** 2 + (y - yc) ** 2
            all_weights.append(np.exp(-center_weight * distance_squared / np.max(distance_squared)))
            #all_weights[-1]*=(fluxes[i].size/np.sum(all_weights[-1]))
            all_weights[-1]/=np.sum(all_weights[-1])

        def chisq_likelihood(parameters):
            total = 0
            for i in range(len(fluxes)):
                posx = xs[i]
                posy = ys[i]
                
                if multi_flux:
                    self.psf_model_list[i].flux = parameters[vparam_names.index('flux%i'%i)]
                else:
                    self.psf_model_list[i].flux = parameters[vparam_names.index('flux')]
                    


                if fit_radec:
                    sky_location = astropy.coordinates.SkyCoord(parameters[vparam_names.index('ra')],
                                                                parameters[vparam_names.index('dec')],
                                                                unit=astropy.units.deg)
                    y,x = astropy.wcs.utils.skycoord_to_pixel(sky_location,self.wcs_list[i])
                    self.psf_model_list[i].x_0 = x
                    self.psf_model_list[i].y_0 = y
                elif fit_pixel:
                    self.psf_model_list[i].x_0 = parameters[vparam_names.index('x%i'%(i))]
                    self.psf_model_list[i].y_0 = parameters[vparam_names.index('y%i'%(i))]

                
                mflux = self.psf_model_list[i](posx,posy)
                
                if fit_bkg:
                    if multi_flux:
                        mflux+=parameters[vparam_names.index('bkg%i'%i)]
                    else:
                        mflux+=parameters[vparam_names.index('bkg')]
                
                total+=np.nansum(all_weights[i]*((fluxes[i]-self.bkg_fluxes[i]-mflux)**2/fluxerrs[i]**2))
                
            return total
 
        

        
        def loglike(parameters):
            chisq = chisq_likelihood(parameters)
            return(-.5*chisq)
        
        
        
        sampler = NestedSampler(loglike, prior_transform, ndim, nlive = npoints)
        sampler.run_nested(maxiter=maxiter,maxcall=maxcall,print_progress=True)
        #res = nestle.sample(loglike, prior_transform, ndim, npdim=npdim,
        #                  npoints=npoints, method=method, maxiter=maxiter,
        #                  maxcall=maxcall, rstate=rstate,
        #                  callback=(nestle.print_progress if verbose else None))
        
        #res = sampler.results
        res = sampler.results
        samples = res.samples  # samples
        weights = np.exp(res.logwt - res.logz[-1])#res.importance_weights()

        # Compute weighted mean and covariance.
        vparameters, cov = dyfunc.mean_and_cov(samples, weights)
        final_chisq = chisq_likelihood(vparameters)
        total_eff_DOF = 0
        total_pixels = 0
        for i in range(len(all_weights)):
            total_eff_DOF += ((np.sum(all_weights[i]) ** 2) / np.sum(all_weights[i] ** 2))
            total_pixels += fluxes[i].size
        DOF_correction = np.sqrt(total_eff_DOF / (total_pixels - len(vparameters)))
        errs = np.sqrt(np.diagonal(cov))

        corrected_errors = errs*DOF_correction
        
        res = sncosmo.utils.Result(niter=res.niter,
                                   ncall=res.ncall,
                                   logz=res.logz,
                                   logzerr=res.logzerr,
                                   #h=res.h,
                                   red_chisq=final_chisq*DOF_correction,
                                   samples=res.samples,
                                   weights=weights,
                                   logvol=res.logvol,
                                   logl=res.logl,
                                   errors=OrderedDict(zip(vparam_names,
                                                          errs)),
                                   vparam_names=copy(vparam_names),
                                   bounds=bounds,
                                   best=vparameters,
                                   data_arr = fluxes,
                                   err_arr = fluxerrs,
                                   psf_arr = None,
                                   big_psf_arr = None,
                                   resid_arr = None,
                                   phot_cal_table = None,
                                   bkg_arr=None)

        if use_MLE:
            best_ind = res.logl.argmax()
            for i in range(len(vparam_names)):
                res.best[i] = res.samples[best_ind,i]
            params = [[res.samples[best_ind, i]-res.errors[vparam_names[i]], res.samples[best_ind, i], res.samples[best_ind, i]+res.errors[vparam_names[i]]]
                      for i in range(len(vparam_names))]

        all_mflux_arr = []
        all_resid_arr = []
        all_bkg_arr = []
        for i in range(len(fluxes)):
            posx = xs[i]
            posy = ys[i]
            
            if multi_flux:
                self.psf_model_list[i].flux = res.best[vparam_names.index('flux%i'%i)]
            else:
                self.psf_model_list[i].flux = res.best[vparam_names.index('flux')]

            if fit_radec:
                sky_location = astropy.coordinates.SkyCoord(res.best[vparam_names.index('ra')],
                                                            res.best[vparam_names.index('dec')],
                                                            unit=astropy.units.deg)
                y,x = astropy.wcs.utils.skycoord_to_pixel(sky_location,self.wcs_list[i])
                self.psf_model_list[i].x_0 = x
                self.psf_model_list[i].y_0 = y
            elif fit_pixel:
                self.psf_model_list[i].x_0 = res.best[vparam_names.index('x%i'%(i))]
                self.psf_model_list[i].y_0 = res.best[vparam_names.index('y%i'%(i))]

            
            mflux = self.psf_model_list[i](posx,posy)
            all_mflux_arr.append(mflux*self.pams[i][posx,posy])

            if fit_bkg:
                if multi_flux:
                    all_bkg_arr.append(np.zeros_like(res.data_arr[i])+res.best[vparam_names.index('bkg%i'%i)]+self.bkg_fluxes[i])
                else:
                    all_bkg_arr.append(np.zeros_like(res.data_arr[i])+res.best[vparam_names.index('bkg')]+self.bkg_fluxes[i])
            #elif True:
            #    all_bkg_arr.append(self.bkg_fluxes[i])
            else:
                all_bkg_arr.append(self.bkg_fluxes[i])
            #mflux*=self.pams[i][posx,posy]
            resid = res.data_arr[i]-mflux-all_bkg_arr[i]
            all_resid_arr.append(resid)
        res.psf_arr = all_mflux_arr
        res.resid_arr = all_resid_arr
        res.bkg_arr = all_bkg_arr
        self.psf_result = res

        return

    def plot_psf_fit(self,fast_n=0):
        """
        Plot the best-fit PSF model and residuals
        """

        try:
            temp = self.psf_result.data_arr[0]
        except:
            print('Must fit PSF before plotting.')
            return

        if self.n_exposures == 1:
            fig,axes = plt.subplots(self.n_exposures,5,figsize=(2*int(4*self.n_exposures),12))
        else:
            fig,axes = plt.subplots(self.n_exposures,5,figsize=(int(4*self.n_exposures),12))
        axes = np.atleast_2d(axes)
        for i in range(self.n_exposures):
            if fast_n!=0:
                d_i = fast_n
            else:
                d_i = i
            try:
                norm1 = astropy.visualization.simple_norm(self.psf_result.data_arr[d_i],stretch='linear',
                    invalid=0)
            except:
                norm1 = None
            axes[i][0].imshow(self.psf_result.data_arr[d_i],
                norm=norm1)
            axes[i][0].set_title('Data')
            axes[i][1].imshow(self.psf_result.bkg_arr[d_i],
                norm=norm1)
            axes[i][1].set_title('BKG')
            axes[i][2].imshow(self.psf_result.psf_arr[d_i],
                norm=norm1)
            axes[i][2].set_title('Model')
            #divider = make_axes_locatable(axes[i][1])
            im0 = axes[i][3].imshow(self.psf_result.psf_arr[d_i]+self.psf_result.bkg_arr[d_i],
                norm=norm1)
            axes[i][3].set_title('BKG+Model')
            divider = make_axes_locatable(axes[i][3])
            cax = divider.append_axes('right', size='5%', pad=0.05)
            fig.colorbar(im0, cax=cax, orientation='vertical')

            im1 = axes[i][4].imshow(self.psf_result.resid_arr[d_i])
            axes[i][4].set_title('Residual')
            divider = make_axes_locatable(axes[i][4])
            cax2 = divider.append_axes('right', size='5%', pad=0.05)
            fig.colorbar(im1, cax=cax2, orientation='vertical')
            for j in range(5):
                axes[i][j].tick_params(
                    axis='both',          # changes apply to the x-axis
                    which='both',      # both major and minor ticks are affected
                    bottom=False,      # ticks along the bottom edge are off
                    left=False,         # ticks along the top edge are off
                    labelbottom=False,
                    labelleft=False) # labels along the bottom edge are off
        plt.tight_layout()
        #plt.show()
        return fig

    def plot_psf_posterior(self,minweight=-np.inf):
        """
        Plot the posterior corner plot from nested sampling

        Parameters
        ----------
        minweight : float
            A minimum weight to show from the nested sampling
            (to zoom in on posterior)
        """
        import corner
        try:
            samples = self.psf_result.samples
        except:
            print('Must fit PSF before plotting.')
            return
        weights = self.psf_result.weights
        samples = samples[weights>minweight]
        weights = weights[weights>minweight]

        fig = corner.corner(
            samples,
            weights=weights,
            labels=self.psf_result.vparam_names,
            quantiles=(0.16, .5, 0.84),
            bins=20,
            color='k',
            show_titles=True,
            title_fmt='.2f',
            smooth1d=False,
            smooth=True,
            fill_contours=True,
            plot_contours=False,
            plot_density=True,
            use_mathtext=True,
            title_kwargs={"fontsize": 11},
            label_kwargs={'fontsize': 16})
        #plt.show()


[docs] class observation3(observation): """ st_phot class for level 3 (drizzled) data """ def __init__(self,fname): self.pipeline_level = 3 self.fname = fname self.fits = astropy.io.fits.open(self.fname) self.data = self.fits['SCI',1].data try: self.err = self.fits['ERR',1].data except: try: self.err = 1./np.sqrt(self.fits['WHT',1].data) except: self.err = np.sqrt(self.data) try: self.dq = self.fits['DQ',1].data except: self.dq = np.zeros(self.data.shape) self.prim_header = self.fits[0].header self.sci_header = self.fits['SCI',1].header self.wcs = astropy.wcs.WCS(self.sci_header) self.pams = [np.ones(self.data.shape)] self.n_exposures = 1 self.pixel_scale = astropy.wcs.utils.proj_plane_pixel_scales(self.wcs)[0] * self.wcs.wcs.cunit[0].to('arcsec') try: self.telescope = self.prim_header['TELESCOP'] self.instrument = self.prim_header['INSTRUME'] try: self.filter = self.prim_header['FILTER'] except: if 'CLEAR' in self.prim_header['FILTER1']: self.filter = self.prim_header['FILTER2'] else: self.filter = self.prim_header['FILTER1'] except: self.telescope = 'JWST' self.instrument = 'NIRCam' try: self.flux_units = astropy.units.Unit(self.sci_header['BUNIT']) except: try: self.flux_units = astropy.units.Unit(self.prim_header['BUNIT']) except: if self.telescope=='JWST': print('Cannot create flux_units from header...assuming MJy/sr') self.flux_units = astropy.units.MJy/astropy.units.sr else: print('Cannot create flux_units from header...assuming electrons/s') self.flux_units = astropy.units.electron/astropy.units.s try: self.detector = self.prim_header['DETECTOR'] self.detector = self.detector.replace('LONG','5') self.detector = self.detector[:5] except: self.detector = None self.px_scale = astropy.wcs.utils.proj_plane_pixel_scales(self.wcs)[0] *\ self.wcs.wcs.cunit[0].to('arcsec') self.fits.close() #if self.telescope=='JWST': # self.epadu = self.sci_header['XPOSURE']*self.sci_header['PHOTMJSR'] #else: # raise RuntimeError('do this for HST')
[docs] def upper_limit(self,nsigma=3,method='psf'): if method.lower()=='psf': try: tab = self.psf_result.phot_cal_table except: print('Must run PSF fit for upper limit and method="psf"') return elif method.lower()=='aperture': try: tab = self.aperture_result.phot_cal_table except: print('Must run aperture fit for upper limit and method="aperture"') return else: print('Do not recognize phot method.') return if self.telescope.lower()=='jwst': _,_,mag,_,_ = calibrate_JWST_flux(tab['fluxerr']*nsigma,1,self.wcs,flux_units=astropy.units.MJy) else: _,_,mag,_,_ = calibrate_HST_flux(tab['fluxerr']*nsigma,1,self.prim_header,self.sci_header) return mag
[docs] def psf_photometry(self,psf_model,sky_location=None,xy_position=None,fit_width=None,background=None, fit_flux=True,fit_centroid=True,fit_bkg=False,bounds={},npoints=100,use_MLE=False, xshift=0,yshift=0,centroidx_shift=0,centroidy_shift=0, maxiter=None,find_centroid=False,minVal=-np.inf,psf_method='nest',center_weight=20): """ st_phot psf photometry class for level 2 data. Parameters ---------- psf_model : :class:`~photutils.psf.EPSFModel` In reality this does not need to be an EPSFModel, but just any photutils psf model class. sky_location : :class:`~astropy.coordinates.SkyCoord` Location of your source xy_positions : list xy position of your source in each exposure. Must supply this or sky_location but this takes precedent. fit_width : int PSF width to fit (recommend odd number) background : float or list float, list of float, array, or list of array defining the background of your data. If you define an array, it should be of the same shape as fit_width fit_flux : str One of 'single','multi','fixed'. Single is a single flux across all exposures, multi fits a flux for every exposure, and fixed only fits the position fit_centroid : str One of 'pixel','wcs','fixed'. Pixel fits a pixel location of the source in each exposure, wcs fits a single RA/DEC across all exposures, and fixed only fits the flux and not position. fit_bkg : bool Fit for a constant background simultaneously with the PSF fit. bounds : dict Bounds on each parameter. npoints : int Number of points in the nested sampling (higher is better posterior sampling, but slower) use_MLE : bool Use the maximum likelihood to define best fit parameters, otherwise use a weighted average of the posterior samples maxiter : None or int If None continue sampling until convergence, otherwise defines the max number of iterations find_centroid : bool If True, then tries to find the centroid around your chosen location. """ assert sky_location is not None or xy_position is not None,\ "Must supply sky_location or xy_positions for every exposure" assert len(bounds)>0,\ "Must supply bounds" if not fit_flux and not fit_centroid: print('Nothing to do, fit flux and/or position.') return if fit_width is None: try: fit_width = psf_model.data.shape[0] except: RuntimeError("If you do not supply fit_width, your psf needs to have a data attribute (i.e., be an ePSF") if fit_width%2==0: print('PSF fitting width is even, subtracting 1.') fit_width-=1 centers = [] all_xf = [] all_yf = [] cutouts = [] cutout_errs = [] fluxg = [] cutouts_big = [] self.psf_model_list = [psf_model] if xy_position is None: yi,xi = astropy.wcs.utils.skycoord_to_pixel(sky_location,self.wcs) else: xi,yi = xy_position xi+=xshift yi+=yshift yg, xg = np.mgrid[-1*(fit_width-1)/2:(fit_width+1)/2, -1*(fit_width-1)/2:(fit_width+1)/2].astype(int) yf, xf = yg+int(yi+.5), xg+int(xi+.5) yg_big, xg_big = np.mgrid[-1*(fit_width+2-1)/2:(fit_width+2+1)/2, -1*(fit_width+2-1)/2:(fit_width+2+1)/2].astype(int) yf_big, xf_big = yg_big+int(yi+.5), xg_big+int(xi+.5) cutout = self.data[xf, yf] cutout_big = self.data[xf_big, yf_big] if background is None: all_bg_est = np.zeros_like(cutout) #replace with bkg method if not fit_bkg: print('Warning: No background subtracting happening here.') elif isinstance(background,(float,int)): all_bg_est = np.zeros_like(cutout)+background else: all_bg_est = np.array(background) self.bkg_fluxes = [all_bg_est] if find_centroid: #xi2,yi2 = photutils.centroids.centroid_com(cutout) xi2,yi2 = np.argmax(cutout) xi += (xi2-(fit_width-1)/2) yi += (yi2-(fit_width-1)/2) yf, xf = yg+np.round(yi).astype(int), xg+np.round(xi).astype(int) cutout = self.data[xf, yf] all_xf.append(xf) all_yf.append(yf) cutout[cutout<minVal] = 0 center = [xi+centroidx_shift,yi+centroidy_shift] centers.append(center) err = self.err[xf, yf] err[np.isnan(err)] = np.nanmax(err) err[err<=0] = np.max(err) cutout_errs.append(err) #cutout -= all_bg_est cutouts.append(cutout) cutouts_big.append(cutout_big) if fit_flux: #if all_bg_est!=0: f_guess = [np.nansum(cutout)] #else: # f_guess = [np.nansum(cutout-np.nanmedian(self.data))] pnames = ['flux'] else: f_guess = [] pnames = [] if fit_centroid: pnames.append(f'x0') pnames.append(f'y0') p0s = np.append(f_guess,[center]).flatten() else: p0s = np.array(f_guess) self.psf_model_list[0].x_0 = center[0] self.psf_model_list[0].y_0 = center[1] pnames = np.array(pnames).ravel() if not np.all([x in bounds.keys() for x in pnames]): pbounds = {} for i in range(len(pnames)): if 'flux' in pnames[i]: pbounds[pnames[i]] = np.array(bounds['flux'])+p0s[i] else: pbounds[pnames[i]] = np.array(bounds['centroid'])+p0s[i] if pbounds[pnames[i]][0]<0: pbounds[pnames[i]][0] = 0 else: pbounds = bounds if fit_bkg: assert 'bkg' in bounds.keys(),"Must supply bounds for bkg" pnames = np.append(pnames,['bkg']) pbounds['bkg'] = bounds['bkg'] if psf_method =='nest': self.nest_psf(pnames,pbounds,cutouts,cutout_errs,all_xf,all_yf,cutouts_big, psf_width=fit_width,npoints=npoints,use_MLE=use_MLE,maxiter=maxiter,center_weight=center_weight) elif psf_method == 'fast': self.fast_psf(self.psf_model_list[0],centers,fit_width,True) #(self,psf_model,centers,psf_width=5,local_bkg=False,**kwargs) if fit_centroid: result_cal = {'ra':[],'ra_err':[],'dec':[],'dec_err':[],'x':[],'x_err':[], 'y':[],'y_err':[],'mjd':[], 'flux':[],'fluxerr':[],'filter':[], 'zp':[],'mag':[],'magerr':[],'zpsys':[],'exp':[]} else: result_cal = {'ra':[],'dec':[],'x':[], 'y':[],'mjd':[], 'flux':[],'fluxerr':[],'filter':[], 'zp':[],'mag':[],'magerr':[],'zpsys':[],'exp':[]} model_psf = None psf_pams = [] i = 0 flux_var = 'flux' if fit_centroid: x = self.psf_result.best[self.psf_result.vparam_names.index('x%i'%i)] y = self.psf_result.best[self.psf_result.vparam_names.index('y%i'%i)] xerr = self.psf_result.errors['x%i'%i] yerr = self.psf_result.errors['y%i'%i] sc = astropy.wcs.utils.pixel_to_skycoord(y,x,self.wcs) ra = sc.ra.value dec = sc.dec.value sc2 = astropy.wcs.utils.pixel_to_skycoord(y+yerr,x+xerr,self.wcs) raerr = np.abs(sc2.ra.value-ra) decerr = np.abs(sc2.dec.value-dec) else: x = float(self.psf_model_list[i].x_0.value) y = float(self.psf_model_list[i].y_0.value) sc = astropy.wcs.utils.pixel_to_skycoord(y,x,self.wcs) ra = sc.ra.value dec = sc.dec.value #if 'bkg' in self.psf_result.vparam_names: # bk_std = np.sqrt(self.psf_result.best[self.psf_result.vparam_names.index('bkg')]/self.epadu) #elif background is not None: # bk_std = np.sqrt(background/self.epadu) #else: bk_std = 0 flux_sum = self.psf_result.best[self.psf_result.vparam_names.index(flux_var)] #if self.telescope.lower()=='jwst': # psf_corr = 1 #yf, xf = np.mgrid[0:self.data.shape[0],0:self.data.shape[1]].astype(int) # slc_lg, _ = astropy.nddata.overlap_slices(self.data.shape, [201,201], # [self.psf_model_list[i].y_0.value,self.psf_model_list[i].x_0.value], mode='trim') # yy, xx = np.mgrid[slc_lg] # psf_arr = self.psf_model_list[i](xx,yy)#*self.pams[i]#self.psf_pams[i] #flux_sum = np.sum(psf_arr)#simple_aperture_sum(psf_arr,np.atleast_2d([y,x]),10) # flux_sum = self.psf_result.best[self.psf_result.vparam_names.index(flux_var)] #else: # yf, xf = np.mgrid[-10:11,-10:11].astype(int) # xf += int(self.psf_model_list[i].y_0+.5) # yf += int(self.psf_model_list[i].x_0+.5) # psf_arr = self.psf_model_list[i](yf,xf)#*self.pams[i]#self.psf_pams[i] # flux_sum = simple_aperture_sum(psf_arr,[[10,10]], # 5.5) # apcorr = hst_apcorr(5.5*self.px_scale,self.filter,self.instrument) # flux_sum/=apcorr if self.telescope == 'JWST': #psf_corr,model_psf = calc_jwst_psf_corr(self.psf_model_list[i].shape[0]/2,self.instrument,self.filter,self.wcs_list[i],psf=model_psf) #psf_corr = 1 flux,fluxerr,mag,magerr,zp = calibrate_JWST_flux(flux_sum, np.sqrt(((self.psf_result.errors[flux_var]/self.psf_result.best[self.psf_result.vparam_names.index(flux_var)])*\ flux_sum)**2+bk_std**2),self.wcs,flux_units=self.flux_units) else: flux,fluxerr,mag,magerr,zp = calibrate_HST_flux(flux_sum, np.sqrt(((self.psf_result.errors[flux_var]/self.psf_result.best[self.psf_result.vparam_names.index(flux_var)])*\ flux_sum)**2+bk_std**2),self.prim_header,self.sci_header) result_cal['x'].append(x) result_cal['y'].append(y) try: result_cal['mjd'].append(self.sci_header['MJD-AVG']) except: result_cal['mjd'].append(self.prim_header['EXPSTART']) result_cal['flux'].append(flux) result_cal['fluxerr'].append(fluxerr) result_cal['filter'].append(self.filter) result_cal['zp'].append(zp) result_cal['mag'].append(mag) result_cal['magerr'].append(magerr) result_cal['zpsys'].append('ab') result_cal['exp'].append(os.path.basename(self.fname)) if fit_centroid: result_cal['ra_err'].append(raerr) result_cal['dec_err'].append(decerr) result_cal['x_err'].append(xerr) result_cal['y_err'].append(yerr) result_cal['ra'].append(ra) result_cal['dec'].append(dec) self.psf_result.phot_cal_table = astropy.table.Table(result_cal) print('Finished PSF psf_photometry with median residuals of %.2f'%\ (100*np.nanmedian([np.nansum(self.psf_result.resid_arr[i])/\ np.nansum(self.psf_result.data_arr[i]) for i in range(self.n_exposures)]))+'%') return self.psf_result
[docs] def create_psf_subtracted(self,sci_ext=1,fname=None): """ Use the best fit PSF models to create a PSF-subtracted image Parameters ---------- sci_ext : int The SCI extension to use (e.g., if this is UVIS) fname : str Output filename """ try: temp = self.psf_result except: print('Must run PSF fit.') return x = float(self.psf_model_list[0].x_0.value) y = float(self.psf_model_list[0].y_0.value) psf_width = self.psf_model_list[0].data.shape[0] yf, xf = np.mgrid[0:self.data.shape[0],0:self.data.shape[1]].astype(int) if self.sci_header['BUNIT'] in ['ELECTRON','ELECTRONS']: psf_arr = self.psf_model_list[0](yf,xf)*self.prim_header['EXPTIME'] else: psf_arr = self.psf_model_list[0](yf,xf) if fname is None: temp = astropy.io.fits.open(self.fname) #temp['SCI',sci_ext].data = self.data_arr[i]-psf_arr else: temp = astropy.io.fits.open(fname) plt.imshow(psf_arr) plt.show() plt.imshow(temp['SCI',sci_ext].data) plt.show() temp['SCI',sci_ext].data-= psf_arr plt.imshow(temp['SCI',sci_ext].data) plt.show() temp.writeto(self.fname.replace('.fits','_resid.fits'),overwrite=True) temp.close() return self.fname.replace('.fits','_resid.fits')
[docs] def aperture_photometry( self, sky_location, xy_positions=None, radius=None, encircled_energy=None, skyan_in=None, skyan_out=None, alternate_ref=None, radius_in_arcsec=False, ): """ Perform aperture photometry on a single drizzled (level 3) image. This uses :func:`space_phot.util.generic_aperture_phot` together with the appropriate HST/JWST aperture corrections and flux calibration. Parameters ---------- sky_location : astropy.coordinates.SkyCoord Sky coordinate of the source. Ignored if ``xy_positions`` is given. xy_positions : array_like, optional One or more (x, y) pixel positions. If provided, these override ``sky_location``. radius : float, optional Aperture radius in pixels (or arcsec if ``radius_in_arcsec=True``). Required for HST. For JWST this may be omitted if ``encircled_energy`` is supplied. encircled_energy : int or str, optional For JWST only. Encircled-energy value (e.g. 70 for 70 percent) to use when querying the JWST aperture-correction reference files. skyan_in, skyan_out : float, optional Inner and outer radii of the background annulus in pixels. If not provided, sensible defaults are chosen based on the aperture size (or JWST reference file if available). alternate_ref : str, optional Optional alternate reference file to pass through to the JWST aperture-correction helper. radius_in_arcsec : bool, optional If True, ``radius``, ``skyan_in``, and ``skyan_out`` are interpreted as arcsec and converted to pixels using ``self.pixel_scale``. Returns ------- aperture_result : sncosmo.utils.Result Result object with the following attributes: * ``radius`` – aperture radius in pixels * ``ee`` – encircled energy (JWST only; None for HST) * ``apcorr`` – aperture correction factor * ``sky_an`` – dict with the sky annulus radii * ``phot_table`` – raw photometry table * ``phot_cal_table`` – calibrated photometry table """ # Basic parameter validation if radius is None and encircled_energy is None: raise ValueError("Must supply either 'radius' or 'encircled_energy'.") if self.telescope.lower() == "hst" and radius is None: raise ValueError("For HST data you must supply an aperture 'radius'.") # Normalise EE input for JWST ee = None if encircled_energy is not None: ee = int(encircled_energy) # Helper for arcsec → pixels def _to_pixels(val): if val is None: return None return float(val) / float(self.pixel_scale) # Convert radii from arcsec to pixels if requested if radius_in_arcsec: radius = _to_pixels(radius) skyan_in = _to_pixels(skyan_in) skyan_out = _to_pixels(skyan_out) # Build result containers result = { "pos_x": [], "pos_y": [], "aper_bkg": [], "aperture_sum": [], "aperture_sum_err": [], "aper_sum_corrected": [], "aper_sum_bkgsub": [], "annulus_median": [], "exp": [], } result_cal = { "flux": [], "fluxerr": [], "filter": [], "zp": [], "mag": [], "magerr": [], "zpsys": [], "exp": [], "mjd": [], } # Source positions if xy_positions is None: try: x, y = astropy.wcs.utils.skycoord_to_pixel(sky_location, self.wcs) positions = np.atleast_2d(np.column_stack([x, y])) except Exception: positions = np.atleast_2d( astropy.wcs.utils.skycoord_to_pixel(sky_location, self.wcs) ) else: positions = np.atleast_2d(xy_positions) # Instrument-specific aperture parameters apcorr = 1.0 sky_in_pix = skyan_in sky_out_pix = skyan_out if self.telescope.upper() == "JWST": # JWST: aperture correction lookup/interp if ee is not None: apres = jwst_apcorr(self.fname, ee, alternate_ref=alternate_ref) if apres is None: raise RuntimeError(f"Failed JWST aperture correction for EE={ee}.") # jwst_apcorr returns PIXELS (not arcsec) radius_pix, apcorr, ref_sky_in_pix, ref_sky_out_pix = apres radius = float(radius_pix) if sky_in_pix is None: sky_in_pix = float(ref_sky_in_pix) if sky_out_pix is None: sky_out_pix = float(ref_sky_out_pix) else: # jwst_apcorr_interp expects radius in PIXELS apres = jwst_apcorr_interp(self.fname, float(radius), alternate_ref=alternate_ref) if apres is None: raise RuntimeError( f"Failed JWST aperture correction interpolation for radius={radius} px." ) ee, apcorr, ref_sky_in_pix, ref_sky_out_pix = apres if sky_in_pix is None: sky_in_pix = float(ref_sky_in_pix) if sky_out_pix is None: sky_out_pix = float(ref_sky_out_pix) # For JWST: noise-scale factor for the aperture-phot error model # (keep your existing behavior here) epadu = self.sci_header.get("XPOSURE", 1.0) * self.sci_header.get("PHOTMJSR", 1.0) apcorr = 1/apcorr # jwst apcorr > 1, hst < 1 else: # HST if self.sci_header.get("BUNIT", "").upper() == "ELECTRON": epadu = 1.0 else: epadu = self.prim_header.get("EXPTIME", 1.0) px_scale = ( astropy.wcs.utils.proj_plane_pixel_scales(self.wcs)[0] * self.wcs.wcs.cunit[0].to("arcsec") ) # hst_apcorr expects radius in arcsec apcorr_arr = hst_apcorr(radius * px_scale, self.filter, self.instrument) try: apcorr = float(apcorr_arr[0]) except Exception: apcorr = float(apcorr_arr) if sky_in_pix is None: sky_in_pix = radius * 3.0 if sky_out_pix is None: sky_out_pix = radius * 4.0 sky = {"sky_in": sky_in_pix, "sky_out": sky_out_pix} # Run the generic aperture photometry phot = generic_aperture_phot( self.data, positions, radius=radius, sky=sky, epadu=epadu, error=self.err, ) # Fill result (raw aperture quantities) result["pos_x"] = np.array(positions[:, 0]) result["pos_y"] = np.array(positions[:, 1]) result["aper_bkg"] = np.array(phot["aper_bkg"]) result["aperture_sum"] = np.array(phot["aperture_sum"]) if "aperture_sum_err" in phot.colnames: result["aperture_sum_err"] = np.array(phot["aperture_sum_err"]) else: result["aperture_sum_err"] = np.zeros(len(phot)) result["aper_sum_bkgsub"] = np.array(phot["aper_sum_bkgsub"]) result["annulus_median"] = np.array(phot["annulus_median"]) result["exp"] = [os.path.basename(self.fname)] * len(phot) # Apply aperture correction result["aper_sum_corrected"] = result["aper_sum_bkgsub"] / apcorr result["aperture_sum_err"] = result["aperture_sum_err"] / apcorr # Flux calibration if self.telescope.upper() == "JWST": flux, fluxerr, mag, magerr, zp = calibrate_JWST_flux( np.array(result["aper_sum_corrected"]), np.array(result["aperture_sum_err"]), self.wcs, flux_units=self.flux_units, ) mjd = self.sci_header.get("MJD-AVG", np.nan) else: flux, fluxerr, mag, magerr, zp = calibrate_HST_flux( np.array(result["aper_sum_corrected"]), np.array(result["aperture_sum_err"]), self.prim_header, self.sci_header, ) mjd = self.sci_header.get("MJD-AVG", self.prim_header.get("MJD-AVG", np.nan)) # Fill calibrated result result_cal["flux"] = np.atleast_1d(flux).tolist() result_cal["fluxerr"] = np.atleast_1d(fluxerr).tolist() result_cal["mag"] = np.atleast_1d(mag).tolist() result_cal["magerr"] = np.atleast_1d(magerr).tolist() result_cal["filter"] = [self.filter] * len(result_cal["flux"]) result_cal["zpsys"] = ["ab"] * len(result_cal["flux"]) # zp may be scalar or array-like if np.ndim(zp) == 0: result_cal["zp"] = [float(zp)] * len(result_cal["flux"]) else: result_cal["zp"] = np.atleast_1d(zp).tolist() result_cal["exp"] = [os.path.basename(self.fname)] * len(result_cal["flux"]) result_cal["mjd"] = [mjd] * len(result_cal["flux"]) # Wrap in a sncosmo Result object for backwards compatibility res = sncosmo.utils.Result( radius=radius, ee=ee, apcorr=apcorr, sky_an=sky, phot_table=astropy.table.Table(result), phot_cal_table=astropy.table.Table(result_cal), ) self.aperture_result = res return res
[docs] def plant_psf(self,psf_model,plant_locations,magnitudes,out_fname=None): """ PSF planting class. Output files will be the same directory as the data files, but with _plant.fits added the end. Parameters ---------- psf_model : :class:`~photutils.psf.EPSFModel` In reality this does not need to be an EPSFModel, but just any photutils psf model class. plant_locations : list The location(s) to plant the psf magnitudes: The magnitudes to plant your psf (matching length of plant_locations) """ if not isinstance(plant_locations,(list,np.ndarray)): plant_locations = [plant_locations] if isinstance(magnitudes,(int,float)): magnitudes = [magnitudes]*len(plant_locations) if not isinstance(psf_model,list): psf_model = [psf_model]*len(psf_model) assert len(psf_model)==len(plant_locations)==len(magnitudes), "Must supply same number of psfs,plant_locations,mags" #psf_corr,mod_psf = calc_jwst_psf_corr(psf_model.data.shape[0]/2,self.instrument, # self.filter,self.wcs_list[0]) if out_fname is None: out_fname = self.fname.replace('.fits','_plant.fits') plant_info = {key:[] for key in ['x','y','ra','dec','mag','flux']} temp = astropy.io.fits.open(self.fname) for j in range(len(plant_locations)): if isinstance(plant_locations[j],astropy.coordinates.SkyCoord): y,x = astropy.wcs.utils.skycoord_to_pixel(plant_locations[j],self.wcs) ra = plant_locations[j].ra.value dec = plant_locations[j].dec.value else: x,y = plant_locations[j] sc = astropy.wcs.utils.pixel_to_skycoord(x,y,self.wcs) ra = sc.ra.value dec = sc.dec.value if self.telescope.upper()=='JWST': flux = JWST_mag_to_flux(magnitudes[j],self.wcs) else: flux = HST_mag_to_flux(magnitudes[j],self.prim_header,self.sci_header) psf_model[j].x_0 = x psf_model[j].y_0 = y psf_model[j].flux = flux#/np.sum(psf_model[j].data)#*self.exp#/psf_corr #psf_arr = flux*psf_model.data/astropy.nddata.extract_array(\ # self.pams[i],psf_model.data.shape,[x,y]) #psf_width = 101 #xf,yf = np.meshgrid(np.arange(-4*psf_width/2,psf_width/2*4+1,1).astype(int)+int(x+.5), # np.arange(-4*psf_width/2,psf_width/2*4+1,1).astype(int)+int(y+.5)) xf, yf = np.mgrid[0:temp['SCI',1].data.shape[0],0:temp['SCI',1].data.shape[1]].astype(int) psf_arr = psf_model[j](xf,yf) plant_info['x'].append(x) plant_info['y'].append(y) plant_info['ra'].append(ra) plant_info['dec'].append(dec) plant_info['mag'].append(magnitudes[j]) plant_info['flux'].append(np.sum(psf_arr)) temp['SCI',1].data[xf,yf]+=psf_arr# = astropy.nddata.add_array(temp['SCI',1].data, #psf_arr,[x,y]) astropy.table.Table(plant_info).write(out_fname.replace('.fits','.dat'),overwrite=True, format='ascii.ecsv') temp.writeto(out_fname,overwrite=True)
[docs] class observation2(observation): """ st_phot class for level 2 (individual exposures, cals, flts) data """ def __init__(self,exposure_fnames,sci_ext=1): self.pipeline_level = 2 self.sci_ext = int(sci_ext) self.exposure_fnames = exposure_fnames if not isinstance(exposure_fnames,str) else [exposure_fnames] self.exposures = [astropy.io.fits.open(f) for f in self.exposure_fnames] self.data_arr = [im['SCI',sci_ext].data for im in self.exposures] self.err_arr = [im['ERR',sci_ext].data for im in self.exposures] try: self.dq_arr = [im['DQ',sci_ext].data for im in self.exposures] except: self.dq = [np.zeros(im.shape) for im in self.data_arr] self.prim_headers = [im[0].header for im in self.exposures] self.sci_headers = [im['SCI',sci_ext].header for im in self.exposures] self.wcs_list = [astropy.wcs.WCS(hdr,dat) for hdr,dat in zip(self.sci_headers,self.exposures)] self.n_exposures = len(self.exposures) self.pixel_scale = np.array([astropy.wcs.utils.proj_plane_pixel_scales(self.wcs_list[i])[0] *\ self.wcs_list[i].wcs.cunit[0].to('arcsec') for i in range(self.n_exposures)]) self.telescope = self.prim_headers[0]['TELESCOP'] self.instrument = self.prim_headers[0]['INSTRUME'] try: self.detector = self.prim_headers[0]['DETECTOR'] self.detector = self.detector.replace('LONG','5') self.detector = self.detector[:5] except: self.detector = None if self.sci_headers[0]['BUNIT'] in ['ELECTRON','ELECTRONS']: self.data_arr = [self.data_arr[i]/self.prim_headers[i]['EXPTIME'] for i in range(self.n_exposures)] self.err_arr = [self.err_arr[i]/self.prim_headers[i]['EXPTIME'] for i in range(self.n_exposures)] if 'FILTER' in self.prim_headers[0].keys(): self.filter = np.unique([hdr['FILTER'] for hdr in self.prim_headers]) elif 'FILTER1' in self.prim_headers[0].keys(): if 'CLEAR' in self.prim_headers[0]['FILTER1']: self.filter = np.unique([hdr['FILTER2'] for hdr in self.prim_headers]) else: self.filter = np.unique([hdr['FILTER1'] for hdr in self.prim_headers]) else: self.filter = np.unique([hdr['FILTER'] for hdr in self.sci_headers]) if len(self.filter)>1: raise RuntimeError("Each observation should only have one filter.") self.filter = self.filter[0] if self.telescope.lower()=='jwst': self.pams = [im['AREA'].data for im in self.exposures] else: self.pams = [] for fname in self.exposure_fnames: pamutils.pam_from_file(fname, ('sci', sci_ext), 'temp_pam.fits') self.pams.append(astropy.io.fits.open('temp_pam.fits')[0].data) os.remove('temp_pam.fits') try: self.flux_units = astropy.units.Unit(self.sci_headers[0]['BUNIT']) except: try: self.flux_units = astropy.units.Unit(self.prim_headers[0]['BUNIT']) except: if self.telescope=='JWST': print('Cannot create flux_units from header...assuming MJy/sr') self.flux_units = astropy.units.MJy/astropy.units.sr else: print('Cannot create flux_units from header...assuming electrons') self.flux_units = astropy.units.electron self.data_arr_pam = [im*pam for im,pam in zip(self.data_arr,self.pams)] self.px_scale = astropy.wcs.utils.proj_plane_pixel_scales(self.wcs_list[0])[0] *\ self.wcs_list[0].wcs.cunit[0].to('arcsec') for i in range(self.n_exposures): self.exposures[i].close()
[docs] def upper_limit(self,nsigma=3,method='psf'): if method.lower()=='psf': try: tab = self.psf_result.phot_cal_table except: print('Must run PSF fit for upper limit and method="psf"') return elif method.lower()=='aperture': try: tab = self.aperture_result.phot_cal_table except: print('Must run aperture fit for upper limit and method="aperture"') return else: print('Do not recognize phot method.') return if self.telescope.lower()=='jwst': _,_,mag,_,_ = calibrate_JWST_flux(tab['fluxerr']*nsigma,1,self.wcs_list[0],flux_units=astropy.units.MJy) else: _,_,mag,_,_ = calibrate_HST_flux(tab['fluxerr']*nsigma,1,self.prim_headers[0],self.sci_headers[0]) return mag
[docs] def plant_psf(self,psf_model,plant_locations,magnitudes,multi_plant=False): """ PSF planting class. Output files will be the same directory as the data files, but with _plant.fits added the end. Parameters ---------- psf_model : :class:`~photutils.psf.EPSFModel` In reality this does not need to be an EPSFModel, but just any photutils psf model class. plant_locations : list The location(s) to plant the psf magnitudes: The magnitudes to plant your psf (matching length of plant_locations) """ if not isinstance(plant_locations,(list,np.ndarray)): plant_locations = [plant_locations] if isinstance(magnitudes,(int,float)): magnitudes = [magnitudes]*len(plant_locations) if not isinstance(psf_model,list): psf_model = [psf_model] assert isinstance(psf_model[0],list), "Must supply list of PSF models (for each exposure)" assert len(psf_model)==len(plant_locations) and len(psf_model[0])==self.n_exposures,\ "Must supply same number of psfs as plant locations." assert len(plant_locations)==len(magnitudes), "Must supply same number of plant_locations,mags" #psf_corr,mod_psf = calc_jwst_psf_corr(psf_model.data.shape[0]/2,self.instrument, # self.filter,self.wcs_list[0]) for i in range(self.n_exposures): plant_info = {key:[] for key in ['x','y','ra','dec','mag','flux']} if not multi_plant: temp = astropy.io.fits.open(self.exposure_fnames[i]) else: if os.path.exists(self.exposure_fnames[i].replace('.fits','_plant.fits')): temp = astropy.io.fits.open(self.exposure_fnames[i].replace('.fits','_plant.fits')) first_plant = False else: temp = astropy.io.fits.open(self.exposure_fnames[i]) first_plant = True #temp['SCI',1].data = np.zeros(temp['SCI',1].data.shape) #try: # temp['VAR_RNOISE',1].data = np.ones(temp['SCI',1].data.shape) #except: # temp['ERR',1].data = np.ones(temp['SCI',1].data.shape) for j in range(len(plant_locations)): if isinstance(plant_locations[j],astropy.coordinates.SkyCoord): y,x = astropy.wcs.utils.skycoord_to_pixel(plant_locations[j],self.wcs_list[i]) ra = plant_locations[j].ra.value dec = plant_locations[j].dec.value else: x,y = plant_locations[j] sc = astropy.wcs.utils.pixel_to_skycoord(x,y,self.wcs_list[i]) ra = sc.ra.value dec = sc.dec.value if self.telescope.upper()=='JWST': flux = JWST_mag_to_flux(magnitudes[j],self.wcs_list[i]) else: flux = HST_mag_to_flux(magnitudes[j],self.prim_headers[i],self.sci_headers[i]) psf_model[j][i].x_0 = x psf_model[j][i].y_0 = y #psf_arr = flux*psf_model.data/astropy.nddata.extract_array(\ # self.pams[i],psf_model.data.shape,[x,y]) psf_model[j][i].flux = flux # if self.telescope.lower=='jwst': # psf_model[j][i].flux = flux # else: # apcorr = hst_apcorr(5.6*self.px_scale,self.filter,self.instrument) # psf_model[j][i].flux = flux*apcorr if self.sci_headers[i]['BUNIT'] in ['ELECTRON','ELECTRONS']: psf_model[j][i].flux *= self.prim_headers[i]['EXPTIME'] # print(np.sum(psf_model[j][i](yf,xf)),flux) yf, xf = np.mgrid[0:temp['SCI',1].data.shape[0],0:temp['SCI',1].data.shape[1]].astype(int) psf_arr = psf_model[j][i](yf,xf)/self.pams[i] # plt.imshow(psf_arr) # plt.show() # print(np.sum(psf_arr),flux) plant_info['x'].append(x) plant_info['y'].append(y) plant_info['ra'].append(ra) plant_info['dec'].append(dec) plant_info['mag'].append(magnitudes[j]) plant_info['flux'].append(np.sum(psf_arr)) temp['SCI',1].data+=psf_arr# = astropy.nddata.add_array(temp['SCI',1].data, #psf_arr,[x,y]) if not multi_plant or first_plant: astropy.table.Table(plant_info).write(self.exposure_fnames[i].replace('.fits','_plant.dat'),overwrite=True, format='ascii') else: astropy.table.vstack([astropy.table.Table(plant_info), astropy.table.Table.read(self.exposure_fnames[i].replace('.fits','_plant.dat'), format='ascii')]).write(self.exposure_fnames[i].replace('.fits','_plant.dat'), overwrite=True, format='ascii') temp.writeto(self.exposure_fnames[i].replace('.fits','_plant.fits'),overwrite=True)
[docs] def aperture_photometry( self, sky_location, xy_positions=None, radius=None, encircled_energy=None, skyan_in=None, skyan_out=None, ): """ Aperture photometry for level 2 data (per exposure). Parameters ---------- sky_location : astropy.coordinates.SkyCoord Sky position of the source. Used if xy_positions is not given. xy_positions : (x, y) or array_like, optional Pixel positions of the source. If a single (x, y) pair is given, it is used for all exposures. If an array/list of shape (n_exposures, 2) is given, the ith row is used for the ith exposure. radius : float, optional Aperture radius in pixels. Required for HST. encircled_energy : int, optional For JWST, encircled-energy value (e.g. 70) to define the aperture. skyan_in : float, optional Inner radius of the sky annulus in pixels. If None, a default is used. skyan_out : float, optional Outer radius of the sky annulus in pixels. If None, a default is used. Returns ------- aperture_result : sncosmo.utils.Result Result object with raw and calibrated photometry tables. """ import warnings from astropy.wcs.utils import skycoord_to_pixel, proj_plane_pixel_scales from astropy.coordinates import SkyCoord # Basic sanity: must provide radius for HST or EE for JWST assert radius is not None or encircled_energy is not None, \ "Must supply radius or encircled_energy" assert ( (self.telescope.lower() == "hst" and radius is not None) or (self.telescope.lower() == "jwst" and encircled_energy is not None) ), "Must supply radius for HST or encircled_energy for JWST" # Containers for per-exposure raw photometry result = { "pos_x": [], "pos_y": [], "aper_bkg": [], "aperture_sum": [], "aperture_sum_err": [], "aper_sum_corrected": [], "aper_sum_bkgsub": [], "annulus_median": [], "exp": [], } # Containers for per-exposure calibrated photometry result_cal = { "flux": [], "fluxerr": [], "filter": [], "zp": [], "mag": [], "magerr": [], "zpsys": [], "exp": [], "mjd": [], } # Helper to get positions for each exposure def _get_positions_for_exposure(i): if xy_positions is None: sc = sky_location # Handle scalar vs sequence SkyCoord if isinstance(sc, SkyCoord) and not sc.isscalar: sc = sc[i] elif not isinstance(sc, SkyCoord) and hasattr(sc, "__len__"): sc = sc[i] x, y = skycoord_to_pixel(sc, self.wcs_list[i]) return np.atleast_2d([x, y]) xy = np.array(xy_positions, dtype=float) # Single (x, y) for all exposures if xy.ndim == 1 and xy.shape[0] == 2: return np.atleast_2d(xy) # One (x, y) per exposure return np.atleast_2d(xy[i]) # Loop over exposures for i in range(self.n_exposures): positions = _get_positions_for_exposure(i) # --- Instrument-specific setup --- if self.telescope.upper() == "JWST": if encircled_energy is None: raise AssertionError( "encircled_energy must be supplied for JWST level-2 data" ) try: # jwst_apcorr returns radii and apcorr appropriate for this data; # we do NOT try to re-interpret the units here. radius_i, apcorr, sky_in_i, sky_out_i = jwst_apcorr( self.exposure_fnames[i], encircled_energy ) except Exception as exc: warnings.warn( f"jwst_apcorr failed for {self.exposure_fnames[i]} " f"({type(exc).__name__}: {exc}). " "Falling back to apcorr=1 and simple sky annulus." ) radius_i = radius if radius is not None else 3.0 apcorr = 1.0 sky_in_i = skyan_in if skyan_in is not None else radius_i * 3.0 sky_out_i = skyan_out if skyan_out is not None else radius_i * 4.0 # Ensure scalar radii radius_i = float(np.atleast_1d(radius_i)[0]) sky_in_i = float(np.atleast_1d(sky_in_i)[0]) sky_out_i = float(np.atleast_1d(sky_out_i)[0]) if radius_i <= 0: raise ValueError("Aperture radius must be positive for JWST") # epadu-like scaling; matches existing JWST L2 logic xposure = self.sci_headers[i]["XPOSURE"] \ if hasattr(self.sci_headers[i], "__getitem__") and "XPOSURE" in self.sci_headers[i] \ else 1.0 photmjsr = self.sci_headers[i]["PHOTMJSR"] \ if hasattr(self.sci_headers[i], "__getitem__") and "PHOTMJSR" in self.sci_headers[i] \ else 1.0 epadu = xposure * photmjsr else: # HST branch radius_i = float(radius) if radius_i <= 0: raise ValueError("Aperture radius must be positive for HST") # BUNIT may not exist; treat missing as non-electron units try: bunit = self.sci_headers[i]["BUNIT"].upper() except Exception: bunit = "" if bunit in ["ELECTRON", "ELECTRONS"]: epadu = 1.0 else: try: epadu = float(self.prim_headers[i]["EXPTIME"]) except Exception: epadu = 1.0 px_scale = ( proj_plane_pixel_scales(self.wcs_list[i])[0] * self.wcs_list[i].wcs.cunit[0].to("arcsec") ) apcorr_arr = hst_apcorr(radius_i * px_scale, self.filter, self.instrument) try: apcorr = float(apcorr_arr[0]) except Exception: apcorr = float(apcorr_arr) if skyan_in is None: sky_in_i = radius_i * 3.0 else: sky_in_i = float(skyan_in) if skyan_out is None: sky_out_i = radius_i * 4.0 else: sky_out_i = float(skyan_out) sky = {"sky_in": sky_in_i, "sky_out": sky_out_i} # --- Raw aperture photometry --- phot = generic_aperture_phot( self.data_arr_pam[i], positions, radius_i, sky, error=self.err_arr[i], epadu=epadu, ) # Fill raw result result["pos_x"].append(float(positions[0][0])) result["pos_y"].append(float(positions[0][1])) result["aper_bkg"].append(float(phot["aper_bkg"])) result["aperture_sum"].append(float(phot["aperture_sum"])) result["aperture_sum_err"].append( float(phot["aperture_sum_err"]) if "aperture_sum_err" in phot.colnames else 0.0 ) result["aper_sum_bkgsub"].append(float(phot["aper_sum_bkgsub"])) result["annulus_median"].append(float(phot["annulus_median"])) expname = os.path.basename(self.exposure_fnames[i]) result["exp"].append(expname) # Apply aperture correction if self.telescope.lower() == "jwst": corr = float(phot["aper_sum_bkgsub"] * apcorr) err_corr = result["aperture_sum_err"][-1] * apcorr else: corr = float(phot["aper_sum_bkgsub"] / apcorr) err_corr = result["aperture_sum_err"][-1] / apcorr result["aper_sum_corrected"].append(corr) result["aperture_sum_err"][-1] = err_corr # --- Calibrate to physical flux / magnitudes --- if self.telescope.upper() == "JWST": flux_arr, fluxerr_arr, mag_arr, magerr_arr, zp = calibrate_JWST_flux( np.array([corr]), np.array([err_corr]), self.wcs_list[i], flux_units=self.flux_units, ) # MJD-AVG may or may not exist; if not, just set NaN try: mjd = float(self.sci_headers[i]["MJD-AVG"]) except Exception: mjd = np.nan else: flux_arr, fluxerr_arr, mag_arr, magerr_arr, zp = calibrate_HST_flux( np.array([corr]), np.array([err_corr]), self.prim_headers[i], self.sci_headers[i], ) # Prefer primary EXPSTART; fall back to NaN try: mjd = float(self.prim_headers[i]["EXPSTART"]) except Exception: mjd = np.nan flux = float(np.atleast_1d(flux_arr)[0]) fluxerr = float(np.atleast_1d(fluxerr_arr)[0]) mag = float(np.atleast_1d(mag_arr)[0]) magerr = float(np.atleast_1d(magerr_arr)[0]) result_cal["flux"].append(flux) result_cal["fluxerr"].append(fluxerr) result_cal["mag"].append(mag) result_cal["magerr"].append(magerr) result_cal["filter"].append(self.filter) result_cal["zp"].append(zp) result_cal["zpsys"].append("ab") result_cal["exp"].append(expname) result_cal["mjd"].append(mjd) res = sncosmo.utils.Result( radius=radius, apcorr=apcorr, sky_an={"sky_in": sky_in_i, "sky_out": sky_out_i}, phot_table=astropy.table.Table(result), phot_cal_table=astropy.table.Table(result_cal), ) self.aperture_result = res return res
[docs] def psf_photometry(self,psf_model,sky_location=None,xy_positions=[],fit_width=None,background=None, fit_flux='single',fit_centroid='pixel',fit_bkg=False,bounds={},npoints=100,use_MLE=False, maxiter=None,find_centroid=False,minVal=-np.inf,center_weight=20.0, xshift=0,yshift=0): """ st_phot psf photometry class for level 2 data. Parameters ---------- psf_model : :class:`~photutils.psf.EPSFModel` In reality this does not need to be an EPSFModel, but just any photutils psf model class. sky_location : :class:`~astropy.coordinates.SkyCoord` Location of your source xy_positions : list xy position of your source in each exposure. Must supply this or sky_location but this takes precedent. fit_width : int PSF width to fit (recommend odd number) background : float or list float, list of float, array, or list of array defining the background of your data. If you define an array, it should be of the same shape as fit_width fit_flux : str One of 'single','multi','fixed'. Single is a single flux across all exposures, multi fits a flux for every exposure, and fixed only fits the position fit_centroid : str One of 'pixel','wcs','fixed'. Pixel fits a pixel location of the source in each exposure, wcs fits a single RA/DEC across all exposures, and fixed only fits the flux and not position. fit_bkg : bool Fit for a constant background simultaneously with the PSF fit. bounds : dict Bounds on each parameter. npoints : int Number of points in the nested sampling (higher is better posterior sampling, but slower) use_MLE : bool Use the maximum likelihood to define best fit parameters, otherwise use a weighted average of the posterior samples maxiter : None or int If None continue sampling until convergence, otherwise defines the max number of iterations find_centroid : bool If True, then tries to find the centroid around your chosen location. """ assert sky_location is not None or len(xy_positions)==self.n_exposures,\ "Must supply sky_location or xy_positions for every exposure" assert fit_flux in ['single','multi','fixed'],\ "fit_flux must be one of: 'single','multi','fixed'" assert fit_centroid in ['pixel','wcs','fixed'],\ "fit_centroid must be one of: 'pixel','wcs','fixed'" assert len(bounds)>0,\ "Must supply bounds" if fit_flux=='fixed' and fit_centroid=='fixed': print('Nothing to do, fit flux and/or position.') return if fit_width is None: try: fit_width = psf_model.data.shape[0] except: RuntimeError("If you do not supply fit_width, your psf needs to have a data attribute (i.e., be an ePSF") if fit_width%2==0: print('PSF fitting width is even, subtracting 1.') fit_width-=1 centers = [] all_xf = [] all_yf = [] cutouts = [] cutout_errs = [] fluxg = [] all_bg_est = [] if not isinstance(psf_model,list): self.psf_model_list = [] for i in range(self.n_exposures): self.psf_model_list.append(deepcopy(psf_model)) else: self.psf_model_list = psf_model if isinstance(xshift,(int,float)): xshift = [xshift]*self.n_exposures if isinstance(yshift,(int,float)): yshift = [yshift]*self.n_exposures for im in range(self.n_exposures): if len(xy_positions)==self.n_exposures: xi,yi = xy_positions[im] else: yi,xi = astropy.wcs.utils.skycoord_to_pixel(sky_location,self.wcs_list[im]) xi+=xshift[im] yi+=yshift[im] yg, xg = np.mgrid[-1*(fit_width-1)/2:(fit_width+1)/2, -1*(fit_width-1)/2:(fit_width+1)/2].astype(int) yf, xf = yg+np.round(yi).astype(int), xg+np.round(xi).astype(int) cutout = self.data_arr_pam[im][xf, yf] if background is None: all_bg_est.append(np.zeros_like(cutout)) if not fit_bkg: print('Warning: No background subtracting happening here.') elif isinstance(background,(int,float)): all_bg_est.append(background+np.zeros_like(cutout)) elif len(background)==self.n_exposures: all_bg_est.append(background[im]+np.zeros_like(cutout)) else: all_bg_est.append(np.array(background)) if find_centroid: #xi2,yi2 = photutils.centroids.centroid_com(cutout) h, w = cutout.shape hx, hy = w // 2, h // 2 peak_idx = np.argmax(cutout) peak_y,peak_x = np.unravel_index(peak_idx, cutout.shape) # Offset from old center (can be fractional if desired) dy = peak_y - hy dx = peak_x - hx #import pdb #pdb.set_trace() # New center in full-image coordinates new_x = xi + dx new_y = yi + dy # # Extract new cutout # x1 = int(round(new_x)) - hx # x2 = int(round(new_x)) + hx + 1 # y1 = int(round(new_y)) - hy # y2 = int(round(new_y)) + hy + 1 y1 = int(np.round(new_y)) - hy y2 = y1 + h x1 = int(np.round(new_x)) - hx x2 = x1 + w #new_cutout = full_image[y1:y2, x1:x2] #print(xi,yi,xi2,yi2,(xi2-(fit_width-1)/2),(yi2-(fit_width-1)/2)) #plt.imshow(cutout) #plt.show() #xi += (xi2-(fit_width-1)/2) #yi += (yi2-(fit_width-1)/2) #yf, xf = yg+np.round(yi).astype(int), xg+np.round(xi).astype(int) #yf, xf = yg+np.round(yi).astype(int), xg+np.round(xi).astype(int) yf,xf = np.mgrid[y1:y2, x1:x2] cutout = self.data_arr_pam[im][xf,yf] centers.append([new_x,new_y]) #plt.imshow(cutout) #plt.show() else: centers.append([xi,yi]) all_xf.append(xf) all_yf.append(yf) err = self.err_arr[im][xf, yf] err[np.isnan(err)] = np.nanmax(err) err[err<=0] = np.max(err) err[cutout<minVal] = np.max(err) cutout[cutout<minVal] = np.nan cutouts.append(cutout)#-all_bg_est[im]) cutout_errs.append(err) if fit_flux!='fixed': #if all_bg_est[im]!=0 or True: f_guess = np.nansum(cutout) #else: # f_guess = np.nansum(cutout-np.nanmedian(self.data_arr_pam[im])) fluxg.append(f_guess) if fit_flux=='single': fluxg = [np.nanmedian(fluxg)] pnames = ['flux'] elif fit_flux=='multi': pnames = ['flux%i'%i for i in range(self.n_exposures)] else: pnames = [] if fit_centroid!='fixed': if fit_centroid=='pixel': for i in range(self.n_exposures): pnames.append(f'x{i}') pnames.append(f'y{i}') else: pnames.append(f'ra') pnames.append(f'dec') pnames = np.array(pnames).ravel() if fit_centroid=='wcs': new_centers = [] n = 0 for center in centers: sc = astropy.wcs.utils.pixel_to_skycoord(center[1],center[0],self.wcs_list[n]) new_centers.append([sc.ra.value,sc.dec.value]) n+=1 p0s = np.append(fluxg,[np.median(new_centers,axis=0)]).flatten() elif fit_centroid=='pixel': p0s = np.append(fluxg,centers).flatten() else: p0s = np.array(fluxg) for i in range(self.n_exposures): self.psf_model_list[i].x_0 = centers[i][0] self.psf_model_list[i].y_0 = centers[i][1] self.bkg_fluxes = all_bg_est if not np.all([x in bounds.keys() for x in pnames]): pbounds = {} for i in range(len(pnames)): if 'flux' in pnames[i]: pbounds[pnames[i]] = np.array(bounds['flux'])+p0s[i] #if pbounds[pnames[i]][0]<0: # pbounds[pnames[i]][0] = 0 #if pbounds[pnames[i]][1]<=0: # raise RuntimeError('Your flux bounds are both <=0.') else: if fit_centroid=='wcs': px_scale = astropy.wcs.utils.proj_plane_pixel_scales(self.wcs_list[0])[0] *\ self.wcs_list[0].wcs.cunit[0].to('deg') pbounds[pnames[i]] = np.array(bounds['centroid'])*px_scale+p0s[i] #pbounds[pnames[i]] = temp_bounds+p0s[i] # todo check inside wcs else: pbounds[pnames[i]] = np.array(bounds['centroid'])+p0s[i] if pbounds[pnames[i]][0]<0: pbounds[pnames[i]][0] = 0 else: pbounds = bounds if fit_bkg: assert 'bkg' in bounds.keys(),"Must supply bounds for bkg" if fit_flux=='multi': to_add = [] for i in range(len(pnames)): if 'flux' in pnames[i]: to_add.append('bkg%s'%pnames[i][-1]) pbounds['bkg%s'%pnames[i][-1]] = bounds['bkg'] pnames = np.append(pnames,to_add) else: pnames = np.append(pnames,['bkg']) pbounds['bkg'] = bounds['bkg'] self.nest_psf(pnames,pbounds,cutouts,cutout_errs,all_xf,all_yf, psf_width=fit_width,npoints=npoints,use_MLE=use_MLE,maxiter=maxiter,center_weight=center_weight) if fit_centroid!='fixed': result_cal = {'ra':[],'ra_err':[],'dec':[],'dec_err':[],'x':[],'x_err':[], 'y':[],'y_err':[],'mjd':[], 'flux':[],'fluxerr':[],'filter':[], 'zp':[],'mag':[],'magerr':[],'zpsys':[],'exp':[]} else: result_cal = {'ra':[],'dec':[],'x':[], 'y':[],'mjd':[], 'flux':[],'fluxerr':[],'filter':[], 'zp':[],'mag':[],'magerr':[],'zpsys':[],'exp':[]} model_psf = None psf_pams = [] for i in range(self.n_exposures): if fit_flux=='single': flux_var = 'flux' else: flux_var = 'flux%i'%i if fit_centroid=='wcs': ra = self.psf_result.best[self.psf_result.vparam_names.index('ra')] dec = self.psf_result.best[self.psf_result.vparam_names.index('dec')] raerr = self.psf_result.errors['ra'] decerr = self.psf_result.errors['dec'] sky_location = astropy.coordinates.SkyCoord(ra, dec, unit=astropy.units.deg) y,x = astropy.wcs.utils.skycoord_to_pixel(sky_location,self.wcs_list[i]) #raise RuntimeError('Need to implement xy errors from wcs') xerr = 0 yerr = 0 elif fit_centroid=='pixel': x = self.psf_result.best[self.psf_result.vparam_names.index('x%i'%i)] y = self.psf_result.best[self.psf_result.vparam_names.index('y%i'%i)] xerr = self.psf_result.errors['x%i'%i] yerr = self.psf_result.errors['y%i'%i] sc = astropy.wcs.utils.pixel_to_skycoord(y,x,self.wcs_list[i]) ra = sc.ra.value dec = sc.dec.value sc2 = astropy.wcs.utils.pixel_to_skycoord(y+yerr,x+xerr,self.wcs_list[i]) raerr = np.abs(sc2.ra.value-ra) decerr = np.abs(sc2.dec.value-dec) else: x = self.psf_model_list[i].x_0.value y = self.psf_model_list[i].y_0.value #xerr = self.psf_result.errors['x%i'%i] #yerr = self.psf_result.errors['y%i'%i] sc = astropy.wcs.utils.pixel_to_skycoord(y,x,self.wcs_list[i]) ra = sc.ra.value dec = sc.dec.value #sc2 = astropy.wcs.utils.pixel_to_skycoord(y+yerr,x+xerr,self.wcs_list[i]) #raerr = np.abs(sc2.ra.value-ra) #decerr = np.abs(sc2.dec.value-dec) if 'bkg' in self.psf_result.vparam_names: bk_std = self.psf_result.errors['bkg'] elif 'bkg%i'%i in self.psf_result.vparam_names: bk_std = self.psf_result.errors['bkg%i'%i] else: bk_std = 0 #psf_pam = astropy.nddata.utils.extract_array(self.pams[i],self.psf_model_list[i].data.shape,[x,y],mode='strict') #psf_pams.append(psf_pam) #yf, xf = np.mgrid[0:self.data_arr[i].shape[0],0:self.data_arr[i].shape[1]].astype(int) # apcorr = hst_apcorr(radius*px_scale,self.filter,self.instrument) # if skyan_in is None: # skyan_in = radius*3 # if skyan_out is None: # skyan_out = radius*4 # sky = {'sky_in':skyan_in,'sky_out':skyan_out} # phot = generic_aperture_phot(self.data_arr_pam[i],positions,radius,sky,error=self.err_arr[i], # epadu=epadu) flux_sum = self.psf_result.best[self.psf_result.vparam_names.index(flux_var)]#np.sum(psf_arr)#simple_aperture_sum(psf_arr,np.atleast_2d([y,x]),10) #if self.telescope.lower()=='hst': # yf, xf = np.mgrid[-10:11,-10:11].astype(int) # xf += int(self.psf_model_list[i].y_0+.5) # yf += int(self.psf_model_list[i].x_0+.5) # psf_arr = self.psf_model_list[i](yf,xf)#*self.pams[i]#self.psf_pams[i] # flux_sum = simple_aperture_sum(psf_arr,[[10,10]], # 5.5) #flux_sum = # psf_corr = hst_apcorr(5.5*self.px_scale,self.filter,self.instrument) #flux_sum/=apcorr #else: # psf_corr = 1 #yf, xf = np.mgrid[0:self.data_arr[i].shape[0],0:self.data_arr[i].shape[1]].astype(int) #psf_arr = self.psf_model_list[i](yf,xf) #import pdb #pdb.set_trace() if self.telescope == 'JWST': flux,fluxerr,mag,magerr,zp = calibrate_JWST_flux(flux_sum, self.psf_result.errors[flux_var],self.wcs_list[i],flux_units=self.flux_units) else: #import pdb #pdb.set_trace() flux,fluxerr,mag,magerr,zp = calibrate_HST_flux(flux_sum, self.psf_result.errors[flux_var],self.prim_headers[i],self.sci_headers[i]) result_cal['x'].append(x) result_cal['y'].append(y) try: result_cal['mjd'].append(self.sci_headers[i]['MJD-AVG']) except: result_cal['mjd'].append(self.prim_headers[i]['EXPSTART']) result_cal['flux'].append(flux) result_cal['fluxerr'].append(fluxerr) result_cal['filter'].append(self.filter) result_cal['zp'].append(zp) result_cal['mag'].append(mag) result_cal['magerr'].append(magerr) result_cal['zpsys'].append('ab') result_cal['exp'].append(os.path.basename(self.exposure_fnames[i])) if fit_centroid!='fixed': result_cal['ra_err'].append(raerr) result_cal['dec_err'].append(decerr) result_cal['x_err'].append(xerr) result_cal['y_err'].append(yerr) result_cal['ra'].append(ra) result_cal['dec'].append(dec) self.psf_result.phot_cal_table = astropy.table.Table(result_cal) self.psf_pams = psf_pams print('Finished PSF psf_photometry with median residuals of %.2f'%\ (100*np.nanmedian([np.nansum(self.psf_result.resid_arr[i])/\ np.nansum(self.psf_result.data_arr[i]) for i in range(self.n_exposures)]))+'%') return self.psf_result
#(100*np.median([np.median(self.psf_result.resid_arr[i]/\ # self.psf_result.data_arr[i]) for i in range(self.n_exposures)]))+'%')
[docs] def create_psf_subtracted(self,sci_ext=1,fname=None): """ Use the best fit PSF models to create a PSF-subtracted image Parameters ---------- sci_ext : int The SCI extension to use (e.g., if this is UVIS) fname : str Output filename """ try: temp = self.psf_result except: print('Must run PSF fit.') return for i in range(self.n_exposures): x = float(self.psf_model_list[i].x_0.value) y = float(self.psf_model_list[i].y_0.value) psf_width = self.psf_model_list[i].data.shape[0] yf, xf = np.mgrid[0:self.data_arr[i].shape[0],0:self.data_arr[i].shape[1]].astype(int) if self.sci_headers[0]['BUNIT'] in ['ELECTRON','ELECTRONS']: psf_arr = self.psf_model_list[i](yf,xf)*self.prim_headers[i]['EXPTIME'] else: psf_arr = self.psf_model_list[i](yf,xf) if fname is None: temp = astropy.io.fits.open(self.exposure_fnames[i]) #temp['SCI',sci_ext].data = self.data_arr[i]-psf_arr else: if isinstance(fname,str): temp = astropy.io.fits.open(fname) else: temp = astropy.io.fits.open(fname[i]) temp['SCI',sci_ext].data-= psf_arr print(np.max(temp['SCI',sci_ext].data)) #print() temp.writeto(self.exposure_fnames[i].replace('.fits','_resid.fits'),overwrite=True) temp.close() return [self.exposure_fnames[i].replace('.fits','_resid.fits') for i in range(self.n_exposures)]
[docs] def plot_phot(self,method='psf'): """ Plot the photometry Parameters ---------- method : str psf or aperture """ try: if method=='aperture': sncosmo.plot_lc(self.aperture_result.phot_cal_table) else: sncosmo.plot_lc(self.psf_result.phot_cal_table) except: print('Could not plot phot table for %s method'%method) return plt.show() return plt.gcf()