westpa.work_managers package

westpa.work_managers module

A system for parallel, remote execution of multiple arbitrary tasks. Much of this, both in concept and execution, was inspired by (and in some cases based heavily on) the concurrent.futures package from Python 3.2, with some simplifications and adaptations (thanks to Brian Quinlan and his futures implementation).

class westpa.work_managers.SerialWorkManager

Bases: WorkManager

classmethod from_environ(wmenv=None)
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

class westpa.work_managers.ThreadsWorkManager(n_workers=None)

Bases: WorkManager

A work manager using threads.

classmethod from_environ(wmenv=None)
runtask(task_queue)
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

class westpa.work_managers.ProcessWorkManager(n_workers=None, shutdown_timeout=1)

Bases: WorkManager

A work manager using the multiprocessing module.

Notes

On MacOS, as of Python 3.8 the default start method for multiprocessing launching new processes was changed from fork to spawn. In general, spawn is more robust and efficient, however it requires serializability of everything being passed to the child process. In contrast, fork is much less memory efficient, as it makes a full copy of everything in the parent process. However, it does not require picklability.

So, on MacOS, the method for launching new processes is explicitly changed to fork from the (MacOS-specific) default of spawn. Unix should default to fork.

See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods and https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods for more details.

classmethod from_environ(wmenv=None)
task_loop()
results_loop()
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

westpa.work_managers.make_work_manager()

Using cues from the environment, instantiate a pre-configured work manager.

westpa.work_managers.core module

class westpa.work_managers.core.islice

Bases: object

islice(iterable, stop) –> islice object islice(iterable, start, stop[, step]) –> islice object

Return an iterator whose next() method returns selected values from an iterable. If start is specified, will skip all preceding elements; otherwise, start defaults to zero. Step defaults to one. If specified as another value, step determines how many values are skipped between successive calls. Works like a slice() on a list but returns an iterator.

westpa.work_managers.core.contextmanager(func)

@contextmanager decorator.

Typical usage:

@contextmanager def some_generator(<arguments>):

<setup> try:

yield <value>

finally:

<cleanup>

This makes this:

with some_generator(<arguments>) as <variable>:

<body>

equivalent to this:

<setup> try:

<variable> = <value> <body>

finally:

<cleanup>

class westpa.work_managers.core.WorkManager

Bases: object

Base class for all work managers. At a minimum, work managers must provide a submit() function and a n_workers attribute (which may be a property), though most will also override startup() and shutdown().

classmethod from_environ(wmenv=None)
classmethod add_wm_args(parser, wmenv=None)
sigint_handler(signum, frame)
install_sigint_handler()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

run()

Run the worker loop (in clients only).

submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

as_completed(futures)

Return a generator which yields results from the given futures as they become available.

submit_as_completed(task_generator, queue_size=None)

Return a generator which yields results from a set of futures as they become available. Futures are generated by the task_generator, which must return a triple of the form expected by submit. The method also accepts an int queue_size that dictates the maximum number of Futures that should be pending at any given time. The default value of None submits all of the tasks at once.

wait_any(futures)

Wait on any of the given futures and return the first one which has a result available. If more than one result is or becomes available simultaneously, any completed future may be returned.

wait_all(futures)

A convenience function which waits on all the given futures in order. This function returns the same futures as submitted to the function as a list, indicating the order in which waits occurred.

property is_master

True if this is the master process for task distribution. This is necessary, e.g., for MPI, where all processes start identically and then must branch depending on rank.

class westpa.work_managers.core.FutureWatcher(futures, threshold=1)

Bases: object

A device to wait on multiple results and/or exceptions with only one lock.

signal(future)

Signal this watcher that the given future has results available. If this brings the number of available futures above signal_threshold, this watcher’s event object will be signalled as well.

wait()

Wait on one or more futures.

reset()

Reset this watcher’s list of completed futures, returning the list of completed futures prior to resetting it.

add(futures)

Add watchers to all futures in the iterable of futures.

class westpa.work_managers.core.WMFuture(task_id=None)

Bases: object

A “future”, representing work which has been dispatched for completion asynchronously.

static all_acquired(futures)

Context manager to acquire all locks on the given futures. Primarily for internal use.

get_result(discard=True)

Get the result associated with this future, blocking until it is available. If discard is true, then removes the reference to the result contained in this instance, so that a collection of futures need not turn into a cache of all associated results.

property result
wait()

Wait until this future has a result or exception available.

get_exception()

Get the exception associated with this future, blocking until it is available.

property exception

Get the exception associated with this future, blocking until it is available.

get_traceback()

Get the traceback object associated with this future, if any.

property traceback

