westpa.work_managers.zeromq package

westpa.work_managers.zeromq module

exception westpa.work_managers.zeromq.ZMQWMError

Bases: RuntimeError

Base class for errors related to the ZeroMQ work manager itself

exception westpa.work_managers.zeromq.ZMQWMTimeout

Bases: ZMQWMEnvironmentError

A timeout of a sort that indicatess that a master or worker has failed or never started.

exception westpa.work_managers.zeromq.ZMQWMEnvironmentError

Bases: ZMQWMError

Class representing an error in the environment in which the ZeroMQ work manager is running. This includes such things as master/worker ID mismatches.

exception westpa.work_managers.zeromq.ZMQWorkerMissing

Bases: ZMQWMError

Exception representing that a worker processing a task died or disappeared

class westpa.work_managers.zeromq.ZMQCore

Bases: object

PROTOCOL_MAJOR = 3
PROTOCOL_MINOR = 0
PROTOCOL_UPDATE = 0
PROTOCOL_VERSION = (3, 0, 0)
internal_transport = 'ipc'
default_comm_mode = 'ipc'
default_master_heartbeat = 20.0
default_worker_heartbeat = 20.0
default_timeout_factor = 5.0
default_startup_timeout = 120.0
default_shutdown_timeout = 5.0
classmethod make_ipc_endpoint()
classmethod remove_ipc_endpoints()
classmethod make_tcp_endpoint(address='127.0.0.1')
classmethod make_internal_endpoint()
get_identification()
validate_message(message)

Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).

message_validation(msg)

A context manager for message validation. The instance variable validation_fail_action controls the behavior of this context manager:

  • ‘raise’: re-raise the exception that indicated failed validation. Useful for development.

  • ‘exit’ (default): report the error and exit the program.

  • ‘warn’: report the error and continue.

recv_message(socket, flags=0, validate=True, timeout=None)

Receive a message object from the given socket, using the given flags. Message validation is performed if validate is true. If timeout is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception. timeout is ignored if flags includes zmq.NOBLOCK.

recv_all(socket, flags=0, validate=True)

Receive all messages currently available from the given socket.

recv_ack(socket, flags=0, validate=True, timeout=None)
send_message(socket, message, payload=None, flags=0)

Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message. message may either be a pre-constructed Message object or a message identifier, in which (latter) case payload will become the message payload. payload is ignored if message is a Message object.

send_reply(socket, original_message, reply='ok', payload=None, flags=0)

Send a reply to original_message on socket. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set from original_message, unless master_id is not set, in which case it is set from self.master_id.

send_ack(socket, original_message)

Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.

send_nak(socket, original_message)

Send a negative acknowledgement message.

send_inproc_message(message, payload=None, flags=0)
signal_shutdown()
shutdown_handler(signal=None, frame=None)
install_signal_handlers(signals=None)
install_sigint_handler()
startup()
shutdown()
join()
class westpa.work_managers.zeromq.ZMQNode(upstream_rr_endpoint, upstream_ann_endpoint, n_local_workers=None)

Bases: ZMQCore, IsNode

run()
property is_master
comm_loop()
startup()
class westpa.work_managers.zeromq.ZMQWorker(rr_endpoint, ann_endpoint)

Bases: ZMQCore

This is the outward facing worker component of the ZMQ work manager. This forms the interface to the master. This process cannot hang or crash due to an error in tasks it executes, so tasks are isolated in ZMQExecutor, which communicates with ZMQWorker via (what else?) ZeroMQ.

property is_master
update_master_info(msg)
identify(rr_socket)
request_task(rr_socket, task_socket)
handle_reconfigure_timeout(msg, timers)
handle_result(result_socket, rr_socket)
comm_loop()

Master communication loop for the worker process.

shutdown_executor()
install_signal_handlers(signals=None)
startup(process_index=None)
class westpa.work_managers.zeromq.ZMQWorkManager(n_local_workers=1)

Bases: ZMQCore, WorkManager, IsNode

classmethod add_wm_args(parser, wmenv=None)
classmethod from_environ(wmenv=None)
classmethod read_host_info(filename)
classmethod canonicalize_endpoint(endpoint, allow_wildcard_host=True)
property n_workers
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

send_message(socket, message, payload=None, flags=0)

Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message. message may either be a pre-constructed Message object or a message identifier, in which (latter) case payload will become the message payload. payload is ignored if message is a Message object.

