Source code for pydelfi.score

from scipy.stats import multivariate_normal
import numpy as np
import tqdm

[docs]def isnotebook(): try: shell = get_ipython().__class__.__name__ if shell == 'ZMQInteractiveShell': return True # Jupyter notebook or qtconsole elif shell == 'TerminalInteractiveShell': return False # Terminal running IPython else: return False # Other type (?) except NameError: return False
[docs]class Gaussian(): def __init__(self, ndata, theta_fiducial, mu = None, Cinv = None, dmudt = None, dCdt = None, F = None, prior_mean = None, prior_covariance = None, rank=0, n_procs=1, comm=None, red_op=None): # Load inputs self.theta_fiducial = theta_fiducial self.npar = len(theta_fiducial) self.ndata = ndata self.mu = mu self.Cinv = Cinv self.dmudt = dmudt self.dCdt = dCdt self.prior_mean = prior_mean self.prior_covariance = prior_covariance if F is None: self.F = None self.Finv = None else: self.F = F self.Finv = np.linalg.inv(F) # Holder to store any simulations and parameter values that get ran self.simulations = np.array([]).reshape((0,self.ndata)) self.parameters = np.array([]).reshape((0,self.npar)) # MPI-specific setup self.rank = rank self.n_procs = n_procs if n_procs > 1: self.use_mpi = True self.comm = comm self.red_op = red_op else: self.use_mpi = False # Are we in a jupyter notebook or not? self.nb = isnotebook() # Divide list of jobs between MPI processes
[docs] def allocate_jobs(self, n_jobs): n_j_allocated = 0 for i in range(self.n_procs): n_j_remain = n_jobs - n_j_allocated n_p_remain = self.n_procs - i n_j_to_allocate = int(n_j_remain / n_p_remain) if self.rank == i: return range(n_j_allocated, \ n_j_allocated + n_j_to_allocate) n_j_allocated += n_j_to_allocate
# Combine arrays from all processes assuming # 1) array was initially zero # 2) each process has edited a unique slice of the array
[docs] def complete_array(self, target_distrib): if self.use_mpi: target = np.zeros(target_distrib.shape, \ dtype=target_distrib.dtype) self.comm.Allreduce(target_distrib, target, \ op=self.red_op) else: target = target_distrib return target
# Compute the mean and covariance
[docs] def compute_mean_covariance(self, simulator, nsims, simulator_args = None, seed_generator = None, progress_bar=True, sub_batch=1): # Set the random seed generator if seed_generator is not None: seed_generator = seed_generator else: seed_generator = lambda: np.random.randint(2147483647) sims = np.zeros((nsims*sub_batch, self.ndata)) # Allocate jobs according to MPI inds = self.allocate_jobs(nsims) # Run the simulations with MPI if progress_bar: if self.nb: pbar = tqdm.tqdm_notebook(total = inds[-1]+1, desc = "Covariance simulations") else: pbar = tqdm.tqdm(total = inds[-1]+1, desc = "Covariance simulations") for i in range(inds[-1]+1): seed = seed_generator() sims[i*sub_batch:i*sub_batch+sub_batch,:] = simulator(self.theta_fiducial, seed, simulator_args, sub_batch) if progress_bar: pbar.update(1) # Collect all the sims together from all the processes sims = self.complete_array(sims) # Now compute the covariance and mean self.mu = np.zeros((self.ndata)) mu2 = np.zeros((self.ndata, self.ndata)) for i in range(0, nsims*sub_batch): self.mu += sims[i,:]/(nsims*sub_batch) mu2 += np.outer(sims[i,:], sims[i,:])/(nsims*sub_batch) self.C = mu2 - np.outer(self.mu,self.mu) self.Cinv = np.linalg.inv(self.C) # Save the simulations self.simulations = sims self.parameters = np.array([self.theta_fiducial for i in range(nsims*sub_batch)])
[docs] def compute_derivatives(self, simulator, nsims, h, simulator_args = None, seed_generator = None, progress_bar=True, sub_batch=1): # Set the random seed generator if seed_generator is not None: seed_generator = seed_generator else: seed_generator = lambda: np.random.randint(2147483647) sims_dash = np.zeros((nsims*sub_batch, self.ndata)) theta = np.zeros((nsims*sub_batch, self.npar)) # Allocate jobs according to MPI inds = self.allocate_jobs(nsims) # Initialize the derivatives dmudt = np.zeros((nsims, self.npar, self.ndata)) # Run seed matched simulations for derivatives if progress_bar: if self.nb: pbar = tqdm.tqdm_notebook(total = (inds[-1]+1)*self.npar, desc = "Derivative simulations") else: pbar = tqdm.tqdm(total = (inds[-1]+1)*self.npar, desc = "Derivative simulations") for k in range(inds[-1]+1): # Set random seed seed = seed_generator() # Fiducial simulation (mean over batch of outputs) d_fiducial = np.mean(np.atleast_2d(simulator(self.theta_fiducial, seed, simulator_args, sub_batch)), axis=0) # Loop over parameters for i in range(0, self.npar): # Step theta theta[k*sub_batch:(k+1)*sub_batch, :] = np.copy(self.theta_fiducial) theta[k*sub_batch:(k+1)*sub_batch, i] += h[i] # Shifted simulation with same seed sims_dash[k*sub_batch:(k+1)*sub_batch, :] = np.atleast_2d(simulator(theta[k], seed, simulator_args, sub_batch)) d_dash = np.mean(sims_dash[k*sub_batch:(k+1)*sub_batch, :], axis = 0) # Forward step derivative dmudt[k, i, :] = (d_dash - d_fiducial)/h[i] if progress_bar: pbar.update(1) sims_dash = self.complete_array(sims_dash) dmudt = self.complete_array(dmudt) self.dmudt = np.mean(dmudt, axis = 0) self.simulations = np.concatenate([self.simulations, sims_dash]) self.parameters = np.concatenate([self.parameters, theta])
# Fisher score maximum likelihood estimator
[docs] def scoreMLE(self, d): if self.F is None: print("Fisher matrix not computed yet: please make sure the neccesary bits (mean, covariance, derivatives) are provided and then call compute_fisher()") return None # Compute the score dLdt = np.zeros(self.npar) # Add terms from mean derivatives for a in range(self.npar): dLdt[a] += np.dot(self.dmudt[a,:], np.dot(self.Cinv, (d - self.mu))) # Add terms from covariance derivatives if self.dCdt is not None: for a in range(self.npar): dLdt[a] += -0.5*np.trace(np.dot(self.Cinv, self.dCdt[a,:,:])) + 0.5*np.dot((d - self.mu), np.dot( np.dot(self.Cinv, np.dot(self.dCdt[a,:,:], self.Cinv)), (d - self.mu))) # Cast to MLE t = self.theta_fiducial + np.dot(self.Finv, dLdt) # Correct for gaussian prior if one is provided if self.prior_mean is not None: t += np.dot(self.Finv, np.dot(np.linalg.inv(self.prior_covariance), self.prior_mean - self.theta_fiducial)) return t
# Fisher matrix
[docs] def compute_fisher(self): # Fisher matrix F = np.zeros((self.npar, self.npar)) # Mean derivatives part for a in range(0, self.npar): for b in range(0, self.npar): F[a, b] += 0.5*(np.dot(self.dmudt[a,:], np.dot(self.Cinv, self.dmudt[b,:])) + np.dot(self.dmudt[b,:], np.dot(self.Cinv, self.dmudt[a,:]))) # Covariance derivatives part if self.dCdt is not None: for a in range(0, self.npar): for b in range(0, self.npar): F[a, b] += 0.5*np.trace( np.dot( np.dot(self.Cinv, self.dCdt[a,:,:]), np.dot(self.Cinv, self.dCdt[b,:,:]) ) ) # Add the prior covariance if one is provided if self.prior_covariance is not None: F = F + np.linalg.inv(self.prior_covariance) self.F = F self.Finv = np.linalg.inv(F)
# Nuisance projected score
[docs] def projected_scoreMLE(self, d, nuisances): # indices for interesting parameters interesting = np.delete(np.arange(self.npar), nuisances) n_interesting = len(interesting) n_nuisance = len(nuisances) # Compute projection vectors P = np.zeros((n_interesting, n_nuisance)) Fnn_inv = np.linalg.inv(np.delete(np.delete(self.F, interesting, axis = 0), interesting, axis = 1)) Finv_tt = np.delete(np.delete(self.Finv, nuisances, axis=0), nuisances, axis=1) for i in range(n_interesting): P[i,:] = np.dot(Fnn_inv, self.F[i,nuisances]) # Compute the score dLdt = np.zeros(self.npar) # Add terms from mean derivatives for a in range(self.npar): dLdt[a] += np.dot(self.dmudt[a,:], np.dot(self.Cinv, (d - self.mu))) # Add terms from covariance derivatives if self.dCdt is not None: for a in range(self.npar): dLdt[a] += -0.5*np.trace(np.dot(self.Cinv, self.dCdt[a,:,:])) + 0.5*np.dot((d - self.mu), np.dot( np.dot(self.Cinv, np.dot(self.dCdt[a,:,:], self.Cinv)), (d - self.mu))) # Do the projection dLdt_projected = np.zeros(n_interesting) for a in range(n_interesting): dLdt_projected[a] = dLdt[a] - np.dot(P[a], dLdt[nuisances]) # Cast it back into an MLE t = np.dot(Finv_tt, dLdt_projected) + self.theta_fiducial[interesting] # Correct for the prior if one is provided if self.prior_mean is not None: Qinv_tt = np.delete(np.delete(np.linalg.inv(self.prior_covariance), nuisances, axis=0), nuisances, axis=1) t += np.dot(Finv_tt, np.dot(Qinv_tt, self.prior_mean[interesting] - self.theta_fiducial[interesting])) return t
[docs]class Wishart(): def __init__(self, theta_fiducial, nu, Cinv, dCdt, F = None, prior_mean = None, prior_covariance = None): # Load inputs self.theta_fiducial = theta_fiducial self.npar = len(theta_fiducial) self.ndata = len(Cinv) self.Cinv = Cinv self.dCdt = dCdt self.nu = nu self.prior_mean = prior_mean self.prior_covariance = prior_covariance # Compute the Fisher matrix or use pre-loaded if F is not None: self.F = F else: self.F = self.fisher() self.Finv = np.linalg.inv(self.F) # Fisher score maximum likelihood estimator
[docs] def scoreMLE(self, d): # Compute the score dLdt = np.zeros(self.npar) for a in range(self.npar): for l in range(self.ndata): dLdt[a] += self.nu[l]*(-0.5*np.trace(np.dot(self.Cinv[l,:,:], self.dCdt[a,l,:,:])) + 0.5*np.trace(np.dot( np.dot(self.Cinv[l,:,:], np.dot(self.dCdt[a,l,:,:], self.Cinv[l,:,:])), d[l,:,:]) ) ) # Make it an MLE t = np.dot(self.Finv, dLdt) + self.theta_fiducial # Correct for prior if there is one if self.prior_covariance is not None: t += np.dot(self.Finv, np.dot(np.linalg.inv(self.prior_covariance), self.prior_mean - self.theta_fiducial)) # Return summary statistics return t
# Fisher matrix
[docs] def fisher(self): # Fisher matrix F = np.zeros((self.npar, self.npar)) for a in range(self.npar): for b in range(self.npar): for l in range(self.ndata): F[a,b] += 0.5*self.nu[l]*np.trace( np.dot(self.Cinv[l,:,:], np.dot(self.dCdt[a,l,:,:], np.dot(self.Cinv[l,:,:], self.dCdt[b,l,:,:]) ) )) # Add prior covariance if there is one if self.prior_covariance is not None: F = F + np.linalg.inv(self.prior_covariance) return F
# Nuisance projected score
[docs] def projected_scoreMLE(self, d, nuisances): # indices for interesting parameters interesting = np.delete(np.arange(self.npar), nuisances) n_interesting = len(interesting) n_nuisance = len(nuisances) # Compute projection vectors P = np.zeros((n_interesting, n_nuisance)) Fnn_inv = np.linalg.inv(np.delete(np.delete(self.F, interesting, axis = 0), interesting, axis = 1)) Finv_tt = np.delete(np.delete(self.Finv, nuisances, axis=0), nuisances, axis=1) for i in range(n_interesting): P[i,:] = np.dot(Fnn_inv, self.F[i,nuisances]) # Compute the score dLdt = np.zeros(self.npar) for a in range(self.npar): for l in range(self.ndata): dLdt[a] += self.nu[l]*(-0.5*np.trace(np.dot(self.Cinv[l,:,:], self.dCdt[a,l,:,:])) + 0.5*np.trace(np.dot( np.dot(self.Cinv[l,:,:], np.dot(self.dCdt[a,l,:,:], self.Cinv[l,:,:])), d[l,:,:]) ) ) # Do the projection dLdt_projected = np.zeros(n_interesting) for a in range(n_interesting): dLdt_projected[a] = dLdt[a] - np.dot(P[a], dLdt[nuisances]) # Cast it back into an MLE t = np.dot(Finv_tt, dLdt_projected) + self.theta_fiducial[interesting] # Correct for the prior if one is provided if self.prior_mean is not None: Qinv_tt = np.delete(np.delete(np.linalg.inv(self.prior_covariance), nuisances, axis=0), nuisances, axis=1) t += np.dot(Finv_tt, np.dot(Qinv_tt, self.prior_mean[interesting] - self.theta_fiducial[interesting])) return t