import numpy as np
from scipy import signal
from copy import deepcopy
import glob
[docs]
def sigma_clip_1d(spectra,region_size=10,sigma=2,max_iter=5,replace_with='median',**kwargs):
error = kwargs.get("error",spectra)
return np.hstack([sigma_clip_replace(spectra[i:i+region_size], sigma=sigma, max_iter=max_iter, replace_with="median",error=error[i:i+region_size])[0] for i in range(0,spectra.shape[0],region_size)])
[docs]
def small_fun(p,rang,recon_s,Targ):
return np.sum((p*recon_s[rang[0]:rang[1]]-Targ[rang[0]:rang[1]])**2)*1e30
[docs]
def mad(data,axis=0):
median = np.median(data,axis=axis)
absolute_deviations = np.abs(data - median[...,np.newaxis])
return np.median(absolute_deviations,axis=axis)
[docs]
def get_images_paths(system_name,band,OB,data_path=""):
if not data_path:
data_path = f"../../spectra_lens_paper/spectroscopic_data/{system_name}/{OB}/{band}/"
images_paths = sorted(glob.glob(f'{data_path}/*SCI_SLIT_MERGE2D*'))
if len(images_paths)==0:
print("CHECK THE PATH")
if isinstance(images_paths,str):
images_paths = [images_paths]
#name_images.sort()
response_paths = sorted(glob.glob(f'{data_path}/RESPONSE*'))
if isinstance(response_paths,str):
response_paths = [response_paths]
if len(response_paths)==1:
response_paths = response_paths * len(images_paths)
#images_paths.sort(),response_paths.sort()
telluric_path = sorted(glob.glob(f'{data_path}/*/*_TELLURIC*'))
if len(telluric_path) != len(images_paths):
if len(telluric_path) == 1:
print('We will use the same telluric for all the images ')
telluric_path = telluric_path * len(images_paths)
else :
telluric_path = sorted(glob.glob(f'{data_path}/*/TELLURIC*'))
#len(telluric_path) != len(images_paths):
if len(telluric_path) == 1:
print('We will use the same telluric for all the images ')
telluric_path = telluric_path * len(images_paths)
else:
print("CHECK THE telluric_path")
telluric_path = [None]*len(images_paths)
stare_spectrums = sorted(glob.glob(f'{data_path}/*/*MERGE1D*'))
if len(stare_spectrums) != len(images_paths):
if len(stare_spectrums) == 1:
print('We will use the same stare_spectrums for all the images ')
stare_spectrums = stare_spectrums * len(images_paths)
else:
stare_spectrums = [None]*len(images_paths)
print("CHECK THE stare_spectrums")
return images_paths,response_paths,telluric_path,stare_spectrums
[docs]
def list_builder(list_,value):
try:
if len(list_)==len(value) and any(isinstance(value[i],list) for i in range(len(value))):
return value
elif len(value) != len(list_) and any(isinstance(value[i],list) for i in range(len(value))):
raise ValueError(
f"IT CANT BE DONE CHECK THE INPUT length value: {value} "
f"is different from number of images: {list_}"
)
elif isinstance(value,list):
return [value] *len(list_)
except:
if value is None:
return [None]*len(list_)
elif isinstance(value,bool) or isinstance(value,str) or isinstance(value,int):
return [value] *len(list_)
[docs]
def inpaint_nans(im, kernel_size=5):
# Taken from http://stackoverflow.com/a/21859317/6519723
ipn_kernel = np.ones((kernel_size, kernel_size)) # kernel for inpaint_nans
ipn_kernel[int(kernel_size/2), int(kernel_size/2)] = 0
nans = np.isnan(im)
while np.sum(nans)>0:
im[nans] = 0
vNeighbors = signal.convolve2d((nans == False), ipn_kernel, mode='same', boundary='symm')
im2 = signal.convolve2d(im, ipn_kernel, mode='same', boundary='symm')
im2[vNeighbors > 0] = im2[vNeighbors > 0]/vNeighbors[vNeighbors > 0]
im2[vNeighbors == 0] = np.nan
im2[(nans == False)] = im[(nans == False)]
im = im2
nans = np.isnan(im)
return im
[docs]
def interpolate_array_with_nans_np(array,mask=None):
if mask is None:
mask = np.isnan(array)
x = np.where(~mask)[0]
y = array[~mask]
if len(x)==0:
return np.zeros(array.shape)
y_new = np.interp(np.where(mask), x, y,right=np.nan,left=np.nan)
array[np.where(mask)] = y_new
return array
[docs]
def remove_nan(array,mask,verbose=False):
#we are agreement at 1e-7 for 0.9999 of the values with the method of the guys
if mask.shape != mask.shape:
return print("The mask and the array should have the same shape")
if len(array.shape) == 2:
array = array[np.newaxis]
mask = mask[np.newaxis]
A = deepcopy(array)
mask = mask.astype(bool)
for i in range(array.shape[0]):
a = A[i].copy()
#R code not implemented yet
# if(OLD=="TRUE"){
# Mc[1:2,]<-0
# Mc[,1:2]<-0
# Mc[dim(Mc)[1]-(0:1),]<-0
# Mc[,dim(Mc)[2]-(0:1)]<-0
# }
a[~mask[i]] = np.nan
UT = 1
UT2 = []
while UT != 0:
Na1 = np.sum(~mask[i])
#Mc = interpolate_array_3d_reshape(A[i],mask[i])
Mc = np.apply_along_axis(interpolate_array_with_nans_np, axis=1, arr=a)
Na2 = np.isnan(Mc).sum()
if verbose:
print("na",Na1)
print("na2",Na2)
if Na1 == Na2:
Mc = np.apply_along_axis(interpolate_array_with_nans_np, axis=1, arr=Mc.T).T
UT = np.isnan(Mc).sum()
UT2.append(UT)
if len(UT2)>6:
lu = len(UT2)
#this is a condition to avoid infinite loops
if (UT2[lu-1] == UT2[lu-2]) & (UT2[lu-2] == UT2[lu-3]) & (UT2[lu-3] == UT2[lu-4]): #for R the last element is len(l) for python is len(l)-1
UT = 0
A[i] = Mc
local_nan = np.isnan(Mc)
if local_nan.sum() != 0:
A[i][local_nan] = array[i][local_nan]
return np.squeeze(A)
[docs]
def sigma_clip_replace(data, sigma=3.0, max_iter=5, replace_with='mean',**kwargs):
"""
Perform iterative sigma-clipping on 'data' and replace outliers with
either the mean or median of the *valid* data in each iteration.
Parameters
----------
data : array_like
1D or 2D numpy array of your data (e.g. pixel values).
sigma : float, optional
Sigma-clipping limit (number of standard deviations).
max_iter : int, optional
Maximum number of iterations.
replace_with : {'mean', 'median'}, optional
Replacement strategy for outliers.
Returns
-------
clipped_data : numpy.ndarray
Copy of 'data' with outliers replaced.
mask : numpy.ndarray (bool)
Boolean mask array of the same shape as data. `True` indicates
pixels considered outliers in the final iteration.
"""
# Make a copy so we don't overwrite original data
clipped_data = np.array(data, copy=True, dtype=float)
clipped_error = kwargs.get("error",clipped_data)
for i in range(max_iter):
# Compute statistics on the current valid data
valid_mask = ~np.isnan(clipped_data)
current_valid_data = clipped_data[valid_mask]
if current_valid_data.size < 2:
# Not enough data to compute meaningful stats
break
mean_val = np.mean(current_valid_data)
median_val = np.median(current_valid_data)
std_val = np.std(current_valid_data)
# Define "low" and "high" cutoff
lower_bound = mean_val - sigma * std_val
upper_bound = mean_val + sigma * std_val
# Determine which pixels are outliers
old_outlier_mask = (clipped_data < lower_bound) | (clipped_data > upper_bound) | (clipped_error>clipped_data)
# Stop if there are no more outliers
if not np.any(old_outlier_mask):
break
# Decide on the replacement value: mean or median
if replace_with.lower() == 'mean':
replacement_value = mean_val
else:
replacement_value = median_val
# Replace outliers
clipped_data[old_outlier_mask] = replacement_value
# After replacing, check if the outlier mask remains the same
# in the next iteration, or continue until max_iter is reached
new_valid_mask = ~np.isnan(clipped_data)
new_valid_data = clipped_data[new_valid_mask]
new_mean_val = np.mean(new_valid_data)
new_std_val = np.std(new_valid_data)
# If the change in mean or std is negligible, we can consider stopping early
if np.isclose(mean_val, new_mean_val, rtol=1e-5) and np.isclose(std_val, new_std_val, rtol=1e-5):
break
# One final mask of outliers, using the final statistics
final_valid_data = clipped_data[~np.isnan(clipped_data)]
if final_valid_data.size < 2:
final_mask = np.zeros_like(data, dtype=bool)
else:
final_mean = np.mean(final_valid_data)
final_std = np.std(final_valid_data)
final_lower = final_mean - sigma * final_std
final_upper = final_mean + sigma * final_std
final_mask = (clipped_data < final_lower) | (clipped_data > final_upper)
return clipped_data, final_mask
[docs]
def clipping_region(image,pieces,slice_, sigma=2.0, max_iter=5, replace_with='mean',where_is_the_signal=[]):
image_clipped = np.zeros_like(image)
for j in range(image.shape[0]):
sigma_i = sigma
if j in where_is_the_signal:
sigma_i = 3
for i in range(0,pieces):
image_clipped[j,slice_*i:slice_*(i+1)] = sigma_clip_replace(image[j,slice_*i:slice_*(i+1)], sigma=sigma_i, max_iter=max_iter, replace_with=replace_with)[0]
return image_clipped