Message Bus

The msgbus subpackage provides a universal message bus for connecting system components in a loosely coupled way.

Message Bus

class MessageBus ( TraderId trader_id , Clock clock , Logger logger , unicode name=None )

Bases: object

Provides a generic message bus to facilitate various messaging patterns.

The bus provides both a producer and consumer API for Pub/Sub, Req/Rep, as well as direct point-to-point messaging to registered endpoints.

Pub/Sub wildcard patterns for hierarchical topics are possible:
  • * asterisk represents one or more characters in a pattern.

  • ? question mark represents a single character in a pattern.

The asterisk in a wildcard matches any character zero or more times. For example, comp* matches anything beginning with comp which means comp , complete , and computer are all matched.

A question mark matches a single character once. For example, c?mp matches camp and comp . The question mark can also be used more than once. For example, c??p would match both of the above examples and coop .

Parameters
  • trader_id ( TraderId ) – The trader ID associated with the message bus.

  • clock ( Clock ) – The clock for the message bus.

  • logger ( Logger ) – The logger for the message bus.

  • name ( str , optional ) – The custom name for the message bus.

Raises

ValueError – If name is not None and not a valid string.

Warning

This message bus is not thread-safe and must be called from the same thread as the event loop.

deregister ( self, unicode endpoint, handler: Callable[[Any], None] ) void

Deregister the given handler from the endpoint address.

Parameters
  • endpoint ( str ) – The endpoint address to deregister.

  • handler ( Callable [ [ Any ] , None ] ) – The handler to deregister.

Raises
  • ValueError – If endpoint is not a valid string.

  • ValueError – If handler is not of type Callable .

  • KeyError – If endpoint is not registered.

  • ValueError – If handler is not registered at the endpoint.

endpoints ( self ) list

Return all endpoint addresses registered with the message bus.

Returns

list[str]

has_subscribers ( self , unicode pattern=None ) bool

If the message bus has subscribers for the give topic pattern .

Parameters

pattern ( str , optional ) – The topic filter. May include wildcard characters * and ? . If None then query is for all topics.

Returns

bool

pub_count

The count of messages published by the bus.

Returns

int

publish ( self , unicode topic , msg: Any ) void

Publish the given message for the given topic .

Subscription handlers will receive the message in priority order (highest first).

Parameters
  • topic ( str ) – The topic to publish on.

  • msg ( object ) – The message to publish.

register ( self, unicode endpoint, handler: Callable[[Any], None] ) void

Register the given handler to receive messages at the endpoint address.

Parameters
  • endpoint ( str ) – The endpoint address to register.

  • handler ( Callable [ [ Any ] , None ] ) – The handler for the registration.

Raises
  • ValueError – If endpoint is not a valid string.

  • ValueError – If handler is not of type Callable .

  • KeyError – If endpoint already registered.

req_count

The count of requests processed by the bus.

Returns

int

request ( self , unicode endpoint , Request request ) void

Handle the given request .

Will log an error if the correlation ID already exists.

Parameters
  • endpoint ( str ) – The endpoint address to send the request to.

  • request ( Request ) – The request to handle.

res_count

The count of responses processed by the bus.

Returns

int

response ( self , Response response ) void

Handle the given response .

Will log an error if the correlation ID is not found.

Parameters

response ( Response ) – The response to handle

send ( self , unicode endpoint , msg: Any ) void

Send the given message to the given endpoint address.

Parameters
  • endpoint ( str ) – The endpoint address to send the message to.

  • msg ( object ) – The message to send.

sent_count

The count of messages sent through the bus.

Returns

int

subscribe ( self, unicode topic, handler: Callable[[Any], None], int priority=0 ) void

Subscribe to the given message topic with the given callback handler .

Parameters
  • topic ( str ) – The topic for the subscription. May include wildcard characters * and ? .

  • handler ( Callable [ [ Any ] , None ] ) – The handler for the subscription.

  • priority ( int , optional ) – The priority for the subscription. Determines the ordering of handlers receiving messages being processed, higher priority handlers will receive messages prior to lower priority handlers.

Raises
  • ValueError – If topic is not a valid string.

  • ValueError – If handler is not of type Callable .

Warning

Assigning priority handling is an advanced feature which shouldn’t normally be needed by most users . Only assign a higher priority to the subscription if you are certain of what you’re doing . If an inappropriate priority is assigned then the handler may receive messages before core system components have been able to process necessary calculations and produce potential side effects for logically sound behaviour.

subscriptions ( self , unicode pattern=None ) list

Return all subscriptions matching the given topic pattern .

Parameters

pattern ( str , optional ) – The topic pattern filter. May include wildcard characters * and ? . If None then query is for all topics.

Returns

list[Subscription]

topics ( self ) list

Return all topics with active subscribers.

Returns

list[str]

trader_id

The trader ID associated with the bus.

Returns

TraderId

unsubscribe ( self, unicode topic, handler: Callable[[Any], None] ) void

Unsubscribe the given callback handler from the given message topic .

Parameters
  • topic ( str , optional ) – The topic to unsubscribe from. May include wildcard characters * and ? .

  • handler ( Callable [ [ Any ] , None ] ) – The handler for the subscription.

Raises
  • ValueError – If topic is not a valid string.

  • ValueError – If handler is not of type Callable .

Subscription

class Subscription ( unicode topic, handler: Callable[[Any], None], int priority=0 )

Bases: object

Represents a subscription to a particular topic.

This is an internal class intended to be used by the message bus to organize topics and their subscribers.

Parameters
  • topic ( str ) – The topic for the subscription. May include wildcard characters * and ? .

  • handler ( Callable [ [ Message ] , None ] ) – The handler for the subscription.

  • priority ( int ) – The priority for the subscription.

Raises
  • ValueError – If topic is not a valid string.

  • ValueError – If handler is not of type Callable .

  • ValueError – If priority is negative (< 0).

Notes

The subscription equality is determined by the topic and handler, priority is not considered (and could change).

handler

The handler for the subscription.

Returns

Callable

priority

The priority for the subscription.

Returns

int

topic

The topic for the subscription.

Returns

str