Source code for physical_validation.util.gromacs_interface

###########################################################################
#                                                                         #
#    physical_validation,                                                 #
#    a python package to test the physical validity of MD results         #
#                                                                         #
#    Written by Pascal T. Merz <pascal.merz@me.com>                       #
#               Michael R. Shirts <michael.shirts@colorado.edu>           #
#                                                                         #
#    Copyright (c) 2017-2021 University of Colorado Boulder               #
#              (c) 2012      The University of Virginia                   #
#                                                                         #
###########################################################################
r"""
GROMACS python interface.

.. warning:: This is a mere place holder, as an official python API is
   currently being developed by the gromacs development team. It is
   probably neither especially elegant nor especially safe. Use of this
   module in any remotely critical application is strongly discouraged.
"""
import errno
import os
import re
import subprocess
import sys
import warnings
from typing import IO, Any, Dict, List, Optional, Tuple, Union

import numpy as np

from ..data.trajectory_data import RectangularBox
from ..util import error as pv_error

Pipe = Union[None, int, IO[Any]]


[docs]class GromacsInterface(object): def __init__( self, exe: str = None, dp: bool = False, includepath: Optional[Union[str, List[str]]] = None, ): self._exe = None self._dp = False self._includepath = None self.double = dp if exe is None: # check whether 'gmx' / 'gmx_d' is in the path if self._check_exe(quiet=True, exe="gmx"): self.exe = "gmx" elif self._check_exe(quiet=True, exe="gmx_d"): self.exe = "gmx_d" else: pv_error.InputError( "exe", '"gmx" and "gmx_d" not found in the path. ' "Set `exe` to point to a GROMACS executable.", ) else: self.exe = exe if includepath is not None: self.includepath = includepath @property def exe(self) -> str: """exe is a string pointing to the gmx executable.""" return self._exe @exe.setter def exe(self, exe: str) -> None: # expand '~/bin' to '/home/user/bin' exe = os.path.expanduser(exe) if self._check_exe(exe=exe): if os.path.dirname(exe): exe = os.path.abspath(exe) self._exe = exe @property def double(self) -> bool: """double is a bool defining whether the simulation was ran at double precision""" return self._dp @double.setter def double(self, dp: bool) -> None: assert isinstance(dp, bool) self._dp = dp @property def includepath(self) -> List[str]: """includepath defines a path the parser looks for system files""" return self._includepath @includepath.setter def includepath(self, path: Union[str, List[str]]) -> None: if isinstance(path, str): path = [path] self._includepath = path
[docs] def get_quantities( self, edr: str, quantities: List[str], cwd: Optional[str] = None, begin: Optional[float] = None, end: Optional[float] = None, args: Optional[List[str]] = None, ) -> Dict[str, np.ndarray]: if args is None: args = [] tmp_xvg = "gmxpy_" + os.path.basename(edr).replace(".edr", "") + ".xvg" if cwd is not None: tmp_xvg = os.path.join(cwd, tmp_xvg) q_dict = {} for q in quantities: not_found = self._create_xvg( edr, tmp_xvg, [q], cwd=cwd, begin=begin, end=end, args=args )[1] if q in not_found: q_dict[q] = None continue skip_line = re.compile("^[#,@]") values = [] times = [] with open(tmp_xvg, "r") as xvg: for line in xvg: if skip_line.match(line): continue times.append(float(line.split()[0])) values.append(float(line.split()[1])) if "time" in q_dict: if not np.array_equal(np.array(times), q_dict["time"]): warnings.warn("Time discrepancy in " + edr) else: q_dict["time"] = np.array(times) q_dict[q] = np.array(values) os.remove(tmp_xvg) return q_dict
[docs] def read_trr( self, trr: str ) -> Dict[str, Optional[Union[np.ndarray, RectangularBox]]]: tmp_dump = "gmxpy_" + os.path.basename(trr).replace(".trr", "") + ".dump" with open(tmp_dump, "w") as dump_file: proc = self._run( "dump", ["-f", trr], stdout=dump_file, stderr=subprocess.PIPE ) proc.wait() position = [] velocity = [] force = [] box = [] with open(tmp_dump) as dump: x = [] v = [] f = [] b = [] for line in dump: if "frame" in line: # new frame if len(x) > 0: # not the first frame - nothing to save there position.append(np.array(x)) velocity.append(np.array(v)) force.append(np.array(f)) box.append(np.array(b)) x = [] v = [] f = [] b = [] continue if "box[" in line: b.append( [ float(value.strip()) for value in line.split("{", 1)[1].split("}")[0].split(",") ] ) elif "x[" in line: x.append( [ float(value.strip()) for value in line.split("{", 1)[1].split("}")[0].split(",") ] ) elif "v[" in line: v.append( [ float(value.strip()) for value in line.split("{", 1)[1].split("}")[0].split(",") ] ) elif "f[" in line: f.append( [ float(value.strip()) for value in line.split("{", 1)[1].split("}")[0].split(",") ] ) # end loop over file - save last arrays position.append(np.array(x)) velocity.append(np.array(v)) force.append(np.array(f)) box.append(np.array(b)) os.remove(tmp_dump) result = {} for key, vector in zip( ["position", "velocity", "force", "box"], [position, velocity, force, box] ): vector = np.array(vector) if vector.size > 0: result[key] = vector else: result[key] = None return result
[docs] @staticmethod def read_gro(gro: str) -> Dict[str, Optional[Union[np.ndarray, RectangularBox]]]: with open(gro) as conf: x = [] v = [] b = [] title = conf.readline() while title: natoms = int(conf.readline().strip()) for _ in range(natoms): line = conf.readline()[20:] line = line.split() x.append([float(xx) for xx in line[0:3]]) v.append([float(vv) for vv in line[3:6]]) line = conf.readline() line = line.split() b.append([float(vv) for vv in line]) title = conf.readline() result = {} for key, vector in zip(["position", "velocity", "force", "box"], [x, v, [], b]): vector = np.array(vector) if vector.size > 0: result[key] = vector else: result[key] = None return result
[docs] @staticmethod def read_mdp(mdp: str) -> Dict[str, str]: result = {} with open(mdp) as f: for line in f: line = line.split(";")[0].strip() if not line: continue line = line.split("=") # unify mdp options - all lower case, only dashes option = line[0].strip().replace("_", "-").lower() if option not in ["include", "define"]: value = line[1].strip().replace("_", "-").lower() else: value = line[1].strip() result[option] = value return result
[docs] @staticmethod def write_mdp(options: Dict[str, str], mdp: str): with open(mdp, "w") as f: for key, value in options.items(): f.write("{:24s} = {:s}\n".format(key, value))
[docs] def read_system_from_top( self, top: str, define: Optional[str] = None, include: Optional[str] = None ) -> List[Dict[str, Any]]: if not define: define = [] else: define = [d.strip() for d in define.split("-D") if d.strip()] if not include: include = [os.getcwd(), os.path.dirname(top)] else: include = [os.getcwd(), os.path.dirname(top)] + [ i.strip() for i in include.split("-I") if i.strip() ] superblock = None block = None nmoleculetypes = 0 topology = {} with open(top) as f: content = self._read_top(f, include=include, define=define) for line in content: if line[0] == "[" and line[-1] == "]": block = line.strip("[").strip("]").strip() if block == "defaults" or block == "system": superblock = block topology[superblock] = {} if block == "moleculetype" or block == "molecule_type": nmoleculetypes += 1 superblock = block + "_" + str(nmoleculetypes) topology[superblock] = {} continue if superblock is None or block is None: raise IOError("Not a valid .top file.") if block in topology[superblock]: topology[superblock][block].append(line) else: topology[superblock][block] = [line] for n in range(1, nmoleculetypes + 1): superblock = "moleculetype_" + str(n) molecule = topology[superblock]["moleculetype"][0].split()[0] topology[molecule] = topology.pop(superblock) atomtype_list = topology["defaults"]["atomtypes"] topology["defaults"]["atomtypes"] = {} for atomtype in atomtype_list: code = atomtype.split()[0] topology["defaults"]["atomtypes"][code] = atomtype molecules = [] for line in topology["system"]["molecules"]: molecule = line.split()[0] nmolecs = int(line.split()[1]) natoms = len(topology[molecule]["atoms"]) masses = [] for atom in topology[molecule]["atoms"]: if len(atom.split()) >= 8: masses.append(float(atom.split()[7])) else: code = atom.split()[1] masses.append( float(topology["defaults"]["atomtypes"][code].split()[2]) ) nbonds = 0 nbondsh = 0 bonds = [] bondsh = [] if "bonds" in topology[molecule]: for bond in topology[molecule]["bonds"]: bond = bond.split() a1 = int(bond[0]) - 1 a2 = int(bond[1]) - 1 m1 = masses[a1] m2 = masses[a2] if m1 > 1.008 and m2 > 1.008: nbonds += 1 bonds.append([a1, a2]) else: nbondsh += 1 bondsh.append([a1, a2]) nangles = 0 nanglesh = 0 angles = [] anglesh = [] if "angles" in topology[molecule]: for angle in topology[molecule]["angles"]: angle = angle.split() a1 = int(angle[0]) - 1 a2 = int(angle[1]) - 1 a3 = int(angle[2]) - 1 m1 = masses[a1] m2 = masses[a2] m3 = masses[a3] if m1 > 1.008 and m2 > 1.008 and m3 > 1.008: nangles += 1 angles.append([a1, a2, a3]) else: nanglesh += 1 anglesh.append([a1, a2, a3]) settle = False if "settles" in topology[molecule]: settle = True bonds = [] bondsh = [[0, 1], [0, 2], [1, 2]] molecules.append( { "name": molecule, "nmolecs": nmolecs, "natoms": natoms, "mass": masses, "nbonds": [nbonds, nbondsh], "bonds": bonds, "bondsh": bondsh, "nangles": [nangles, nanglesh], "angles": angles, "anglesh": anglesh, "settles": settle, } ) return molecules
[docs] def grompp( self, mdp: str, top: str, gro: str, tpr: Optional[str] = None, cwd: str = ".", args: Optional[List[str]] = None, stdin: Optional[Pipe] = None, stdout: Optional[Pipe] = None, stderr: Optional[Pipe] = None, ) -> int: cwd = os.path.abspath(cwd) assert os.path.exists(os.path.join(cwd, mdp)) assert os.path.exists(os.path.join(cwd, top)) assert os.path.exists(os.path.join(cwd, gro)) if args is None: args = [] if tpr is None: tpr = os.path.basename(mdp).replace(".mdp", "") + ".tpr" else: assert os.path.exists(os.path.join(cwd, os.path.dirname(tpr))) args = ["-f", mdp, "-p", top, "-c", gro, "-o", tpr] + args proc = self._run( "grompp", args, cwd=cwd, stdin=stdin, stdout=stdout, stderr=stderr ) proc.wait() return proc.returncode
[docs] def mdrun( self, tpr: str, edr: Optional[str] = None, deffnm: Optional[str] = None, cwd: str = ".", args: Optional[List[str]] = None, stdin: Optional[Pipe] = None, stdout: Optional[Pipe] = None, stderr: Optional[Pipe] = None, mpicmd: Optional[str] = None, ) -> int: cwd = os.path.abspath(cwd) tpr = os.path.join(cwd, tpr) assert os.path.exists(cwd) assert os.path.exists(tpr) if args is None: args = [] if deffnm is None: deffnm = os.path.basename(tpr).replace(".tpr", "") args = ["-s", tpr, "-deffnm", deffnm] + args if edr is not None: args += ["-e", edr] proc = self._run( "mdrun", args, cwd=cwd, stdin=stdin, stdout=stdout, stderr=stderr, mpicmd=mpicmd, ) proc.wait() return proc.returncode
def _check_exe(self, quiet: bool = False, exe: Optional[str] = None) -> bool: if exe is None: exe = self._exe try: devnull = open(os.devnull) exe_out = subprocess.check_output([exe, "--version"], stderr=devnull) except OSError as e: if e.errno == errno.ENOENT: # file not found error. if not quiet: print("ERROR: gmx executable not found") print(exe) return False else: raise e # check that output is as expected return bool(re.search(rb":-\) GROMACS - gmx.* \(-:", exe_out)) def _run( self, cmd: str, args: Optional[List[str]] = None, cwd: Optional[str] = None, stdin: Optional[Pipe] = None, stdout: Optional[Pipe] = None, stderr: Optional[Pipe] = None, mpicmd: Optional[str] = None, ) -> subprocess.Popen: if self.exe is None: raise RuntimeError( "Tried to use GromacsParser before setting gmx executable." ) if mpicmd: command = [mpicmd, self.exe, cmd] else: command = [self.exe, cmd] command.extend(args) return subprocess.Popen( command, cwd=cwd, stdin=stdin, stdout=stdout, stderr=stderr ) def _create_xvg( self, edr: str, xvg: str, quantities: List[str], cwd: Optional[str] = None, begin: Optional[float] = None, end: Optional[float] = None, args: Optional[List[str]] = None, ) -> Tuple[int, List[str]]: assert os.path.exists(edr) assert os.path.exists(os.path.abspath(os.path.dirname(xvg))) if args is None: args = [] if self._dp: args.append("-dp") if begin is not None: args.extend(["-b", str(begin)]) if end is not None: args.extend(["-e", str(end)]) quants = "" for q in quantities: quants += str(q) + "\n" args = ["-f", edr, "-o", xvg] + args proc = self._run( "energy", args, cwd=cwd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) err = proc.communicate(quants.encode())[1] encoding = sys.stderr.encoding if encoding is None: encoding = "UTF-8" not_found = [] if "does not match anything" in err.decode(encoding): for q in quantities: if "String '" + q + "' does not match anything" in err.decode(encoding): not_found.append(q) return proc.wait(), not_found def _read_top( self, filehandler: IO, include: List[str], define: List[str] ) -> List[str]: read = [True] content = [] include_dirs = include if self.includepath: include_dirs += self.includepath if self.exe is not None: # Check for standard topology locations try: full_exe = ( subprocess.check_output( ["which", self.exe], stderr=open(os.devnull) ) .strip() .decode() ) except subprocess.CalledProcessError: full_exe = None if full_exe is not None: # By default, when installed, exe is bin/gmx[_d], # while topologies are in share/gromacs/top/ base_directory = os.path.dirname(os.path.dirname(full_exe)) standard_install_location = os.path.join( base_directory, "share", "gromacs", "top" ) # When running from the build directory, exe is usually in build_folder/bin/gmx[_d], # while topologies are in share/top/ root_directory = os.path.dirname(base_directory) standard_build_location = os.path.join( os.path.join(root_directory, "share", "top") ) for location in [standard_install_location, standard_build_location]: if os.path.exists(location): include_dirs.append(location) for idx, d in enumerate(include_dirs): # expand '~/bin' to '/home/user/bin' include_dirs[idx] = os.path.expanduser(d) for line in filehandler: line = line.split(";")[0].strip() if not line or line[0] == "*": continue if line[0] == "#": if line.startswith("#ifdef"): option = line.replace("#ifdef", "").strip() if option in define: read.append(True) else: read.append(False) elif line.startswith("#ifndef"): option = line.replace("#ifndef", "").strip() if option not in define: read.append(True) else: read.append(False) elif line.startswith("#else"): read[-1] = not read[-1] elif line.startswith("#endif"): read.pop() elif line.startswith("#define") and all(read): option = line.replace("#define", "").strip() define.append(option) elif line.startswith("#include") and all(read): filename = ( line.replace("#include", "") .strip() .replace('"', "") .replace("'", "") ) for idir in include_dirs: try: ifile = open(os.path.join(idir, filename)) break except FileNotFoundError: pass else: msg = ( "Include file in .top file not found: " + line + "\n" + "Include directories: " + str(include_dirs) ) raise IOError(msg) if ifile: subcontent = self._read_top( ifile, [os.path.dirname(ifile.name)] + include, define ) content.extend(subcontent) elif all(read): raise IOError( "Unknown preprocessor directive in .top file: " + line ) continue # end line starts with '#' if all(read): content.append(line) return content