handle_result(socket, msg)
handle_task_request(socket, msg)
update_worker_information(msg)
check_workers()
remove_worker(worker_id)
shutdown_clear_tasks()

Abort pending tasks with error on shutdown.

comm_loop()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

westpa.work_managers.zeromq.core module

Created on May 29, 2015

@author: mzwier

westpa.work_managers.zeromq.core.randport(address='127.0.0.1')

Select a random unused TCP port number on the given address.

exception westpa.work_managers.zeromq.core.ZMQWMError

Bases: RuntimeError

Base class for errors related to the ZeroMQ work manager itself

exception westpa.work_managers.zeromq.core.ZMQWorkerMissing

Bases: ZMQWMError

Exception representing that a worker processing a task died or disappeared

exception westpa.work_managers.zeromq.core.ZMQWMEnvironmentError

Bases: ZMQWMError

Class representing an error in the environment in which the ZeroMQ work manager is running. This includes such things as master/worker ID mismatches.

exception westpa.work_managers.zeromq.core.ZMQWMTimeout

Bases: ZMQWMEnvironmentError

A timeout of a sort that indicatess that a master or worker has failed or never started.

class westpa.work_managers.zeromq.core.Message(message=None, payload=None, master_id=None, src_id=None)

Bases: object

SHUTDOWN = 'shutdown'
ACK = 'ok'
NAK = 'no'
IDENTIFY = 'identify'
TASKS_AVAILABLE = 'tasks_available'
TASK_REQUEST = 'task_request'
MASTER_BEACON = 'master_alive'
RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
TASK = 'task'
RESULT = 'result'
idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
classmethod coalesce_announcements(messages)
class westpa.work_managers.zeromq.core.Task(fn, args, kwargs, task_id=None)

Bases: object

execute()

Run this task, returning a Result object.

class westpa.work_managers.zeromq.core.Result(task_id, result=None, exception=None, traceback=None)

Bases: object

class westpa.work_managers.zeromq.core.PassiveTimer(duration, started=None)

Bases: object

started
duration
property expired
property expires_in
reset(at=None)
start(at=None)
class westpa.work_managers.zeromq.core.PassiveMultiTimer

Bases: object

add_timer(identifier, duration)
remove_timer(identifier)
change_duration(identifier, duration)
reset(identifier=None, at=None)
expired(identifier, at=None)
next_expiration()
next_expiration_in()
which_expired(at=None)
class westpa.work_managers.zeromq.core.ZMQCore

Bases: object

PROTOCOL_MAJOR = 3
PROTOCOL_MINOR = 0
PROTOCOL_UPDATE = 0
PROTOCOL_VERSION = (3, 0, 0)
internal_transport = 'ipc'
default_comm_mode = 'ipc'
default_master_heartbeat = 20.0
default_worker_heartbeat = 20.0
default_timeout_factor = 5.0
default_startup_timeout = 120.0
default_shutdown_timeout = 5.0
classmethod make_ipc_endpoint()
classmethod remove_ipc_endpoints()
classmethod make_tcp_endpoint(address='127.0.0.1')
classmethod make_internal_endpoint()
get_identification()
validate_message(message)

Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).

message_validation(msg)

A context manager for message validation. The instance variable validation_fail_action controls the behavior of this context manager:

  • ‘raise’: re-raise the exception that indicated failed validation. Useful for development.

  • ‘exit’ (default): report the error and exit the program.

  • ‘warn’: report the error and continue.

recv_message(socket, flags=0, validate=True, timeout=None)

Receive a message object from the given socket, using the given flags. Message validation is performed if validate is true. If timeout is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception. timeout is ignored if flags includes zmq.NOBLOCK.

recv_all(socket, flags=0, validate=True)

Receive all messages currently available from the given socket.

recv_ack(socket, flags=0, validate=True, timeout=None)
send_message(socket, message, payload=None, flags=0)

Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message. message may either be a pre-constructed Message object or a message identifier, in which (latter) case payload will become the message payload. payload is ignored if message is a Message object.

send_reply(socket, original_message, reply='ok', payload=None, flags=0)

Send a reply to original_message on socket. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set from original_message, unless master_id is not set, in which case it is set from self.master_id.

send_ack(socket, original_message)

Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.

send_nak(socket, original_message)

Send a negative acknowledgement message.

