Message Bus

The msgbus subpackage provides a universal message bus for connecting system components in a loosely coupled way.

class MessageBus ( TraderId trader_id , Clock clock , Logger logger , unicode name=None )

Bases: object

Provides a generic message bus to facilitate various messaging patterns.

The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as well as direct point-to-point messaging to registered endpoints.

Pub/Sub wildcard patterns for hierarchical topics are possible:
  • * asterisk represents one or more characters in a pattern.

  • ? question mark represents a single character in a pattern.

Given a topic and pattern potentially containing wildcard characters, i.e. * and ? , where ? can match any single character in the topic, and * can match any number of characters including zero characters.

The asterisk in a wildcard matches any character zero or more times. For example, comp* matches anything beginning with comp which means comp , complete , and computer are all matched.

A question mark matches a single character once. For example, c?mp matches camp and comp . The question mark can also be used more than once. For example, c??p would match both of the above examples and coop .

Parameters :
  • trader_id ( TraderId ) – The trader ID associated with the message bus.

  • clock ( Clock ) – The clock for the message bus.

  • logger ( Logger ) – The logger for the message bus.

  • name ( str , optional ) – The custom name for the message bus.

Raises :

ValueError – If name is not None and not a valid string.

Warning

This message bus is not thread-safe and must be called from the same thread as the event loop.

deregister ( self, unicode endpoint, handler: Callable[[Any], None] ) void

Deregister the given handler from the endpoint address.

Parameters :
  • endpoint ( str ) – The endpoint address to deregister.

  • handler ( Callable [ [ Any ] , None ] ) – The handler to deregister.

Raises :
  • ValueError – If endpoint is not a valid string.

  • ValueError – If handler is not of type Callable .

  • KeyError – If endpoint is not registered.

  • ValueError – If handler is not registered at the endpoint.

endpoints ( self ) list

Return all endpoint addresses registered with the message bus.

Returns :

list[str]

has_subscribers ( self , unicode pattern=None ) bool

If the message bus has subscribers for the give topic pattern .

Parameters :

pattern ( str , optional ) – The topic filter. May include wildcard characters * and ? . If None then query is for all topics.

Returns :

bool

is_pending_request ( self , UUID4 request_id ) bool

Return if the given request_id is still pending a response.

Parameters :

request_id ( UUID4 ) – The request ID to check (to match the correlation_id).

Returns :

bool

is_subscribed ( self, unicode topic, handler: Callable[[Any], None] ) bool

Return if topic and handler is subscribed to the message bus.

Does not consider any previous priority .

Parameters :
  • topic ( str ) – The topic of the subscription.

  • handler ( Callable [ [ Any ] , None ] ) – The handler of the subscription.

Returns :

bool

pub_count

The count of messages published by the bus.

Returns :

int

publish ( self , unicode topic , msg: Any ) void

Publish the given message for the given topic .

Subscription handlers will receive the message in priority order (highest first).

Parameters :
  • topic ( str ) – The topic to publish on.

  • msg ( object ) – The message to publish.

register ( self, unicode endpoint, handler: Callable[[Any], None] ) void

Register the given handler to receive messages at the endpoint address.

Parameters :
  • endpoint ( str ) – The endpoint address to register.

  • handler ( Callable [ [ Any ] , None ] ) – The handler for the registration.

Raises :
  • ValueError – If endpoint is not a valid string.

  • ValueError – If handler is not of type Callable .

  • KeyError – If endpoint already registered.

req_count

The count of requests processed by the bus.

Returns :

int

request ( self , unicode endpoint , Request request ) void

Handle the given request .

Will log an error if the correlation ID already exists.

Parameters :
  • endpoint ( str ) – The endpoint address to send the request to.

  • request ( Request ) – The request to handle.

res_count

The count of responses processed by the bus.

Returns :

int

response ( self , Response response ) void

Handle the given response .

Will log an error if the correlation ID is not found.

Parameters :

response ( Response ) – The response to handle

send ( self , unicode endpoint , msg: Any ) void

Send the given message to the given endpoint address.

Parameters :
  • endpoint ( str ) – The endpoint address to send the message to.

  • msg ( object ) – The message to send.

sent_count

The count of messages sent through the bus.

Returns :

int

subscribe ( self, unicode topic, handler: Callable[[Any], None], int priority=0 ) void

Subscribe to the given message topic with the given callback handler .

Parameters :
  • topic ( str ) – The topic for the subscription. May include wildcard characters * and ? .

  • handler ( Callable [ [ Any ] , None ] ) – The handler for the subscription.

  • priority ( int , optional ) – The priority for the subscription. Determines the ordering of handlers receiving messages being processed, higher priority handlers will receive messages prior to lower priority handlers.

Raises :
  • ValueError – If topic is not a valid string.

  • ValueError – If handler is not of type Callable .

Warning

Assigning priority handling is an advanced feature which shouldn’t normally be needed by most users . Only assign a higher priority to the subscription if you are certain of what you’re doing . If an inappropriate priority is assigned then the handler may receive messages before core system components have been able to process necessary calculations and produce potential side effects for logically sound behavior.

subscriptions ( self , unicode pattern=None ) list

Return all subscriptions matching the given topic pattern .

Parameters :

pattern ( str , optional ) – The topic pattern filter. May include wildcard characters * and ? . If None then query is for all topics.

Returns :

list[Subscription]

topics ( self ) list

Return all topics with active subscribers.

Returns :

list[str]

trader_id

The trader ID associated with the bus.

Returns :

TraderId

unsubscribe ( self, unicode topic, handler: Callable[[Any], None] ) void

Unsubscribe the given callback handler from the given message topic .

Parameters :
  • topic ( str , optional ) – The topic to unsubscribe from. May include wildcard characters * and ? .

  • handler ( Callable [ [ Any ] , None ] ) – The handler for the subscription.

Raises :
  • ValueError – If topic is not a valid string.

  • ValueError – If handler is not of type Callable .

is_matching_py ( unicode topic , unicode pattern ) bool
class Subscription ( unicode topic, handler: Callable[[Any], None], int priority=0 )

Bases: object

Represents a subscription to a particular topic.

This is an internal class intended to be used by the message bus to organize topics and their subscribers.

Parameters :
  • topic ( str ) – The topic for the subscription. May include wildcard characters * and ? .

  • handler ( Callable [ [ Message ] , None ] ) – The handler for the subscription.

  • priority ( int ) – The priority for the subscription.

Raises :
  • ValueError – If topic is not a valid string.

  • ValueError – If handler is not of type Callable .

  • ValueError – If priority is negative (< 0).

Notes

The subscription equality is determined by the topic and handler, priority is not considered (and could change).

handler

The handler for the subscription.

Returns :

Callable

priority

The priority for the subscription.

Returns :

int

topic

The topic for the subscription.

Returns :

str