Source code for qp.manager.submit

"""Submit QM jobs to cluster schedulers (SLURM/SGE)."""

import os
import sys
import glob
import time
import getpass
import datetime
import subprocess


[docs]def create_marker(submission_marker_path, job_name, submission_command, submission_output): """Create a submission record file to track job status. Writes a ``.submit_record`` file containing job metadata including the job name, submitting user, queue time, and scheduler output. This file prevents duplicate submissions and enables job status tracking. Parameters ---------- submission_marker_path : str Path to the ``.submit_record`` file to create. job_name : str Name of the submitted job. submission_command : str The shell command used to submit the job. submission_output : str Output returned by the scheduler (e.g., job ID). """ with open(submission_marker_path, 'w') as marker: marker.write(f"Jobname: {job_name}\n") marker.write(f"Author: {getpass.getuser()}\n") marker.write(f"Queue Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") marker.write(f"Command: {submission_command}\n") marker.write(f"Output: {submission_output}\n")
[docs]def prepare_jobs(job_count, method): """Identify jobs ready for submission without submitting them. Scans the output directory for cluster directories that have QM input files but no ``.submit_record`` or ``qmscript.out``, indicating they haven't been submitted yet. Parameters ---------- job_count : int Maximum number of jobs to prepare. method : str DFT method name (subdirectory under each chain directory). Returns ------- list of tuple Each tuple contains ``(qm_path, pdb_dir, structure_dir)`` for jobs ready to submit. """ base_dir = os.getcwd() prepared_jobs = [] pdb_dirs = sorted([d for d in os.listdir() if os.path.isdir(d) and not d == 'Protoss']) for pdb in pdb_dirs: pdb_dir_path = os.path.join(base_dir, pdb) structure_dirs = sorted(glob.glob(os.path.join(pdb_dir_path, '[A-Z][0-9]*'))) if len(structure_dirs) == 0: continue for structure in structure_dirs: qm_path = os.path.join(structure, method) os.makedirs(qm_path, exist_ok=True) submission_marker_path = os.path.join(qm_path, '.submit_record') qm_output_file = os.path.join(qm_path, 'qmscript.out') log_files = os.path.join(qm_path, 'Z*') # Remove previous slurm log files for file in glob.glob(log_files): os.remove(file) # Placeholder for submitted jobs to prevent resubmission if os.path.exists(submission_marker_path) or os.path.exists(qm_output_file): continue xyz_files = glob.glob(os.path.join(structure, '*.xyz')) if len(xyz_files) != 1: print(f"ERROR: Expected 1 xyz file in {structure}, found {len(xyz_files)}") continue prepared_jobs.append((qm_path, pdb, structure)) # Break early if job_count is reached if len(prepared_jobs) == job_count: return prepared_jobs return prepared_jobs
[docs]def submit_jobs(prepared_jobs, scheduler): """Submit prepared jobs to the cluster scheduler. Iterates through the prepared job list and submits each using the appropriate scheduler command (``sbatch`` for SLURM, ``qsub`` for SGE). Creates a ``.submit_record`` file for each submitted job. Parameters ---------- prepared_jobs : list of tuple Jobs to submit, as returned by :func:`prepare_jobs`. scheduler : str Scheduler type (``'slurm'`` or ``'sge'``). Returns ------- int Number of jobs successfully submitted. """ base_dir = os.getcwd() submitted_jobs = 0 for qm_path, pdb, structure in prepared_jobs: os.chdir(qm_path) pdb_name = os.path.basename(pdb) structure_name = os.path.basename(structure) job_name = f"{pdb_name}{structure_name}" time.sleep(.2) # Gives user time to abort with ctrl + C # Execute the job submission command and capture its output if scheduler == "sge": submission_command = 'qsub jobscript.sh' elif scheduler == "slurm": submission_command = 'sbatch jobscript.sh' result = subprocess.run(submission_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) submission_output = result.stdout.strip() if result.returncode == 0 else result.stderr.strip() print(f" > {submission_output}") submission_marker_path = os.path.join(qm_path, '.submit_record') create_marker(submission_marker_path, job_name, submission_command, submission_output) os.chdir(base_dir) submitted_jobs += 1 return submitted_jobs
[docs]def manage_jobs(output, job_count, method, scheduler): """Orchestrate job preparation and submission. Main entry point for the job submission workflow. Prepares up to ``job_count`` unsubmitted jobs and submits them to the specified scheduler. Parameters ---------- output : str Path to the output directory containing cluster subdirectories. job_count : int Maximum number of jobs to submit in this batch. method : str DFT method name (subdirectory under each chain directory). scheduler : str Scheduler type (``'slurm'`` or ``'sge'``). Notes ----- This function calls ``sys.exit(0)`` upon completion. """ # Change into the directory of the generated cluster models os.chdir(output) print(f"> Attempting to submit {job_count} jobs.") # Prepare the jobs prepared_jobs = prepare_jobs(job_count, method) # Submit the jobs if prepared_jobs: submitted_jobs = submit_jobs(prepared_jobs, scheduler) print(f"> Successfully submitted {submitted_jobs} jobs.") else: print("> No jobs were prepared.") # Done, exit the script print("Done.\n") sys.exit(0)