send_inproc_message(message, payload=None, flags=0)
signal_shutdown()
shutdown_handler(signal=None, frame=None)
install_signal_handlers(signals=None)
install_sigint_handler()
startup()
shutdown()
join()
westpa.work_managers.zeromq.core.shutdown_process(process, timeout=1.0)
class westpa.work_managers.zeromq.core.IsNode(n_local_workers=None)

Bases: object

write_host_info(filename=None)
startup()
shutdown()

westpa.work_managers.zeromq.node module

Created on Jun 11, 2015

@author: mzwier

class westpa.work_managers.zeromq.node.ZMQCore

Bases: object

PROTOCOL_MAJOR = 3
PROTOCOL_MINOR = 0
PROTOCOL_UPDATE = 0
PROTOCOL_VERSION = (3, 0, 0)
internal_transport = 'ipc'
default_comm_mode = 'ipc'
default_master_heartbeat = 20.0
default_worker_heartbeat = 20.0
default_timeout_factor = 5.0
default_startup_timeout = 120.0
default_shutdown_timeout = 5.0
classmethod make_ipc_endpoint()
classmethod remove_ipc_endpoints()
classmethod make_tcp_endpoint(address='127.0.0.1')
classmethod make_internal_endpoint()
get_identification()
validate_message(message)

Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).

message_validation(msg)

A context manager for message validation. The instance variable validation_fail_action controls the behavior of this context manager:

  • ‘raise’: re-raise the exception that indicated failed validation. Useful for development.

  • ‘exit’ (default): report the error and exit the program.

  • ‘warn’: report the error and continue.

recv_message(socket, flags=0, validate=True, timeout=None)

Receive a message object from the given socket, using the given flags. Message validation is performed if validate is true. If timeout is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception. timeout is ignored if flags includes zmq.NOBLOCK.

recv_all(socket, flags=0, validate=True)

Receive all messages currently available from the given socket.

recv_ack(socket, flags=0, validate=True, timeout=None)
send_message(socket, message, payload=None, flags=0)

Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message. message may either be a pre-constructed Message object or a message identifier, in which (latter) case payload will become the message payload. payload is ignored if message is a Message object.

send_reply(socket, original_message, reply='ok', payload=None, flags=0)

Send a reply to original_message on socket. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set from original_message, unless master_id is not set, in which case it is set from self.master_id.

send_ack(socket, original_message)

Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.

send_nak(socket, original_message)

Send a negative acknowledgement message.

send_inproc_message(message, payload=None, flags=0)
signal_shutdown()
shutdown_handler(signal=None, frame=None)
install_signal_handlers(signals=None)
install_sigint_handler()
startup()
shutdown()
join()
class westpa.work_managers.zeromq.node.Message(message=None, payload=None, master_id=None, src_id=None)

Bases: object

SHUTDOWN = 'shutdown'
ACK = 'ok'
NAK = 'no'
IDENTIFY = 'identify'
TASKS_AVAILABLE = 'tasks_available'
TASK_REQUEST = 'task_request'
MASTER_BEACON = 'master_alive'
RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
TASK = 'task'
RESULT = 'result'
idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
classmethod coalesce_announcements(messages)
class westpa.work_managers.zeromq.node.PassiveMultiTimer

Bases: object

add_timer(identifier, duration)
remove_timer(identifier)
change_duration(identifier, duration)
reset(identifier=None, at=None)
expired(identifier, at=None)
next_expiration()
next_expiration_in()
which_expired(at=None)
class westpa.work_managers.zeromq.node.IsNode(n_local_workers=None)

Bases: object

write_host_info(filename=None)
startup()
shutdown()
class westpa.work_managers.zeromq.node.ThreadProxy(in_type, out_type, mon_type=SocketType.PUB)

Bases: ProxyBase, ThreadDevice

Proxy in a Thread. See Proxy for more.

class westpa.work_managers.zeromq.node.ZMQNode(upstream_rr_endpoint, upstream_ann_endpoint, n_local_workers=None)

Bases: ZMQCore, IsNode

run()
property is_master
comm_loop()
startup()

westpa.work_managers.zeromq.work_manager module

class westpa.work_managers.zeromq.work_manager.ZMQCore

Bases: object

PROTOCOL_MAJOR = 3
PROTOCOL_MINOR = 0
PROTOCOL_UPDATE = 0
PROTOCOL_VERSION = (3, 0, 0)
internal_transport = 'ipc'
default_comm_mode = 'ipc'
default_master_heartbeat = 20.0
default_worker_heartbeat = 20.0
default_timeout_factor = 5.0
default_startup_timeout = 120.0
default_shutdown_timeout = 5.0
classmethod make_ipc_endpoint()
classmethod remove_ipc_endpoints()
classmethod make_tcp_endpoint(address='127.0.0.1')
classmethod make_internal_endpoint()
get_identification()
validate_message(message)

Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).

