Source code for physical_validation.util.ensemble

###########################################################################
#                                                                         #
#    physical_validation,                                                 #
#    a python package to test the physical validity of MD results         #
#                                                                         #
#    Written by Pascal T. Merz <pascal.merz@me.com>                       #
#               Michael R. Shirts <michael.shirts@colorado.edu>           #
#                                                                         #
#    Copyright (c) 2017-2021 University of Colorado Boulder               #
#              (c) 2012      The University of Virginia                   #
#                                                                         #
###########################################################################
r"""
This file reimplements most functionality of the checkensemble.py code
originally published on https://github.com/shirtsgroup/checkensemble. It
serves as the low-level functionality of the high-level module
:mod:`physical_validation.ensemble`.
"""
from typing import Dict, List, Optional, Tuple, Union

import numpy as np
import pymbar

try:
    # pymbar >= 4
    from pymbar.timeseries import statistical_inefficiency
except ImportError:
    # pymbar < 4
    from pymbar.timeseries import statisticalInefficiency as statistical_inefficiency

import scipy.optimize

from . import error as pv_error
from . import plot, trajectory


[docs]def pymbar_bar( work_forward: np.ndarray, work_backward: np.ndarray, ) -> Dict[str, float]: r""" Wrapper around the pymbar BAR functionality, allowing the remaining code to be agnostic of the pymbar version used. Parameters ---------- work_forward: Array of forward work values work_backward: Array of backward work values Returns ------- A dictionary containing the free energy estimate and its error estimate """ try: # pymbar >= 4 return pymbar.other_estimators.bar(work_forward, work_backward) except AttributeError: # pymbar < 4 return pymbar.BAR(work_forward, work_backward, return_dict=True)
[docs]def chemical_potential_energy( chemical_potential: np.ndarray, number_of_species: np.ndarray, ) -> np.ndarray: r""" Calculates the chemical potential energy of a trajectory by returning the sum of the current number of species multiplied by the chemical potential. """ assert chemical_potential.ndim == 1 assert number_of_species.ndim == 1 or number_of_species.ndim == 2 if number_of_species.ndim == 1: assert chemical_potential.size == 1 return chemical_potential[0] * number_of_species else: assert number_of_species.shape[1] == chemical_potential.size # chemical_potential: 1 x num_pot # number_of_species: num_frames x num_pot return np.sum(chemical_potential * number_of_species, axis=1)
[docs]def generate_histograms( traj1: np.ndarray, traj2: np.ndarray, g1: float, g2: float, bins: np.ndarray ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: n1 = np.size(traj1) n2 = np.size(traj2) h1 = np.histogram(traj1, bins=bins)[0] / n1 h2 = np.histogram(traj2, bins=bins)[0] / n2 dh1 = np.sqrt(g1 * h1 * (1 - h1) / n1) dh2 = np.sqrt(g2 * h2 * (1 - h2) / n2) return h1, h2, dh1, dh2
[docs]def do_linear_fit( traj1: np.ndarray, traj2: np.ndarray, g1: float, g2: float, bins: np.ndarray, screen: bool, filename: Optional[str], trueslope: float, trueoffset: float, units: Optional[str], xlabel: str, ylabel: str, ) -> Tuple[np.ndarray, np.ndarray]: h1, h2, dh1, dh2 = generate_histograms(traj1, traj2, g1, g2, bins) # v copied from checkensemble.py v ratio = np.log(h2 / h1) dratio = np.sqrt((dh1 / h1) ** 2 + (dh2 / h2) ** 2) usedat = np.isfinite(ratio) y = ratio[usedat] nuse = len(y) weights = 1.0 / dratio[usedat] xaxis = (bins[:-1] + bins[1:]) / 2 x = xaxis[usedat] x_mat = np.ones([nuse, 2]) x_mat[:, 1] = x w = np.diag(weights) wx = np.dot(w, x_mat) wy = np.dot(w, y) wx_t = np.transpose(wx) z = np.dot(wx_t, wx) wxy = np.dot(wx_t, wy) a = np.linalg.solve(z, wxy) da_matrix = np.transpose(np.linalg.inv(z)) da = np.zeros(2) da[0] = np.sqrt(da_matrix[0, 0]) da[1] = np.sqrt(da_matrix[1, 1]) # the true line is y = df + dp*x, where y is ln P_1(X)/P_2(X) # ^ end copied from checkensemble.py ^ do_plot = screen or filename is not None if do_plot: true = trueoffset + trueslope * xaxis fit = a[0] + a[1] * xaxis data = [ {"x": xaxis, "y": ratio, "y_err": dratio, "name": "Simulation"}, {"x": xaxis, "y": fit, "name": "Fit to simulation"}, {"x": xaxis, "y": true, "name": "Analytical ratio"}, ] if units is not None: units = " [" + units + "]" else: units = "" annot = ( "{:.1f}".format(abs((a[1] - trueslope) / da[1])) + " quantiles (linear estimate)" ) plot.plot( data, legend="upper left", title="Log probability ratio", xlabel=xlabel + units, ylabel=ylabel, filename=filename, screen=screen, axtext=annot, ) return a, da
[docs]def do_max_likelihood_fit( traj1: np.ndarray, traj2: np.ndarray, g1: Union[float, np.ndarray], g2: Union[float, np.ndarray], init_params: np.ndarray, verbose: bool, ) -> Tuple[np.ndarray, np.ndarray]: # ============================================================= # # Define (negative) log-likelihood function and its derivatives # # ============================================================= # def log_likelihood(a, ene1, ene2): # Returns negative of eq (8) of check_ensemble paper # # Uses log (1/f(x)) == -log(f(x)) # and log(1 + e^x) == log(e^x (e^-x + 1)) == x + log(1 + e^-x) # ^(a) ^(b) # form (a) -> 0 for x->-inf, -> inf for x->inf # form (b) -> NaN for x->-inf, -> x for x->inf # combined: -> 0 for x-> -inf, -> x for x-> inf def log_1_plus_exp(y): def f(yy): with np.errstate(over="raise"): try: xx = np.log(1 + np.exp(yy)) except FloatingPointError: xx = yy + np.log(1 + np.exp(-yy)) return xx return np.vectorize(f)(y) if a.size == 2: return np.sum(log_1_plus_exp(a[0] + a[1] * ene1)) + np.sum( log_1_plus_exp(-a[0] - a[1] * ene2) ) else: return np.sum( log_1_plus_exp(a[0] + a[1] * ene1[0] + a[2] * ene1[1]) ) + np.sum(log_1_plus_exp(-a[0] - a[1] * ene2[0] - a[2] * ene2[1])) def da_log_likelihood(a, ene1, ene2): # Returns the first derivative wrt the parameters a of log_likelihood # # d/da0 log(1 + exp(a0 + a1*E)) == exp(a0 + a1*E) / (1 + exp(a0 + a1*E)) # == 1 / (1 + exp(-a0 - a1*E)) # d/da1 log(1 + exp(a0 + a1*E)) == E * exp(a0 + a1*E) / (1 + exp(a0 + a1*E)) # == E / (1 + exp(-a0 - a1*E)) def inv_1_plus_exp(y): def f(yy): with np.errstate(over="raise"): try: xx = 1.0 / (1 + np.exp(yy)) except FloatingPointError: xx = 0.0 return xx return np.vectorize(f)(y) if a.size == 2: d = np.zeros(2) d[0] = np.sum(inv_1_plus_exp(-a[0] - a[1] * ene1)) - np.sum( inv_1_plus_exp(a[0] + a[1] * ene2) ) d[1] = np.sum(inv_1_plus_exp(-a[0] - a[1] * ene1) * ene1) - np.sum( inv_1_plus_exp(a[0] + a[1] * ene2) * ene2 ) else: d = np.zeros(3) d[0] = np.sum( inv_1_plus_exp(-a[0] - a[1] * ene1[0] - a[2] * ene1[1]) ) - np.sum(inv_1_plus_exp(a[0] + a[1] * ene2[0] + a[2] * ene2[1])) d[1] = np.sum( inv_1_plus_exp(-a[0] - a[1] * ene1[0] - a[2] * ene1[1]) * ene1[0] ) - np.sum(inv_1_plus_exp(a[0] + a[1] * ene2[0] + a[2] * ene2[1]) * ene2[0]) d[2] = np.sum( inv_1_plus_exp(-a[0] - a[1] * ene1[0] - a[2] * ene1[1]) * ene1[1] ) - np.sum(inv_1_plus_exp(a[0] + a[1] * ene2[0] + a[2] * ene2[1]) * ene2[1]) return d def hess_log_likelihood(a, ene1, ene2): # Returns the hessian wrt the parameters a of log_likelihood # fac1 = 1 / (2 + 2*cosh(a0 + a1*ene1)) # h1 = [[ fac1, ene1*fac1 ], # [ ene1*fac1, ene1**2*fac1 ]] # fac2 = 1 / (2 + 2*cosh(a0 + a1*ene2)) # h2 = [[ fac2, ene2*fac2 ], # [ ene2*fac2, ene2**2*fac2 ]] # h = h1 + h2 if a.size == 2: fac1 = 1 / (2 + 2 * np.cosh(a[0] + a[1] * ene1)) fac2 = 1 / (2 + 2 * np.cosh(a[0] + a[1] * ene2)) h = np.zeros((2, 2)) h[0, 0] = np.sum(fac1) + np.sum(fac2) h[0, 1] = h[1, 0] = np.sum(ene1 * fac1) + np.sum(ene2 * fac2) h[1, 1] = np.sum(ene1 * ene1 * fac1) + np.sum(ene2 * ene2 * fac2) else: fac1 = 1 / (2 + 2 * np.cosh(a[0] + a[1] * ene1[0] + a[2] * ene1[1])) fac2 = 1 / (2 + 2 * np.cosh(a[0] + a[1] * ene2[0] + a[2] * ene2[1])) h = np.zeros((3, 3)) h[0, 0] = np.sum(fac1) + np.sum(fac2) h[1, 1] = np.sum(ene1[0] * ene1[0] * fac1) + np.sum( ene2[0] * ene2[0] * fac2 ) h[2, 2] = np.sum(ene1[1] * ene1[1] * fac1) + np.sum( ene2[1] * ene2[1] * fac2 ) h[0, 1] = h[1, 0] = np.sum(ene1[0] * fac1) + np.sum(ene2[0] * fac2) h[0, 2] = h[2, 0] = np.sum(ene1[1] * fac1) + np.sum(ene2[1] * fac2) h[1, 2] = h[2, 1] = np.sum(ene1[0] * ene1[1] * fac1) + np.sum( ene2[0] * ene2[1] * fac2 ) return h # ==================================================== # # Minimize the negative of the log likelihood function # # ==================================================== # min_res = checkensemble_solver( fun=log_likelihood, x0=init_params, args=(traj1, traj2), jac=da_log_likelihood, hess=hess_log_likelihood, ) # fallback options if not min_res.success: # built-in was unsuccessful if verbose: print( "Note: Max-Likelihood minimization failed using built-in method. " "Trying scipy method 'dogleg'." ) for variation in [1.0, 0.9, 1.1]: min_res = scipy.optimize.minimize( log_likelihood, x0=init_params * variation, args=(traj1, traj2), method="dogleg", jac=da_log_likelihood, hess=hess_log_likelihood, options=dict(initial_trust_radius=0.001), ) if min_res.success: break if not min_res.success: # dogleg was unsuccessful if verbose: print( "Note: Max-Likelihood minimization failed using built-in method. " "Trying scipy method 'nelder-mead'." ) for variation in [1.0, 0.9, 1.1]: min_res = scipy.optimize.minimize( log_likelihood, x0=init_params * variation, args=(traj1, traj2), method="nelder-mead", ) if min_res.success: break if not min_res.success: raise RuntimeError("MaxLikelihood: Unable to minimize function.") final_params = min_res.x # ======================= # # Calculate uncertainties # # ======================= # cov = np.linalg.inv(hess_log_likelihood(final_params, traj1, traj2)) final_error = np.sqrt(np.diag(cov)) * np.sqrt(np.average([g1, g2])) return final_params, final_error
[docs]def checkensemble_solver( fun, x0, args, jac, hess, tol=1e-10, maxiter=20 ) -> scipy.optimize.OptimizeResult: # This is the solver used in checkensemble, unchanged except for some # modernization / adaptation to coding style success = False x = np.array(x0) itol = 1e-2 rtol = 1e-2 lasttol = float("inf") lastx = x it = 0 gx = 0 nh = 0 for it in range(maxiter): gx = np.transpose(jac(x, *args)) nh = hess(x, *args) dx = np.linalg.solve(nh, gx) x -= dx rx = dx / x checktol = np.sqrt(np.dot(dx, dx)) checkrtol = np.sqrt(np.dot(rx, rx)) if checkrtol < tol: success = True break if checkrtol > 1.0 and checktol > lasttol: x = scipy.optimize.fmin_cg( fun, lastx, fprime=jac, gtol=itol, args=args, disp=False ) itol *= rtol lasttol = checktol lastx = x return scipy.optimize.OptimizeResult( x=x, success=success, nit=it, status=int(not success), fun=fun(x, *args), jac=gx, hess=nh, nfev=int(np.round(np.log(itol * 100) / np.log(rtol))), njev=it, nhev=it, )
[docs]def check_bins(traj1: np.ndarray, traj2: np.ndarray, bins: np.ndarray) -> np.ndarray: # check for empty bins h1, _ = np.histogram(traj1, bins=bins) h2, _ = np.histogram(traj2, bins=bins) empty = np.where((h1 == 0) | (h2 == 0))[0] if np.size(empty) == 0: return bins elif np.size(empty) == 1: empty = empty[0] if empty > np.size(bins) / 2: return bins[:empty] else: return bins[empty + 1 :] else: # find longest non-empty interval empty = np.insert(np.append(empty, [40]), 0, [-1]) max_interval = np.argmax(empty[1:] - empty[:-1]) left = empty[max_interval] + 1 right = empty[max_interval + 1] return bins[left:right]
[docs]def estimate_interval( ens_string: str, ens_temp: float, energy: np.ndarray, kb: float, ens_press: Optional[float], volume: Optional[np.ndarray], pvconvert: Optional[float], ens_mu: Optional[np.ndarray], species_number: Optional[np.ndarray], verbosity: int, cutoff: float, tunit: str, punit: str, munit: str, data_is_uncorrelated: bool, ) -> Dict[str, Union[float, List[float]]]: result = {} if ens_string == "NVT": # Discard burn-in period and time-correlated frames energy = trajectory.prepare( energy, cut=cutoff, verbosity=verbosity - 1, name="Energy", skip_preparation=data_is_uncorrelated, ) # dT sig = np.std(energy) result["dT"] = 2 * kb * ens_temp * ens_temp / sig elif ens_string == "NPT": enthalpy = energy + pvconvert * ens_press * volume traj_2d = np.array([energy, volume]) # Discard burn-in period and time-correlated frames enthalpy = trajectory.prepare( enthalpy, cut=cutoff, verbosity=verbosity - 1, name="Enthalpy", skip_preparation=data_is_uncorrelated, ) volume_1d = trajectory.prepare( volume, cut=cutoff, verbosity=verbosity - 1, name="Volume", skip_preparation=data_is_uncorrelated, ) traj_2d = trajectory.prepare( traj_2d, cut=cutoff, verbosity=verbosity - 1, name="2D-Trajectory", skip_preparation=data_is_uncorrelated, ) # dT sig = np.std(enthalpy) result["dT"] = 2 * kb * ens_temp * ens_temp / sig # dP sig = np.std(volume_1d) * pvconvert result["dP"] = 2 * kb * ens_temp / sig # dTdP cov = np.cov(traj_2d) sig = np.sqrt(np.diag(cov)) sig[1] *= pvconvert result["dTdP"] = [ 2 * kb * ens_temp * ens_temp / sig[0], 2 * kb * ens_temp / sig[1], ] elif ens_string == "muVT": if ens_mu.size > 1: print( "Note: Your muVT ensemble has more than one mu value. Ensemble check is " "implemented for\n" " * ensembles differing in T only, with any number of mu values\n" " * ensembles differing in one value of mu only\n" " * ensembles differing in T and one value of mu.\n" ) else: species_number = species_number.flatten() muvt_energy = energy - chemical_potential_energy(ens_mu, species_number) # Discard burn-in period and time-correlated frames muvt_energy = trajectory.prepare( muvt_energy, cut=cutoff, verbosity=verbosity - 1, name="muVT Energy", skip_preparation=data_is_uncorrelated, ) try: species_number_traj = trajectory.prepare( species_number, cut=cutoff, verbosity=verbosity - 1, name="N", skip_preparation=data_is_uncorrelated, ) except NotImplementedError: raise NotImplementedError( "Preparing of the trajectory is not implemented for more than two dimensions. " "Your trajectory has 3 or more species. If you are sure that your trajectory " "is uncorrelated, you can retry this command using the argument " "`data_is_uncorrelated=True`." ) traj_2d = np.array([energy, species_number]) if ens_mu.size == 1 else None if traj_2d is not None: traj_2d = trajectory.prepare( traj_2d, cut=cutoff, verbosity=verbosity - 1, name="2D-Trajectory", skip_preparation=data_is_uncorrelated, ) # dT sig = np.std(muvt_energy) result["dT"] = 2 * kb * ens_temp * ens_temp / sig # dmu sig = np.array([np.std(species_number_traj, axis=0)]).flatten() if sig.size == 1: result["dmu"] = 2 * kb * ens_temp / sig[0] else: result["dmu"] = [2 * kb * ens_temp / s for s in sig] # dTdP if traj_2d is not None: cov = np.cov(traj_2d) sig = np.sqrt(np.diag(cov)) result["dTdmu"] = [ 2 * kb * ens_temp * ens_temp / sig[0], 2 * kb * ens_temp / sig[1], ] else: raise pv_error.InputError("ens_str", "Unrecognized ensemble string.") if verbosity > 0: print( "A rule of thumb states that good error recognition can be expected when\n" "spacing the tip of the distributions by about two standard deviations.\n" "Based on this rule, and the assumption that the standard deviation of the\n" "distributions is largely independent of the state point, here's an estimate\n" "for the interval given the current simulation:" ) if ens_string == "NVT": print("Current trajectory: NVT, T = {:.2f} {:s}".format(ens_temp, tunit)) print("Suggested interval: dT = {:.1f} {:s}".format(result["dT"], tunit)) if ens_string == "NPT": print( "Current trajectory: NPT, T = {:.2f} {:s}, P = {:.2f} {:s}".format( ens_temp, tunit, ens_press, punit ) ) print("Suggested interval:") print(" Temperature-only: dT = {:.1f} {:s}".format(result["dT"], tunit)) print(" Pressure-only: dP = {:.1f} {:s}".format(result["dP"], punit)) print( " Combined: dT = {:.1f} {:s}, dP = {:.1f} {:s}".format( result["dTdP"][0], tunit, result["dTdP"][1], punit ) ) if ens_string == "muVT": def print_mu(mu, format_string): mu = np.array([mu]).flatten() if mu.size > 1: return np.array2string( mu, formatter={"float_kind": lambda x: format_string.format(x)} ) else: return format_string.format(mu[0]) print( "Current trajectory: muVT, T = {:.2f} {:s}, mu = {:s} {:s}".format( ens_temp, tunit, print_mu(ens_mu, "{:.2f}"), munit ) ) print("Suggested interval:") print(" Temperature-only: dT = {:.1f} {:s}".format(result["dT"], tunit)) print( " Chemical potential-only: dmu = {:s} {:s}".format( print_mu(result["dmu"], "{:.1f}"), munit ) ) if "dTdmu" in result: print( " Combined: dT = {:.1f} {:s}, dmu = {:.1f} {:s}".format( result["dTdmu"][0], tunit, result["dTdmu"][1], munit ) ) return result
[docs]def check_1d( traj1: np.ndarray, traj2: np.ndarray, param1: float, param2: float, kb: float, quantity: str, dtemp: bool, dpress: bool, dmu: bool, temp: Optional[float], pvconvert: Optional[float], nbins: int, cutoff: float, bootstrap_seed: Optional[int], bootstrap_error: bool, bootstrap_repetitions: int, verbosity: int, screen: bool, filename: Union[str], xlabel: str, xunit: Optional[str], data_is_uncorrelated: bool, ) -> List[float]: r""" Checks whether the energy trajectories of two simulation performed at different temperatures have sampled distributions at the analytically expected ratio. Parameters ---------- traj1 Trajectory of the first simulation If dtemp: * NVT: Potential energy U or total energy E = U + K * NPT: Enthalpy H = U + pV or total energy E = H + K If dpress: * NPT: Volume V traj2 Trajectory of the second simulation If dtemp: * NVT: Potential energy U or total energy E = U + K * NPT: Enthalpy H = U + pV or total energy E = H + K If dpress: * NPT: Volume V param1 Target temperature or pressure of the first simulation param2 Target temperature or pressure of the second simulation kb Boltzmann constant in same units as the energy trajectories quantity Name of quantity analyzed (used for printing only) dtemp Set to True if trajectories were simulated at different temperature dpress Set to True if trajectories were simulated at different pressure temp The temperature in equal temperature, differing pressure NPT simulations. Needed to print optimal dP. pvconvert Conversion from pressure * volume to energy units. Needed to print optimal dP. dmu Set to True if trajectories were simulated at different chemical potential nbins Number of bins used to assess distributions of the trajectories cutoff Tail cutoff of distributions. bootstrap_seed Sets the random number seed for bootstrapping. If set, bootstrapping will be reproducible. If `None`, bootstrapping is non-reproducible. bootstrap_error Calculate the standard error via bootstrap resampling bootstrap_repetitions Number of bootstrap repetitions drawn verbosity Verbosity level. screen Plot distributions on screen. filename Plot distributions to `filename`. If `None`, no plotting. xlabel x-axis label used for plotting xunit x-axis label unit used for plotting data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, but note that if the provided data is correlated, the results of the physical validation checks might be invalid. Returns ------- The number of quantiles the computed result is off the analytical one. """ if ( not (dtemp or dpress or dmu) or (dtemp and dpress) or (dtemp and dmu) or (dpress and dmu) ): raise pv_error.InputError( ["dtemp", "dpress", "dmu"], "Need to specify exactly one of `dtemp`, `dpress` and `dmu`.", ) if dpress and (temp is None or pvconvert is None): raise pv_error.InputError( ["dpress", "temp", "pvconvert"], "`ensemble.check_1d` with `dpress=True` requires `temp` and `pvconvert`.", ) if dmu and temp is None: raise pv_error.InputError( ["dmu", "temp"], "`ensemble.check_1d` with `dmu=True` requires `temp`." ) # =============================== # # prepare constants, strings etc. # # =============================== # pstring = "ln(P_2(" + quantity + ")/P_1(" + quantity + "))" trueslope = 0 if dtemp: trueslope = 1 / (kb * param1) - 1 / (kb * param2) elif dpress: trueslope = (param1 - param2) / (kb * temp) * pvconvert elif dmu: trueslope = -(param1 - param2) / (kb * temp) if verbosity > 1: print("Analytical slope of {:s}: {:.8f}".format(pstring, trueslope)) quant = {} # ==================== # # prepare trajectories # # ==================== # # Discard burn-in period and time-correlated frames traj1 = trajectory.prepare( traj1, cut=cutoff, verbosity=verbosity, name="Trajectory 1", skip_preparation=data_is_uncorrelated, ) traj2 = trajectory.prepare( traj2, cut=cutoff, verbosity=verbosity, name="Trajectory 2", skip_preparation=data_is_uncorrelated, ) # calculate overlap traj1_full = traj1 traj2_full = traj2 traj1, traj2, min_ene, max_ene = trajectory.overlap( traj1=traj1_full, traj2=traj2_full ) if verbosity > 0: print( "Overlap is {:.1%} of trajectory 1 and {:.1%} of trajectory 2.".format( traj1.shape[0] / traj1_full.shape[0], traj2.shape[0] / traj2_full.shape[0], ) ) if verbosity > 0 and dtemp: sig1 = np.std(traj1_full) sig2 = np.std(traj2_full) dt1 = 2 * kb * param1 * param1 / sig1 dt2 = 2 * kb * param2 * param2 / sig2 if verbosity > 1: print( "A rule of thumb states that a good overlap is found when dT/T = (2*kB*T)/(sig),\n" "where sig is the standard deviation of the energy distribution.\n" "For the current trajectories, dT = {:.1f}, sig1 = {:.1f} and sig2 = {:.1f}.\n" "According to the rule of thumb, given T1, a good dT is dT = {:.1f}, and\n" " given T2, a good dT is dT = {:.1f}.".format( param2 - param1, sig1, sig2, dt1, dt2 ) ) print( "Rule of thumb estimates that dT = {:.1f} would be optimal " "(currently, dT = {:.1f})".format(0.5 * (dt1 + dt2), param2 - param1) ) if verbosity > 0 and dpress: sig1 = np.std(traj1_full) * pvconvert sig2 = np.std(traj2_full) * pvconvert dp1 = 2 * kb * temp / sig1 dp2 = 2 * kb * temp / sig2 if verbosity > 1: print( "A rule of thumb states that a good overlap is found when dP = (2*kB*T)/(sig),\n" "where sig is the standard deviation of the volume distribution.\n" "For the current trajectories, dP = {:.1f}, sig1 = {:.1g} and sig2 = {:.1g}.\n" "According to the rule of thumb, given P1, a good dP is dP = {:.1f}, and\n" " given P2, a good dP is dP = {:.1f}.".format( param2 - param1, sig1, sig2, dp1, dp2 ) ) print( "Rule of thumb estimates that dP = {:.1f} would be optimal " "(currently, dP = {:.1f})".format(0.5 * (dp1 + dp2), param2 - param1) ) if verbosity > 0 and dmu: sig1 = np.std(traj1_full) sig2 = np.std(traj2_full) dmu1 = 2 * kb * temp / sig1 dmu2 = 2 * kb * temp / sig2 if verbosity > 1: print( "A rule of thumb states that a good overlap is found when dP = (2*kB*T)/(sig),\n" "where sig is the standard deviation of the volume distribution.\n" "For the current trajectories, dmu = {:.1f}, sig1 = {:.1g} and sig2 = {:.1g}.\n" "According to the rule of thumb, given mu1, a good dmu is dmu = {:.1f}, and\n" " given mu2, a good dmu is dmu = {:.1f}.".format( param2 - param1, sig1, sig2, dmu1, dmu2 ) ) print( "Rule of thumb estimates that mu = {:.1f} would be optimal " "(currently, dmu = {:.1f})".format( 0.5 * (dmu1 + dmu2), abs(param2 - param1) ) ) if not min_ene: raise pv_error.InputError( ["traj1", "traj2"], "No overlap between trajectories." ) # calculate bins bins = np.linspace(min_ene, max_ene, nbins + 1) bins = check_bins(traj1, traj2, bins) if np.size(bins) < 3: raise pv_error.InputError( ["traj1", "traj2", "nbins", "cutoff"], "Less than 3 bins were filled in the overlap region.\n" "Ensure sufficient overlap between the trajectories, and " "consider increasing `cutoff` or `nbins` if there is " "sufficient overlap but unusually long tails.", ) # calculate inefficiency g1 = statistical_inefficiency(traj1) g2 = statistical_inefficiency(traj2) w_f = -trueslope * traj1 w_r = trueslope * traj2 if verbosity > 2: print("Computing log of partition functions using pymbar.BAR...") bar_results = pymbar_bar(w_f, w_r) df = bar_results["Delta_f"] ddf = bar_results["dDelta_f"] if verbosity > 2: print( "Using {:.5f} for log of partition functions as computed from BAR.".format( df ) ) print("Uncertainty in quantity is {:.5f}.".format(ddf)) print( "Assuming this is negligible compared to sampling error at individual points." ) # ========== # # linear fit # # ========== # if verbosity > 2: print("Computing linear fit parameters (for plotting / comparison)") fitvals, dfitvals = do_linear_fit( traj1=traj1, traj2=traj2, g1=g1, g2=g2, bins=bins, screen=screen, filename=filename, trueslope=trueslope, trueoffset=df, units=xunit, xlabel=xlabel, ylabel=r"$\log\frac{P_2(" + quantity + ")}{P_1(" + quantity + ")}$", ) slope = fitvals[1] dslope = dfitvals[1] quant["linear"] = [abs((slope - trueslope) / dslope)] if verbosity > 1: print_stats( title="Linear Fit Analysis (analytical error)", fitvals=fitvals, dfitvals=dfitvals, kb=kb, param1=param1, param2=param2, trueslope=trueslope, temp=temp, pvconvert=pvconvert, dtemp=dtemp, dpress=dpress, dmu=dmu, dtempdmu=False, dtempdpress=False, ) # ================== # # max-likelihood fit # # ================== # if verbosity > 2: print("Computing the maximum likelihood parameters") fitvals, dfitvals = do_max_likelihood_fit( traj1, traj2, g1, g2, init_params=np.array([df, trueslope]), verbose=(verbosity > 1), ) slope = fitvals[1] dslope = dfitvals[1] quant["maxLikelihood"] = [abs((slope - trueslope) / dslope)] if (verbosity > 0 and not bootstrap_error) or verbosity > 1: print_stats( title="Maximum Likelihood Analysis (analytical error)", fitvals=fitvals, dfitvals=dfitvals, kb=kb, param1=param1, param2=param2, trueslope=trueslope, temp=temp, pvconvert=pvconvert, dtemp=dtemp, dpress=dpress, dmu=dmu, dtempdmu=False, dtempdpress=False, ) if not bootstrap_error: return quant["maxLikelihood"] # =============================== # # bootstrapped max-likelihood fit # # =============================== # if verbosity > 0: print( "Computing bootstrapped maximum likelihood parameters... " "[0/{:d}]".format(bootstrap_repetitions), end="", ) if bootstrap_seed is not None: np.random.seed(bootstrap_seed) bs_fitvals = [] for n, (t1, t2) in enumerate( zip( trajectory.bootstrap(traj1, bootstrap_repetitions), trajectory.bootstrap(traj2, bootstrap_repetitions), ) ): # use overlap region t1, t2, min_ene, max_ene = trajectory.overlap(traj1=t1, traj2=t2) # calculate inefficiency g1 = statistical_inefficiency(t1) g2 = statistical_inefficiency(t2) # calculate max_likelihood fit fv, _ = do_max_likelihood_fit( t1, t2, g1, g2, init_params=np.array([df, trueslope]), verbose=(verbosity > 2), ) bs_fitvals.append(fv) # print progress if verbosity > 0: print( "\rComputing bootstrapped maximum likelihood parameters... " "[{:d}/{:d}]".format(n + 1, bootstrap_repetitions), end="", ) print() bs_fitvals = np.array(bs_fitvals) # slope = np.average(fitvals[:, 1]) dslope = np.std(bs_fitvals[:, 1], axis=0) quant["bootstrap"] = [abs((slope - trueslope) / dslope)] if verbosity > 0: print_stats( title="Maximum Likelihood Analysis (bootstrapped error)", fitvals=np.concatenate(([fitvals], bs_fitvals)), dfitvals=None, kb=kb, param1=param1, param2=param2, trueslope=trueslope, temp=temp, pvconvert=pvconvert, dtemp=dtemp, dpress=dpress, dmu=dmu, dtempdmu=False, dtempdpress=False, ) return quant["bootstrap"]
[docs]def check_2d( traj1: np.ndarray, traj2: np.ndarray, param1: np.ndarray, param2: np.ndarray, kb: float, pvconvert: Optional[float], quantity: List[str], dtempdpress: bool, dtempdmu: bool, cutoff: float, bootstrap_seed: Optional[int], bootstrap_error: bool, bootstrap_repetitions: int, verbosity: int, screen: bool, filename: Optional[str], data_is_uncorrelated: bool, ) -> List[float]: r""" Checks whether the energy trajectories of two simulation performed at different temperatures have sampled distributions at the analytically expected ratio. Parameters ---------- traj1 Trajectory of the first simulation If dtempdpress: * traj[0,:]: Potential energy U or total energy E = U + K * traj[1,:]: Volume V traj2 Trajectory of the second simulation If dtempdpress: * traj[0,:]: Potential energy U or total energy E = U + K * traj[1,:]: Volume V param1 If dtempdpress: Target temperature and pressure of the first simulation param2 If dtempdpress: Target temperature and pressure of the first simulation kb Boltzmann constant in same units as the energy trajectories pvconvert Conversion from pressure * volume to energy units quantity Names of quantities analyzed (used for printing only) dtempdpress Set to True if trajectories were simulated at different temperature and pressure dtempdmu Set to True if trajectories were simulated at different temperature and chemical potential cutoff Tail cutoff of distributions. bootstrap_seed Sets the random number seed for bootstrapping. If set, bootstrapping will be reproducible. If `None`, bootstrapping is non-reproducible. bootstrap_error Calculate the standard error via bootstrap resampling bootstrap_repetitions Number of bootstrap repetitions drawn verbosity Verbosity level. screen Plot distributions on screen. filename Plot distributions to `filename`. If `None`, no plotting. data_is_uncorrelated Whether the provided data is uncorrelated. If this option is set, the equilibration, decorrelation and tail pruning of the trajectory is skipped. This can speed up the analysis, but note that if the provided data is correlated, the results of the physical validation checks might be invalid. Default: False Returns ------- The number of quantiles the computed result is off the analytical one. """ if not (dtempdpress or dtempdmu) or (dtempdpress and dtempdmu): raise pv_error.InputError( ["dtempdpress", "dtempdmu"], "Need to specify exactly one of `dtempdpress` and `dtempdmu`.", ) if screen or filename is not None: raise NotImplementedError("check_2d: Plotting not implemented.") if dtempdpress and pvconvert is None: raise pv_error.InputError( "pvconvert", "When using `dtempdpress`, `pvconvert` is required." ) # =============================== # # prepare constants, strings etc. # # =============================== # pstring = ( "ln(P_2(" + quantity[0] + ", " + quantity[1] + ")/" + "P_1(" + quantity[0] + ", " + quantity[1] + "))" ) factor = -1 if dtempdpress: factor = pvconvert # The true slope in the first dimension is always beta_1 - beta_2, # in the second dimension it's either pvconvert * (beta_1*P_1 - beta_2*P_2) # or -(beta_1*mu_1 - beta_2*mu_2) trueslope = np.array( [ 1 / (kb * param1[0]) - 1 / (kb * param2[0]), factor * (1 / (kb * param1[0]) * param1[1] - 1 / (kb * param2[0]) * param2[1]), ] ) if verbosity > 1: print( "Analytical slope of {:s}: {:.8f}, {:.8f}".format( pstring, trueslope[0], trueslope[1] ) ) quant = {} # ==================== # # prepare trajectories # # ==================== # # Discard burn-in period and time-correlated frames traj1 = trajectory.prepare( traj1, cut=cutoff, verbosity=verbosity, name="Trajectory 1", skip_preparation=data_is_uncorrelated, ) traj2 = trajectory.prepare( traj2, cut=cutoff, verbosity=verbosity, name="Trajectory 2", skip_preparation=data_is_uncorrelated, ) # calculate overlap traj1_full = traj1 traj2_full = traj2 traj1, traj2, min_ene, max_ene = trajectory.overlap( traj1=traj1_full, traj2=traj2_full, ) if verbosity > 0: print( "Overlap is {:.1%} of trajectory 1 and {:.1%} of trajectory 2.".format( traj1.shape[1] / traj1_full.shape[1], traj2.shape[1] / traj2_full.shape[1], ) ) if verbosity > 0: cov1 = np.cov(traj1_full) sig1 = np.sqrt(np.diag(cov1)) cov2 = np.cov(traj2_full) sig2 = np.sqrt(np.diag(cov2)) # First parameter is always T dt1 = 2 * kb * param1[0] * param1[0] / sig1[0] dt2 = 2 * kb * param2[0] * param2[0] / sig2[0] # Second parameter is either P or mu if dtempdpress: parameter_name = "dP" # If the second parameter is P, we need to adjust the units sig1[1] *= pvconvert sig2[1] *= pvconvert else: parameter_name = "dmu" dparam1 = 2 * kb * param1[0] / sig1[1] dparam2 = 2 * kb * param2[0] / sig2[1] if verbosity > 1: print( "A rule of thumb states that a good overlap can be expected when choosing state\n" "points separated by about 2 standard deviations.\n" "For the current trajectories, dT = {:.1f}, and {:s} = {:.1f},\n" "with standard deviations sig1 = [{:.1f}, {:.1g}], and sig2 = [{:.1f}, {:.1g}].\n" "According to the rule of thumb, given point 1, the estimate is dT = {:.1f}, {:s} = {:.1f}, and\n" " given point 2, the estimate is dT = {:.1f}, {:s} = {:.1f}.".format( param2[0] - param1[0], parameter_name, abs(param2[1] - param1[1]), sig1[0], sig1[1], sig2[0], sig2[1], dt1, parameter_name, dparam1, dt2, parameter_name, dparam2, ) ) print( "Rule of thumb estimates that (dT,{:s}) = ({:.1f},{:.1f}) would be optimal " "(currently, (dT,{:s}) = ({:.1f},{:.1f}))".format( parameter_name, 0.5 * (dt1 + dt2), 0.5 * (dparam1 + dparam2), parameter_name, param2[0] - param1[0], abs(param2[1] - param1[1]), ) ) if min_ene is None: raise pv_error.InputError( ["traj1", "traj2"], "No overlap between trajectories." ) # calculate inefficiency g1 = np.array( [ statistical_inefficiency(traj1[0]), statistical_inefficiency(traj1[1]), ] ) g2 = np.array( [ statistical_inefficiency(traj2[0]), statistical_inefficiency(traj2[1]), ] ) w_f = -trueslope[0] * traj1[0] - trueslope[1] * traj1[1] w_r = trueslope[0] * traj2[0] + trueslope[1] * traj2[1] if verbosity > 2: print("Computing log of partition functions using pymbar.BAR...") bar_results = pymbar_bar(w_f, w_r) df = bar_results["Delta_f"] ddf = bar_results["dDelta_f"] if verbosity > 2: print( "Using {:.5f} for log of partition functions as computed from BAR.".format( df ) ) print("Uncertainty in quantity is {:.5f}.".format(ddf)) print( "Assuming this is negligible compared to sampling error at individual points." ) # ================== # # max-likelihood fit # # ================== # if verbosity > 2: print("Computing the maximum likelihood parameters") fitvals, dfitvals = do_max_likelihood_fit( traj1, traj2, g1, g2, init_params=np.array([df, trueslope[0], trueslope[1]]), verbose=(verbosity > 1), ) slope = fitvals[1:] dslope = dfitvals[1:] quant["maxLikelihood"] = np.abs((slope - trueslope) / dslope) if verbosity > 0: print_stats( title="Maximum Likelihood Analysis (analytical error)", fitvals=fitvals, dfitvals=dfitvals, kb=kb, param1=param1, param2=param2, trueslope=trueslope, temp=None, pvconvert=pvconvert, dtemp=False, dpress=False, dmu=False, dtempdpress=dtempdpress, dtempdmu=dtempdmu, ) if not bootstrap_error: return quant["maxLikelihood"] # =============================== # # bootstrapped max-likelihood fit # # =============================== # if verbosity > 2: print("Computing bootstrapped maximum likelihood parameters") if bootstrap_seed is not None: np.random.seed(bootstrap_seed) bs_fitvals = [] for t1, t2 in zip( trajectory.bootstrap(traj1, bootstrap_repetitions), trajectory.bootstrap(traj2, bootstrap_repetitions), ): # use overlap region t1, t2, min_ene, max_ene = trajectory.overlap(traj1=t1, traj2=t2) # calculate inefficiency g1 = np.array( [ statistical_inefficiency(t1[0]), statistical_inefficiency(t1[1]), ] ) g2 = np.array( [ statistical_inefficiency(t2[0]), statistical_inefficiency(t2[1]), ] ) # calculate max_likelihood fit fv, _ = do_max_likelihood_fit( t1, t2, g1, g2, init_params=np.array([df, trueslope[0], trueslope[1]]), verbose=(verbosity > 2), ) bs_fitvals.append(fv) bs_fitvals = np.array(bs_fitvals) # slope = np.average(fitvals[:, 1:]) dslope = np.std(bs_fitvals[:, 1:], axis=0) quant["bootstrap"] = np.abs((slope - trueslope) / dslope) if verbosity > 0: print_stats( title="Maximum Likelihood Analysis (bootstrapped error)", fitvals=np.concatenate(([fitvals], bs_fitvals)), dfitvals=None, kb=kb, param1=param1, param2=param2, trueslope=trueslope, temp=None, pvconvert=pvconvert, dtemp=False, dpress=False, dmu=False, dtempdpress=dtempdpress, dtempdmu=dtempdmu, ) return quant["bootstrap"]