Module ambit_stochastics.helpers.acf_functions

Expand source code
from statsmodels.tsa.stattools import acf
from scipy.integrate import quad
from scipy.optimize import minimize
import numpy as np

def corr_matrix_from_corr_vector(corr_vector):
    """inputs a vector = [corr(0),corr(s),...,corr((k-1)*s)] and outputs the arrray
    Sigma_tilde_ij = overlap area at lag |i-j| = corr(|i-j|*s)
    """
    if isinstance(corr_vector,np.ndarray):
        assert len(corr_vector.shape) == 1
        corr_vector = tuple(corr_vector)
    assert isinstance(corr_vector,(tuple,list))
    
    k = len(corr_vector)
    corr_matrix = [corr_vector[1:i+1][::-1] + corr_vector[:k-i] for i in range(k)]
    return np.array(corr_matrix)

#in the following correlation functions corr_exponnetial,corr_gama,corr_ig      h > 0
def corr_exponential_envelope(h,params):
     u = params[0]
     return np.exp(-u * h)

def corr_gamma_envelope(h,params):
    H,delta = params
    return (1+h/delta)**(-H)

def corr_ig_envelope(h,params):
    gamma,delta =  params
    return np.exp(delta * gamma *(1-np.sqrt(2*h/gamma**2+1)))

def trawl_acf(envelope, envelope_function=None):
    assert envelope in ['exponential','gamma','ig','custom'],'please check the value of envelope'
    
    if envelope == "custom":
        """describe how to specify envelope_function"""
        assert callable(envelope_function)
        
        def corr_other(h,params):
            return quad(envelope_function(params), a=-np.inf, b=-h)[0] / quad(envelope_function(params), a=-np.inf, b=0)[0]

        return corr_other
    
    else:
        
        assert envelope_function == None
        
        if envelope == "exponential":
            return corr_exponential_envelope

        if envelope == "gamma":
            return corr_gamma_envelope

        if envelope == "ig":
            return corr_ig_envelope


def bounds_and_initial_guess_for_acf_params(envelope):
    assert envelope in ['exponential','gamma','ig']
    
    if envelope == 'exponential':
        bounds = ((0,np.inf),)
        initial_guess = (1,)
        
    elif envelope  == 'gamma' or envelope == 'ig':
        bounds = ((0.0001,np.inf),(0.0001,np.inf))
        initial_guess = (1,1)
    
    
    return bounds,initial_guess


def fit_trawl_envelope_gmm(s,simulations,lags,envelope,initial_guess = None,
                           bounds = None, envelope_function = None):

    
    #parameter checks
    assert isinstance(s,(float,int))
    assert isinstance(lags,tuple)
    assert envelope in ['exponential','gamma','ig','custom']
    assert len(simulations.shape) == 2
    #assert isinstance(envelope_params,tuple)
    
    assert (isinstance(initial_guess,tuple) and all(isinstance(i,tuple) for i in initial_guess)) or initial_guess     == None
    assert isinstance(bounds,tuple)        or bounds            == None
    assert callable(envelope_function)     or envelope_function == None 

    
    theoretical_acf_func = trawl_acf(envelope, envelope_function)
    empirical_acf   = np.apply_along_axis(lambda x: acf(x,nlags = max(lags)),arr = simulations,axis=1)
    empirical_acf   = empirical_acf[:,lags]
    
    #this will look s up in the `fit_trawl_envelope_gmm` scope
    def criterion(params,empirical_acf_row):
        theoretical = np.array([theoretical_acf_func(s*i,params) for i in lags])
        return np.sum((empirical_acf_row - theoretical)**2)
        
    if envelope == 'custom':
        #must pass the envelope function and the initial guess
        assert isinstance(initial_guess,tuple) 
        assert callable(envelope_function)
    
    if envelope != 'custom':
        bounds,_ = bounds_and_initial_guess_for_acf_params(envelope)
        if initial_guess  == None:
            initial_guess = tuple([_ for index_ in range(len(empirical_acf))])
    
    #if the custom function has no bounds
    #if bounds == None:
    #    result = [minimize(criterion,x0 = initial_guess, args= (empirical_acf_row,),
    #                                 method='BFGS').x for empirical_acf_row in empirical_acf]
    #    
    #in all the other cases, we have bounds    
    #else:
    result = [minimize(criterion,x0 = initial_guess[j], args= (empirical_acf[j],),
                       method='L-BFGS-B', bounds = bounds).x for j in range(len(empirical_acf))]
        
    return np.array(result)

Functions

