Message Bus ¶
The msgbus subpackage provides a universal message bus for connecting system components in a loosely coupled way.
- class MessageBus ( TraderId trader_id , Clock clock , Logger logger , unicode name=None ) ¶
-
Bases:
object
Provides a generic message bus to facilitate various messaging patterns.
The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as well as direct point-to-point messaging to registered endpoints.
- Pub/Sub wildcard patterns for hierarchical topics are possible:
-
-
* asterisk represents one or more characters in a pattern.
-
? question mark represents a single character in a pattern.
-
Given a topic and pattern potentially containing wildcard characters, i.e. * and ? , where ? can match any single character in the topic, and * can match any number of characters including zero characters.
The asterisk in a wildcard matches any character zero or more times. For example, comp* matches anything beginning with comp which means comp , complete , and computer are all matched.
A question mark matches a single character once. For example, c?mp matches camp and comp . The question mark can also be used more than once. For example, c??p would match both of the above examples and coop .
- Parameters :
- Raises :
-
ValueError – If name is not
None
and not a valid string.
Warning
This message bus is not thread-safe and must be called from the same thread as the event loop.
- deregister ( self, unicode endpoint, handler: Callable[[Any], None] ) void ¶
-
Deregister the given handler from the endpoint address.
- Parameters :
-
-
endpoint ( str ) – The endpoint address to deregister.
-
handler ( Callable [ [ Any ] , None ] ) – The handler to deregister.
-
- Raises :
-
-
ValueError – If endpoint is not a valid string.
-
ValueError – If handler is not of type Callable .
-
KeyError – If endpoint is not registered.
-
ValueError – If handler is not registered at the endpoint.
-
- endpoints ( self ) list ¶
-
Return all endpoint addresses registered with the message bus.
- Returns :
-
list[str]
- has_subscribers ( self , unicode pattern=None ) bool ¶
-
If the message bus has subscribers for the give topic pattern .
- Parameters :
-
pattern ( str , optional ) – The topic filter. May include wildcard characters * and ? . If
None
then query is for all topics. - Returns :
-
bool
- is_pending_request ( self , UUID4 request_id ) bool ¶
-
Return if the given request_id is still pending a response.
- Parameters :
-
request_id ( UUID4 ) – The request ID to check (to match the correlation_id).
- Returns :
-
bool
- is_subscribed ( self, unicode topic, handler: Callable[[Any], None] ) bool ¶
-
Return if topic and handler is subscribed to the message bus.
Does not consider any previous priority .
- Parameters :
-
-
topic ( str ) – The topic of the subscription.
-
handler ( Callable [ [ Any ] , None ] ) – The handler of the subscription.
-
- Returns :
-
bool
- pub_count ¶
-
The count of messages published by the bus.
- Returns :
-
int
- publish ( self , unicode topic , msg: Any ) void ¶
-
Publish the given message for the given topic .
Subscription handlers will receive the message in priority order (highest first).
- Parameters :
-
-
topic ( str ) – The topic to publish on.
-
msg ( object ) – The message to publish.
-
- register ( self, unicode endpoint, handler: Callable[[Any], None] ) void ¶
-
Register the given handler to receive messages at the endpoint address.
- Parameters :
-
-
endpoint ( str ) – The endpoint address to register.
-
handler ( Callable [ [ Any ] , None ] ) – The handler for the registration.
-
- Raises :
-
-
ValueError – If endpoint is not a valid string.
-
ValueError – If handler is not of type Callable .
-
KeyError – If endpoint already registered.
-
- req_count ¶
-
The count of requests processed by the bus.
- Returns :
-
int
- request ( self , unicode endpoint , Request request ) void ¶
-
Handle the given request .
Will log an error if the correlation ID already exists.
- Parameters :
-
-
endpoint ( str ) – The endpoint address to send the request to.
-
request ( Request ) – The request to handle.
-
- res_count ¶
-
The count of responses processed by the bus.
- Returns :
-
int
- response ( self , Response response ) void ¶
-
Handle the given response .
Will log an error if the correlation ID is not found.
- Parameters :
-
response ( Response ) – The response to handle
- send ( self , unicode endpoint , msg: Any ) void ¶
-
Send the given message to the given endpoint address.
- Parameters :
-
-
endpoint ( str ) – The endpoint address to send the message to.
-
msg ( object ) – The message to send.
-
- sent_count ¶
-
The count of messages sent through the bus.
- Returns :
-
int
- subscribe ( self, unicode topic, handler: Callable[[Any], None], int priority=0 ) void ¶
-
Subscribe to the given message topic with the given callback handler .
- Parameters :
-
-
topic ( str ) – The topic for the subscription. May include wildcard characters * and ? .
-
handler ( Callable [ [ Any ] , None ] ) – The handler for the subscription.
-
priority ( int , optional ) – The priority for the subscription. Determines the ordering of handlers receiving messages being processed, higher priority handlers will receive messages prior to lower priority handlers.
-
- Raises :
-
-
ValueError – If topic is not a valid string.
-
ValueError – If handler is not of type Callable .
-
Warning
Assigning priority handling is an advanced feature which shouldn’t normally be needed by most users . Only assign a higher priority to the subscription if you are certain of what you’re doing . If an inappropriate priority is assigned then the handler may receive messages before core system components have been able to process necessary calculations and produce potential side effects for logically sound behavior.
- subscriptions ( self , unicode pattern=None ) list ¶
-
Return all subscriptions matching the given topic pattern .
- Parameters :
-
pattern ( str , optional ) – The topic pattern filter. May include wildcard characters * and ? . If
None
then query is for all topics. - Returns :
-
list[Subscription]
- topics ( self ) list ¶
-
Return all topics with active subscribers.
- Returns :
-
list[str]
- trader_id ¶
-
The trader ID associated with the bus.
- Returns :
-
TraderId
- unsubscribe ( self, unicode topic, handler: Callable[[Any], None] ) void ¶
-
Unsubscribe the given callback handler from the given message topic .
- Parameters :
-
-
topic ( str , optional ) – The topic to unsubscribe from. May include wildcard characters * and ? .
-
handler ( Callable [ [ Any ] , None ] ) – The handler for the subscription.
-
- Raises :
-
-
ValueError – If topic is not a valid string.
-
ValueError – If handler is not of type Callable .
-
- is_matching_py ( unicode topic , unicode pattern ) bool ¶
- class Subscription ( unicode topic, handler: Callable[[Any], None], int priority=0 ) ¶
-
Bases:
object
Represents a subscription to a particular topic.
This is an internal class intended to be used by the message bus to organize topics and their subscribers.
- Parameters :
-
-
topic ( str ) – The topic for the subscription. May include wildcard characters * and ? .
-
handler ( Callable [ [ Message ] , None ] ) – The handler for the subscription.
-
priority ( int ) – The priority for the subscription.
-
- Raises :
-
-
ValueError – If topic is not a valid string.
-
ValueError – If handler is not of type Callable .
-
ValueError – If priority is negative (< 0).
-
Notes
The subscription equality is determined by the topic and handler, priority is not considered (and could change).
- handler ¶
-
The handler for the subscription.
- Returns :
-
Callable
- priority ¶
-
The priority for the subscription.
- Returns :
-
int
- topic ¶
-
The topic for the subscription.
- Returns :
-
str