Get the traceback object associated with this future, if any.

is_done()

Indicates whether this future is done executing (may block if this future is being updated).

property done

Indicates whether this future is done executing (may block if this future is being updated).

westpa.work_managers.environment module

Routines for configuring the work manager environment

class westpa.work_managers.environment.WMEnvironment(use_arg_prefixes=False, valid_work_managers=None)

Bases: object

A class to encapsulate the environment in which work managers are instantiated; this controls how environment variables and command-line arguments are used to set up work managers. This could be used to cleanly instantiate two work managers within one application, but is really more about providing facilities to make it easier for individual work managers to configure themselves according to precendence of configuration information:

  1. command-line arguments

  2. environment variables

  3. defaults

group_title = 'parallelization options'
group_description = None
env_prefix = 'WM'
arg_prefix = 'wm'
default_work_manager = 'serial'
default_parallel_work_manager = 'processes'
valid_work_managers = ['serial', 'threads', 'processes', 'zmq', 'mpi']
env_name(name)
arg_name(name)
arg_flag(name)
get_val(name, default=None, type_=None)
add_wm_args(parser)
process_wm_args(args)
make_work_manager()

Using cues from the environment, instantiate a pre-configured work manager.

westpa.work_managers.environment.make_work_manager()

Using cues from the environment, instantiate a pre-configured work manager.

westpa.work_managers.environment.add_wm_args(parser)
westpa.work_managers.environment.process_wm_args(args)

westpa.work_managers.mpi module

A work manager which uses MPI to distribute tasks and collect results.

class westpa.work_managers.mpi.deque

Bases: object

deque([iterable[, maxlen]]) –> deque object

A list-like sequence optimized for data accesses near its endpoints.

append()

Add an element to the right side of the deque.

appendleft()

Add an element to the left side of the deque.

clear()

Remove all elements from the deque.

copy()

Return a shallow copy of a deque.

count()

D.count(value) – return number of occurrences of value

extend()

Extend the right side of the deque with elements from the iterable

extendleft()

Extend the left side of the deque with elements from the iterable

index()

D.index(value, [start, [stop]]) – return first index of value. Raises ValueError if the value is not present.

insert()

D.insert(index, object) – insert object before index

maxlen

maximum size of a deque or None if unbounded

pop()

Remove and return the rightmost element.

popleft()

Remove and return the leftmost element.

remove()

D.remove(value) – remove first occurrence of value.

reverse()

D.reverse() – reverse IN PLACE

rotate()

Rotate the deque n steps to the right (default n=1). If n is negative, rotates left.

class westpa.work_managers.mpi.WorkManager

Bases: object

Base class for all work managers. At a minimum, work managers must provide a submit() function and a n_workers attribute (which may be a property), though most will also override startup() and shutdown().

classmethod from_environ(wmenv=None)
classmethod add_wm_args(parser, wmenv=None)
sigint_handler(signum, frame)
install_sigint_handler()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

run()

Run the worker loop (in clients only).

submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

as_completed(futures)

Return a generator which yields results from the given futures as they become available.

submit_as_completed(task_generator, queue_size=None)

Return a generator which yields results from a set of futures as they become available. Futures are generated by the task_generator, which must return a triple of the form expected by submit. The method also accepts an int queue_size that dictates the maximum number of Futures that should be pending at any given time. The default value of None submits all of the tasks at once.

wait_any(futures)

Wait on any of the given futures and return the first one which has a result available. If more than one result is or becomes available simultaneously, any completed future may be returned.

wait_all(futures)

A convenience function which waits on all the given futures in order. This function returns the same futures as submitted to the function as a list, indicating the order in which waits occurred.

property is_master

True if this is the master process for task distribution. This is necessary, e.g., for MPI, where all processes start identically and then must branch depending on rank.

class westpa.work_managers.mpi.WMFuture(task_id=None)

Bases: object

A “future”, representing work which has been dispatched for completion asynchronously.

static all_acquired(futures)

Context manager to acquire all locks on the given futures. Primarily for internal use.

get_result(discard=True)

Get the result associated with this future, blocking until it is available. If discard is true, then removes the reference to the result contained in this instance, so that a collection of futures need not turn into a cache of all associated results.

property result
wait()

Wait until this future has a result or exception available.

get_exception()

Get the exception associated with this future, blocking until it is available.

property exception

Get the exception associated with this future, blocking until it is available.

get_traceback()

Get the traceback object associated with this future, if any.

property traceback

Get the traceback object associated with this future, if any.

is_done()

Indicates whether this future is done executing (may block if this future is being updated).

property done

Indicates whether this future is done executing (may block if this future is being updated).

