Rx and historical time replay

I often come with a scenario where I have a set of historical data and I want to replay and them and process using Rx reactive extensions.

Here is a recipe how to use HistoricalScheduler in Python 3.

from rx import Observable
from rx.concurrency import HistoricalScheduler
from datetime import timedelta

# tuples as our timestamped data, timestamps are in milliseconds
data = [(200, 'a'), (250, 'b'), (400, 'c'),
        (450, 'd'), (1300, 'e'), (3000, 'f')]

sched = HistoricalScheduler()

# this converts items from the `data` collection
# into observable of time-delayed observables, flat_mapped back to original tuples
Observable.from_list(data).flat_map(lambda pair: Observable.timer(
    timedelta(milliseconds=pair[0]), scheduler=sched).map(lambda _: pair)).subscribe(on_next=print)

# This is executed on real time scheduler
# and the interval determines the resolution
# and the multiplier in .advance_by determines the replay speed
Observable.interval(10).subscribe(
    on_next=lambda i: sched.advance_by(timedelta(milliseconds=4*i)))

# this is just to keep the schedulers running
while True:
    pass

To run this code you need to:

pip3 install Rx

There are two main elements in the code:

  • an observable that is running with HistoricalScheduler using virtual time
  • an interval that advances the virtual time clock of the historical scheduler

This gives requires flexibility to control at which pace the historical data are played and what is the resolution of the virtual time for triggering various events.

comments powered by Disqus