Source code for tuskitoo.SpectralExtraction.fitting

from .function_maker import create_multigaussian_model, create_multimoffat_model
import warnings
warnings.filterwarnings("ignore")
from multiprocessing import cpu_count
from parallelbar import progress_imap, progress_map, progress_imapu
from parallelbar.tools import cpu_bench, fibonacci
import numpy as np
from copy import deepcopy

[docs] def make_fit(ydata:np.array,num_source=2,initial_center=None,initial_separation=None ,bound_sigma=None,fix_sep=None,fix_height=None,custom_expr=None ,weights=None,param_limit=None,param_fix=None,param_value=None,verbose=False,\ distribution="gaussian",**kwargs): """ Fits a model to the provided data using either a Gaussian or Moffat profile. Parameters: ----------- ydata : array-like The dependent data (observations or measurements) to be fitted by the model. num_source : int, optional, default=2 Number of sources (or components) to include in the fitting model. initial_center : float, int, or None, optional, default=None Initial estimate for the center (mean or peak position) of each source/component. initial_separation : list or None, optional, default=None Initial guess for the separation between the sources/components. bound_sigma : array-like or None, optional, default=None Bounds or constraints on the sigma (standard deviation) for components beyond the first. fix_sep : list, bool, or None, optional, default=None List of source indices for which the separation is fixed (i.e., not varied during fitting). fix_height : bool or None, optional, default=None Flag to fix the height (amplitude) of the sources/components. custom_expr : dict or None, optional, default=None Dictionary mapping parameter names to custom expressions for the fitting function. Refer to https://lmfit.github.io/lmfit-py/index.html for details. weights : array-like or None, optional, default=None Weights for each data point in ydata; useful for weighted fitting. param_limit : dict or None, optional, default=None Dictionary specifying limits (min and max) for parameters. Example: {'sigma_1': (0.1, 5.0), 'center_1': (100, 200)} param_fix : list or None, optional, default=None List of parameter names to be fixed at a specified value during fitting. param_value : dict or None, optional, default=None Dictionary of initial parameter values. Example: {'height_1': 10.0, 'sigma_1': 2.0} verbose : bool, optional, default=False If True, prints detailed information during the fitting process. distribution : str, optional, default="gaussian" Type of distribution to use for fitting. Supported options are "gaussian" and "moffat". Returns: -------- result : lmfit.model.ModelResult The result of the fitting process, including fitted parameters and fit statistics. Raises: ------- ValueError: If the provided distribution is not among the supported types. """ if distribution=="moffat": model, params,xdata = create_multimoffat_model(num_source,ydata, initial_separation=initial_separation,initial_center=initial_center) elif distribution=="gaussian": model, params,xdata = create_multigaussian_model(num_source,ydata, initial_separation=initial_separation,initial_center=initial_center) else: raise ValueError(f"the distribution {distribution} not available, only can be use [gaussian, moffat]") if bound_sigma: #se podria definir a cual sigma atar for i in bound_sigma: if i>num_source: continue params["sigma_"+str(i)].expr=f'sigma_{1}' if fix_sep and initial_separation: for i in fix_sep: if i>num_source: continue params["separation_"+str(i)].vary = False if custom_expr: for param, expr in custom_expr.items(): params[param].expr = expr if param_limit: for param, limit in param_limit.items(): params[param].min, params[param].max = limit if param_value: for param, value in param_value.items(): params[param].value = value if param_fix: for param in param_fix: params[param].vary = False result = model.fit(ydata, params, x=xdata, weights=weights)#,max_nfev=200 if verbose: print(f"Model parameters {params}") if kwargs.get("model_and_result",False): return result,model return result
[docs] def parallel_fit(image,error,num_source,pixel_limit=None,n_cpu=None,mask_list=[],**kwargs): ##Weights = 1 / (error^2) """ Perform parallel fitting on a 2D image using multiple sources. Parameters: ----------- image : 2D array-like The image data to be fitted. error : 2D array-like The error (sigma) associated with the image data. num_source : int Number of sources (or components) to include in the fitting model. pixel_limit : list or tuple of two ints, optional, default=None Column indices [start, end] that limit the processing region. If None, all columns are processed. n_cpu : int or None, optional, default=None Number of CPU cores to use for parallel processing. Defaults to the total available cores. wavelenght : list, optional, default=[] (Not used yet) Intended to hold wavelength information for each pixel. mask_list : list, optional, default=[] List of column ranges to mask (exclude) from fitting. Each element should be a two-element iterable (start, stop). **kwargs : dict Additional keyword arguments to be passed to the make_fit function. These can include: - initial_center - initial_separation - bound_sigma - fix_sep - fix_height - custom_expr - weights - param_limit - param_fix - param_value - verbose - distribution Returns: -------- spectral_extraction_results : dict A dictionary containing: - "value": Fitted parameter values. - "std": Parameter uncertainties. - "name_params": Names of the fitted parameters. - "extra_params": Extra fit statistics (e.g., chisqr, redchi, AIC, BIC, R-squared). - "normalized_image": The normalized image used for fitting. - "normalize_matrix": Normalization factors for each pixel column. - "num_source": Number of sources used. - "distribution": Distribution type used. - "mask": The mask applied to the image. - "parameter_number": Number of parameters per source. """ if not n_cpu: n_cpu = cpu_count() distribution = kwargs.get("distribution", "gaussian") #if "distribution" not in kwargs.keys(): # kwargs["distribution"]= "gaussian" proc_image = np.copy(image) proc_error = np.copy(error) # Determine pixel limits along the column axis. if isinstance(pixel_limit, (list, tuple)) and len(pixel_limit) == 2: col_start, col_end = pixel_limit else: col_start, col_end = 0, image.shape[1] pixel_limit = [0, image.shape[1]] if isinstance(mask_list, list): mask = np.ones_like(proc_image, dtype=bool) for mask_range in mask_list: mask[:, slice(*mask_range)] = False proc_image = proc_image * mask proc_error = proc_error * mask parameter_number = 3 if distribution == "gaussian" else 4 proc_image = proc_image[:, col_start:col_end] proc_error = proc_error[:, col_start:col_end] x_num = len(proc_image.T) with np.errstate(divide='ignore', invalid='ignore'): weight = 1.0 / np.square(proc_error) norm_factor = np.abs(proc_image).max(axis=0) norm_factor[norm_factor == 0] = 1 normalized_image = np.nan_to_num(proc_image / norm_factor) normalized_weight = np.nan_to_num(weight / norm_factor) init_trace = kwargs.get("init_trace") global process_pixel def process_pixel(args): n_pixel, pixel,pixel_weight = args if isinstance(init_trace, np.ndarray): kwargs["initial_center"] = init_trace[n_pixel] #if i want to add a parameter for the stats part of the matrix always should be and the left of it if np.all(pixel== 0) or np.all(pixel== np.nan): return list([0]*parameter_number*num_source)+[1e15]*parameter_number*num_source+ list([np.nan]*parameter_number*num_source)+[1e15,1e15,1e15,1e15,1e15,n_pixel,x_num] fiting = make_fit(pixel, num_source=num_source,weights=pixel_weight,**kwargs) if not np.all(fiting.covar): return list([0]*parameter_number*num_source)+[1e15]*parameter_number*num_source+ list(fiting.values.keys())+[1e15,1e15,1e15,1e15,1e15,n_pixel,x_num] return list(np.array([[value.value,value.stderr] for key,value in fiting.params.items()]).T.reshape(parameter_number*num_source*2,))+ list(fiting.values.keys()) + [fiting.chisqr,fiting.redchi,fiting.aic,fiting.bic,fiting.rsquared,n_pixel,x_num] print(f"The code will be executed in {n_cpu} core using {num_source} sources an a {distribution} distribution") args = [(n_pixel, pixel,pixel_weight) for n_pixel, pixel,pixel_weight in zip(np.arange(*pixel_limit) ,normalized_image.T,normalized_weight.T)] normalize_matrix = norm_factor[:,np.newaxis] full_fit = np.array(progress_map(process_pixel, args, process_timeout=20, n_cpu=n_cpu,need_serialize=False)) results = full_fit[:,0:parameter_number*num_source].astype(float) errors = full_fit[:,parameter_number*num_source:2*parameter_number*num_source].astype(float) name_params = full_fit[:,2*parameter_number*num_source:-7] mask_names=~np.all(name_params == 'nan', axis=1) name_params = name_params[mask_names][0] extra_params = full_fit[:,-7:].astype(float) #print(normalized_image *norm_factor == proc_image) spectral_extraction_results = {"value":results,"std":errors,"name_params":name_params,"extra_params":extra_params,"normalized_image":normalized_image,"normalize_matrix":normalize_matrix,"num_source":num_source,"distribution":distribution,"mask":mask,"parameter_number":parameter_number} #,"original_image":image return spectral_extraction_results