class westpa.work_managers.mpi.Task(task_id, fn, args, kwargs)

Bases: object

Tasks are tuples of (task_id, function, args, keyword args)

class westpa.work_managers.mpi.MPIWorkManager

Bases: WorkManager

MPIWorkManager factory.

Initialize info shared by Manager and Worker classes.

classmethod from_environ(wmenv=None)
submit(fn, args=None, kwargs=None)

Adhere to WorkManager interface. This method should never be called.

class westpa.work_managers.mpi.Serial

Bases: MPIWorkManager

Replication of the serial work manager. This is a fallback for MPI runs that request only 1 (size=1) processor.

Initialize info shared by Manager and Worker classes.

submit(fn, args=None, kwargs=None)

Adhere to WorkManager interface. This method should never be called.

class westpa.work_managers.mpi.Manager

Bases: MPIWorkManager

Manager of the MPIWorkManage. Distributes tasks to Worker as they are received from the sim_manager. In addition to the main thread, this class spawns two threads, a receiver and a dispatcher.

Initialize different state variables used by Manager.

startup()

Spawns the dispatcher and receiver threads.

submit(fn, args=None, kwargs=None)

Receive task from simulation manager and add it to pending_futures.

shutdown()

Send shutdown tag to all worker processes, and set the shutdown sentinel to stop the receiver and dispatcher loops.

class westpa.work_managers.mpi.Worker

Bases: MPIWorkManager

Client class for executing tasks as distributed by the Manager in the MPI Work Manager

Initialize info shared by Manager and Worker classes.

startup()

Clock the worker in for work.

clockIn()

Do each task as it comes in. The completion of a task is notice to the manager that more work is welcome.

property is_master

Worker processes need to be marked as not manager. This ensures that the proper branching is followed in w_run.py.

westpa.work_managers.processes module

class westpa.work_managers.processes.WorkManager

Bases: object

Base class for all work managers. At a minimum, work managers must provide a submit() function and a n_workers attribute (which may be a property), though most will also override startup() and shutdown().

classmethod from_environ(wmenv=None)
classmethod add_wm_args(parser, wmenv=None)
sigint_handler(signum, frame)
install_sigint_handler()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

run()

Run the worker loop (in clients only).

submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

as_completed(futures)

Return a generator which yields results from the given futures as they become available.

submit_as_completed(task_generator, queue_size=None)

Return a generator which yields results from a set of futures as they become available. Futures are generated by the task_generator, which must return a triple of the form expected by submit. The method also accepts an int queue_size that dictates the maximum number of Futures that should be pending at any given time. The default value of None submits all of the tasks at once.

wait_any(futures)

Wait on any of the given futures and return the first one which has a result available. If more than one result is or becomes available simultaneously, any completed future may be returned.

wait_all(futures)

A convenience function which waits on all the given futures in order. This function returns the same futures as submitted to the function as a list, indicating the order in which waits occurred.

property is_master

True if this is the master process for task distribution. This is necessary, e.g., for MPI, where all processes start identically and then must branch depending on rank.

class westpa.work_managers.processes.WMFuture(task_id=None)

Bases: object

A “future”, representing work which has been dispatched for completion asynchronously.

static all_acquired(futures)

Context manager to acquire all locks on the given futures. Primarily for internal use.

get_result(discard=True)

Get the result associated with this future, blocking until it is available. If discard is true, then removes the reference to the result contained in this instance, so that a collection of futures need not turn into a cache of all associated results.

property result
wait()

Wait until this future has a result or exception available.

get_exception()

Get the exception associated with this future, blocking until it is available.

property exception

Get the exception associated with this future, blocking until it is available.

get_traceback()

Get the traceback object associated with this future, if any.

property traceback

Get the traceback object associated with this future, if any.

is_done()

Indicates whether this future is done executing (may block if this future is being updated).

property done

Indicates whether this future is done executing (may block if this future is being updated).

class westpa.work_managers.processes.ProcessWorkManager(n_workers=None, shutdown_timeout=1)

Bases: WorkManager

A work manager using the multiprocessing module.

Notes

On MacOS, as of Python 3.8 the default start method for multiprocessing launching new processes was changed from fork to spawn. In general, spawn is more robust and efficient, however it requires serializability of everything being passed to the child process. In contrast, fork is much less memory efficient, as it makes a full copy of everything in the parent process. However, it does not require picklability.

So, on MacOS, the method for launching new processes is explicitly changed to fork from the (MacOS-specific) default of spawn. Unix should default to fork.

See https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods and https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods for more details.

classmethod from_environ(wmenv=None)
task_loop()
results_loop()
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

westpa.work_managers.serial module

