Persistence

The persistence subpackage handles data storage and retrieval, mainly to support backtesting.

External

Core

class RawFile ( open_file : OpenFile , block_size : Optional [ int ] = None , progress : bool = False )

Bases: object

Provides a wrapper of fsspec.OpenFile that processes a raw file and writes to parquet.

split_and_serialize ( objs : list ) dict [ type , dict [ Optional [ str ] , list ] ]

Given a list of Nautilus objs ; serialize and split into dictionaries per type / instrument ID.

dicts_to_dataframes ( dicts ) dict [ type , dict [ str , pandas.core.frame.DataFrame ] ]

Convert dicts from split_and_serialize into sorted dataframes.

determine_partition_cols ( cls : type , instrument_id : str = None ) Optional [ list ]

Determine partition columns (if any) for this type cls .

merge_existing_data ( catalog : BaseDataCatalog , cls : type , df : DataFrame ) DataFrame

Handle existing data for instrument subclasses.

Instruments all live in a single file, so merge with existing data. For all other classes, simply return data unchanged.

write_tables ( catalog : ParquetDataCatalog , tables : dict [ type , dict [ str , pandas.core.frame.DataFrame ] ] , ** kwargs )

Write tables to catalog.

write_parquet ( fs : AbstractFileSystem , path : Path , df : DataFrame , partition_cols : Optional [ list [ str ] ] , schema : Schema , ** kwargs )

Write a single dataframe to parquet.

read_progress ( func , total )

Wrap a file handle and update progress bar as bytes are read.

Readers

class LinePreprocessor

Bases: object

Provides pre-processing lines before they are passed to a Reader class (currently only TextReader ).

Used if the input data requires any pre-processing that may also be required as attributes on the resulting Nautilus objects that are created.

Examples

For example, if you were logging data in Python with a prepended timestamp, as below:

