Source code for emgfit.sample

################################################################################
##### Python module for creating simulated time-of-flight mass spectra with
##### Gaussian and hyper-exponentially-modified Gaussian lines shapes
##### Author: Stefan Paul

import numpy as np
import pandas as pd
import warnings
from scipy.stats import exponnorm, uniform, norm

################################################################################
##### Define functions for drawing random variates from Gaussian and hyper-EMG
##### PDFs
norm_precision = 1e-06 # required precision for normalization of eta parameters


[docs]def Gaussian_rvs(mu, sigma , N_samples=None): """Draw random samples from a Gaussian probability density function Parameters ---------- mu : float Nominal position of simulated peak (mean of Gaussian). sigma : float Nominal standard deviation of the simulated Gaussian peak. N_samples : int, optional, default: 1 Number of random events to sample. Returns ------- :class:`numpy.ndarray` of floats Array with simulated events. """ rvs = norm.rvs(loc=mu, scale=sigma, size=N_samples) return rvs
[docs]def _h_m_i_rvs(mu, sigma, tau_m, N_i): """Helper function for definition of h_m_emg_rvs """ rvs = mu - exponnorm.rvs(loc=0,scale=sigma,K=tau_m/sigma,size=N_i) return rvs
[docs]def h_m_emg_rvs(mu, sigma, *t_args,N_samples=None): """Draw random samples from negative skewed hyper-EMG probability density Parameters ---------- mu : float Nominal position of simulated peak (mean of underlying Gaussian). sigma : float Nominal standard deviation (of the underlying Gaussian) of the simulated hyper-EMG peak. theta : float Mixing weight of pos. & neg. skewed EMG distributions. t_args : list of lists of float List containing lists of the EMG tail parameters with the signature: [[eta_m1, eta_m2, ...], [tau_m1, tau_m2, ...]] N_samples : int, optional, default: 1 Number of random events to sample. Returns ------- :class:`numpy.ndarray` of floats Array with simulated events. """ if not isinstance(N_samples,int): raise TypeError("N_samples must be of type int") li_eta_m = t_args[0] li_tau_m = t_args[1] t_order_m = len(li_eta_m) # order of negative tail exponentials if abs(sum(li_eta_m) - 1) > norm_precision: raise Exception("eta_m's don't add up to 1.") if len(li_tau_m) != t_order_m: # check if all arguments match tail order raise Exception("orders of eta_m and tau_m do not match!") # randomly distribute ions between tails according to eta_m weights tail_nos = np.random.choice(range(t_order_m),size=N_samples,p = li_eta_m) rvs = np.array([]) for i in range(t_order_m): N_i = np.count_nonzero(tail_nos == i) tau_m = li_tau_m[i] rvs_i = _h_m_i_rvs(mu,sigma,tau_m,N_i) rvs = np.append(rvs,rvs_i) return rvs
[docs]def _h_p_i_rvs(mu, sigma, tau_p, N_i): """Helper function for definition of h_p_emg_rvs """ rvs = exponnorm.rvs(loc=mu,scale=sigma,K=tau_p/sigma,size=N_i) return rvs
[docs]def h_p_emg_rvs(mu, sigma, *t_args, N_samples=None): """Draw random samples from pos. skewed hyper-EMG probability density Parameters ---------- mu : float Nominal position of simulated peak (mean of underlying Gaussian). sigma : float Nominal standard deviation (of the underlying Gaussian) of the simulated hyper-EMG peak. t_args : list of lists of float List containing lists of the EMG tail parameters with the signature: [[eta_p1, eta_p2, ...], [tau_p1, tau_p2, ...]] N_samples : int, optional, default: 1 Number of random events to sample. Returns ------- :class:`numpy.ndarray` of floats Array with simulated events. """ if not isinstance(N_samples,int): raise TypeError("N_samples must be of type int") li_eta_p = t_args[0] li_tau_p = t_args[1] t_order_p = len(li_eta_p) # order of negative tail exponentials if abs(sum(li_eta_p) - 1) > norm_precision: raise Exception("eta_p's don't add up to 1.") if len(li_tau_p) != t_order_p: # check if all arguments match tail order raise Exception("orders of eta_p and tau_p do not match!") # randomly distribute ions between tails according to eta_p weights tail_nos = np.random.choice(range(t_order_p),size=N_samples,p = li_eta_p) rvs = np.array([]) for i in range(t_order_p): N_i = np.count_nonzero(tail_nos == i) tau_p = li_tau_p[i] rvs_i = _h_p_i_rvs(mu,sigma,tau_p,N_i) rvs = np.append(rvs,rvs_i) return rvs
[docs]def h_emg_rvs(mu, sigma , theta, *t_args, N_samples=None): """Draw random samples from a hyper-EMG probability density function Parameters ---------- mu : float Nominal position of simulated peak (mean of underlying Gaussian). sigma : float Nominal standard deviation (of the underlying Gaussian) of the simulated hyper-EMG peak. theta : float Mixing weight of pos. & neg. skewed EMG distributions. t_args : list of lists of float List containing lists of the EMG tail parameters with the signature: [[eta_m1, eta_m2, ...], [tau_m1, tau_m2, ...], [eta_p1, eta_p2, ...], [tau_p1, tau_p2, ...]] N_samples : int, optional, default: 1 Number of random events to sample. Returns ------- :class:`numpy.ndarray` of floats Array with simulated events. """ if not isinstance(N_samples,int): raise TypeError("N_samples must be of type int") li_eta_m = t_args[0] li_tau_m = t_args[1] li_eta_p = t_args[2] li_tau_p = t_args[3] if theta == 1: rvs = h_m_emg_rvs(mu, sigma, li_eta_m, li_tau_m, N_samples=N_samples) elif theta == 0: rvs = h_p_emg_rvs(mu, sigma, li_eta_p, li_tau_p, N_samples=N_samples) else: # randomly distribute ions between h_m_emg and h_p_emg according to # left-right-weight theta: neg = np.random.choice([1,0],size=N_samples,p = [theta,1-theta]) N_m = int(np.sum(neg)) #int(np.round(theta*N_samples)) # np.rint(theta*N_samples,dtype=int) N_p = N_samples - N_m # np.rint((1-theta)*N_samples,dtype=int) rvs_m = h_m_emg_rvs(mu, sigma, li_eta_m, li_tau_m, N_samples=N_m) rvs_p = h_p_emg_rvs(mu, sigma, li_eta_p, li_tau_p, N_samples=N_p) rvs = np.append(rvs_m,rvs_p) return rvs
################################################################################ ##### Define functions for simulating events or spectra through random sampling ##### from a reference distribution
[docs]def simulate_events(shape_pars, mus, amps, bkg_c, N_events, x_min, x_max, out='hist', scl_facs=None, N_bins=None, bin_cens=None): """Create simulated detector events drawn from a user-defined probability density function (PDF) Events can either be output as a list of single events (mass stamps) or as a histogram. In histogram output mode, uniform binning is easily realized by specifying the `N_bins` argument. More control over the binning can be achieved by parsing the desired bin centers to the `bin_cens` argument (e.g. for non-uniform binning). Parameters ---------- shape_pars : dict Peak-shape parameters to use for sampling. The dictionary must follow the structure of the :attr:`~spectrum.shape_cal_pars` attribute of the :class:`~emgfit.spectrum.spectrum` class. mus : float or list of float Nominal peak positions of peaks in simulated spectrum. amps : float or list of float [(counts in peak)*(bin width in u)] Nominal amplitudes of peaks in simulated spectrum. bkg_c : float [counts per bin], optional, default: 0.0 Nominal amplitude of uniform background in simulated spectrum. x_min : float Beginning of sampling x-range. x_max : float End of sampling x-range. scl_facs : float or list of float, optional Scale factors to use for scaling the scale-dependent shape parameters in `shape_pars` to a given peak before sampling events. If `None`, no shape-parameter scaling is applied. N_events : int, optional, default: 1000 Total number of events to simulate (signal and background events). out : str, optional Output format of sampled data. Options: - ``'hist'`` for binned mass spectrum (default). The centres of the mass bins must be specified with the `bin_cens` argument. - ``'array'`` for unbinned array of single ion and background events. N_bins : int, optional Number of uniform bins to use in ``'hist'`` output mode. The **outer** edges of the first and last bin are fixed to the start and end of the sampling range respectively (i.e. `x_min` and `x_max`). In between, bins are distributed with a fixed spacing of (`x_max`-`x_min`)/`N_bins`. bin_cens : :class:`numpy.ndarray` Centres of bins to use in ``'hist'`` output mode. This argument allows the realization of non-uniform binning. Bin edges are centred between neighboring bins. Note: Bins outside the sampling range defined with `x_min` and `x_max` will be empty. Returns ------- :class:`pandas.Dataframe` or :class:`numpy.ndarray` If out='hist' a dataframe with a histogram of the format [bin centre, counts in bin] is returned. If out='array' an unbinned array with the x-values of single ion or background events is returned. Notes ----- Random events are created via custom hyper-EMG extensions of Scipy's :meth:`scipy.stats.exponnorm.rvs` method. Currently, all simulated peaks have identical width and shape (no re-scaling of mass-dependent shape parameters to a peak's mass centroid). **Mind the different units for peak amplitudes `amps` (<counts in peak> * <bin width in x-axis units>) and the background level `bkg_c` (counts per bin).** When spectrum data is simulated counts are distributed between the different peaks and the background with probability weights `amps` / <bin width in u> and `bkg_c` * <number of bins>, respectively. As a consequence, simply changing `N_events` (while keeping all other arguments constant), will cause `amps` and `bkg_c` to deviate from their nominal units. """ mus = np.atleast_1d(mus) amps = np.atleast_1d(amps) if len(mus) != len(amps): raise Exception("Lengths of `mus` and `amps` arrays must match.") if type(N_events) != int: raise Exception("`N_events` must be of type int.") if (mus < x_min).any() or (mus > x_max).any(): msg = str("At least one peak centroid in `mus` is outside the sampling range.") warnings.warn(msg, UserWarning) if scl_facs is None: scl_facs = np.ones_like(mus) else: scl_facs = np.atleast_1d(scl_facs) if len(mus) != len(scl_facs): raise Exception("Lengths of `mus` and `scl_facs` must match.") sample_range = x_max - x_min # Get bin parameters if N_bins is not None and bin_cens is not None: msg = "Either specify the `N_bins` OR the `bin_cens` argument." raise Exception(msg) elif bin_cens is not None: # user-defined bins N_bins = len(bin_cens) bin_edges = np.empty((N_bins+1,)) spacings = bin_cens[1:] - bin_cens[:-1] inner_edges = bin_cens[:-1] + spacings/2 bin_edges[1:-1] = inner_edges # set inner bin edges # Get outer edges width_start = bin_cens[1] - bin_cens[0] bin_edges[0] = bin_cens[0] - width_start/2 # set first bin edge width_end = bin_cens[-1] - bin_cens[-2] bin_edges[-1] = bin_cens[-1] + width_end/2 # set last bin edge bin_width = (bin_edges[-1] - bin_edges[0])/N_bins # AVERAGE bin width elif N_bins is not None: # automatic uniform binning bin_edges = np.linspace(x_min, x_max, num=N_bins+1, endpoint=True) bin_width = sample_range/N_bins bin_cens = bin_edges[:-1] + bin_width/2 elif out == "array": bin_width = 1. N_bins = 1. pass else: raise Exception("`N_bins` or `bin_cens` argument must be specified!") # Prepare shape parameters sigma = shape_pars['sigma'] li_eta_m = [] li_tau_m = [] li_eta_p = [] li_tau_p = [] for key, val in shape_pars.items(): if key.startswith('eta_m'): li_eta_m.append(val) if key.startswith('tau_m'): li_tau_m.append(val) if key.startswith('eta_p'): li_eta_p.append(val) if key.startswith('tau_p'): li_tau_p.append(val) if len(li_tau_m) == 0 and len(li_tau_p) == 0: # Gaussian theta = -1 # flag for below else: try: theta = shape_pars['theta'] except KeyError: pass if len(li_eta_m) == 0 and len(li_tau_m) == 1: # emg1X li_eta_m = [1] elif len(li_eta_m) == 0 and len(li_tau_m) == 0: # emg0X theta = 0 if len(li_eta_p) == 0 and len(li_tau_p) == 1: # emgX1 li_eta_p = [1] elif len(li_eta_p) == 0 and len(li_tau_p) == 0: # emgX0 theta = 1 if len(li_eta_m) > 0 and abs(sum(li_eta_m) - 1) > norm_precision: msg = "Sum of elements in li_eta_m is not normalized to within {}.".format(norm_precision) raise Exception(msg) if len(li_eta_m) != len(li_tau_m): raise Exception("Lengths of li_eta_m and li_tau_m do not match.") if len(li_eta_p) > 0 and abs(sum(li_eta_p) - 1) > norm_precision: msg = "Sum of elements in li_eta_p is not normalized to within {}.".format(norm_precision) raise Exception(msg) if len(li_eta_p) != len(li_tau_p): raise Exception("Lengths of li_eta_p and li_tau_p do not match.") # Distribute counts over different peaks and background (bkgd) # randomly distribute ions using amps and c_bkg as prob. weights N_peaks = len(amps) counts = np.append(amps/bin_width,bkg_c*N_bins) # cts in each peak & bkgd weights = counts/np.sum(counts) # normalized probability weights peak_dist = np.random.choice(range(N_peaks+1), size=N_events, p=weights) N_bkg = np.count_nonzero(peak_dist == N_peaks) # calc. number of bkgd counts events = np.array([]) # Create & add random samples from each individual peak for i in range(N_peaks): N_i = np.count_nonzero(peak_dist == i) # get no. of ions in peak if theta == -1: # Gaussian events_i = Gaussian_rvs(mus[i], sigma*scl_facs[i], N_samples=N_i) else: # hyper-EMG events_i = h_emg_rvs(mus[i], sigma*scl_facs[i], theta, li_eta_m, np.array(li_tau_m)*scl_facs[i], li_eta_p, np.array(li_tau_p)*scl_facs[i], N_samples=N_i) events = np.append(events, events_i) # Create & add background events bkg = uniform.rvs(size=N_bkg, loc=x_min, scale=sample_range) events = np.append(events, bkg) # Discard events outside of specified sampling range events = events[np.logical_and(events >= x_min, events <= x_max)] N_discarded = N_events - events.size if N_discarded > 0: msg = str("{:.0f} simulated events fell outside the specified sampling " "range and were discarded. Peak areas and area ratios might " "deviate from expectation.").format(N_discarded) warnings.warn(msg, UserWarning) # Return unbinned array of events or dataframe with histogram if out == 'array': np.random.shuffle(events) # randomize event ordering return events elif out == 'hist': y = np.histogram(events, bins=bin_edges)[0] df = pd.DataFrame(data=y, index=bin_cens, columns = ['Counts']) df.index.rename('m/z [u]', inplace=True) return df
[docs]def simulate_spectrum(spec, x_cen=None, x_range=None, mus=None, amps=None, scl_facs=None, bkg_c=None, N_events=None, copy_spec=False): """Create a simulated spectrum using the attributes of a reference spectrum The peak shape of the sampling probability density function (PDF) follows the shape calibration of the reference spectrum (`spec`). By default, all other parameters of the sampling PDF are identical to the best-fit parameters of the reference spectrum. If desired, the positions, amplitudes and number of peaks in the sampling PDF as well as the background level can be changed with the `mus`, `amps` and `bkg_c` arguments. Parameters ---------- spec : :class:`~emgfit.spectrum.spectrum` Reference spectrum object whose best-fit parameters will be used to sample from. mus : float or list of float, optional Nominal peak centres of peaks in simulated spectrum. Defaults to the mus of the reference spectrum fit. amps : float or list of float [(counts in peak)*(bin width in u)], optional Nominal amplitudes of peaks in simulated spectrum. Defaults to the amplitudes of the reference spectrum fit. scl_facs : float or list of float, optional Scale factors to use for scaling the scale-dependent shape parameters in `shape_pars` to a given peak before sampling events. Defaults to the scale factors asociated with the fit results stored in the reference spectrum's :attr:`spectrum.fit_results` attribute. bkg_c : float [counts per bin], optional Nominal amplitude of uniform background in simulated spectrum. Defaults to the c_bkg obtained in the fit of the first peak in the reference spectrum. x_cen : float, optional Center of simulated x-range. Defaults to `x_cen` of `spec`. x_range : float, optional Covered x-range of simulated spectrum. Defaults to `x_range` of `spectrum`. N_events : int, optional Number of ion events to simulate (including background events). Defaults to total number of events in `spec`. copy_spec : bool, optional, default: False If `False` (default), this function returns a fresh :class:`~emgfit.spectrum.spectrum` object created from the simulated mass data. If `True`, this function returns an exact copy of `spec` with only the :attr:`data` attribute replaced by the new simulated mass data. Returns ------- :class:`~emgfit.spectrum.spectrum` If `copy_spec = False` (default) a fresh spectrum object holding the simulated mass data is returned. If `copy_spec = True`, a copy of the reference spectrum `spec` is returned with only the :attr:`data` attribute replaced by the new simulated mass data. Notes ----- Random events are created via custom Hyper-EMG extensions of Scipy's :meth:`scipy.stats.exponnorm.rvs` method. The returned spectrum follows the binning of the reference spectrum. Mind the different units for peak amplitudes `amps` (<counts in peak> * <bin width in x-axis units>) and the background level `bkg_c` (counts per bin). When spectrum data is simulated counts are distributed between the different peaks and the background with probability weights `amps` / <bin width in x-axis units> and `bkg_c` * <number of bins>, respectively. As a consequence, simply changing `N_events` (while keeping all other arguments constant), will cause `amps` and `bkg_c` to deviate from their nominal units. """ if spec.fit_results is [] or None: raise Exception("No fit results found in reference spectrum `spec`.") if x_cen is None and x_range is None: x = spec.data.index.values bin_width_start = x[1] - x[0] bin_width_end = x[-1] - x[-2] x_min = x[0] - bin_width_start/2 x_max = x[-1] + bin_width_end/2 indeces = range(len(spec.peaks)) # get peak indeces in sampling range else: x_min = x_cen - x_range x_max = x_cen + x_range # Get peak indeces in sampling range: peaks = spec.peaks indeces = [i for i in range(len(peaks)) if x_min <= peaks[i].x_pos <= x_max] if mus is None: if len(indeces) == 0: msg = str("No peaks in sampling range.") warnings.warn(msg, UserWarning) mus = [] for i in indeces: result = spec.fit_results[i] pref = 'p{0}_'.format(i) mus.append(result.best_values[pref+'mu']) if amps is None: amps = [] for i in indeces: result = spec.fit_results[i] pref = 'p{0}_'.format(i) amps.append(result.best_values[pref+'amp']) if scl_facs is None: scl_facs = [] for i in indeces: result = spec.fit_results[i] pref = 'p{0}_'.format(i) try: scl_facs.append(result.params[pref+'scl_fac']) except KeyError: scl_facs.append(1.0) if not (len(mus)==len(amps)==len(scl_facs)): msg = "`mus`, `amps` and `scl_facs` argments must have the same length!" raise Exception(msg) if bkg_c is None: if len(indeces) == 0: raise Exception("Zero background and no peaks in sampling range.") result = spec.fit_results[indeces[0]] bkg_c = result.best_values['bkg_c'] if N_events is None: # Get total number of counts in simulated region of original spectrum: N_events = int(spec.data[x_min:x_max]["Counts"].sum()) # Create histogram with Monte Carlo events x = spec.data[x_min:x_max].index.values df = simulate_events(spec.shape_cal_pars, mus, amps, bkg_c, N_events, x_min, x_max, out='hist', scl_facs=scl_facs, N_bins=None, bin_cens=x) # Copy original spectrum and overwrite data # This copies all results such as peak assignments, PS calibration, # fit_results etc. if copy_spec: from copy import deepcopy new_spec = deepcopy(spec) new_spec.data = df else: # Define a fresh spectrum with sampled data from emgfit.spectrum import spectrum new_spec = spectrum(df=df, show_plot=False) return new_spec
[docs]def fit_simulated_spectra(spec, fit_result, alt_result=None, N_spectra=1000, seed=None, randomize_ref_mus_and_amps=False, MC_shape_par_samples=None, n_cores=-1): """Fit spectra simulated via sampling from a reference distribution This function performs fits of many simulated spectra. The simulated spectra are created by sampling events from the best-fit PDF asociated with `fit_result` (as e.g. needed for a parametric bootstrap). The `alt_model` can be used to perform the fits with a different distribution than the reference PDF used for the event sampling. Parameters ---------- spec : :class:`emgfit.spectrum.spectrum` Spectrum object to perform bootstrap on. fit_result : :class:`lmfit.model.ModelResult` Fit result object holding the best-fit distribution to sample from. alt_result : :class:`lmfit.model.ModelResult`, optional Fit result object holding a prepared fit model to be used for the fitting. Defaults to the fit model stored in `fit_result`. N_spectra : int, optional Number of simulated spectra to fit. Defaults to 1000, which typically yields statistical uncertainty estimates with a Monte Carlo uncertainty of a few percent. randomize_ref_mus_and_amps : bool, default: False If `True`, the peak and background amplitudes and the peak centroids of the reference spectrum to sample from will be varied assuming normal distributions around the best-fit values with standard deviations given by the respective standard errors stored in `fit_result`. MC_shape_par_samples : :class:`pandas.DataFrame` Monte Carlo shape parameter samples to use in the fitting. seed : int, optional Random seed to use for reproducible sampling. n_cores : int, optional Number of CPU cores to use for parallelized fitting of simulated spectra. When set to `-1` (default) all available cores are used. Returns ------- :class:`numpy.ndarray` of :class:`lmfit.minimizer.MinimizerResult` MinimizerResults obtained in the fits of the simulated spectra. Note ---- The `randomize_ref_mus_and_amps` option allows one to propagate systematic uncertainties in the determination of the reference parameters into the Monte Carlo results. If varying the centroid and amplitude parameters of the reference spectrum, the standard deviations of the parameter distributions will be taken as the respective standard errors determined by lmfit (see lmfit fit report table) and might not be consistent with the area and mass uncertainty estimates shown in the peak properties table. See also -------- :meth:`emgfit.spectrum.spectrum.get_errors_from_resampling` :func:`emgfit.sample.simulate_events` for details on the event sampling. """ bkg_c = fit_result.best_values['bkg_c'] bkg_c_err = fit_result.params['bkg_c'].stderr method = fit_result.method shape_pars = spec.shape_cal_pars x_cen = fit_result.x_fit_cen x_range = fit_result.x_fit_range x = fit_result.x y = fit_result.y x_min = x_cen - 0.5*x_range x_max = x_cen + 0.5*x_range if alt_result is None: fit_model = fit_result.fit_model model = fit_result.model init_pars = fit_result.init_params else: fit_model = alt_result.fit_model model = alt_result.model init_pars = alt_result.init_params # Determine tail order of fit model for normalization of initial etas if fit_result.fit_model.startswith('emg'): n_ltails = int(fit_result.fit_model.lstrip('emg')[0]) n_rtails = int(fit_result.fit_model.lstrip('emg')[1]) else: n_ltails = 0 n_rtails = 0 # Collect aLL peaks, peak centroids and amplitudes of fit_result fitted_peaks = [idx for idx, p in enumerate(spec.peaks) if x_min < p.x_pos < x_max] # indeces of all fitted peaks mus, mu_errs = [], [] amps, amp_errs = [], [] scl_facs = [] for idx in fitted_peaks: pref = 'p{0}_'.format(idx) mus.append(fit_result.best_values[pref+'mu']) mu_errs.append(fit_result.params[pref+'mu'].stderr) amps.append(fit_result.best_values[pref+'amp']) amp_errs.append(fit_result.params[pref+'amp'].stderr) try: scl_facs.append(fit_result.params[pref+'scl_fac']) except KeyError: scl_facs.append(1) import emgfit.fit_models as fit_models from .model import save_model, load_model from lmfit.minimizer import minimize import lmfit import time datetime = time.localtime() # get current date and time datetime_str = time.strftime("%Y-%m-%d_%H-%M-%S", datetime) if spec.input_filename is not None: data_fname = spec.input_filename.rsplit('.', 1)[0] # del. extension else: data_fname = '' modelfname = data_fname+datetime_str+"_resampl_model.sav" save_model(model, modelfname) N_events = int(np.sum(y)) funcdefs = {'constant': lmfit.models.ConstantModel, str(fit_model): getattr(fit_models,fit_model)} def refit(seed, shape_pars_i): # create simulated spectrum data by sampling from fit-result PDF np.random.seed(seed) if randomize_ref_mus_and_amps: bkg_c_i = np.random.normal(loc=bkg_c, scale=bkg_c_err, size=1) amps_i = np.random.normal(loc=amps, scale=amp_errs) mus_i = np.random.normal(loc=mus, scale=mu_errs) else: bkg_c_i = bkg_c amps_i = amps mus_i = mus if MC_shape_par_samples is not None: # Calculate missing parameters from normalization if n_ltails == 2: shape_pars_i['eta_m2'] = 1 - shape_pars_i['eta_m1'] elif n_ltails == 3: eta_m2 = shape_pars_i['delta_m'] - shape_pars_i['eta_m1'] shape_pars_i['eta_m3'] = 1 - shape_pars_i['eta_m1'] - eta_m2 if n_rtails == 2: shape_pars_i['eta_p2'] = 1 - shape_pars_i['eta_p1'] elif n_rtails == 3: eta_p2 = shape_pars_i['delta_p'] - shape_pars_i['eta_p1'] shape_pars_i['eta_p3'] = 1 - shape_pars_i['eta_p1'] - eta_p2 df = simulate_events(shape_pars_i, mus_i, amps_i, bkg_c_i, N_events, x_min, x_max, out='hist', scl_facs=scl_facs, bin_cens=x) new_x = df.index.values new_y = df['Counts'].values new_y_err = np.maximum(1, np.sqrt(new_y)) # Poisson (counting) stats # Weights for residuals: residual = (fit_model - y) * weights new_weights = 1./new_y_err model = load_model(modelfname, funcdefs=funcdefs) # re-perform fit on simulated spectrum - for performance, use only the # underlying Minimizer object instead of full lmfit model interface try: min_res = minimize(model._residual, init_pars, method=method, args=(new_y, new_weights), kws={'x': new_x}, scale_covar=False, nan_policy='propagate', reduce_fcn=None, calc_covar=False) return min_res except ValueError: with warnings.catch_warnings(): warnings.simplefilter('always') msg = str("Fit failed with ValueError (likely NaNs in " "y-model array) and will be excluded.") warnings.warn(msg, UserWarning) return None # For reproducible sampling with joblib parallel, generate `N_spectra` # random seeds for refit() - only works with the default Loky backend np.random.seed(seed=seed) # for reproducible sampling joblib_seeds = np.random.randint(2**31, size=N_spectra) from tqdm.auto import tqdm # add progress bar with tqdm from joblib import Parallel, delayed try: if MC_shape_par_samples is None: min_results = np.array(Parallel(n_jobs=n_cores) (delayed(refit)(s, shape_pars) for s in tqdm(joblib_seeds))) else: from .spectrum import _strip_prefs min_results = np.array(Parallel(n_jobs=n_cores) (delayed(refit)(s, _strip_prefs(dict(MC_shape_par_samples.iloc[i]))) for i, s in tqdm(enumerate(joblib_seeds)))) finally: # Force workers to shut down and clean up temp SAV file from joblib.externals.loky import get_reusable_executor import os get_reusable_executor().shutdown(wait=True) os.remove(modelfname) return min_results
################################################################################ ##### Define functions for non-parametric resampling
[docs]def resample_events(df, N_events=None, x_cen=None, x_range=0.02, out='hist'): """Create simulated spectrum via non-parametric resampling from `df`. The simulated data is obtained through resampling from the specified dataset with replacement. Parameters ---------- df : :class:`pandas.DataFrame` Original histogrammed spectrum data to re-sample from. N_events : int, optional Number of events to create via non-parametric re-sampling, defaults to number of events in original DataFrame `df`. x_cen : float [u/z], optional Center of mass range to re-sample from. If ``None``, re-sample from full mass range of input data `df`. x_range : float [u/z], optional Width of mass range to re-sample from. Defaults to 0.02 u. `x_range` is only relevant if a `x_cen` argument is specified. out : str, optional Output format of sampled data. Options: - ``'hist'`` for binned mass spectrum (default). The centres of the mass bins must be specified with the `bin_cens` argument. - ``'array'`` for unbinned array of single ion and background events. Returns ------- :class:`pandas.Dataframe` or :class:`numpy.ndarray` If out='hist' a dataframe with a histogram of the format [bin centre, counts in bin] is returned. If out='array' an unbinned array with the x-values of single ion or background events is returned. """ if x_cen: x_min = x_cen - 0.5*x_range x_max = x_cen + 0.5*x_range df = df[x_min:x_max] mass_bins = df.index.values counts = df['Counts'].values.astype(int) # Convert histogrammed spectrum (equivalent to MAc HIST export mode) to # list of events (equivalent to MAc LIST export mode) orig_events = np.repeat(mass_bins, counts, axis=0) # Create new DataFrame of events by bootstrapping from `orig_events` if N_events == None: N_events = len(orig_events) random_indeces = np.random.randint(0, len(orig_events), N_events) events = pd.DataFrame(orig_events[random_indeces]) # Convert list of events back to a DataFrame with histogram data bin_cens = df.index.values bin_width = df.index.values[1] - df.index.values[0] bin_edges = np.append(bin_cens-0.5*bin_width, bin_cens[-1]+0.5*bin_width) # Return unbinned array of events or dataframe with histogram if out == 'array': np.random.shuffle(events) # randomize event ordering return events elif out == 'hist': hist = np.histogram(events, bins=bin_edges) df_new = pd.DataFrame(data=hist[0], index=bin_cens, dtype=float, columns=["Counts"]) df_new.index.name = "m/z [u]" return df_new