message_validation(msg)

A context manager for message validation. The instance variable validation_fail_action controls the behavior of this context manager:

  • ‘raise’: re-raise the exception that indicated failed validation. Useful for development.

  • ‘exit’ (default): report the error and exit the program.

  • ‘warn’: report the error and continue.

recv_message(socket, flags=0, validate=True, timeout=None)

Receive a message object from the given socket, using the given flags. Message validation is performed if validate is true. If timeout is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception. timeout is ignored if flags includes zmq.NOBLOCK.

recv_all(socket, flags=0, validate=True)

Receive all messages currently available from the given socket.

recv_ack(socket, flags=0, validate=True, timeout=None)
send_message(socket, message, payload=None, flags=0)

Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message. message may either be a pre-constructed Message object or a message identifier, in which (latter) case payload will become the message payload. payload is ignored if message is a Message object.

send_reply(socket, original_message, reply='ok', payload=None, flags=0)

Send a reply to original_message on socket. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set from original_message, unless master_id is not set, in which case it is set from self.master_id.

send_ack(socket, original_message)

Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.

send_nak(socket, original_message)

Send a negative acknowledgement message.

send_inproc_message(message, payload=None, flags=0)
signal_shutdown()
shutdown_handler(signal=None, frame=None)
install_signal_handlers(signals=None)
install_sigint_handler()
startup()
shutdown()
join()
class westpa.work_managers.zeromq.work_manager.Message(message=None, payload=None, master_id=None, src_id=None)

Bases: object

SHUTDOWN = 'shutdown'
ACK = 'ok'
NAK = 'no'
IDENTIFY = 'identify'
TASKS_AVAILABLE = 'tasks_available'
TASK_REQUEST = 'task_request'
MASTER_BEACON = 'master_alive'
RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
TASK = 'task'
RESULT = 'result'
idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
classmethod coalesce_announcements(messages)
class westpa.work_managers.zeromq.work_manager.Task(fn, args, kwargs, task_id=None)

Bases: object

execute()

Run this task, returning a Result object.

class westpa.work_managers.zeromq.work_manager.Result(task_id, result=None, exception=None, traceback=None)

Bases: object

exception westpa.work_managers.zeromq.work_manager.ZMQWorkerMissing

Bases: ZMQWMError

Exception representing that a worker processing a task died or disappeared

exception westpa.work_managers.zeromq.work_manager.ZMQWMEnvironmentError

Bases: ZMQWMError

Class representing an error in the environment in which the ZeroMQ work manager is running. This includes such things as master/worker ID mismatches.

class westpa.work_managers.zeromq.work_manager.IsNode(n_local_workers=None)

Bases: object

write_host_info(filename=None)
startup()
shutdown()
class westpa.work_managers.zeromq.work_manager.PassiveMultiTimer

Bases: object

add_timer(identifier, duration)
remove_timer(identifier)
change_duration(identifier, duration)
reset(identifier=None, at=None)
expired(identifier, at=None)
next_expiration()
next_expiration_in()
which_expired(at=None)
westpa.work_managers.zeromq.work_manager.randport(address='127.0.0.1')

Select a random unused TCP port number on the given address.

class westpa.work_managers.zeromq.work_manager.ZMQWorker(rr_endpoint, ann_endpoint)

Bases: ZMQCore

This is the outward facing worker component of the ZMQ work manager. This forms the interface to the master. This process cannot hang or crash due to an error in tasks it executes, so tasks are isolated in ZMQExecutor, which communicates with ZMQWorker via (what else?) ZeroMQ.

property is_master
update_master_info(msg)
identify(rr_socket)
request_task(rr_socket, task_socket)
handle_reconfigure_timeout(msg, timers)
handle_result(result_socket, rr_socket)
comm_loop()

Master communication loop for the worker process.

shutdown_executor()
install_signal_handlers(signals=None)
startup(process_index=None)
class westpa.work_managers.zeromq.work_manager.ZMQNode(upstream_rr_endpoint, upstream_ann_endpoint, n_local_workers=None)

Bases: ZMQCore, IsNode

run()
property is_master
comm_loop()
startup()
class westpa.work_managers.zeromq.work_manager.WorkManager