2021-06-29T06:03:14.528000 - {“op”:”mcm”,”pt”:1624946594395,”mc”:[{“id”:”1.179082386”,”rc”:[{“atb”:[[1.93,0]]}]}

The raw JSON data is contained after the logging timestamp, additionally we would also want to use this timestamp as the Nautilus ts_init value. In this instance, you could use something like:

>>> class LoggingLinePreprocessor(LinePreprocessor):
>>>    @staticmethod
>>>    def pre_process(line):
>>>        timestamp, json_data = line.split(' - ')
>>>        yield json_data, {'ts_init': pd.Timestamp(timestamp)}
>>>
>>>    @staticmethod
>>>    def post_process(obj: Any, state: dict):
>>>        obj.ts_init = state['ts_init']
>>>        return obj
class Reader ( instrument_provider : Optional [ InstrumentProvider ] = None , instrument_provider_update : Callable = None )

Bases: object

Provides parsing of raw byte blocks to Nautilus objects.

class ByteReader ( block_parser : Callable , instrument_provider : Optional [ InstrumentProvider ] = None , instrument_provider_update : Callable = None )

Bases: Reader

A Reader subclass for reading blocks of raw bytes; byte_parser will be passed a blocks of raw bytes.

Parameters :
  • block_parser ( Callable ) – The handler which takes a blocks of bytes and yields Nautilus objects.

  • instrument_provider ( InstrumentProvider , optional ) – The instrument provider for the reader.

  • instrument_provider_update ( Callable , optional ) – An optional hook/callable to update instrument provider before data is passed to byte_parser (in many cases instruments need to be known ahead of parsing).

class TextReader ( line_parser : Callable , line_preprocessor : LinePreprocessor = None , instrument_provider : Optional [ InstrumentProvider ] = None , instrument_provider_update : Optional [ Callable ] = None , newline : bytes = b'\n' )

Bases: ByteReader

A Reader subclass for reading lines of a text-like file; line_parser will be passed a single row of bytes.

Parameters :
  • line_parser ( Callable ) – The handler which takes byte strings and yields Nautilus objects.

  • line_preprocessor ( Callable , optional ) – The context manager for pre-processing (cleaning log lines) of lines before json.loads is called. Nautilus objects are returned to the context manager for any post-processing also (for example, setting the ts_init ).

  • instrument_provider ( InstrumentProvider , optional ) – The instrument provider for the reader.

  • instrument_provider_update ( Callable , optional ) – An optional hook/callable to update instrument provider before data is passed to line_parser (in many cases instruments need to be known ahead of parsing).

  • newline ( bytes ) – The newline char value.

class CSVReader ( block_parser : Callable , instrument_provider : Optional [ InstrumentProvider ] = None , instrument_provider_update : Optional [ Callable ] = None , header : Optional [ list [ str ] ] = None , chunked : bool = True , as_dataframe : bool = True , separator : str = ',' , newline : bytes = b'\n' , encoding : str = 'utf-8' )

Bases: Reader

Provides parsing of CSV formatted bytes strings to Nautilus objects.

Parameters :
  • block_parser ( callable ) – The handler which takes byte strings and yields Nautilus objects.

  • instrument_provider ( InstrumentProvider , optional ) – The readers instrument provider.

  • instrument_provider_update – Optional hook to call before parser for the purpose of loading instruments into an InstrumentProvider

  • header ( list [ str ] , default None ) – If first row contains names of columns, header has to be set to None . If data starts right at the first row, header has to be provided the list of column names.

  • chunked ( bool , default True ) – If chunked=False, each CSV line will be passed to block_parser individually, if chunked=True, the data passed will potentially contain many lines (a block).

  • as_dataframe ( bool , default False ) – If as_dataframe=True, the passes block will be parsed into a DataFrame before passing to block_parser .

class ParquetReader ( parser : Callable = None , instrument_provider : Optional [ InstrumentProvider ] = None , instrument_provider_update : Optional [ Callable ] = None )

Bases: ByteReader

Provides parsing of parquet specification bytes to Nautilus objects.

Parameters :
  • parser ( Callable ) – The parser.

  • instrument_provider ( InstrumentProvider , optional ) – The readers instrument provider.

  • instrument_provider_update ( Callable , optional ) – An optional hook/callable to update instrument provider before data is passed to byte_parser (in many cases instruments need to be known ahead of parsing).

Batching

class FileMeta ( filename , datatype , instrument_id , client_id , start , end )

Bases: tuple

client_id

Alias for field number 3

count ( value , / )

Return number of occurrences of value.

datatype

Alias for field number 1

end

Alias for field number 5

filename

Alias for field number 0

index ( value , start = 0 , stop = 9223372036854775807 , / )

Return first index of value.

Raises ValueError if the value is not present.

instrument_id

Alias for field number 2

start

Alias for field number 4

extract_generic_data_client_ids ( data_configs : list [ nautilus_trader.config.backtest.BacktestDataConfig ] ) dict

Extract a mapping of data_type : client_id from the list of data_configs . In the process of merging the streaming data, we lose the client_id for generic data, we need to inject this back in so the backtest engine can be correctly loaded.

Catalog

class BaseDataCatalog ( * args , ** kw )

Bases: ABC

Provides a abstract base class for a queryable data catalog.

class ParquetDataCatalog ( * args , ** kw )

Bases: BaseDataCatalog

Provides a queryable data catalog persisted to file in parquet format.

Parameters :
  • path ( str ) – The root path for this data catalog. Must exist and must be an absolute path.

  • fs_protocol ( str , default 'file' ) – The fsspec filesystem protocol to use.

  • fs_storage_options ( dict , optional ) – The fs storage options.

Streaming

class StreamingFeatherWriter ( path : str , logger : LoggerAdapter , fs_protocol : str = 'file' , flush_interval_ms : Optional [ int ] = None , replace : bool = False , include_types : Optional [ tuple [ type ] ] = None )

Bases: object

Provides a stream writer of Nautilus objects into feather files.

Parameters :
  • path ( str ) – The path to persist the stream to.

  • logger ( LoggerAdapter ) – The logger for the writer.

  • fs_protocol ( str , default 'file' ) – The fsspec file system protocol.

  • flush_interval_ms ( int , optional ) – The flush interval (milliseconds) for writing chunks.

  • replace ( bool , default False ) – If existing files at the given path should be replaced.

write ( obj : object ) None

Write the object to the stream.

Parameters :

obj ( object ) – The object to write.

Raises :

ValueError – If obj is None .

check_flush ( ) None

Flush all stream writers if current time greater than the next flush interval.

flush ( ) None

Flush all stream writers.

close ( ) None

Flush and close all stream writers.

generate_signal_class ( name : str , value_type : type )

Dynamically create a Data subclass for this signal.