New reactive operators and sources
Intro
Since some time, reactive programming is one of my favourite field of modern software engineering.
I’ve decided to share the idea of introducing few new general-purpose reactive operators and observables. The candidates seem so far not to be part of any major or popular Rx library (as documented on https://reactivex.io) but IMO would be useful as part of such. Some of them offer functionality that cannot be easily created by a simple composition of existing operators. Others - are proposed for clarity and convenience because they introduce better semantics than alternative of not-so-obvious compositions.
With index
Similar to timestamp
except that is attaches an ordinal information to each emitted item.
Example:
The operator is inspired with Python’s enumerate
function that iterates over collection and returns pair of an index and a value.
Tuplewise
Operator that groups most recent N (and exactly N) items into a tuple (or buffer).
Every N-th
This operator takes an integer value N as argument and emits an item from the source every N-1 items, skipping all others in between.
This operator provides a very simple way of throttling or sampling observable source items based on a very simple underlying mechanism (a built-in counter will do), so it might be particularly useful for resource-constraint Rx implementations.
Functionality of this operator is equal to chain of:
source.buffer(N).map(b->b[0])
or could be expressed using the with_index operator introduced above, in the form of:
source.with_index().filter(tuple -> tuple.first % N == 0).map(tuple -> tuple.second)
however, dedicated implementation would be so much simpler and smaller, without extra temporary object and buffer constructions etc.
Sort
This operator collects all incoming items and when source completes, re-emits all items in sorted form.
Example:
Alternative operator with a user-given binary operator (as item comparer) can be introduced as well.
Reverse
This operator re-emits all items in reversed order, at the time of source completion.
Example:
Tokenize
This operator works on observable of characters as input and emits strings on detecting one of string termination markers given
Example:
Cartesian product
This operator combines two sources, and when received item on either of them, emits items that are yet missing for a complete Cartesian product of the two sources. The result on completion is that all product items have been emitted.
Example:
The example above displays a cartesian product for two observables, but in general, the operator can produce products of more than two sources (as N-dimensional product).
MScan
The name of this operator stands from multiple-scan. This is generalisation of a scan
operator.
mscan
, in contrast to scan
, takes any number of observable sources (rather than one like scan
does), and having a dedicated accumulation functions - one for each observable source it acts on, it emits accumulated values, following items emitted by each given source.
Existing scan
operator can be seen as a simple case of proposed mscan
, where only one source and its dedicated accumulation function is given.
mscan
is a somehow similar in concept to sequence of specially crafted merge
and specially prepared scan
chained together, with the exception that multiple observables it operates on can be of different value types.
I consider it being quite powerful operator actually. I’m not entirely sure this description is presenting it adequately, so I might give it a dedicated post with some examples and comparison soon.
Play
This observable source takes (similarly to iterate
) a collection of time-stamped items (pairs made of a time-point and a value to emit) and emits them sequentially at specified time-stamps.
play({2,A}, {4,B}, {5,C}, {8,D})
time -1-2-3-4-5-6-7-8-9-...
result ---A---B-C-----D|
The name comes from analogy to playing musing (playing values, like notes, in sequence at specific points in time).
Reasons:
- Having such a source turns to be extremely useful in writing unit tests, to simulate events that happen in time.
RTC source
This observable source notifies about running real-time-clock time. By default the source emits items every round second, but additional parameters can tune that according to user’s need.