Module ambit_stochastics.helpers.forecasting_helpers
Expand source code
from .acf_functions import trawl_acf,corr_matrix_from_corr_vector
#import acf_functions
from scipy.stats import beta,gamma,norm
import numpy as np
def get_trawl_process_mean(levy_seed,levy_seed_params):
#levy_seed_params is a tuple (not a tuple of tuples)
if levy_seed == 'gamma':
alpha,theta = levy_seed_params
return gamma(a = alpha, loc = 0, scale = theta).mean()
elif levy_seed == 'gaussian':
return levy_seed_params[0]
else:
raise ValueError('not yet implemented')
def conditional_gaussian_distribution(values,tau,nr_steps_ahead,max_gaussian_lag,levy_seed_params,envelope,envelope_params,envelope_function=None):
acf_function_helper = trawl_acf(envelope, envelope_function)
acf_function = lambda t: acf_function_helper(t,envelope_params)
mu_,scale_ = levy_seed_params
joints = [np.array(values[i:i+max_gaussian_lag]) for i in range(0,len(values) - max_gaussian_lag +1 )]
mu_1 = mu_ #mean of X_{gaussian_lags + nr_steps_ahead}
mu_2 = mu_ * np.ones(max_gaussian_lag) #mean of (X_1,...,X_{gaussian_lags})
sigma_11 = scale_**2
sigma_22 = scale_**2 * corr_matrix_from_corr_vector(acf_function(np.array([i*tau for i in range(max_gaussian_lag)])))
sigma_21 = scale_**2 * (acf_function(np.array([i*tau for i in range(nr_steps_ahead, max_gaussian_lag+nr_steps_ahead)])))[::-1]
sigma_12 = sigma_21
sigma_22_inv = np.linalg.inv(sigma_22)
conditional_mean = [mu_1 + sigma_12 @ sigma_22_inv @ (joint -mu_2) for joint in joints]
conditional_var = sigma_11 - sigma_12 @ sigma_22_inv @ sigma_21
return np.array(conditional_mean), conditional_var**0.5
def deterministic_forecasting_sanity_checks(values,tau,nr_steps_ahead,levy_seed,levy_seed_params,envelope_params):
assert isinstance(values,np.ndarray)
assert isinstance(tau,float) and tau > 0
assert isinstance(nr_steps_ahead,int) and nr_steps_ahead > 0
assert levy_seed in ['gaussian','gamma','custom']
assert isinstance(levy_seed_params,np.ndarray)
#assert envelope and envelope_function checked in trawl_acf
assert isinstance(envelope_params,np.ndarray)
def probabilistic_forecasting_sanity_checks(values,tau,nr_steps_ahead,levy_seed,levy_seed_params,nr_samples,envelope_params):
assert isinstance(values,np.ndarray)
assert isinstance(tau,float) and tau > 0
assert isinstance(nr_steps_ahead,int) and nr_steps_ahead > 0
assert levy_seed in ['gaussian','gamma','custom']
assert isinstance(levy_seed_params,np.ndarray)
#assert envelope and envelope_function checked in trawl_acf
assert isinstance(envelope_params,np.ndarray)
def deterministic_forecasting(tau, nr_steps_ahead,values,levy_seed,levy_seed_params,envelope,
envelope_params, envelope_function = None, max_gaussian_lag = None):
deterministic_forecasting_sanity_checks(values, tau, nr_steps_ahead, levy_seed,levy_seed_params,envelope_params)
if envelope == 'gaussian':
assert isinstance(max_gaussian_lag,int) and max_gaussian_lag > 0
conditional_mean,_ = conditional_gaussian_distribution(values,tau,nr_steps_ahead,max_gaussian_lag,levy_seed_params,envelope,envelope_params,envelope_function=None)
return conditional_mean
else:
acf_function_helper = trawl_acf(envelope, envelope_function)
overlap_area = acf_function_helper(tau * nr_steps_ahead,envelope_params)
#print (overlap_area,type(values),get_trawl_process_mean(levy_seed,levy_seed_params) )
return overlap_area * values + (1-overlap_area) * get_trawl_process_mean(levy_seed,levy_seed_params)
def probabilistic_forecasting(tau,nr_steps_ahead,values,levy_seed,levy_seed_params,envelope,
envelope_params, nr_samples, envelope_function = None, max_gaussian_lag = None):
"""assumes the area of the lebesgue measure is 1
values is a 1 dimensional array """
probabilistic_forecasting_sanity_checks(values,tau,nr_steps_ahead,levy_seed,levy_seed_params,nr_samples,envelope_params)
acf_function_helper = trawl_acf(envelope, envelope_function)
acf_function = lambda x: acf_function_helper(x,envelope_params)
overlap_area = acf_function(tau*nr_steps_ahead)
if levy_seed == 'gaussian':
assert isinstance(max_gaussian_lag,int) and max_gaussian_lag > 0
conditional_mean,conditional_scale = conditional_gaussian_distribution(values,tau,nr_steps_ahead,max_gaussian_lag,
levy_seed_params,envelope,envelope_params,envelope_function)
return np.array([norm.rvs(loc = i, scale = conditional_scale, size=nr_samples) for i in conditional_mean])
elif levy_seed == 'gamma':
alpha,theta = levy_seed_params
alpha0 = alpha * overlap_area
alpha1 = alpha * (1-overlap_area)
print('before overlap')
overlap_samples = values[:,np.newaxis] * beta.rvs(a = alpha0, b = alpha1, size = [len(values),nr_samples])
print('before independent')
independent_samples = gamma.rvs(a = alpha1, loc = 0, scale = theta, size = nr_samples)[np.newaxis,:]
print('after independent')
elif levy_seed in ['invgauss','gig','cauchy','student']:
raise ValueError('not yet implemented')
return overlap_samples + independent_samples
Functions
def conditional_gaussian_distribution(values, tau, nr_steps_ahead, max_gaussian_lag, levy_seed_params, envelope, envelope_params, envelope_function=None)
-
Expand source code
def conditional_gaussian_distribution(values,tau,nr_steps_ahead,max_gaussian_lag,levy_seed_params,envelope,envelope_params,envelope_function=None): acf_function_helper = trawl_acf(envelope, envelope_function) acf_function = lambda t: acf_function_helper(t,envelope_params) mu_,scale_ = levy_seed_params joints = [np.array(values[i:i+max_gaussian_lag]) for i in range(0,len(values) - max_gaussian_lag +1 )] mu_1 = mu_ #mean of X_{gaussian_lags + nr_steps_ahead} mu_2 = mu_ * np.ones(max_gaussian_lag) #mean of (X_1,...,X_{gaussian_lags}) sigma_11 = scale_**2 sigma_22 = scale_**2 * corr_matrix_from_corr_vector(acf_function(np.array([i*tau for i in range(max_gaussian_lag)]))) sigma_21 = scale_**2 * (acf_function(np.array([i*tau for i in range(nr_steps_ahead, max_gaussian_lag+nr_steps_ahead)])))[::-1] sigma_12 = sigma_21 sigma_22_inv = np.linalg.inv(sigma_22) conditional_mean = [mu_1 + sigma_12 @ sigma_22_inv @ (joint -mu_2) for joint in joints] conditional_var = sigma_11 - sigma_12 @ sigma_22_inv @ sigma_21 return np.array(conditional_mean), conditional_var**0.5
def deterministic_forecasting(tau, nr_steps_ahead, values, levy_seed, levy_seed_params, envelope, envelope_params, envelope_function=None, max_gaussian_lag=None)
-
Expand source code
def deterministic_forecasting(tau, nr_steps_ahead,values,levy_seed,levy_seed_params,envelope, envelope_params, envelope_function = None, max_gaussian_lag = None): deterministic_forecasting_sanity_checks(values, tau, nr_steps_ahead, levy_seed,levy_seed_params,envelope_params) if envelope == 'gaussian': assert isinstance(max_gaussian_lag,int) and max_gaussian_lag > 0 conditional_mean,_ = conditional_gaussian_distribution(values,tau,nr_steps_ahead,max_gaussian_lag,levy_seed_params,envelope,envelope_params,envelope_function=None) return conditional_mean else: acf_function_helper = trawl_acf(envelope, envelope_function) overlap_area = acf_function_helper(tau * nr_steps_ahead,envelope_params) #print (overlap_area,type(values),get_trawl_process_mean(levy_seed,levy_seed_params) ) return overlap_area * values + (1-overlap_area) * get_trawl_process_mean(levy_seed,levy_seed_params)
def deterministic_forecasting_sanity_checks(values, tau, nr_steps_ahead, levy_seed, levy_seed_params, envelope_params)
-
Expand source code
def deterministic_forecasting_sanity_checks(values,tau,nr_steps_ahead,levy_seed,levy_seed_params,envelope_params): assert isinstance(values,np.ndarray) assert isinstance(tau,float) and tau > 0 assert isinstance(nr_steps_ahead,int) and nr_steps_ahead > 0 assert levy_seed in ['gaussian','gamma','custom'] assert isinstance(levy_seed_params,np.ndarray) #assert envelope and envelope_function checked in trawl_acf assert isinstance(envelope_params,np.ndarray)
def get_trawl_process_mean(levy_seed, levy_seed_params)
-
Expand source code
def get_trawl_process_mean(levy_seed,levy_seed_params): #levy_seed_params is a tuple (not a tuple of tuples) if levy_seed == 'gamma': alpha,theta = levy_seed_params return gamma(a = alpha, loc = 0, scale = theta).mean() elif levy_seed == 'gaussian': return levy_seed_params[0] else: raise ValueError('not yet implemented')
def probabilistic_forecasting(tau, nr_steps_ahead, values, levy_seed, levy_seed_params, envelope, envelope_params, nr_samples, envelope_function=None, max_gaussian_lag=None)
-
assumes the area of the lebesgue measure is 1 values is a 1 dimensional array
Expand source code
def probabilistic_forecasting(tau,nr_steps_ahead,values,levy_seed,levy_seed_params,envelope, envelope_params, nr_samples, envelope_function = None, max_gaussian_lag = None): """assumes the area of the lebesgue measure is 1 values is a 1 dimensional array """ probabilistic_forecasting_sanity_checks(values,tau,nr_steps_ahead,levy_seed,levy_seed_params,nr_samples,envelope_params) acf_function_helper = trawl_acf(envelope, envelope_function) acf_function = lambda x: acf_function_helper(x,envelope_params) overlap_area = acf_function(tau*nr_steps_ahead) if levy_seed == 'gaussian': assert isinstance(max_gaussian_lag,int) and max_gaussian_lag > 0 conditional_mean,conditional_scale = conditional_gaussian_distribution(values,tau,nr_steps_ahead,max_gaussian_lag, levy_seed_params,envelope,envelope_params,envelope_function) return np.array([norm.rvs(loc = i, scale = conditional_scale, size=nr_samples) for i in conditional_mean]) elif levy_seed == 'gamma': alpha,theta = levy_seed_params alpha0 = alpha * overlap_area alpha1 = alpha * (1-overlap_area) print('before overlap') overlap_samples = values[:,np.newaxis] * beta.rvs(a = alpha0, b = alpha1, size = [len(values),nr_samples]) print('before independent') independent_samples = gamma.rvs(a = alpha1, loc = 0, scale = theta, size = nr_samples)[np.newaxis,:] print('after independent') elif levy_seed in ['invgauss','gig','cauchy','student']: raise ValueError('not yet implemented') return overlap_samples + independent_samples
def probabilistic_forecasting_sanity_checks(values, tau, nr_steps_ahead, levy_seed, levy_seed_params, nr_samples, envelope_params)
-
Expand source code
def probabilistic_forecasting_sanity_checks(values,tau,nr_steps_ahead,levy_seed,levy_seed_params,nr_samples,envelope_params): assert isinstance(values,np.ndarray) assert isinstance(tau,float) and tau > 0 assert isinstance(nr_steps_ahead,int) and nr_steps_ahead > 0 assert levy_seed in ['gaussian','gamma','custom'] assert isinstance(levy_seed_params,np.ndarray) #assert envelope and envelope_function checked in trawl_acf assert isinstance(envelope_params,np.ndarray)