Source code for qp.manager.create

"""Manager for submitting DFT single point calcultions."""

import os
import sys
import glob
import shutil
import pandas as pd
from itertools import groupby
from operator import itemgetter
from qp.manager import job_scripts
from qp.manager import charge_embedding

[docs]def compress_sequence(seq): """Condense a list of atom indices into range notation for TeraChem constraints. Converts a sorted list of integers into a compact string representation using ranges (e.g., ``1-10``) and comma-separated values for non-consecutive indices. Parameters ---------- seq : list of int Sorted list of atom indices to freeze. Returns ------- str TeraChem constraint block with ``xyz`` prefix for each range or group. Examples -------- >>> compress_sequence([1, 2, 3, 5, 7, 8, 9]) 'xyz 1-3\\nxyz 5\\nxyz 7-9' """ ranges = [] buffer = [] # Stores standalone numbers temporarily for k, g in groupby(enumerate(seq), lambda ix: ix[0] - ix[1]): group = list(map(itemgetter(1), g)) if len(group) > 1: if buffer: ranges.append(f"xyz {','.join(map(str, buffer))}") buffer.clear() ranges.append(f"xyz {group[0]}-{group[-1]}") else: buffer.append(group[0]) if buffer: # Append any remaining standalone numbers ranges.append(f"xyz {','.join(map(str, buffer))}") return '\n'.join(ranges)
[docs]def find_heavy(): """Find heavy (non-hydrogen) atom indices for geometry optimization constraints. Reads the XYZ file in the current directory and identifies all atoms that are not hydrogen. These atoms are typically frozen during partial geometry optimizations to relax only the hydrogen positions. Returns ------- str TeraChem constraint block specifying which atoms to freeze, formatted using :func:`compress_sequence`. Raises ------ StopIteration If no XYZ file is found in the current directory. """ xyz_file = next(f for f in os.listdir() if f.endswith('.xyz')) with open(xyz_file, 'r') as f: non_hydrogens = [i for i, line in enumerate(f.readlines()[2:], 1) if line.split()[0] != 'H'] return compress_sequence(non_hydrogens)
[docs]def residue_exists(sphere_path, res_name, chain, res_id): """Check whether a specific residue exists in a PDB file. Parameters ---------- sphere_path : str Path to the sphere PDB file (e.g., ``0.pdb``). res_name : str Three-letter residue name to search for. chain : str Single-character chain ID. res_id : int Residue sequence number. Returns ------- bool True if a matching residue is found, False otherwise. """ with open(sphere_path, 'r') as f: for line in f: if res_name == line[17:20].strip() \ and chain == line[21] and res_id == int(line[22:26]): return True return False
[docs]def ligand_in_spheres(ligand, structure_dir, num_sphere): """Check whether a ligand is present in any of the sphere PDB files. For oligomeric ligands (multi-residue), all component residues must be found. A warning is printed if only some residues of an oligomer are present, as this can cause charge errors. Parameters ---------- ligand : str Ligand key string (space-separated residue keys for oligomers). structure_dir : str Path to the cluster directory containing numbered sphere PDB files. num_sphere : int Number of spheres to search (0 through ``num_sphere``). Returns ------- bool True if the ligand (all residues for oligomers) is found. """ ligand_residues = ligand.split() partial_found = False for ligand_residue in ligand_residues: res_name, res_id_full = ligand_residue.split('_') chain = res_id_full[0] res_id = int(res_id_full[1:]) for i in range(num_sphere + 1): sphere_path = os.path.join(structure_dir, f"{i}.pdb") if residue_exists(sphere_path, res_name, chain, res_id): partial_found = True break else: # if ligand residue is not found in any sphere if len(ligand_residues) > 1 and partial_found: print(f"For oligomer ligand {ligand}, {ligand_residue} is partially found in spheres. This will cause unpredictable charge error!") pass return False return True
[docs]def get_electronic(pdb_id, pdb_list_path): """Retrieve oxidation state and spin multiplicity from the input CSV. Looks up the specified PDB ID in the user-provided CSV file and extracts the ``oxidation`` and ``multiplicity`` columns for calculating total charge and spin. Parameters ---------- pdb_id : str Four-character PDB accession code (lowercase). pdb_list_path : str Path to the CSV file containing ``pdb_id``, ``oxidation``, and ``multiplicity`` columns. Returns ------- tuple of (int, int) ``(oxidation_state, spin_multiplicity)``. Missing values default to 0 for oxidation and 1 for multiplicity. Raises ------ SystemExit If the CSV cannot be read or parsed. """ try: df = pd.read_csv(pdb_list_path) # Check if the row with the matching pdb_id exists row = df[df['pdb_id'] == pdb_id] if row.empty: print(f"> No data found for pdb_id: {pdb_id}") return None # Extract multiplicity and oxidation state directly multiplicity = int(row['multiplicity'].fillna(1).iloc[0]) oxidation = int(row['oxidation'].fillna(0).iloc[0]) return oxidation, multiplicity except Exception as e: print(f"> ERROR: {e}") sys.exit()
[docs]def get_charge(structure_dir=None): """Extract total charge and extra spin from charge.csv and spin.csv. Parses the charge CSV generated by the cluster extraction stage to sum up amino acid charges and ligand charges for residues present in the extracted spheres. Also reads spin.csv if present for radical ligands (e.g., NO, O2). Parameters ---------- structure_dir : str, optional Path to the chain directory (e.g., ``output/1os7/A200``). If None, inferred from the current working directory. Returns ------- tuple of (int, int) ``(total_charge, extra_spin)`` where ``total_charge`` is the sum of amino acid and ligand charges, and ``extra_spin`` is the additional spin contribution from radical species. """ if structure_dir is None: current_dir = os.getcwd() charge_dir = os.path.abspath(os.path.join(current_dir, "../../")) structure_dir = os.path.abspath(os.path.join(current_dir, "../")) else: charge_dir = os.path.abspath(os.path.join(structure_dir, "../")) charge_csv_path = os.path.join(charge_dir, "charge.csv") spin_csv_path = os.path.join(charge_dir, "spin.csv") charge = 0 section = 1 num_sphere = 0 chain_identifier = os.path.basename(structure_dir) chain = os.path.basename(structure_dir)[0] with open(charge_csv_path, 'r') as charge_csv_content: for line in charge_csv_content: line = line.strip() # Check if entered the next section if not line: section += 1 continue # Parse charge.csv section 1 if section == 1 and line.startswith(chain_identifier): parts = line.split(',') num_sphere = len(parts) - 2 current_chain_identifier = parts[0] if current_chain_identifier == chain_identifier: charge += sum([int(x) for x in parts[1:]]) # Parse charge.csv section 2 elif section == 2: ligand, value = line.split(',') if ligand_in_spheres(ligand, structure_dir, num_sphere): charge += int(value) spin = 0 if os.path.exists(spin_csv_path): with open(spin_csv_path, 'r') as spin_csv_content: for line in spin_csv_content: line = line.strip() if not line: continue ligand, value = line.split(',') if ligand_in_spheres(ligand, structure_dir, num_sphere): spin += int(value) return charge, spin
[docs]def create_jobs(pdb_list_path, output_dir, optimization, basis, method, guess, use_charge_embedding, charge_embedding_cutoff, charge_embedding_charges, gpus, memory, scheduler, pcm_radii_file, dielectric, use_implicit_solvent=True): """Generate QM job input files for all extracted clusters. Creates TeraChem input files (``qmscript.in``) and scheduler submission scripts (``jobscript.sh``) for each cluster directory. Automatically determines charge and spin multiplicity from the input CSV and charge.csv files. Parameters ---------- pdb_list_path : str Path to the input CSV with ``pdb_id``, ``oxidation``, ``multiplicity``. output_dir : str Base output directory containing PDB subdirectories. optimization : bool If True, run geometry optimization; otherwise single-point energy. basis : str Basis set name (e.g., ``'lacvps_ecp'``). method : str DFT method/functional (e.g., ``'wpbeh'``). guess : str Initial wavefunction guess method (e.g., ``'generate'``). use_charge_embedding : bool If True, generate point charge embedding file. charge_embedding_cutoff : float Distance cutoff in angstroms for MM point charges. charge_embedding_charges : str or None Path to a JSON file with custom partial charges. When ``None``, the built-in AMBER ff14SB charges are used. gpus : int Number of GPUs to request. memory : str Memory allocation string (e.g., ``'8G'``). scheduler : str Job scheduler type (``'slurm'`` or ``'sge'``). pcm_radii_file : str Path to PCM radii file for implicit solvent. dielectric : float Dielectric constant for PCM solvent model. use_implicit_solvent : bool, optional If True, include PCM implicit solvent in TeraChem input. Can be enabled alongside ``use_charge_embedding``. Default is True. """ orig_dir = os.getcwd() os.chdir(output_dir) base_dir = os.getcwd() pdb_dirs = sorted([d for d in os.listdir() if os.path.isdir(d) and not d == 'Protoss']) for pdb in pdb_dirs: pdb_dir_path = os.path.join(base_dir, pdb) structure_dirs = sorted(glob.glob(os.path.join(pdb_dir_path, '[A-Z][0-9]*'))) if len(structure_dirs) == 0: continue for structure in structure_dirs: qm_path = os.path.join(structure, method) os.makedirs(qm_path, exist_ok=True) xyz_files = glob.glob(os.path.join(structure, '*.xyz')) if len(xyz_files) != 1: print(f"ERROR: Expected 1 xyz file in {structure}, found {len(xyz_files)}") continue coord_file = os.path.join(qm_path, os.path.basename(xyz_files[0])) shutil.copyfile(xyz_files[0], coord_file) os.chdir(qm_path) oxidation, multiplicity = get_electronic(pdb.lower(), pdb_list_path) charge, extra_spin = get_charge() multiplicity += extra_spin total_charge = charge + oxidation # Get heavy atoms to fix if geometry optimization was requested heavy_list = find_heavy() if optimization else "" constraint_freeze = f"$constraint_freeze\n{heavy_list}\n$end" if optimization else "" coord_file = os.path.basename(coord_file) pdb_name = os.path.basename(pdb) structure_name = os.path.basename(structure) job_name = f"{pdb_name}{structure_name}" if scheduler == "slurm": qmscript = job_scripts.write_qm(optimization, coord_file, basis, method, total_charge, multiplicity, guess, pcm_radii_file, constraint_freeze, dielectric, use_charge_embedding, use_implicit_solvent) jobscript = job_scripts.write_slurm_job(job_name, gpus, memory) if scheduler == "sge": qmscript = job_scripts.write_qm(optimization, coord_file, basis, method, total_charge, multiplicity, guess, pcm_radii_file, constraint_freeze, dielectric, use_charge_embedding, use_implicit_solvent) jobscript = job_scripts.write_sge_job(job_name, gpus, memory) with open('qmscript.in', 'w') as f: f.write(qmscript) with open('jobscript.sh', 'w') as f: f.write(jobscript) if use_charge_embedding: charge_embedding.get_charges(charge_embedding_cutoff, charge_embedding_charges) print(f"> Created QM job files for {pdb}/{structure_name}/{method}/") os.chdir(base_dir) os.chdir(orig_dir) return