westpa.work_managers.zeromq package
westpa.work_managers.zeromq module
- exception westpa.work_managers.zeromq.ZMQWMError
Bases:
RuntimeError
Base class for errors related to the ZeroMQ work manager itself
- exception westpa.work_managers.zeromq.ZMQWMTimeout
Bases:
ZMQWMEnvironmentError
A timeout of a sort that indicatess that a master or worker has failed or never started.
- exception westpa.work_managers.zeromq.ZMQWMEnvironmentError
Bases:
ZMQWMError
Class representing an error in the environment in which the ZeroMQ work manager is running. This includes such things as master/worker ID mismatches.
- exception westpa.work_managers.zeromq.ZMQWorkerMissing
Bases:
ZMQWMError
Exception representing that a worker processing a task died or disappeared
- class westpa.work_managers.zeromq.ZMQCore
Bases:
object
- PROTOCOL_MAJOR = 3
- PROTOCOL_MINOR = 0
- PROTOCOL_UPDATE = 0
- PROTOCOL_VERSION = (3, 0, 0)
- internal_transport = 'ipc'
- default_comm_mode = 'ipc'
- default_master_heartbeat = 20.0
- default_worker_heartbeat = 20.0
- default_timeout_factor = 5.0
- default_startup_timeout = 120.0
- default_shutdown_timeout = 5.0
- classmethod make_ipc_endpoint()
- classmethod remove_ipc_endpoints()
- classmethod make_tcp_endpoint(address='127.0.0.1')
- classmethod make_internal_endpoint()
- get_identification()
- validate_message(message)
Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).
- message_validation(msg)
A context manager for message validation. The instance variable
validation_fail_action
controls the behavior of this context manager:‘raise’: re-raise the exception that indicated failed validation. Useful for development.
‘exit’ (default): report the error and exit the program.
‘warn’: report the error and continue.
- recv_message(socket, flags=0, validate=True, timeout=None)
Receive a message object from the given socket, using the given flags. Message validation is performed if
validate
is true. Iftimeout
is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception.timeout
is ignored ifflags
includeszmq.NOBLOCK
.
- recv_all(socket, flags=0, validate=True)
Receive all messages currently available from the given socket.
- recv_ack(socket, flags=0, validate=True, timeout=None)
- send_message(socket, message, payload=None, flags=0)
Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message.
message
may either be a pre-constructedMessage
object or a message identifier, in which (latter) casepayload
will become the message payload.payload
is ignored ifmessage
is aMessage
object.
- send_reply(socket, original_message, reply='ok', payload=None, flags=0)
Send a reply to
original_message
onsocket
. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set fromoriginal_message
, unless master_id is not set, in which case it is set from self.master_id.
- send_ack(socket, original_message)
Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.
- send_nak(socket, original_message)
Send a negative acknowledgement message.
- send_inproc_message(message, payload=None, flags=0)
- signal_shutdown()
- shutdown_handler(signal=None, frame=None)
- install_signal_handlers(signals=None)
- install_sigint_handler()
- startup()
- shutdown()
- join()
- class westpa.work_managers.zeromq.ZMQNode(upstream_rr_endpoint, upstream_ann_endpoint, n_local_workers=None)
-
- run()
- property is_master
- comm_loop()
- startup()
- class westpa.work_managers.zeromq.ZMQWorker(rr_endpoint, ann_endpoint)
Bases:
ZMQCore
This is the outward facing worker component of the ZMQ work manager. This forms the interface to the master. This process cannot hang or crash due to an error in tasks it executes, so tasks are isolated in ZMQExecutor, which communicates with ZMQWorker via (what else?) ZeroMQ.
- property is_master
- update_master_info(msg)
- identify(rr_socket)
- request_task(rr_socket, task_socket)
- handle_reconfigure_timeout(msg, timers)
- handle_result(result_socket, rr_socket)
- comm_loop()
Master communication loop for the worker process.
- shutdown_executor()
- install_signal_handlers(signals=None)
- startup(process_index=None)
- class westpa.work_managers.zeromq.ZMQWorkManager(n_local_workers=1)
Bases:
ZMQCore
,WorkManager
,IsNode
- classmethod add_wm_args(parser, wmenv=None)
- classmethod from_environ(wmenv=None)
- classmethod read_host_info(filename)
- classmethod canonicalize_endpoint(endpoint, allow_wildcard_host=True)
- property n_workers
- submit(fn, args=None, kwargs=None)
Submit a task to the work manager, returning a WMFuture object representing the pending result.
fn(*args,**kwargs)
will be executed by a worker, and the return value assigned as the result of the returned future. The functionfn
and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).
- submit_many(tasks)
Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in
tasks
should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The functionfn
and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.
- send_message(socket, message, payload=None, flags=0)
Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message.
message
may either be a pre-constructedMessage
object or a message identifier, in which (latter) casepayload
will become the message payload.payload
is ignored ifmessage
is aMessage
object.
- handle_result(socket, msg)
- handle_task_request(socket, msg)
- update_worker_information(msg)
- check_workers()
- remove_worker(worker_id)
- shutdown_clear_tasks()
Abort pending tasks with error on shutdown.
- comm_loop()
- startup()
Perform any necessary startup work, such as spawning clients.
- shutdown()
Cleanly shut down any active workers.
westpa.work_managers.zeromq.core module
Created on May 29, 2015
@author: mzwier
- westpa.work_managers.zeromq.core.randport(address='127.0.0.1')
Select a random unused TCP port number on the given address.
- exception westpa.work_managers.zeromq.core.ZMQWMError
Bases:
RuntimeError
Base class for errors related to the ZeroMQ work manager itself
- exception westpa.work_managers.zeromq.core.ZMQWorkerMissing
Bases:
ZMQWMError
Exception representing that a worker processing a task died or disappeared
- exception westpa.work_managers.zeromq.core.ZMQWMEnvironmentError
Bases:
ZMQWMError
Class representing an error in the environment in which the ZeroMQ work manager is running. This includes such things as master/worker ID mismatches.
- exception westpa.work_managers.zeromq.core.ZMQWMTimeout
Bases:
ZMQWMEnvironmentError
A timeout of a sort that indicatess that a master or worker has failed or never started.
- class westpa.work_managers.zeromq.core.Message(message=None, payload=None, master_id=None, src_id=None)
Bases:
object
- SHUTDOWN = 'shutdown'
- ACK = 'ok'
- NAK = 'no'
- IDENTIFY = 'identify'
- TASKS_AVAILABLE = 'tasks_available'
- TASK_REQUEST = 'task_request'
- MASTER_BEACON = 'master_alive'
- RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
- TASK = 'task'
- RESULT = 'result'
- idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
- classmethod coalesce_announcements(messages)
- class westpa.work_managers.zeromq.core.Task(fn, args, kwargs, task_id=None)
Bases:
object
- execute()
Run this task, returning a Result object.
- class westpa.work_managers.zeromq.core.Result(task_id, result=None, exception=None, traceback=None)
Bases:
object
- class westpa.work_managers.zeromq.core.PassiveTimer(duration, started=None)
Bases:
object
- started
- duration
- property expired
- property expires_in
- reset(at=None)
- start(at=None)
- class westpa.work_managers.zeromq.core.PassiveMultiTimer
Bases:
object
- add_timer(identifier, duration)
- remove_timer(identifier)
- change_duration(identifier, duration)
- reset(identifier=None, at=None)
- expired(identifier, at=None)
- next_expiration()
- next_expiration_in()
- which_expired(at=None)
- class westpa.work_managers.zeromq.core.ZMQCore
Bases:
object
- PROTOCOL_MAJOR = 3
- PROTOCOL_MINOR = 0
- PROTOCOL_UPDATE = 0
- PROTOCOL_VERSION = (3, 0, 0)
- internal_transport = 'ipc'
- default_comm_mode = 'ipc'
- default_master_heartbeat = 20.0
- default_worker_heartbeat = 20.0
- default_timeout_factor = 5.0
- default_startup_timeout = 120.0
- default_shutdown_timeout = 5.0
- classmethod make_ipc_endpoint()
- classmethod remove_ipc_endpoints()
- classmethod make_tcp_endpoint(address='127.0.0.1')
- classmethod make_internal_endpoint()
- get_identification()
- validate_message(message)
Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).
- message_validation(msg)
A context manager for message validation. The instance variable
validation_fail_action
controls the behavior of this context manager:‘raise’: re-raise the exception that indicated failed validation. Useful for development.
‘exit’ (default): report the error and exit the program.
‘warn’: report the error and continue.
- recv_message(socket, flags=0, validate=True, timeout=None)
Receive a message object from the given socket, using the given flags. Message validation is performed if
validate
is true. Iftimeout
is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception.timeout
is ignored ifflags
includeszmq.NOBLOCK
.
- recv_all(socket, flags=0, validate=True)
Receive all messages currently available from the given socket.
- recv_ack(socket, flags=0, validate=True, timeout=None)
- send_message(socket, message, payload=None, flags=0)
Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message.
message
may either be a pre-constructedMessage
object or a message identifier, in which (latter) casepayload
will become the message payload.payload
is ignored ifmessage
is aMessage
object.
- send_reply(socket, original_message, reply='ok', payload=None, flags=0)
Send a reply to
original_message
onsocket
. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set fromoriginal_message
, unless master_id is not set, in which case it is set from self.master_id.
- send_ack(socket, original_message)
Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.
- send_nak(socket, original_message)
Send a negative acknowledgement message.
- send_inproc_message(message, payload=None, flags=0)
- signal_shutdown()
- shutdown_handler(signal=None, frame=None)
- install_signal_handlers(signals=None)
- install_sigint_handler()
- startup()
- shutdown()
- join()
- westpa.work_managers.zeromq.core.shutdown_process(process, timeout=1.0)
westpa.work_managers.zeromq.node module
Created on Jun 11, 2015
@author: mzwier
- class westpa.work_managers.zeromq.node.ZMQCore
Bases:
object
- PROTOCOL_MAJOR = 3
- PROTOCOL_MINOR = 0
- PROTOCOL_UPDATE = 0
- PROTOCOL_VERSION = (3, 0, 0)
- internal_transport = 'ipc'
- default_comm_mode = 'ipc'
- default_master_heartbeat = 20.0
- default_worker_heartbeat = 20.0
- default_timeout_factor = 5.0
- default_startup_timeout = 120.0
- default_shutdown_timeout = 5.0
- classmethod make_ipc_endpoint()
- classmethod remove_ipc_endpoints()
- classmethod make_tcp_endpoint(address='127.0.0.1')
- classmethod make_internal_endpoint()
- get_identification()
- validate_message(message)
Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).
- message_validation(msg)
A context manager for message validation. The instance variable
validation_fail_action
controls the behavior of this context manager:‘raise’: re-raise the exception that indicated failed validation. Useful for development.
‘exit’ (default): report the error and exit the program.
‘warn’: report the error and continue.
- recv_message(socket, flags=0, validate=True, timeout=None)
Receive a message object from the given socket, using the given flags. Message validation is performed if
validate
is true. Iftimeout
is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception.timeout
is ignored ifflags
includeszmq.NOBLOCK
.
- recv_all(socket, flags=0, validate=True)
Receive all messages currently available from the given socket.
- recv_ack(socket, flags=0, validate=True, timeout=None)
- send_message(socket, message, payload=None, flags=0)
Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message.
message
may either be a pre-constructedMessage
object or a message identifier, in which (latter) casepayload
will become the message payload.payload
is ignored ifmessage
is aMessage
object.
- send_reply(socket, original_message, reply='ok', payload=None, flags=0)
Send a reply to
original_message
onsocket
. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set fromoriginal_message
, unless master_id is not set, in which case it is set from self.master_id.
- send_ack(socket, original_message)
Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.
- send_nak(socket, original_message)
Send a negative acknowledgement message.
- send_inproc_message(message, payload=None, flags=0)
- signal_shutdown()
- shutdown_handler(signal=None, frame=None)
- install_signal_handlers(signals=None)
- install_sigint_handler()
- startup()
- shutdown()
- join()
- class westpa.work_managers.zeromq.node.Message(message=None, payload=None, master_id=None, src_id=None)
Bases:
object
- SHUTDOWN = 'shutdown'
- ACK = 'ok'
- NAK = 'no'
- IDENTIFY = 'identify'
- TASKS_AVAILABLE = 'tasks_available'
- TASK_REQUEST = 'task_request'
- MASTER_BEACON = 'master_alive'
- RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
- TASK = 'task'
- RESULT = 'result'
- idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
- classmethod coalesce_announcements(messages)
- class westpa.work_managers.zeromq.node.PassiveMultiTimer
Bases:
object
- add_timer(identifier, duration)
- remove_timer(identifier)
- change_duration(identifier, duration)
- reset(identifier=None, at=None)
- expired(identifier, at=None)
- next_expiration()
- next_expiration_in()
- which_expired(at=None)
- class westpa.work_managers.zeromq.node.IsNode(n_local_workers=None)
Bases:
object
- write_host_info(filename=None)
- startup()
- shutdown()
- class westpa.work_managers.zeromq.node.ThreadProxy(in_type, out_type, mon_type=SocketType.PUB)
Bases:
ProxyBase
,ThreadDevice
Proxy in a Thread. See Proxy for more.
westpa.work_managers.zeromq.work_manager module
- class westpa.work_managers.zeromq.work_manager.ZMQCore
Bases:
object
- PROTOCOL_MAJOR = 3
- PROTOCOL_MINOR = 0
- PROTOCOL_UPDATE = 0
- PROTOCOL_VERSION = (3, 0, 0)
- internal_transport = 'ipc'
- default_comm_mode = 'ipc'
- default_master_heartbeat = 20.0
- default_worker_heartbeat = 20.0
- default_timeout_factor = 5.0
- default_startup_timeout = 120.0
- default_shutdown_timeout = 5.0
- classmethod make_ipc_endpoint()
- classmethod remove_ipc_endpoints()
- classmethod make_tcp_endpoint(address='127.0.0.1')
- classmethod make_internal_endpoint()
- get_identification()
- validate_message(message)
Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).
- message_validation(msg)
A context manager for message validation. The instance variable
validation_fail_action
controls the behavior of this context manager:‘raise’: re-raise the exception that indicated failed validation. Useful for development.
‘exit’ (default): report the error and exit the program.
‘warn’: report the error and continue.
- recv_message(socket, flags=0, validate=True, timeout=None)
Receive a message object from the given socket, using the given flags. Message validation is performed if
validate
is true. Iftimeout
is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception.timeout
is ignored ifflags
includeszmq.NOBLOCK
.
- recv_all(socket, flags=0, validate=True)
Receive all messages currently available from the given socket.
- recv_ack(socket, flags=0, validate=True, timeout=None)
- send_message(socket, message, payload=None, flags=0)
Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message.
message
may either be a pre-constructedMessage
object or a message identifier, in which (latter) casepayload
will become the message payload.payload
is ignored ifmessage
is aMessage
object.
- send_reply(socket, original_message, reply='ok', payload=None, flags=0)
Send a reply to
original_message
onsocket
. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set fromoriginal_message
, unless master_id is not set, in which case it is set from self.master_id.
- send_ack(socket, original_message)
Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.
- send_nak(socket, original_message)
Send a negative acknowledgement message.
- send_inproc_message(message, payload=None, flags=0)
- signal_shutdown()
- shutdown_handler(signal=None, frame=None)
- install_signal_handlers(signals=None)
- install_sigint_handler()
- startup()
- shutdown()
- join()
- class westpa.work_managers.zeromq.work_manager.Message(message=None, payload=None, master_id=None, src_id=None)
Bases:
object
- SHUTDOWN = 'shutdown'
- ACK = 'ok'
- NAK = 'no'
- IDENTIFY = 'identify'
- TASKS_AVAILABLE = 'tasks_available'
- TASK_REQUEST = 'task_request'
- MASTER_BEACON = 'master_alive'
- RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
- TASK = 'task'
- RESULT = 'result'
- idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
- classmethod coalesce_announcements(messages)
- class westpa.work_managers.zeromq.work_manager.Task(fn, args, kwargs, task_id=None)
Bases:
object
- execute()
Run this task, returning a Result object.
- class westpa.work_managers.zeromq.work_manager.Result(task_id, result=None, exception=None, traceback=None)
Bases:
object
- exception westpa.work_managers.zeromq.work_manager.ZMQWorkerMissing
Bases:
ZMQWMError
Exception representing that a worker processing a task died or disappeared
- exception westpa.work_managers.zeromq.work_manager.ZMQWMEnvironmentError
Bases:
ZMQWMError
Class representing an error in the environment in which the ZeroMQ work manager is running. This includes such things as master/worker ID mismatches.
- class westpa.work_managers.zeromq.work_manager.IsNode(n_local_workers=None)
Bases:
object
- write_host_info(filename=None)
- startup()
- shutdown()
- class westpa.work_managers.zeromq.work_manager.PassiveMultiTimer
Bases:
object
- add_timer(identifier, duration)
- remove_timer(identifier)
- change_duration(identifier, duration)
- reset(identifier=None, at=None)
- expired(identifier, at=None)
- next_expiration()
- next_expiration_in()
- which_expired(at=None)
- westpa.work_managers.zeromq.work_manager.randport(address='127.0.0.1')
Select a random unused TCP port number on the given address.
- class westpa.work_managers.zeromq.work_manager.ZMQWorker(rr_endpoint, ann_endpoint)
Bases:
ZMQCore
This is the outward facing worker component of the ZMQ work manager. This forms the interface to the master. This process cannot hang or crash due to an error in tasks it executes, so tasks are isolated in ZMQExecutor, which communicates with ZMQWorker via (what else?) ZeroMQ.
- property is_master
- update_master_info(msg)
- identify(rr_socket)
- request_task(rr_socket, task_socket)
- handle_reconfigure_timeout(msg, timers)
- handle_result(result_socket, rr_socket)
- comm_loop()
Master communication loop for the worker process.
- shutdown_executor()
- install_signal_handlers(signals=None)
- startup(process_index=None)
- class westpa.work_managers.zeromq.work_manager.ZMQNode(upstream_rr_endpoint, upstream_ann_endpoint, n_local_workers=None)
-
- run()
- property is_master
- comm_loop()
- startup()
- class westpa.work_managers.zeromq.work_manager.WorkManager
Bases:
object
Base class for all work managers. At a minimum, work managers must provide a
submit()
function and an_workers
attribute (which may be a property), though most will also overridestartup()
andshutdown()
.- classmethod from_environ(wmenv=None)
- classmethod add_wm_args(parser, wmenv=None)
- sigint_handler(signum, frame)
- install_sigint_handler()
- startup()
Perform any necessary startup work, such as spawning clients.
- shutdown()
Cleanly shut down any active workers.
- run()
Run the worker loop (in clients only).
- submit(fn, args=None, kwargs=None)
Submit a task to the work manager, returning a WMFuture object representing the pending result.
fn(*args,**kwargs)
will be executed by a worker, and the return value assigned as the result of the returned future. The functionfn
and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).
- submit_many(tasks)
Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in
tasks
should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The functionfn
and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.
- as_completed(futures)
Return a generator which yields results from the given
futures
as they become available.
- submit_as_completed(task_generator, queue_size=None)
Return a generator which yields results from a set of
futures
as they become available. Futures are generated by thetask_generator
, which must return a triple of the form expected bysubmit
. The method also accepts an intqueue_size
that dictates the maximum number of Futures that should be pending at any given time. The default value ofNone
submits all of the tasks at once.
- wait_any(futures)
Wait on any of the given
futures
and return the first one which has a result available. If more than one result is or becomes available simultaneously, any completed future may be returned.
- wait_all(futures)
A convenience function which waits on all the given
futures
in order. This function returns the samefutures
as submitted to the function as a list, indicating the order in which waits occurred.
- property is_master
True if this is the master process for task distribution. This is necessary, e.g., for MPI, where all processes start identically and then must branch depending on rank.
- class westpa.work_managers.zeromq.work_manager.WMFuture(task_id=None)
Bases:
object
A “future”, representing work which has been dispatched for completion asynchronously.
- static all_acquired(futures)
Context manager to acquire all locks on the given
futures
. Primarily for internal use.
- get_result(discard=True)
Get the result associated with this future, blocking until it is available. If
discard
is true, then removes the reference to the result contained in this instance, so that a collection of futures need not turn into a cache of all associated results.
- property result
- wait()
Wait until this future has a result or exception available.
- get_exception()
Get the exception associated with this future, blocking until it is available.
- property exception
Get the exception associated with this future, blocking until it is available.
- get_traceback()
Get the traceback object associated with this future, if any.
- property traceback
Get the traceback object associated with this future, if any.
- is_done()
Indicates whether this future is done executing (may block if this future is being updated).
- property done
Indicates whether this future is done executing (may block if this future is being updated).
- class westpa.work_managers.zeromq.work_manager.deque
Bases:
object
deque([iterable[, maxlen]]) –> deque object
A list-like sequence optimized for data accesses near its endpoints.
- append()
Add an element to the right side of the deque.
- appendleft()
Add an element to the left side of the deque.
- clear()
Remove all elements from the deque.
- copy()
Return a shallow copy of a deque.
- count()
D.count(value) – return number of occurrences of value
- extend()
Extend the right side of the deque with elements from the iterable
- extendleft()
Extend the left side of the deque with elements from the iterable
- index()
D.index(value, [start, [stop]]) – return first index of value. Raises ValueError if the value is not present.
- insert()
D.insert(index, object) – insert object before index
- maxlen
maximum size of a deque or None if unbounded
- pop()
Remove and return the rightmost element.
- popleft()
Remove and return the leftmost element.
- remove()
D.remove(value) – remove first occurrence of value.
- reverse()
D.reverse() – reverse IN PLACE
- rotate()
Rotate the deque n steps to the right (default n=1). If n is negative, rotates left.
- class westpa.work_managers.zeromq.work_manager.ZMQWorkManager(n_local_workers=1)
Bases:
ZMQCore
,WorkManager
,IsNode
- classmethod add_wm_args(parser, wmenv=None)
- classmethod from_environ(wmenv=None)
- classmethod read_host_info(filename)
- classmethod canonicalize_endpoint(endpoint, allow_wildcard_host=True)
- property n_workers
- submit(fn, args=None, kwargs=None)
Submit a task to the work manager, returning a WMFuture object representing the pending result.
fn(*args,**kwargs)
will be executed by a worker, and the return value assigned as the result of the returned future. The functionfn
and all arguments must be picklable; note particularly that off-path modules (like the system module and any active plugins) are not picklable unless pre-loaded in the worker process (i.e. prior to forking the master).
- submit_many(tasks)
Submit a set of tasks to the work manager, returning a list of WMFuture objects representing pending results. Each entry in
tasks
should be a triple (fn, args, kwargs), which will result in fn(*args, **kwargs) being executed by a worker. The functionfn
and all arguments must be picklable; note particularly that off-path modules are not picklable unless pre-loaded in the worker process.
- send_message(socket, message, payload=None, flags=0)
Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message.
message
may either be a pre-constructedMessage
object or a message identifier, in which (latter) casepayload
will become the message payload.payload
is ignored ifmessage
is aMessage
object.
- handle_result(socket, msg)
- handle_task_request(socket, msg)
- update_worker_information(msg)
- check_workers()
- remove_worker(worker_id)
- shutdown_clear_tasks()
Abort pending tasks with error on shutdown.
- comm_loop()
- startup()
Perform any necessary startup work, such as spawning clients.
- shutdown()
Cleanly shut down any active workers.
westpa.work_managers.zeromq.worker module
Created on May 29, 2015
@author: mzwier
- class westpa.work_managers.zeromq.worker.ZMQCore
Bases:
object
- PROTOCOL_MAJOR = 3
- PROTOCOL_MINOR = 0
- PROTOCOL_UPDATE = 0
- PROTOCOL_VERSION = (3, 0, 0)
- internal_transport = 'ipc'
- default_comm_mode = 'ipc'
- default_master_heartbeat = 20.0
- default_worker_heartbeat = 20.0
- default_timeout_factor = 5.0
- default_startup_timeout = 120.0
- default_shutdown_timeout = 5.0
- classmethod make_ipc_endpoint()
- classmethod remove_ipc_endpoints()
- classmethod make_tcp_endpoint(address='127.0.0.1')
- classmethod make_internal_endpoint()
- get_identification()
- validate_message(message)
Validate incoming message. Raises an exception if the message is improperly formatted (TypeError) or does not correspond to the appropriate master (ZMQWMEnvironmentError).
- message_validation(msg)
A context manager for message validation. The instance variable
validation_fail_action
controls the behavior of this context manager:‘raise’: re-raise the exception that indicated failed validation. Useful for development.
‘exit’ (default): report the error and exit the program.
‘warn’: report the error and continue.
- recv_message(socket, flags=0, validate=True, timeout=None)
Receive a message object from the given socket, using the given flags. Message validation is performed if
validate
is true. Iftimeout
is given, then it is the number of milliseconds to wait prior to raising a ZMQWMTimeout exception.timeout
is ignored ifflags
includeszmq.NOBLOCK
.
- recv_all(socket, flags=0, validate=True)
Receive all messages currently available from the given socket.
- recv_ack(socket, flags=0, validate=True, timeout=None)
- send_message(socket, message, payload=None, flags=0)
Send a message object. Subclasses may override this to decorate the message with appropriate IDs, then delegate upward to actually send the message.
message
may either be a pre-constructedMessage
object or a message identifier, in which (latter) casepayload
will become the message payload.payload
is ignored ifmessage
is aMessage
object.
- send_reply(socket, original_message, reply='ok', payload=None, flags=0)
Send a reply to
original_message
onsocket
. The reply message is a Message object or a message identifier. The reply master_id and worker_id are set fromoriginal_message
, unless master_id is not set, in which case it is set from self.master_id.
- send_ack(socket, original_message)
Send an acknowledgement message, which is mostly just to respect REQ/REP recv/send patterns.
- send_nak(socket, original_message)
Send a negative acknowledgement message.
- send_inproc_message(message, payload=None, flags=0)
- signal_shutdown()
- shutdown_handler(signal=None, frame=None)
- install_signal_handlers(signals=None)
- install_sigint_handler()
- startup()
- shutdown()
- join()
- class westpa.work_managers.zeromq.worker.Message(message=None, payload=None, master_id=None, src_id=None)
Bases:
object
- SHUTDOWN = 'shutdown'
- ACK = 'ok'
- NAK = 'no'
- IDENTIFY = 'identify'
- TASKS_AVAILABLE = 'tasks_available'
- TASK_REQUEST = 'task_request'
- MASTER_BEACON = 'master_alive'
- RECONFIGURE_TIMEOUT = 'reconfigure_timeout'
- TASK = 'task'
- RESULT = 'result'
- idempotent_announcement_messages = {'master_alive', 'shutdown', 'tasks_available'}
- classmethod coalesce_announcements(messages)
- exception westpa.work_managers.zeromq.worker.ZMQWMTimeout
Bases:
ZMQWMEnvironmentError
A timeout of a sort that indicatess that a master or worker has failed or never started.
- class westpa.work_managers.zeromq.worker.PassiveMultiTimer
Bases:
object
- add_timer(identifier, duration)
- remove_timer(identifier)
- change_duration(identifier, duration)
- reset(identifier=None, at=None)
- expired(identifier, at=None)
- next_expiration()
- next_expiration_in()
- which_expired(at=None)
- class westpa.work_managers.zeromq.worker.Task(fn, args, kwargs, task_id=None)
Bases:
object
- execute()
Run this task, returning a Result object.
- class westpa.work_managers.zeromq.worker.Result(task_id, result=None, exception=None, traceback=None)
Bases:
object
- class westpa.work_managers.zeromq.worker.ZMQWorker(rr_endpoint, ann_endpoint)
Bases:
ZMQCore
This is the outward facing worker component of the ZMQ work manager. This forms the interface to the master. This process cannot hang or crash due to an error in tasks it executes, so tasks are isolated in ZMQExecutor, which communicates with ZMQWorker via (what else?) ZeroMQ.
- property is_master
- update_master_info(msg)
- identify(rr_socket)
- request_task(rr_socket, task_socket)
- handle_reconfigure_timeout(msg, timers)
- handle_result(result_socket, rr_socket)
- comm_loop()
Master communication loop for the worker process.
- shutdown_executor()
- install_signal_handlers(signals=None)
- startup(process_index=None)