Bases: object

Base class for all work managers. At a minimum, work managers must provide a submit() function and a n_workers attribute (which may be a property), though most will also override startup() and shutdown().

classmethod from_environ(wmenv=None)
classmethod add_wm_args(parser, wmenv=None)
sigint_handler(signum, frame)
install_sigint_handler()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

run()

Run the worker loop (in clients only).

submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

as_completed(futures)

Return a generator which yields results from the given futures as they become available.

submit_as_completed(task_generator, queue_size=None)

Return a generator which yields results from a set of futures as they become available. Futures are generated by the task_generator, which must return a triple of the form expected by submit. The method also accepts an int queue_size that dictates the maximum number of Futures that should be pending at any given time. The default value of None submits all of the tasks at once.

wait_any(futures)

Wait on any of the given futures and return the first one which has a result available. If more than one result is or becomes available simultaneously, any completed future may be returned.

wait_all(futures)

A convenience function which waits on all the given futures in order. This function returns the same futures as submitted to the function as a list, indicating the order in which waits occurred.

property is_master

True if this is the master process for task distribution. This is necessary, e.g., for MPI, where all processes start identically and then must branch depending on rank.

class westpa.work_managers.zeromq.work_manager.WMFuture(task_id=None)

Bases: object

A “future”, representing work which has been dispatched for completion asynchronously.

static all_acquired(futures)

Context manager to acquire all locks on the given futures. Primarily for internal use.

get_result(discard=True)

Get the result associated with this future, blocking until it is available. If discard is true, then removes the reference to the result contained in this instance, so that a collection of futures need not turn into a cache of all associated results.

property result
wait()

Wait until this future has a result or exception available.

get_exception()

Get the exception associated with this future, blocking until it is available.

property exception

Get the exception associated with this future, blocking until it is available.

get_traceback()

Get the traceback object associated with this future, if any.

property traceback

Get the traceback object associated with this future, if any.

is_done()

Indicates whether this future is done executing (may block if this future is being updated).

property done

Indicates whether this future is done executing (may block if this future is being updated).

class westpa.work_managers.zeromq.work_manager.deque

Bases: object

deque([iterable[, maxlen]]) –> deque object

A list-like sequence optimized for data accesses near its endpoints.

append()

Add an element to the right side of the deque.

appendleft()

Add an element to the left side of the deque.

clear()

Remove all elements from the deque.

copy()

Return a shallow copy of a deque.

count()

D.count(value) – return number of occurrences of value

extend()

Extend the right side of the deque with elements from the iterable

extendleft()

Extend the left side of the deque with elements from the iterable

index()

D.index(value, [start, [stop]]) – return first index of value. Raises ValueError if the value is not present.

insert()

D.insert(index, object) – insert object before index

maxlen

maximum size of a deque or None if unbounded

pop()

Remove and return the rightmost element.

popleft()

Remove and return the leftmost element.

remove()

D.remove(value) – remove first occurrence of value.

reverse()

D.reverse() – reverse IN PLACE

rotate()

Rotate the deque n steps to the right (default n=1). If n is negative, rotates left.

class westpa.work_managers.zeromq.work_manager.ZMQWorkManager(n_local_workers=1)

Bases: ZMQCore, WorkManager, IsNode

classmethod add_wm_args(parser, wmenv=None)
classmethod from_environ(wmenv=None)
classmethod read_host_info(filename)
classmethod canonicalize_endpoint(endpoint, allow_wildcard_host=True)
property n_workers
submit(fn, args=None, kwargs=None)

Submit a task to the work manager, returning a WMFuture object representing the pending result. fn(*args,**kwargs) will be executed by a worker, and the return value assigned as the result of the returned future. The function fn and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).

submit_many(tasks)

Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in tasks should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The function fn and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.

send_message(socket, message, payload=None, flags=0)

Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message. message may either be a pre-constructed Message object or a message identifier, in which (latter) case payload will become the message payload. payload is ignored if message is a Message object.

handle_result(socket, msg)
handle_task_request(socket, msg)
update_worker_information(msg)
check_workers()
remove_worker(worker_id)
shutdown_clear_tasks()

Abort pending tasks with error on shutdown.

comm_loop()
startup()

Perform any necessary startup work, such as spawning clients.

shutdown()

Cleanly shut down any active workers.

westpa.work_managers.zeromq.worker module

Created on May 29, 2015

@author: mzwier

