"""Dispatchers for running parallel jobs."""
import subprocess
import multiprocessing
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
[docs]class Dispatcher(ABC):
"""
Abstract base class for dispatchers.
Subclasses should implement the ``initialize_session``, ``dispatch``, and
``wait_all`` methods.
"""
[docs] @abstractmethod
def initialize_session(self):
"""Must be called before dispatching jobs."""
pass
[docs] @abstractmethod
def dispatch(self, command, wait):
"""
Dispatch a command using the dispatcher.
Parameters
----------
command : str
The command to dispatch.
wait : bool
Whether to wait for the process to finish before returning.
"""
pass
[docs] @abstractmethod
def wait_all(self):
"""Wait for the running/scheduled processes to finish, then return."""
pass
[docs]class SubprocessDispatcher(Dispatcher):
"""
Dispatcher using subprocesses.
Parameters
----------
max_procs : int, optional
The maximum number of processes to run simultaneously. By default, uses
the number of processors on the machine (this is a good choice for
CPU-bound work).
"""
def __init__(self, max_procs=None):
self.max_procs = (multiprocessing.cpu_count() if max_procs is None
else max_procs)
[docs] def initialize_session(self):
self.processes = []
self.pool = ThreadPoolExecutor(max_workers=self.max_procs)
[docs] def dispatch(self, command, wait):
process = self.pool.submit(lambda: subprocess.Popen(command,
shell=True).wait())
self.processes.append(process)
if wait:
process.result()
[docs] def wait_all(self):
for process in self.processes:
process.result()
[docs]class DRMAADispatcher(Dispatcher):
"""
Dispatcher for DRMAA.
Parameters
----------
job_template : drmaa.JobTemplate instance, optional
A job template containing settings for running the jobs with
the job scheduler. Documentation for the different options is
available in the Python drmaa package. Some options specific to each
job scheduler, called the native specification, may have to be set
using the ``job_template.nativeSpecification`` attribute, the options
for which can be found in the job scheduler's DRMAA interface (e.g.,
slurm-drmaa for Slurm and pbs-drmaa for PBS).
Examples
--------
>>> import drmaa
>>> jt = drmaa.JobTemplate(hardWallclockTimeLimit=60)
>>> dispatcher = DRMAADispatcher(jt)
"""
session = None
def __init__(self, job_template=None):
if job_template is None:
import drmaa
self.job_template = drmaa.JobTemplate()
else:
self.job_template = job_template
[docs] def initialize_session(self):
# Ensure there is only one active DRMAA session, otherwise it raises
# an error
if DRMAADispatcher.session is None:
import drmaa
self.session = drmaa.Session()
self.session.initialize()
DRMAADispatcher.session = self.session
else:
self.session = DRMAADispatcher.session
self.jobids = []
[docs] def dispatch(self, command, wait):
self.job_template.remoteCommand = command
jobid = self.session.runJob(self.job_template)
self.jobids.append(jobid)
if wait:
self.session.wait(jobid)
[docs] def wait_all(self):
self.session.synchronize(self.jobids)