Source code for pyfresco.SpectraAnalysis

"""
    SpectraAnalysis module
"""

import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
from matplotlib.path import Path
from matplotlib.ticker import (MultipleLocator, AutoMinorLocator)
from matplotlib.widgets import PolygonSelector
import matplotlib.animation as animation
from matplotlib.backend_bases import MouseButton
from matplotlib.ticker import FuncFormatter

import numpy as np
import pandas as pd

from scipy.stats import median_abs_deviation as MAD
from scipy.spatial import ConvexHull
from scipy.interpolate import interp1d
from scipy.special import erf
from scipy.signal import savgol_filter, argrelextrema, find_peaks

from kneed import KneeLocator

import torch
import torch.nn.functional as F
from torch import nn
[docs] class SpectraAnalysis(): """ Class to perform the required spectral analyses. Parameters ---------- final : 1-dim array Normalized spectrum as given by SpectraNorm.norm_spectra(). m_spec : 1-dim array Mean target spectrum. err_spec : 1-dim array Standard deviation of the target spectrum. n_spectra : 1-dim array Median neutral spectrum. n_err : 1-dim array MAD of the neutral spectrum. wavelength : list or array CRISM observation wavelengths. MIN : float Minimum wavelength value to be taken into consideration. MAX : float Maximum wavelength value to be taken into consideration. folder : string Folder in which the data spectra_MICA_LAB_info.csv is stored. Default is pyFRESCO/data. """ def __init__(self , final , final_error , m_spec , err_spec , n_spectra , n_err , wavelength , MIN , MAX , folder = 'pyfresco/data'): self.final = final # normalized spectrum self.final_error = final_error # normalized spectrum propagated error self.err_spec = err_spec # error of the target self.m_spec = m_spec # target mean spectrum self.n_spectra = n_spectra # neutral median spectrum self.n_err = err_spec # neutral error self.w = wavelength # wavelength self.MIN = MIN # minimum wavelength value self.MAX = MAX # maximum wavelength value self.folder = folder
[docs] def limits(self, other_w = None): """ Function to compute the limits of the x-axis and the corresponding indexes to then compute the y-axis limits of an array given the list/array of the x axis. Parameters ---------- other_w : list, array or None If list or array, this will be taken as the wavelength array to use, if None then the CRISM observation wavelengths are used. Default is None. Returns ------- extremas : list of float list of index correspondant do xmin , index correpsondant to xmax, xmin and xmax """ if type(other_w) == list or isinstance(other_w , np.ndarray) == True: k = other_w else: k = self.w a , b = np.zeros(len(k)) , np.zeros(len(k)) for i in range(len(k)): a[i] = np.abs(k[i]-self.MIN) b[i] = np.abs(k[i]-self.MAX) xmin_ind , xmax_ind = np.argmin(a) , np.argmin(b) xmin , xmax = k[xmin_ind] , k[xmax_ind] return [xmin_ind , xmax_ind , xmin , xmax]
[docs] def find_nearest(self, array, value): """ Function used to find the index of element nearest to a given arbitrary value. Parameters ---------- array : array Array in which to search value : float Value to search the nearest element inside array. Returns ------- idx : int Index of the nearest value. """ array = np.asarray(array) idx = (np.abs(array - value)).argmin() return idx
[docs] def upload_norm(self , name , folder): """ Function to upload a pre-made median/mad spectrum. Parameters ---------- name : string Name of the file. folder : string or None If None it will be saved into the home folder, if a folder path is given, the path must end with the /. Returns ------- norm : 1d-array The uploaded median spectrum. w : 1d-array The cut wavelength range. """ i, j = self.find_nearest(self.w, self.MIN), self.find_nearest(self.w, self.MAX) if folder == None: self.w = np.genfromtxt('Wavelength_from_'+str(self.w[i])+'_to_'+str(self.w[i:j])+'.txt') self.final = np.genfromtxt(name + '.txt') else: self.w = np.genfromtxt(folder + 'Wavelength_from_'+str(self.w[i])+'_to_'+str(self.w[i:j])+'.txt') self.final = np.genfromtxt(folder + name + '.txt') return self.final , self.w
[docs] def moving_average(self , window_size , limiti = True): """ Function to perform a moving average smoothing on the normalized spectrum. Parameters ---------- window_size : int Size of the step taken for the moving mean. Returns ------- result : 1-dim array Moving-mean smoothed normalized spectrum. """ if window_size < 1: raise ValueError("Window_size must be at least 1.") data = np.asarray(self.final) result = np.empty(len(self.final)) half_window = window_size // 2 for i in range(len(self.final)): start = max(0, i - half_window) end = min(len(self.final), i + half_window + 1) result[i] = np.mean(data[start:end]) self.final_smooth = result if limiti == True: lims = self.limits() a , b = lims[0] , lims[1] plt.plot(self.w[a:b] , self.final , 'b') plt.plot(self.w[a:b] , self.final_smooth , 'r') else: plt.plot(self.w , self.final , 'b') plt.plot(self.w , self.final_smooth , 'r') plt.xlabel('$\lambda$[nm]') plt.show() return result
[docs] def savgol(self, window, order , limiti = True): """ Function to perform a Savitzky-Golay smoothing on the normalized spectrum as given in https://pubs.acs.org/doi/10.1021/ac60214a047. Mutuated from scipy.signal (https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html). Parameters ---------- window : int Size of the convolution window. order : int Order of the convolutional fit. Returns ------- result : 1-dim array Moving-mean smoothed normalized spectrum. """ self.final_smooth = savgol_filter(self.final , window , order) self.final_smooth_error = np.zeros(len(self.final_smooth)) if limiti == True: lims = self.limits() a , b = lims[0] , lims[1] plt.plot(self.w[a:b] , self.final , 'b') plt.plot(self.w[a:b] , self.final_smooth , 'r') else: plt.plot(self.w , self.final , 'b') plt.plot(self.w , self.final_smooth , 'r') plt.xlabel('$\lambda$[nm]') plt.show() return self.final_smooth
[docs] def dataset_interaction(self, mineralname, use): """ Utils function to interact with the .csv file named "spectra_MICA_LAB_info.csv" to extract the needed spectral informations from the comparison with the references given by the MICA files, 2019 (http://crism.jhuapl.edu/data/mica/). IMPORTANT: THE MICA FILES REFERENCE LABORATORY SPECTRA ARE IN https://crismtypespectra.rsl.wustl.edu/ AND FOR SOME REASONS SOME SPECTRA RESULTS UNAVALIABLE FROM THERE, BEING: CO2 ICE, HYDRATED SILICA, HYDROXYLATED FE SULFATE, GYPSUM AND CHLORIDE. THUS, REGARDING THESE MINERALS ONLY THE CRISM REFERENCE IS USED AT THE MOMENT. Parameters ---------- mineralname : string Name of the mineral from the MICA files. use : string If to use only the CRISM certified spectra, the laboratory spectra or both of them. Accepted values are either MICA, LAB or Both. Returns ------- index : int Index of the given mineral name in the spectra_MICA_LAB_info.csv . """ MICA_spectra_names = ['Hematite' , 'Mg Olivine' , 'Fe Olivine' , 'Plagioclase' , 'Low Ca Pyroxene' , 'High Ca Pyroxene' , 'H20 Ice' , 'CO2 Ice' , 'Monohydrated Sulfate' , 'Alunite' , 'Hydroxylated Fe-Sulfate' , 'Jarosite' , 'Polyhydrated Sulfate' , 'Gypsum' , 'Bassanite' , 'Kaolinite' , 'Al Smectite' , 'Margarite' , 'Illite Muscovite' , 'Fe Smectite' , 'Mg Smectite' , 'Talc' , 'Serpentine' , 'Chlorite' , 'Mg Carbonate' , 'Ca Fe Carbonate' , 'Prehnite' , 'Hydrated Silica' , 'Epidote' , 'Analcime' , 'Chloride'] if mineralname not in MICA_spectra_names: print('The name of the mineral must be in the list named MICA_spectra_names and the name must be present also in the spectra_MICA_LAB_info.csv file along with all other required inputs!') return # This is here just because at the moment I do not have any laboratory spectra of these minerals if mineralname == 'CO2 Ice' or mineralname == 'Hydroxylated Fe-Sulfate': use = 'MICA' # Check if the use parameter is one of these if use not in ['MICA' , 'LAB' , 'Both']: raise ValueError("Parameter must be either 'MICA' , 'LAB' or 'Both'.") # Upload the .csv files with all the indicized products if self.folder == None: spectra_info = pd.read_csv('spectra_MICA_LAB_info.csv' , sep = None , names = ["Mineral Name","Mineral Type","txt name CRISM","txt name LAB","CRISM spectra cube","Lab. Mineral", "Type of Lab. From MICA","MICA Lab.Code","Type of Lab. From website", "Notable Absorptions CRISM [nm]","Notable Absorptions LAB [nm]", "Sample Grain Size [mum]"]) else: spectra_info = pd.read_csv(self.folder + '/' + 'spectra_MICA_LAB_info.csv' , sep = None , names = ["Mineral Name","Mineral Type","txt name CRISM","txt name LAB","CRISM spectra cube","Lab. Mineral", "Type of Lab. From MICA","MICA Lab.Code","Type of Lab. From website", "Notable Absorptions CRISM [nm]","Notable Absorptions LAB [nm]", "Sample Grain Size [mum]"]) self.index_db = spectra_info[spectra_info['Mineral Name'] == mineralname] # Constraint to one index return self.index_db
[docs] def simple_compare(self, mineralname, smooth = False, use = 'LAB', alpha = 0.15, folder = None, errorplot = False): """ Function to do a simple spectra compariSON using both the MICA spectra and the laboratory spectra. To do so it simply plot the smoothed or non-smoothed normalized spectrum and it superimposes the absorption lines of the mineral guess given in the input. IMPORTANT: to use this function one must have the spectra_MICA_LAB_info.csv file and all the files it points to in the same folder!. Parameters ---------- mineralname : string, case sensitive Name of the mineral as given in the Mineral Name column in the spectra_MICA_LAB_info.csv file. smooth : bool If True it uses the last smoothed spectrum. use : string {'MICA' , 'LAB' , 'Both'} Which reference to use for the comparison. For the first it uses the MICA spectra fro comaprison, for 'LAB' it uses the laboratory spectra and for 'Both' it uses both. alpha : float Shade strength of the error plot if errorplot is True. Default is 0.15. folder : string Folder in which the 'spectra_MICA_LAB_info.csv' is located. If None the folder is defaulted as the home fodler. Default is None. errorplot : bool If to plot or not the errorbar fo the normalized spectrum. Default is False. Returns ------- None """ I = self.index_db#self.dataset_interaction(mineralname , use) # Compute the spectra limits for the plot spectra_lim = self.limits() # Upload MICA spectra if use == 'MICA' or use == 'Both': CRISM_spectra = np.genfromtxt(I['txt name CRISM'].iloc[0]) CRISM_spectra_x = CRISM_spectra[:,0]*1000 CRISM_lim = self.limits(other_w = CRISM_spectra_x) abs_CRISM = eval(I['Notable Absorptions CRISM [nm]'].iloc[0]) # Upload laboratory spectra if use == 'LAB' or use == 'Both': LAB_spectra = np.genfromtxt(I['txt name LAB'].iloc[0]) LAB_spectra_x = LAB_spectra[:,0]*1000 LAB_spectra_y = LAB_spectra[:,1] if I['Type of Lab. From website'].iloc[0] == 'USGS': LAB_spectra_x = LAB_spectra[:,0]/10000 LAB_spectra_y = LAB_spectra[:,1]/1e7 LAB_lim = self.limits(other_w = LAB_spectra_x) abs_LAB = eval(I['Notable Absorptions LAB [nm]'].iloc[0]) w = self.w[spectra_lim[0] : spectra_lim[1]] if smooth == True: norm , normerr = self.final_smooth , np.zeros(len(self.final_smooth)) else: norm , normerr = self.final , self.final_error if errorplot != False: plt.plot(w , norm+normerr , 'k--' , linewidth = 1) plt.plot(w , norm-normerr , 'k--' , linewidth = 1) plt.fill_between(w , norm-normerr , norm+normerr , color = 'k' , alpha = alpha) plt.plot(w , self.final , color = 'k' , linewidth = 2) if use == 'Both' or use == 'MICA': for i in range(len(abs_CRISM)): if abs_CRISM[i] != None: plt.axvline(abs_CRISM[i] , color = 'r' , linestyle = '--' , linewidth = 1) if use == 'Both' or use == 'LAB': for i in range(len(abs_LAB)): if abs_LAB[i] != None: plt.axvline(abs_LAB[i] , color = 'b' , linestyle = '--' , linewidth = 1) plt.xlim(self.MIN , self.MAX) plt.xlabel('$\lambda$ [nm]' , fontsize = 15) plt.ylabel('Relative Reflectance') plt.title(mineralname + '. Using ' + use + ' to compare.' , fontsize = 15) plt.show()
[docs] def compare_spectra(self, mineralname, smooth = False, errorplot = False, use = 'LAB', alpha = 0.15 , folder = None , size = [3,15] , save = False , namesave = None , folder_save = None , ext = '.png' , show = True): """ Function to do the spectra comparing using both the MICA spectra and the laboratory spectra. \\ IMPORTANT: to use this function one must have the spectra_MICA_LAB_info.csv file and all the files it points to in the same folder!. Parameters ---------- mineralname : string Name of the mineral as given in the Mineral Name column in the 'spectra_MICA_LAB_info.csv' file. smooth : bool If True it uses the last smoothed spectrum. errorplot : bool If to plot or not the errorbar fo the normalized spectrum. Default is False. use : string {'MICA' , 'LAB' , 'Both'} Which reference to use for the comparison. For the first it uses the MICA spectra fro comaprison, for 'LAB' it uses the laboratory spectra and for 'Both' it uses both. alpha : float Shade strength of the error plot if errorplot is True. Default is 0.15. folder : string Folder in which the 'spectra_MICA_LAB_info.csv' is located. If None the folder is defaulted as the home fodler. Default is None. size : list with two float values Size of the plot. save : bool To save or not the plot. namesave : string The name with thich you want to save the plot, if save is True a name must be given. folder_save : string The folder path in which you want to save the plot, if None it will save the plot in the home folder. ext : string {'.png' , '.jpg' , '.pdf'} Extension of the image. show : bool To show or not the plot. Returns ------- None """ if smooth == True: norm , normerr = self.final_smooth , np.zeros(len(self.final_smooth)) else: norm , normerr = self.final , self.final_error I = self.index_db # Compute the spectra limits for the plot spectra_lim = self.limits() # Upload MICA spectra if use == 'MICA' or use == 'Both': if folder == None: CRISM_spectra = np.genfromtxt(I['txt name CRISM'].iloc[0]) else: CRISM_spectra = np.genfromtxt(folder + '/' + I['txt name CRISM'].iloc[0]) CRISM_spectra_x = CRISM_spectra[:,0]*1000 CRISM_spectra_y = CRISM_spectra[:,1] CRISM_lim = self.limits(other_w = CRISM_spectra_x) abs_CRISM = eval(I['Notable Absorptions CRISM [nm]'].iloc[0]) # Upload laboratory spectra if use == 'LAB' or use == 'Both': if folder == None: LAB_spectra = np.genfromtxt(I['txt name LAB'].iloc[0]) else: LAB_spectra = np.genfromtxt(folder + '/' + I['txt name LAB'].iloc[0]) LAB_spectra_x = LAB_spectra[:,0]*1000 LAB_spectra_y = LAB_spectra[:,1] if I['Type of Lab. From website'].iloc[0] == 'USGS': LAB_spectra_x = LAB_spectra[:,0]/10000 LAB_spectra_y = LAB_spectra[:,1]/1e7 LAB_lim = self.limits(other_w = LAB_spectra_x) abs_LAB = eval(I['Notable Absorptions LAB [nm]'].iloc[0]) # Initialize figure fig = plt.figure(figsize = size) # Add subplots if use == 'Both': gs = fig.add_gridspec( 3 , hspace = 0 ) else: gs = fig.add_gridspec( 2 , hspace = 0 ) w = self.w[spectra_lim[0] : spectra_lim[1]] # Make plots share the x axis and set title axs = gs.subplots( sharex = True ) fig.suptitle(mineralname) # Setting given x axis limits for ax in axs: ax.set_xlim(self.MIN , self.MAX) # Plot spectra axs[0].plot( w , norm , 'k-' , label = 'Target Spectra' ) # If chosen, plot the error as a shade with dotted borders if errorplot == True: axs[0].plot( w , norm+normerr , 'k--' , linewidth = 0.5 ) axs[0].plot( w , norm-normerr , 'k--' , linewidth = 0.5 ) axs[0].fill_between(w , norm-normerr , norm+normerr , color = 'k' , alpha = alpha) axs[0].set_ylim( np.min(norm-normerr)-0.01 , np.max(norm+normerr) + 0.01 ) else: axs[0].set_ylim( np.min(norm)-0.01 , np.max(norm) + 0.01 ) # Plot the MICA and laboratory spectra given the 'use' parameter set to 'Both' if use == 'Both': axs[1].plot( CRISM_spectra_x , CRISM_spectra_y , 'r-' , label = 'MICA spectra' ) axs[2].plot( LAB_spectra_x , LAB_spectra_y , 'b-' , label = I['Type of Lab. From website'].iloc[0] + ' spectra' ) # Set y axis limits for the MICA and laboratory plot and the labels axs[1].set_ylim( np.min(CRISM_spectra_y[CRISM_lim[0] : CRISM_lim[1]])-0.001 , np.max(CRISM_spectra_y[CRISM_lim[0] : CRISM_lim[1]]) + 0.001 ) axs[2].set_ylim( np.min(LAB_spectra_y[LAB_lim[0] : LAB_lim[1]])-0.001 , np.max(LAB_spectra_y[LAB_lim[0] : LAB_lim[1]]) + 0.001 ) axs[0].set_ylabel('Relative Reflectance') axs[1].set_ylabel('Relative Reflectance') axs[2].set_ylabel('Reflectance') # Plot the vertical lines in correspondance with the uploaded absorptions for i in range(len(abs_CRISM)): for ax in axs: if abs_CRISM[i] != None: ax.axvline(abs_CRISM[i] , color = 'r' , linestyle = '--' , linewidth = 0.5) if abs_LAB[i] != None: ax.axvline(abs_LAB[i] , color = 'b' , linestyle = '--' , linewidth = 0.5) # Plot just the MICA spectra given the 'use' parameter set to 'MICA' elif use == 'MICA': axs[1].plot( CRISM_spectra_x , CRISM_spectra_y , 'r-' , label = 'MICA spectra' ) axs[1].set_ylabel('Relative Reflectance') # Plot the vertical lines in correspondance with the uploaded absorptions for i in range(len(abs_CRISM)): for ax in axs: if abs_CRISM[i] != None: ax.axvline(abs_CRISM[i] , color = 'r' , linestyle = '--' , linewidth = 0.5) axs[1].set_ylim( np.min(CRISM_spectra_y[CRISM_lim[0] : CRISM_lim[1]]) - 0.001 , np.max(CRISM_spectra_y[CRISM_lim[0] : CRISM_lim[1]]) + 0.001 ) # Plot just the laboratory spectra given the 'use' parameter set to 'LAB' else: axs[1].plot( LAB_spectra_x , LAB_spectra_y , 'b-' , label = I['Type of Lab. From website'].iloc[0] + ' spectra' ) axs[1].set_ylabel('Relative Reflectance') for i in range(len(abs_LAB)): for ax in axs: if abs_LAB[i] != None: ax.axvline(abs_LAB[i] , color = 'b' , linestyle = '--' , linewidth = 0.5) axs[1].set_ylim( np.min(LAB_spectra_y[LAB_lim[0] : LAB_lim[1]]) - 0.001 , np.max(LAB_spectra_y[LAB_lim[0] : LAB_lim[1]]) + 0.001 ) # Hide x labels and tick labels for all but bottom plot for ax in axs: ax.label_outer() ax.legend() # Set label of x axis axs[-1].set_xlabel('$\lambda$ [nm]' , fontsize = 15) # If chosen, save the figure in given folder with given name if save == True and namesave != None: if folder_save != None: plt.savefig(folder_save + '/' + namesave + ext) else: plt.savefig(savename + ext) if show == True: plt.show()
[docs] def compare_lines(self, name, smooth = False, ran = 10 , s = 5 , t = 10): """ Function that automatically infers the nearest absorption lines that are present within a set range from the tabulated absorption line position of the given mineral guess from the MICA siles CRISM reference spectra. It works by firstly search for local minima in the set interval, if it does not find any sufficiently good local minima it searches for inflection points using the KneeLocator function of the kneed module (https://pypi.org/project/kneed/). Parameters ---------- name : string Name of the mineral as given in the Mineral Name column in the 'spectra_MICA_LAB_info.csv' file. smooth : bool If True it uses the last smoothed spectrum. ran : float Interval semi-width for the check of elbow points. Deafult is 10. s : float Sensibility of the elbow finding function. Default is 5. t : float Threshold limit value for proximity with tabulated absorption. Default is 10. Returns ------- absoC : list Position of the inferred lines from the comparison with MICA CRISM reference spectra. absoL : list Position of the inferred lines from the comparison with the Laboratory reference spectra. diffC : list Difference between the inferred lines and the MICA CRISM reference absorption lines. diffL : list Difference between the inferred lines and the Laboratory reference absorption lines. CRISM_abs : list MICA CRISM reference absorption lines. LAB_abs : list LAboratory reference absorption lines. """ if smooth == True: norm , normerr = self.final_smooth , np.zeros(len(self.final_smooth)) else: norm , normerr = self.final , self.final_error spectra_lim = self.limits() W = self.w W2 = W[spectra_lim[0]:spectra_lim[1]] I = self.index_db LAB_spectra = np.genfromtxt(I['txt name LAB'].iloc[0]) CRISM_spectra = np.genfromtxt(I['txt name CRISM'].iloc[0]) if I['Type of Lab. From website'].iloc[0] == 'USGS': LAB_spectra_x = LAB_spectra[:,0]/1000 LAB_spectra_y = LAB_spectra[:,1]/1e7 else: LAB_spectra_x = LAB_spectra[:,0]*1000 LAB_spectra_y = LAB_spectra[:,1] CRISM_spectra_x = CRISM_spectra[spectra_lim[0]:spectra_lim[1],0]*1000 CRISM_spectra_y = CRISM_spectra[spectra_lim[0]:spectra_lim[1],1] CRISM_abs = abs_CRISM = eval(I['Notable Absorptions CRISM [nm]'].iloc[0])#eval(spectra_info[spectra_info['Mineral Name']==name]['Notable Absorptions CRISM [nm]'].iloc[0]) LAB_abs = abs_CRISM = eval(I['Notable Absorptions LAB [nm]'].iloc[0])#eval(spectra_info[spectra_info['Mineral Name']==name]['Notable Absorptions LAB [nm]'].iloc[0]) ab = argrelextrema(norm, np.less) absoC , absoL , diffC , diffL = [] , [] , [] , [] G = 0 for i in range( len(CRISM_abs) ): C = CRISM_abs[i] L = LAB_abs[i] if C != None and C >= self.MIN and C <= self.MAX: A , Ai = np.zeros( len(ab[0]) ) , np.zeros( len(ab[0]) ) for j in range(len(ab[0])): ww , h = W2[ab[0][j]] , norm[ab[0][j]] A[j] = np.abs( ww - C ) Ai[j] = ab[0][j] k , ki = np.min(A) , np.argmin(A) F = int(Ai[ki]) Gold = G G = W2[F] if np.abs(G-Gold) > t: absoC.append(W2[F]) absoL.append(W2[F]) diffC.append(np.abs(W2[F]-C)) diffL.append(np.abs(W2[F]-L)) else: E = self.find_nearest(W2 , C) Elbow = KneeLocator(W2[E-ran:E+ran], norm[E-ran:E+ran], S=s, curve="convex", direction="decreasing") absoC.append(Elbow.elbow) absoL.append(Elbow.elbow) diffC.append(np.abs(Elbow.elbow - C)) diffL.append(np.abs(Elbow.elbow - L)) elif C == None and L != None and L >= self.MIN and L <= self.MAX: A , Ai = np.zeros( len(ab[0]) ) , np.zeros( len(ab[0]) ) for j in range(len(ab[0])): ww , h = W2[ab[0][j]] , self.final[ab[0][j]] A[j] = np.abs( ww - L ) Ai[j] = ab[0][j] k , ki = np.min(A) , np.argmin(A) F = int(Ai[ki]) Gold = G G = W2[F] if np.abs(G-Gold) > t: absoL.append(W2[F]) diffL.append(np.abs(W2[F]-L)) else: E = self.find_nearest(W2 , L) Elbow = KneeLocator(W2[E-ran:E+ran], hyd1[E-ran:E+ran], S=s, curve="convex", direction="increasing") absoL.append(Elbow.knee) diffL.append(np.abs(Elbow.knee-L)) self.absoC = absoC self.absoL = absoL self.diffC = diffC self.diffL = diffL return self.absoC , self.absoL , self.diffC , self.diffL , CRISM_abs , LAB_abs
[docs] def compare2(self , p , n_spectra , n_err , spectra , mineralname , smooth = False , save = False , fold = None , name = None): """ Function to show the comparison using the inferred absorption lines found using SpectraAnalysis.compare_lines(). This comparison will show three plots, one showing the target and neutral spectrum with the respective errors, one showing the (non)smooth ratioed spectrum and the MICA files CRISM spectrum with the inferred absorption lines and the last one being the laboratory spectrum with the inferred absorption lines. Parameters --------- p : float Separator for the double plot of the target and MICA spectra. mineralname : string Name of the mineral as given in the Mineral Name column in the 'spectra_MICA_LAB_info.csv' file. smooth : bool If True it uses the last smoothed spectrum. save : string If to save the plot or not. Default is False. fold : string If save is True this is the path in which to save the plot. If None then the home directory is selected. Default is None. name : string Name of the plot if required to save it. Default is None. Returns ------- None """ if smooth == True: norm , normerr = self.final_smooth , np.zeros(len(self.final_smooth)) else: norm , normerr = self.final , self.final_error spectra_lim = self.limits() W2 = self.w[spectra_lim[0]:spectra_lim[1]] spectra_info = pd.read_csv('spectra_MICA_LAB_info.csv' , sep = None , names = ["Mineral Name","Mineral Type","txt name CRISM","txt name LAB","CRISM spectra cube","Lab. Mineral", "Type of Lab. From MICA","MICA Lab.Code","Type of Lab. From website", "Notable Absorptions CRISM [nm]","Notable Absorptions LAB [nm]", "Sample Grain Size [mum]"]) I = self.index_db LAB_spectra = np.genfromtxt(I['txt name LAB'].iloc[0]) if I['Type of Lab. From website'].iloc[0] == 'USGS': LAB_spectra_x = LAB_spectra[:,0]/1e4 LAB_spectra_y = LAB_spectra[:,1]/1e7 else: LAB_spectra_x = LAB_spectra[:,0]*1000 LAB_spectra_y = LAB_spectra[:,1] CRISM_spectra = np.genfromtxt(I['txt name CRISM'].iloc[0]) CRISM_spectra_x = CRISM_spectra[:,0]*1000 CRISM_spectra_y = CRISM_spectra[:,1] mean_spec = self.m_spec[spectra_lim[0]:spectra_lim[1]] std_spec = self.err_spec[spectra_lim[0]:spectra_lim[1]] n_spectra = self.n_spectra[spectra_lim[0]:spectra_lim[1]] n_err = self.n_err[spectra_lim[0]:spectra_lim[1]] ############################################################################################################# ############################################################################################################# ############################################################################################################# fig , ax = plt.subplots(1 , 3 , figsize = [15,5]) ax[0].plot(W2 , mean_spec , color = 'blue' , linestyle = '-' , linewidth = 1 , label = 'Mean Target Spectrum') ax[0].plot(W2 , mean_spec+std_spec , color = 'blue' , linestyle = '--' , linewidth = 0.5) ax[0].plot(W2 , mean_spec-std_spec , color = 'blue' , linestyle = '--' , linewidth = 0.5) ax[0].fill_between(W2 , mean_spec-std_spec , mean_spec+std_spec , color = 'blue' , alpha = 0.2) ax[0].plot(W2 , n_spectra , 'k-' , linewidth = 1 , label = 'Median Neutral Spectrum') ax[0].plot(W2 , n_spectra+n_err , 'k--' , linewidth = 0.5) ax[0].plot(W2 , n_spectra-n_err , 'k--' , linewidth = 0.5) ax[0].fill_between(W2 , n_spectra-n_err , n_spectra+n_err , color = 'k' , alpha = 0.2) ax[0].set_xlabel( '$\lambda$ [nm]' , fontsize = 10 ) ax[0].set_ylabel( 'Reflectance' , fontsize = 10 ) ax[0].set_xlim(self.MIN , self.MAX) if np.min(n_spectra-n_err) <= np.min(mean_spec-std_spec): m = np.min(n_spectra-n_err) else: m = np.min(mean_spec-std_spec) if np.max(n_spectra+n_err) <= np.max(mean_spec+std_spec): M = np.max(mean_spec+std_spec) else: M = np.max(n_spectra+n_err) ax[0].set_ylim(m,M) ax[0].xaxis.set_tick_params(labelsize=10 , rotation = 90) ax[0].set_xticks(np.arange(self.MIN , self.MAX+200 , 200)) ax[0].xaxis.set_minor_locator(MultipleLocator(100)) ax[0].legend() ############################################################################################################# ############################################################################################################# ############################################################################################################# CRISM_spectra_x = CRISM_spectra_x[spectra_lim[0]:spectra_lim[1]] CRISM_spectra_y = CRISM_spectra_y[spectra_lim[0]:spectra_lim[1]] CRISM_abs = eval(spectra_info[spectra_info['Mineral Name']==mineralname]['Notable Absorptions CRISM [nm]'].iloc[0]) ax1=ax[1].twinx() ax1.plot(W2 , norm+np.ones(len(norm))*p , color = 'blue' , linestyle = '-' , linewidth = 1 , label = '$\frac{Target}{Neutral}$') ax[1].plot(CRISM_spectra_x , CRISM_spectra_y , color = 'k' , linestyle = '-' , linewidth = 1 , label = 'MICA ' + mineralname) ax[1].set_xlabel( '$\lambda$ [nm]' , fontsize = 10 ) ax[1].set_ylabel( 'Relative Reflectance' , fontsize = 10 ) ax1.set_ylabel( 'Relative Reflectance' , fontsize = 10 , color = 'blue' ) ax1.yaxis.set_tick_params(colors = 'blue') ax[1].set_xlim( self.MIN , self.MAX ) if np.min(norm+np.ones(len(norm))*p) <= np.min(CRISM_spectra_y): m = np.min(norm+np.ones(len(norm))*p) else: m = np.min(CRISM_spectra_y) if np.max(norm+np.ones(len(norm))*p) <= np.max(CRISM_spectra_y): M = np.max(CRISM_spectra_y) else: M = np.max(norm+np.ones(len(norm))*p) ax[1].set_ylim(m,M) ax[1].xaxis.set_tick_params(labelsize=10 , rotation = 90) ax[1].set_xticks(np.arange(self.MIN , self.MAX+200 , 200)) ax[1].xaxis.set_minor_locator(MultipleLocator(100)) ax[1].legend() ############################################################################################################# ############################################################################################################# ############################################################################################################# LAB_abs = eval(spectra_info[spectra_info['Mineral Name']==mineralname]['Notable Absorptions LAB [nm]'].iloc[0]) #ax2=ax[2].twinx() #ax2.plot(W2 , self.final , color = 'b' , linestyle = '-' , linewidth = 1)# , label = 'Target Spectrum') ax[2].plot(LAB_spectra_x , LAB_spectra_y , color = 'k' , linestyle = '-' , linewidth = 1) ax[2].set_xlabel( '$\lambda$ [nm]' , fontsize = 10 ) ax[2].set_ylabel( 'Reflectance' , fontsize = 10 ) ax[2].set_xlim(self.MIN , self.MAX) ax[2].xaxis.set_tick_params(labelsize=10 , rotation = 90) ax[2].set_xticks(np.arange(self.MIN , self.MAX+200 , 200)) ax[2].xaxis.set_minor_locator(MultipleLocator(100)) ############################################################################################################# ############################################################################################################# ############################################################################################################# for i in range(len(self.absoC)): ax[1].axvline(self.absoC[i] , color = 'red' , linestyle = '--' , linewidth = 1) for i in range(len(self.absoL)): ax[2].axvline(self.absoL[i] , color = 'red' , linestyle = '--' , linewidth = 1) fig.subplots_adjust(wspace=0.5) if save == True: plt.savefig(fold + '/' + name + '.png') else: plt.show()
[docs] def convex_hull(self , interp = 'linear' , plot = False): """ Function to apply the convex hull caseline correction before feeding a spectrum to the mgm. Parameters ---------- interp : string plot : bool To plot or not the result. Default is False. Returns ------- final : numpy 1-d array baseline-corrected spectrum. WARNING: THIS FUNCTION RE-INITIALIZE THE self.final SPECTRUM! """ I , J = self.find_nearest(self.w , self.MIN) , self.find_nearest(self.w , self.MAX) x , y = self.w[I:J] , self.final[I:J] points = np.c_[x , y] augmented = np.concatenate([points, [(x[0], np.min(y)-1), (x[-1], np.min(y)-1)]], axis=0) hull = ConvexHull(augmented , incremental = True) continuum_points = points[np.sort([v for v in hull.vertices if v < len(points)])] continuum_indexs = np.array(range(0,len(continuum_points) , 1) , dtype = int) continuum_function = interp1d(*continuum_points.T , kind = interp) hull = continuum_function(x) self.final = y/hull if plot == True: fig , ax = plt.subplots(1,2) ax[0].plot(x , y , 'k') ax[0].plot(x , hull , 'b') ax[0].set_xlabel('$\lambda$ [nm]') ax[0].set_ylabel('Reflectance') ax[1].set_xlabel('$\lambda$ [nm]') ax[1].plot(x , self.final , 'k') ax[1].set_ylabel('Relative Reflectance') plt.show() return self.final
[docs] def mgm(self , n, iterations, lr, means=None, mean_ranges = [] , asym=False , smooth = False , hull = False , interp = 'linear' , t = 'savgol' , w = 0 , o = 0): r""" Machine learning driven Modified Gaussian Model (MGM, https://agupubs.onlinelibrary.wiley.com/doi/epdf/10.1029/JB095iB05p06955?src=getftr). This version of the algorithm implements gradient-descent, thus it efficiently fits the reference spectrum with the required number of skew normal distributions (https://en.wikipedia.org/wiki/Skew_normal_distribution) formulated as follows: .. math:: G(\lambda;\alpha , \mu , A , f) = A\cdot e^{-4ln(2)\left(\frac{\lambda - \mu}{f}\right)^{2}}\cdot\left[1 + erf\left(2\sqrt{ln(2)}\alpha\frac{\lambda - \mu}{f}\right)\right] Where λ is the array of wavelengths, α is the skewing parameter, μ is the mean (i.e. the line position), A is the amplitude and f is the full width at half maximum. The used loss function is Pytorch.F Mean Square Loss (MSE, https://pytorch.org/docs/stable/generated/torch.nn.functional.mse_loss.html). Parameters ---------- n : int Number of skew normal distributions to be fitted. max_iterations : int Number of iterations. lr : float Learning rate for the Adam orptimizer. means : list of float or None If None, then the means (μ) are inserted in the fitting parameters. If a list is given, then the values in the list will be used as mean values for the distributions. Default is None. mean_ranges : list of tuples of float or None If None, no ranges is given to the distributions for the fit. If the list of tuples is given, then the search of the mean will be constrained inside an interval (min,max). asym : bool If True, then the distributions fitted are Skew Normal Distributions, if False, then the alpha parameters will be set to zero, thus efficiently modeling the absorptions as simple Gaussian distributions. Default is False. smooth : bool Whether or not to smooth the spectrum before the run. Default is False. hull : bool Whether or not to correct by baseline the spectrum before the run. This can be useful when working with spectra not obtained thourgh the convex hull normalization method, sine their values can be higher than one, exceeding so the maximum value fot this function. Default is False interp : string Type of interpolation, can be ‘linear’, ‘nearest’, ‘nearest-up’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘previous’, or ‘next’. Default is ‘linear’. t : string Type of smoothing algorithm to be used. Must be either ‘savgol’ (Savitzky-Golay filter) or ‘movmean’ (Moving Average filter). Default is ‘savgol’. w : int Smoothing window size. Default is 0. o : int Polynomial order for the Savitzky-Golay smoothing filter, can be between 1 and 5. Default is 0. Returns ------- losses : array Array of the loss value for each epoch. gaussian_sums : array Resulting fit per epoch. n_wav : array Normalized wavelength array. gaussian_sum : array Final fit. a : list Amplitude (A) parameters of the distributions. m : list Mean (μ) parameters of the distributions. fwhm : list Full width at half maximum parameters (f) of the distributions. alpha : list Skew parameters (α) of the distributions. """ device = torch.device("cuda" if torch.cuda.is_available() else "cpu") lims = self.limits() wavelength = self.w[lims[0]:lims[1]] self.w = wavelength if hull == True: self.final = self.convex_hull(interp = interp , plot = True) self.w = self.w[0:len(self.w)-1] if smooth == True: if t == 'savgol': self.final = self.savgol(w , o , limiti = False) elif t == 'movmean': self.final = self.moving_average(w , limiti = False) data = self.final n_wav = np.zeros_like(self.w) for i in range(len(self.w)): n_wav[i] = (self.w[i] - min(self.w)) / (max(self.w) - min(self.w)) data = torch.tensor(self.final, dtype=torch.float32, device=device) wavelength = torch.tensor(self.w, dtype=torch.float32, device=device) A , B = max(self.w) , min(self.w) n_wav = torch.tensor(n_wav, dtype=torch.float32, device=device) #d = min(data) #L , l = np.log((1-d)/(d)) , np.log(1/( (1/lr)-1)) #print(L , l) #LL = np.random.uniform(low=l, high=L, size=n) #a = torch.tensor(LL).requires_grad_() a = torch.randn(n, device=device, requires_grad=True) fwhm = torch.randn(n, device=device, requires_grad=True) #if means is None: # m = torch.randn(n, device=device, requires_grad=True) # params = [a, m, fwhm] #else: # params = [a, fwhm] # for i in range(len(means)): # means[i] = (means[i] - B) / (A-B) # m = torch.tensor(np.array(means), dtype=torch.float32, device=device) if means is None: if mean_ranges is not None: # Normalize and create tensor bounds mean_bounds = [( (min_val - B) / (A - B), (max_val - B) / (A - B) ) for (min_val, max_val) in mean_ranges] m_raw = torch.rand(n, device=device, requires_grad=True) # between 0 and 1 params = [a, m_raw, fwhm] else: m_raw = torch.randn(n, device=device, requires_grad=True) params = [a, m_raw, fwhm] else: params = [a, fwhm] for i in range(len(means)): means[i] = (means[i] - B) / (A-B) m = torch.tensor(np.array(means), dtype=torch.float32, device=device) if asym: alpha = torch.randn(n, device=device, requires_grad=True) params.append(alpha) else: alpha = torch.zeros(n, device=device) optimizer = torch.optim.Adam(params, lr=lr) losses = [] gaussian_sums = [] #print('Initial Conditions are: \n', # 'A = ', torch.sigmoid(a).cpu().detach().numpy(), '\n', # 'FWHM = ', (torch.sigmoid(fwhm) * (A-B)).cpu().detach().numpy(), '\n', # 'Alpha = ', alpha.cpu().detach().numpy(), '\n', # 'X = ', (m * (A-B) + B).cpu().detach().numpy(), '\n') # Compute m2 once to show in print if means is None: if mean_ranges is not None: m2 = torch.zeros_like(m_raw) for idx in range(n): min_bound, max_bound = mean_bounds[idx] m2[idx] = min_bound + (max_bound - min_bound) * torch.sigmoid(m_raw[idx]) else: m2 = torch.sigmoid(m_raw) else: m2 = m print('Initial Conditions are: \n', 'A = ', torch.sigmoid(a).cpu().detach().numpy(), '\n', 'FWHM = ', (torch.sigmoid(fwhm) * (A-B)).cpu().detach().numpy(), '\n', 'Alpha = ', alpha.cpu().detach().numpy(), '\n', 'M = ', (m2 * (A-B) + B).cpu().detach().numpy(), '\n') K , k = 2 * np.sqrt(2 * np.log(2)) , 2*np.sqrt(np.log(2)) for i in range(iterations): optimizer.zero_grad() a2 = torch.sigmoid(a) fwhm2 = torch.sigmoid(fwhm) #if means is None: # m2 = torch.sigmoid(m) #else: # m2 = m if means is None: if mean_ranges is not None: m2 = torch.zeros_like(m_raw) for idx in range(n): min_bound, max_bound = mean_bounds[idx] m2[idx] = min_bound + (max_bound - min_bound) * torch.sigmoid(m_raw[idx]) else: m2 = torch.sigmoid(m_raw) else: m2 = m gaussian_sum = torch.zeros_like(data) for j in range(n): wavo = (n_wav - m2[j]) * K / fwhm2[j] er = torch.erf(alpha[j] * (k * (n_wav - m2[j]) / (fwhm2[j]))) gaussian_sum += a2[j] * torch.exp( -0.5 * wavo * wavo ) * ( torch.ones_like(er) + er ) gaussian_sum = torch.ones_like(data) - gaussian_sum loss = F.mse_loss(gaussian_sum, data) losses.append(loss.cpu().detach().numpy()) gaussian_sums.append(gaussian_sum.cpu().detach().numpy()) loss.backward() optimizer.step() gaussian_sum = gaussian_sum.cpu().detach().numpy() a = torch.sigmoid(a).cpu().detach().numpy() fwhm = (torch.sigmoid(fwhm)*(A-B)).cpu().detach().numpy() #if means is None: # m = (torch.sigmoid(m)*(A-B) + B).cpu().detach().numpy() #else: # m = (m*(A-B)+B).cpu().detach().numpy() alpha = alpha.cpu().detach().numpy() if means is None: if mean_ranges is not None: m = [] for idx in range(n): min_bound, max_bound = mean_ranges[idx] min_bound , max_bound = (min_bound-B)/(A-B) , (max_bound-B)/(A-B) #min_bound , max_bound = 1/(1+np.exp(-min_bound)) , 1/(1+np.exp(-max_bound))#torch.sigmoid(min_bound) , torch.sigmoid(max_bound) m_val = min_bound + (max_bound - min_bound) * torch.sigmoid(m_raw[idx]) m.append(m_val * (A - B) + B) m = np.array([v.cpu().detach().numpy() for v in m]) else: m = (torch.sigmoid(m_raw)*(A-B) + B).cpu().detach().numpy() else: m = (m*(A-B)+B).cpu().detach().numpy() self.losses = losses self.gaussian_sums = gaussian_sums self.n_wav = n_wav self.gaussian_sum = gaussian_sum self.a = a self.m = m self.fwhm = fwhm self.alpha = alpha return self.losses, self.gaussian_sums , self.n_wav , self.gaussian_sum, self.a, self.m, self.fwhm, self.alpha
[docs] def create_animation(self , losses, gaussian_sums, data, n_wav, frame_step=10 , FPS = 15 , save = False , path = None , name = 'mgm_gradient_descent'): """ Function to create animation of the fitting process for the SpectraAnalysis.mgm() function. Parameters ---------- losses : list or array Losses per epoch. gaussian_sums : 2-dim array MGM fit per epoch. data : list or array Normlized/smoothed spectrum. n_wav : list or array Normalized wavelengths. frame_step : int Frame step for the animation. Default is 20. FPS : int Photograms frequency for the animation. Default is 15. save : bool If to save or not the animation. Default is False. path : string Path to which to save the animation. If None then the home directory is used. Default is None. name : string Name of the animation if wanted to be saved. Default is mgm_gradient_descent. Returns ------- None """ fig, ax = plt.subplots(1, 2, gridspec_kw={'width_ratios': [3, 1], 'height_ratios': [1]} , figsize = [20,10]) ax[1].set_ylabel('log(loss)') ax[1].set_xlabel('Epoch') ax[0].set_xlabel('Normalized Wavelength') line_gaussian, = ax[0].plot([], [], 'b') line_data, = ax[0].plot(self.n_wav, self.final, 'r', linewidth=2) line_loss, = ax[1].plot([], [], 'k') def init(): ax[0].set_xlim(0, 1.1) ax[0].set_ylim(np.min(self.final), np.max(self.final)) ax[1].set_xlim(0, len(self.losses)) ax[1].set_ylim(min(self.losses), max(self.losses)) ax[1].set_yscale('log') return line_gaussian, line_data, line_loss def update_plot(frame): i = frame * frame_step if i >= len(self.losses): return line_gaussian.set_data(self.n_wav, self.gaussian_sums[i]) ax[0].set_ylim(np.min(self.gaussian_sums[i]), np.max(self.final)+0.01)#np.max(gaussian_sums[i])) line_loss.set_data(np.arange(0, i+1, frame_step), losses[:i+1:frame_step]) return line_gaussian, line_data, line_loss num_frames = len(self.losses) // frame_step ani = animation.FuncAnimation(fig, update_plot, frames=num_frames, init_func=init, interval=50, blit=True, repeat=False) if save == True: writervideo = animation.PillowWriter(fps=FPS, metadata=dict(artist='Me'),bitrate=1800) ani.save(path + name + '.gif', writer=writervideo) plt.show()
[docs] def mse(self): r""" Calculates the mean square error: .. math:: mse = (y_{fit} - y)^{2} Parameters ---------- None Returns ------- residuals : array Mean square error of the fit per wavelength. """ residuals = np.zeros(len(self.gaussian_sum)) for i in range(len(self.gaussian_sum)): residuals[i] = ( self.gaussian_sum[i] - self.final[i] )**2 self.residuals = residuals return self.residuals
[docs] def global_fit(self): """ Function to plot the fit of the machine learning MGM with the resulting mean square error per wavelength on top. Parameters ---------- None Returns ------- None """ def scientific_format(y, pos): return r'${0:.1f} \times 10^{{-6}}$'.format(y * 1e6) #lims = self.limits() wav = self.w#[lims[0]:lims[1]] fig , ax = plt.subplots(2,1 , sharex = True , squeeze = True ,gridspec_kw={'width_ratios':[1], 'height_ratios':[1,4]}) mr , Mr = min(self.residuals) , max(self.residuals) #ax[0].set_yscale('log') ax[1].plot(wav , self.final , 'k' , label = 'Data') ax[1].plot(wav , self.gaussian_sum , 'b' , label = 'Approximation') ax[0].plot(wav , self.residuals , 'r' , label = 'Residuals') ax[0].yaxis.set_major_formatter(FuncFormatter(scientific_format)) ax[1].set_xlabel('$\lambda$ [nm]') ax[1].set_ylabel('Relative Reflectance') fig.subplots_adjust(hspace=0) ax[1].legend() ax[0].set_ylabel('MSE')#log(MSE)') plt.show()
[docs] def local_fit(self): """ Function to plot the fit of each single distribution of the machine learning MGM. Parameters ---------- None Returns ------- None """ #lims = self.limits() wav = self.w#[lims[0]:lims[1]] ones = np.ones(len(wav)) GAUSS = np.zeros((len(self.a) , len(wav))) for i in range(len(self.a)): W = np.exp( -4*np.log(2)*( (wav - self.m[i]*ones)/self.fwhm[i] )**2 ) skew = erf( 2*np.sqrt(np.log(2))*self.alpha[i]*( wav - self.m[i]*ones )/self.fwhm[i] ) GAUSS[i] = self.a[i]*W*(ones+skew) fig , ax = plt.subplots() ax.plot(wav , self.final , 'r' , linewidth = 1.5) for i in range(len(self.a)): ax.plot(wav , ones - GAUSS[i] , 'b--' , linewidth = 1) ax.set_xlabel('$\lambda$ [nm]') ax.set_ylabel('Relative Reflectance') plt.show() return GAUSS
[docs] def both_fit(self): """ Function to plot the fit of each single distribution, the global fit and the MSE per wavelength of the machine learning MGM. Parameters ---------- None Returns ------- None """ a = self.a m = self.m fwhm = self.fwhm alpha = self.alpha #lims = self.limits() wav = self.w#[lims[0]:lims[1]] ones = np.ones(len(wav)) GAUSS = np.zeros((len(self.a) , len(wav))) for i in range(len(self.a)): W = np.exp( -4*np.log(2)*( (wav - self.m[i]*ones)/self.fwhm[i] )**2 ) skew = erf( 2*np.sqrt(np.log(2))*self.alpha[i]*( wav - self.m[i]*ones )/self.fwhm[i] ) GAUSS[i] = self.a[i]*W*(ones+skew) def scientific_format(y, pos): return r'${0:.1f} \times 10^{{-6}}$'.format(y * 1e6) #lims = self.limits() wav = self.w#[lims[0]:lims[1]] fig , ax = plt.subplots(2,1 , sharex = True , squeeze = True ,gridspec_kw={'width_ratios':[1], 'height_ratios':[1,4]}) mr , Mr = min(self.residuals) , max(self.residuals) #ax[0].set_yscale('log') for i in range(len(self.a)): if i == len(self.a)-1: ax[1].plot(wav , ones - GAUSS[i] , 'b--' , linewidth = 1 , label = 'Single Distribution') else: ax[1].plot(wav , ones - GAUSS[i] , 'b--' , linewidth = 1) ax[1].plot(wav , self.final , 'k' , label = 'Target') ax[1].plot(wav , self.gaussian_sum , 'b' , label = 'Approximation') ax[0].plot(wav , self.residuals , 'r' , label = 'Residuals') ax[0].yaxis.set_major_formatter(FuncFormatter(scientific_format)) ax[1].set_xlabel('$\lambda$ [nm]') fig.subplots_adjust(hspace=0) ax[1].legend() ax[0].set_ylabel('MSE') ax[1].set_ylabel('Relative Reflectance') plt.show() return GAUSS #lims = self.limits() wav = self.w#[lims[0]:lims[1]] ones = np.ones(len(wav)) GAUSS = np.zeros((len(self.a) , len(wav))) for i in range(len(self.a)): W = np.exp( -4*np.log(2)*( (wav - self.m[i]*ones)/self.fwhm[i] )**2 ) skew = erf( 2*np.sqrt(np.log(2))*self.alpha[i]*( wav - self.m[i]*ones )/self.fwhm[i] ) GAUSS[i] = self.a[i]*W*(ones+skew) fig , ax = plt.subplots() ax.plot(wav , self.final , 'r' , linewidth = 1.5) for i in range(len(self.a)): ax.plot(wav , ones - GAUSS[i] , 'b--' , linewidth = 1) ax.set_xlabel('$\lambda$ [nm]') plt.show() return GAUSS
[docs] class MaficAnalysis(): """ Adpated from the paper from Horgan et al., 2014 ( https://doi.org/10.1016/j.icarus.2014.02.031 ). This class is taylored for the analysis of mafic materials and mixes. Parameters --------- data : numpy array Data (i.e. mean spectrum) to be analyzed. wav : numpy array Wavelengths used as x-coordinate. MIN : float Minimum wavelength value MAX : float Maximum wavelength value fs : float Font size of text of axis and legends in the plots. """ def __init__(self, spectrum, wavelength, MIN, MAX, fsize = 15, pre_normed = True): self.data = spectrum self.wav = wavelength self.MIN = MIN self.MAX = MAX self.fs = fsize self.pre_normed = pre_normed
[docs] def find_nearest(self, array, value): array = np.asarray(array) idx = (np.abs(array - value)).argmin() return idx
[docs] def plot(self): plt.plot(self.wav , self.data , 'k') plt.axvline(self.MIN , color = 'red' , linestyle = '--') plt.axvline(self.MAX , color = 'blue' , linestyle = '--') plt.xlabel('$\lambda$[nm]' , fontsize = self.fs) plt.ylabel('Reflectance' , fontsize = self.fs) plt.show()
[docs] def continuum_removal(self, interptype='linear', plot=False , smooth = False): """ Applies continuum removal using convex hull or flat model for out-of-range spectra. The hull will be constructed from the beginning to the maximum point of the spectrum. After that the hull will be a flat line. Parameters ---------- interptype : string Interpolation type for continuum modeling plot : bool Whether to plot the result. Default is False. smooth : bool Whether the input is the smoothed spectrum or not. Default is False. Returns ------- removed_data : array Continuum-removed reflectance. """ if self.pre_normed == True: self.removed_data = self.data #return self.data I , J = self.find_nearest(self.wav , self.MIN) , self.find_nearest(self.wav , self.MAX) if smooth == True: x , y = self.wav[I:J] , self.data else: x , y = self.wav[I:J] , self.data[I:J] points = np.c_[x, y] augmented = np.concatenate([points, [(x[0], np.min(y) - 1), (x[-1], np.min(y) - 1)]], axis=0) hull = ConvexHull(augmented, incremental=True) continuum_points = points[np.sort([v for v in hull.vertices if v < len(points)])] continuum_function = interp1d(*continuum_points.T, kind=interptype) continuum = continuum_function(x) if y[-1] < np.max(y): max_idx = np.argmax(y) x_max, y_max = x[max_idx], y[max_idx] # First part: from start to max points1 = np.c_[x[:max_idx + 1], y[:max_idx + 1]] augmented1 = np.concatenate([ points1, [(x[0], np.min(y) - 1), (x[max_idx], np.min(y) - 1)] ]) hull = ConvexHull(augmented1, incremental=True) left_points = points1[np.sort([v for v in hull.vertices if v < len(points1)])] # Second part: flat line from max to end x_flat = x[max_idx:] y_flat = np.full_like(x_flat, y_max) # Combine both x_cont = np.concatenate((left_points[:, 0], x_flat)) y_cont = np.concatenate((left_points[:, 1], y_flat)) continuum = interp1d(x_cont, y_cont, kind=interptype)(x) else: points = np.c_[x, y] augmented = np.concatenate([points, [(x[0], np.min(y) - 1), (x[-1], np.min(y) - 1)]], axis=0) hull = ConvexHull(augmented, incremental=True) continuum_points = points[np.sort([v for v in hull.vertices if v < len(points)])] continuum_function = interp1d(*continuum_points.T, kind=interptype) continuum = continuum_function(x) self.wav_cut = x self.data_cut = y self.continuum = continuum self.removed_data = y / continuum if plot: fig, ax = plt.subplots(1, 2, figsize=(12, 5)) ax[0].plot(x, y, 'k', label='Original Spectrum') ax[0].plot(x, continuum, 'r' , label='Continuum') ax[1].plot(x, self.removed_data, 'b', label='Continuum-Removed') for a in ax: a.legend(fontsize = self.fs) a.set_xlabel('Wavelength (nm)' , fontsize = self.fs) a.set_ylabel('Reflectance' , fontsize = self.fs) plt.tight_layout() plt.show() return self.removed_data
[docs] def bands_removal(self , windows , mins , maxs , plot = False): """ Bands removal technique involving a moving average over a given subset(s) of the spectrum. Parameters ---------- windows : list List of the smoothing window to apply to the subset(s). mins : list List of the minimum values at which the smoothing process needs to be applied. maxs : list List of the maximum values at which the smoothing process needs to be applied. plot : bool Whether to plot the result. Default is False. Returns ------- spectrum : array Cleaned spectrum. IMPORTANT, THIS OVERWRITES THE INPUT DATA. """ for i in range(len(windows)): win = windows[i] I , J = self.find_nearest(self.wav , mins[i]) , self.find_nearest(self.wav , maxs[i]) for i in range(I,J+1): start = max(0, i - win) end = min(len(self.wav), i + win + 1) self.data[i] = np.mean(self.data[start:end]) if plot == True: if self.pre_normed == True: I , J = self.find_nearest(self.wav , self.MIN) , self.find_nearest(self.wav , self.MAX) plt.plot(self.wav[I:J] , self.data , 'k') else: plt.plot(self.wav , self.data , 'k') plt.ylabel('Reflectance', fontsize = self.fs) plt.xlabel('$\lambda$[nm]', fontsize = self.fs) plt.show() return self.data
[docs] def moving_average(self , window_size, plot = False, overwrite = False, removed = True): """ Function to perform a moving average smoothing on the normalized spectrum. Parameters ---------- window_size : int Size of the step taken for the moving mean. plot : bool Whether to plot the result. Default is False. overwrite : bool Whether or not to overwrite the previsou self.data. Default is False. removed : bool Whether or not the data underwent continuum removal process or not. Default is True. Returns ------- result : array Moving-mean smoothed normalized spectrum. """ if window_size < 1: raise ValueError("Window_size must be at least 1.") I , J = self.find_nearest(self.wav , self.MIN) , self.find_nearest(self.wav , self.MAX) x = self.wav[I:J] if self.pre_normed == False: if removed == True: y = self.removed_data else: y = self.data[I:J] else: y = self.data half_window = window_size // 2 result = np.zeros_like(y) for i in range(len(y)): start = max(0, i - half_window) end = min(len(x), i + half_window + 1) result[i] = np.mean(y[start:end]) self.final_smooth = result if plot == True: plt.plot(x , y , 'b') plt.plot(x , self.final_smooth , 'r') plt.xlabel('$\lambda$[nm]', fontsize = self.fs) if removed == True or self.pre_normed == True: plt.ylabel('Relative Reflectance', fontsize = self.fs) else: plt.ylabel('Reflectance', fontsize = self.fs) plt.show() if overwrite == True: self.data = self.final_smooth return self.final_smooth
[docs] def band_parameters(self, windows_nm = 75 , resolution_nm = 5 , plot = False , tol = 10 , smoothed_after_removed = False): """ Core function of the class. Adapted from Horgan et al., 2014 (https://doi.org/10.1016/j.icarus.2014.02.031). The band parameters are the band minimum, band center, band depth, band area and band asymmetry. For more infor refer to the paper. The function can sometime not catch up and returns some error. Try with other parameters configuration in case. The issue refers to the way band centers are computed: due to the interpolation, sometimes that interpolation does not have a minium in the give region around the band minimum. Parameters ---------- windows_nm : float Nanometric window around the band minimum where to search the band center. resolution_nm : float Nanometric resolution of the 4-th order polynomial approximation used for the band center determination. plot : bool Whether to plot the result. Default is False. tol : float Minimum band wavelength span in nanometers. Bands with less wavelength span will not be analyzed. Returns ------- parameters : list List of the parameters for each band. """ I , J = self.find_nearest(self.wav , self.MIN) , self.find_nearest(self.wav , self.MAX) x = self.wav[I:J] if smoothed_after_removed == True: y = self.final_smooth else: if self.pre_normed == True: y = self.data else: y = self.removed_data ones_indexes = np.argwhere(y == 1)#self.removed_data == 1) ones_idx = [] for i in range(len(ones_indexes)): ones_idx.append(ones_indexes[i][0]) print(ones_idx , ones_indexes) parameters = [] if plot == True: fig , ax = plt.subplots( ) k = 0 for i in range(0, len(ones_idx) - 1): band_parameters = [] a = ones_idx[i+1] if ones_idx[i] == a: continue if ones_idx[i+1] - ones_idx[i] >= tol: k += 1 S = y[ones_idx[i]:ones_idx[i+1]] X = x[ones_idx[i]:ones_idx[i+1]] band_parameters.append( (min(X) , max(X)) ) # minimum computation minimum , minimum_index = np.min(S) , np.argmin(S) band_parameters.append(X[minimum_index]) # center computation shift_right = self.find_nearest(X , X[minimum_index]+windows_nm) shift_left = self.find_nearest(X , X[minimum_index]-windows_nm) Xfit_range = np.arange(X[shift_left] , X[shift_right]+resolution_nm , resolution_nm) interp_S = interp1d(X, S, kind='cubic')(Xfit_range) coeffs = np.polyfit(Xfit_range, interp_S, 4) poly = np.poly1d(coeffs) y_poly = poly(Xfit_range) idx_center = np.argmin(y_poly)#[shift_left:shift_right]) band_center_wav = Xfit_range[idx_center] band_center_val = y_poly[idx_center] band_parameters.append(band_center_wav) # depth computation band_depth = 1 - S[idx_center] band_parameters.append(band_depth) # area and asymmetry computation (open spectrum compatible) if X[-1] >= x[-1] - resolution_nm: # Open band: skip area and asymmetry band_parameters.append(None) # total_area band_parameters.append(None) # asymmetry else: total_area = np.trapz(np.ones_like(S) - S, X) band_parameters.append(total_area) left_area = np.trapz(S[:idx_center], X[:idx_center]) right_area = np.trapz(S[idx_center:], X[idx_center:]) asymmetry = (right_area - left_area) / (100 * total_area) band_parameters.append(asymmetry) band_parameters.append(asymmetry) if plot == True: ax.axvline(band_center_wav+5 , color = 'blue' , linestyle = '--' , label = 'Band Center' , linewidth = 1) ax.axvline(X[minimum_index]+5 , color = 'red' , linestyle = '--' , label = 'Band Minimum' , linewidth = 1) ax.fill_between(X, np.ones(len(S)) , S , color = 'grey' , alpha = 0.5) ax.plot(Xfit_range , y_poly , color = 'black' , marker = 'o') parameters.append(band_parameters) print('Band number ' , k) print('Band extremes = ' , band_parameters[0] , ' nm') print('Band minimum = ' , np.round(band_parameters[1] , 2) , ' nm') print('Band center = ' , np.round(band_parameters[2] , 2) , ' nm') print('Band depth = ' , np.round(band_parameters[3] , 2)) print('Band area = ' , np.round(band_parameters[4] , 2) , ' nm') print('Band asymmetry = ' , np.round(band_parameters[5] , 2) , ' %') print('--------------------------------') if plot == True: ax.plot(x , y , 'k' , label = 'Continuum Removed Smoothed Spectrum') ax.set_xlabel('$\lambda$[nm]' , fontsize = self.fs) ax.set_ylabel('Relative Reflectance' , fontsize = self.fs) plt.tick_params(axis='both', labelsize=self.fs) minimum = plt.Line2D([], [], color='red', linestyle='--', linewidth = 1 , label = 'Band Minimum') center = plt.Line2D([], [], color='blue', linestyle='--', linewidth = 1 , label = 'Band Center') spectrum = plt.Line2D([], [], color='black', linestyle='-', linewidth = 1 , label = 'Target Spectrum') interpolation = plt.Line2D([], [], color='black', marker = 'o', linestyle=None , markersize = 10 , label = '4-th Degree Polynomial') ax.legend(handles=[minimum , center , spectrum , interpolation][::-1] , fontsize = self.fs) #plt.legend(fontsize = self.fs) plt.show() return parameters