class westpa.work_managers.zeromq.worker.ZMQCore

Bases: object

PROTOCOL_MAJOR = 3
PROTOCOL_MINOR = 0
PROTOCOL_UPDATE = 0
PROTOCOL_VERSION = (3, 0, 0)
internal_transport = 'ipc'
default_comm_mode = 'ipc'
default_master_heartbeat = 20.0
default_worker_heartbeat = 20.0
default_timeout_factor = 5.0
default_startup_timeout = 120.0
default_shutdown_timeout = 5.0
classmethod make_ipc_endpoint()
classmethod remove_ipc_endpoints()
classmethod make_tcp_endpoint(address='127.0.0.1')
classmethod make_internal_endpoint()
get_identification()
validate_message(message)

Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).

message_validation(msg)

A context manager for message validation. The instance variable validation_fail_action controls the behavior of this context manager:

  • ‘raise’: re-raise the exception that indicated failed validation. Useful for development.

  • ‘exit’ (default): report the error and exit the program.

  • ‘warn’: report the error and continue.

recv_message(socket, flags=0, validate=True, timeout=None)

Receive a message object from the given socket, using the given flags. Message validation is performed if validate is true. If timeout is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception. timeout is ignored if flags includes zmq.NOBLOCK.

recv_all(socket, flags=0, validate=True)

Receive all messages currently available from the given socket.

recv_ack(socket, flags=0, validate=True, timeout=None)
send_message(socket, message, payload=None, flags=0)

Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message. message may either be a pre-constructed Message object or a message identifier, in which (latter) case payload will become the message payload. payload is ignored if message is a Message object.

send_reply(socket, original_message, reply='ok', payload=None, flags=0)

Send a reply to original_message on socket. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set from original_message, unless master_id is not set, in which case it is set from self.master_id.

send_ack(socket, original_message)

Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.

send_nak(socket, original_message)

Send a negative acknowledgement message.

send_inproc_message(message, payload=None, flags=0)
signal_shutdown()
shutdown_handler(signal=None, frame=None)
install_signal_handlers(signals=None)
install_sigint_handler()
startup()
shutdown()
join()
class westpa.work_managers.zeromq.worker.Message(message=None, payload=None, master_id=None, src_id=None)

Bases: object

SHUTDOWN = 'shutdown'
ACK = 'ok'
NAK = 'no'
IDENTIFY = 'identify'
TASKS_AVAILABLE = 'tasks_available'
TASK_REQUEST = 'task_request'
MASTER_BEACON = 'master_alive'
RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
TASK = 'task'
RESULT = 'result'
idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
classmethod coalesce_announcements(messages)
exception westpa.work_managers.zeromq.worker.ZMQWMTimeout

Bases: ZMQWMEnvironmentError

A timeout of a sort that indicatess that a master or worker has failed or never started.

class westpa.work_managers.zeromq.worker.PassiveMultiTimer

Bases: object

add_timer(identifier, duration)
remove_timer(identifier)
change_duration(identifier, duration)
reset(identifier=None, at=None)
expired(identifier, at=None)
next_expiration()
next_expiration_in()
which_expired(at=None)
class westpa.work_managers.zeromq.worker.Task(fn, args, kwargs, task_id=None)

Bases: object

execute()

Run this task, returning a Result object.

class westpa.work_managers.zeromq.worker.Result(task_id, result=None, exception=None, traceback=None)

Bases: object

class westpa.work_managers.zeromq.worker.ZMQWorker(rr_endpoint, ann_endpoint)

Bases: ZMQCore

This is the outward facing worker component of the ZMQ work manager. This forms the interface to the master. This process cannot hang or crash due to an error in tasks it executes, so tasks are isolated in ZMQExecutor, which communicates with ZMQWorker via (what else?) ZeroMQ.

property is_master
update_master_info(msg)
identify(rr_socket)
request_task(rr_socket, task_socket)
handle_reconfigure_timeout(msg, timers)
handle_result(result_socket, rr_socket)
comm_loop()

Master communication loop for the worker process.

shutdown_executor()
install_signal_handlers(signals=None)
startup(process_index=None)
class westpa.work_managers.zeromq.worker.ZMQExecutor(task_endpoint, result_endpoint)

Bases: ZMQCore

The is the component of the ZMQ WM worker that actually executes tasks. This is isolated in a separate process and controlled via ZMQ from the ZMQWorker.

comm_loop()
startup(process_index=None)