class westpa.work_managers.serial.WorkManager

Bases: object

Base class for all work managers. At a minimum, work managers must provide a submit() function and a n_workers attribute (which may be a property), though most will also override startup() and shutdown().

classmethod from_environ(wmenv=None)
classmethod add_wm_args(parser, wmenv=None)
sigint_handler(signum, frame)
install_sigint_handler()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

run()

Run the worker loop (in clients only).

submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

as_completed(futures)

Return a generator which yields results from the given futures as they become available.

submit_as_completed(task_generator, queue_size=None)

Return a generator which yields results from a set of futures as they become available. Futures are generated by the task_generator, which must return a triple of the form expected by submit. The method also accepts an int queue_size that dictates the maximum number of Futures that should be pending at any given time. The default value of None submits all of the tasks at once.

wait_any(futures)

Wait on any of the given futures and return the first one which has a result available. If more than one result is or becomes available simultaneously, any completed future may be returned.

wait_all(futures)

A convenience function which waits on all the given futures in order. This function returns the same futures as submitted to the function as a list, indicating the order in which waits occurred.

property is_master

True if this is the master process for task distribution. This is necessary, e.g., for MPI, where all processes start identically and then must branch depending on rank.

class westpa.work_managers.serial.WMFuture(task_id=None)

Bases: object

A “future”, representing work which has been dispatched for completion asynchronously.

static all_acquired(futures)

Context manager to acquire all locks on the given futures. Primarily for internal use.

get_result(discard=True)

Get the result associated with this future, blocking until it is available. If discard is true, then removes the reference to the result contained in this instance, so that a collection of futures need not turn into a cache of all associated results.

property result
wait()

Wait until this future has a result or exception available.

get_exception()

Get the exception associated with this future, blocking until it is available.

property exception

Get the exception associated with this future, blocking until it is available.

get_traceback()

Get the traceback object associated with this future, if any.

property traceback

Get the traceback object associated with this future, if any.

is_done()

Indicates whether this future is done executing (may block if this future is being updated).

property done

Indicates whether this future is done executing (may block if this future is being updated).

class westpa.work_managers.serial.SerialWorkManager

Bases: WorkManager

classmethod from_environ(wmenv=None)
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

westpa.work_managers.threads module

class westpa.work_managers.threads.WorkManager

Bases: object

Base class for all work managers. At a minimum, work managers must provide a submit() function and a n_workers attribute (which may be a property), though most will also override startup() and shutdown().

classmethod from_environ(wmenv=None)
classmethod add_wm_args(parser, wmenv=None)
sigint_handler(signum, frame)
install_sigint_handler()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

run()

Run the worker loop (in clients only).

submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

as_completed(futures)

Return a generator which yields results from the given futures as they become available.

submit_as_completed(task_generator, queue_size=None)

Return a generator which yields results from a set of futures as they become available. Futures are generated by the task_generator, which must return a triple of the form expected by submit. The method also accepts an int queue_size that dictates the maximum number of Futures that should be pending at any given time. The default value of None submits all of the tasks at once.

wait_any(futures)

Wait on any of the given futures and return the first one which has a result available. If more than one result is or becomes available simultaneously, any completed future may be returned.

wait_all(futures)

A convenience function which waits on all the given futures in order. This function returns the same futures as submitted to the function as a list, indicating the order in which waits occurred.

property is_master

True if this is the master process for task distribution. This is necessary, e.g., for MPI, where all processes start identically and then must branch depending on rank.

class westpa.work_managers.threads.WMFuture(task_id=None)

Bases: object

A “future”, representing work which has been dispatched for completion asynchronously.

static all_acquired(futures)

Context manager to acquire all locks on the given futures. Primarily for internal use.

get_result(discard=True)

Get the result associated with this future, blocking until it is available. If discard is true, then removes the reference to the result contained in this instance, so that a collection of futures need not turn into a cache of all associated results.

property result
wait()

Wait until this future has a result or exception available.

get_exception()

Get the exception associated with this future, blocking until it is available.

property exception

Get the exception associated with this future, blocking until it is available.

get_traceback()

Get the traceback object associated with this future, if any.

property traceback

Get the traceback object associated with this future, if any.

is_done()

Indicates whether this future is done executing (may block if this future is being updated).

property done

Indicates whether this future is done executing (may block if this future is being updated).

class westpa.work_managers.threads.Task(fn, args, kwargs, future)

Bases: object

run()
class westpa.work_managers.threads.ThreadsWorkManager(n_workers=None)

Bases: WorkManager

A work manager using threads.

classmethod from_environ(wmenv=None)
runtask(task_queue)
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.