def bounds_and_initial_guess_for_acf_params(envelope)
Expand source code
def bounds_and_initial_guess_for_acf_params(envelope):
    assert envelope in ['exponential','gamma','ig']
    
    if envelope == 'exponential':
        bounds = ((0,np.inf),)
        initial_guess = (1,)
        
    elif envelope  == 'gamma' or envelope == 'ig':
        bounds = ((0.0001,np.inf),(0.0001,np.inf))
        initial_guess = (1,1)
    
    
    return bounds,initial_guess
def corr_exponential_envelope(h, params)
Expand source code
def corr_exponential_envelope(h,params):
     u = params[0]
     return np.exp(-u * h)
def corr_gamma_envelope(h, params)
Expand source code
def corr_gamma_envelope(h,params):
    H,delta = params
    return (1+h/delta)**(-H)
def corr_ig_envelope(h, params)
Expand source code
def corr_ig_envelope(h,params):
    gamma,delta =  params
    return np.exp(delta * gamma *(1-np.sqrt(2*h/gamma**2+1)))
def corr_matrix_from_corr_vector(corr_vector)

inputs a vector = [corr(0),corr(s),…,corr((k-1)s)] and outputs the arrray Sigma_tilde_ij = overlap area at lag |i-j| = corr(|i-j|s)

Expand source code
def corr_matrix_from_corr_vector(corr_vector):
    """inputs a vector = [corr(0),corr(s),...,corr((k-1)*s)] and outputs the arrray
    Sigma_tilde_ij = overlap area at lag |i-j| = corr(|i-j|*s)
    """
    if isinstance(corr_vector,np.ndarray):
        assert len(corr_vector.shape) == 1
        corr_vector = tuple(corr_vector)
    assert isinstance(corr_vector,(tuple,list))
    
    k = len(corr_vector)
    corr_matrix = [corr_vector[1:i+1][::-1] + corr_vector[:k-i] for i in range(k)]
    return np.array(corr_matrix)
def fit_trawl_envelope_gmm(s, simulations, lags, envelope, initial_guess=None, bounds=None, envelope_function=None)
Expand source code
def fit_trawl_envelope_gmm(s,simulations,lags,envelope,initial_guess = None,
                           bounds = None, envelope_function = None):

    
    #parameter checks
    assert isinstance(s,(float,int))
    assert isinstance(lags,tuple)
    assert envelope in ['exponential','gamma','ig','custom']
    assert len(simulations.shape) == 2
    #assert isinstance(envelope_params,tuple)
    
    assert (isinstance(initial_guess,tuple) and all(isinstance(i,tuple) for i in initial_guess)) or initial_guess     == None
    assert isinstance(bounds,tuple)        or bounds            == None
    assert callable(envelope_function)     or envelope_function == None 

    
    theoretical_acf_func = trawl_acf(envelope, envelope_function)
    empirical_acf   = np.apply_along_axis(lambda x: acf(x,nlags = max(lags)),arr = simulations,axis=1)
    empirical_acf   = empirical_acf[:,lags]
    
    #this will look s up in the `fit_trawl_envelope_gmm` scope
    def criterion(params,empirical_acf_row):
        theoretical = np.array([theoretical_acf_func(s*i,params) for i in lags])
        return np.sum((empirical_acf_row - theoretical)**2)
        
    if envelope == 'custom':
        #must pass the envelope function and the initial guess
        assert isinstance(initial_guess,tuple) 
        assert callable(envelope_function)
    
    if envelope != 'custom':
        bounds,_ = bounds_and_initial_guess_for_acf_params(envelope)
        if initial_guess  == None:
            initial_guess = tuple([_ for index_ in range(len(empirical_acf))])
    
    #if the custom function has no bounds
    #if bounds == None:
    #    result = [minimize(criterion,x0 = initial_guess, args= (empirical_acf_row,),
    #                                 method='BFGS').x for empirical_acf_row in empirical_acf]
    #    
    #in all the other cases, we have bounds    
    #else:
    result = [minimize(criterion,x0 = initial_guess[j], args= (empirical_acf[j],),
                       method='L-BFGS-B', bounds = bounds).x for j in range(len(empirical_acf))]
        
    return np.array(result)
def trawl_acf(envelope, envelope_function=None)
Expand source code
def trawl_acf(envelope, envelope_function=None):
    assert envelope in ['exponential','gamma','ig','custom'],'please check the value of envelope'
    
    if envelope == "custom":
        """describe how to specify envelope_function"""
        assert callable(envelope_function)
        
        def corr_other(h,params):
            return quad(envelope_function(params), a=-np.inf, b=-h)[0] / quad(envelope_function(params), a=-np.inf, b=0)[0]

        return corr_other
    
    else:
        
        assert envelope_function == None
        
        if envelope == "exponential":
            return corr_exponential_envelope

        if envelope == "gamma":
            return corr_gamma_envelope

        if envelope == "ig":
            return corr_ig_envelope