Fundametal reactive operators
Intro
I think it would be benefictial to have a common set of fundamental operators, such that all other operators can be expressed as their combination and composition etc.
Motivation
Example of where that analysis can turn to be useful are:
- creating a new Rx library, e.g. for non-supported-yet language. It would be a sufficient to limit implementations for fundamental operators only, and provide all others as derived (e.g. as a first supported version).
- unit testing a library. Unit tests could be reduced to the fundamental operators. Derived operators would be then warrantied to work. In case of creating dedicated optimized implementations of derived operators, unit tests could be as simple as comparing results produced with those with reference derived variant of it.
- lacking one of listed derived operators in a particular Rx library of choice. Derived implementation can act as a reference here and a quick guide how-to come with a quick and correct version of it.
Operators
Here is my proposal of operator classification, based on the alphabetical list from here plus some few added from my side.
I’ve marked the ones that I couldn’t come with a way to express by any simpler means
- as fundamental. Others - have alternative formula.
operator | is fund. | formula |
---|---|---|
amb |
✓ | |
all(cond) |
scan(true, (a,x)-> a && cond).take-last(1) |
|
average |
with-index().scan(pair{0,0}, (pair,x)->pair{pair.first+1, x+pair.second}).take-last(1).map(pair->pair.second/pair.first) |
|
buffer |
✓ | |
catch |
✓ | |
combine-latest(op) |
mscan(pair(null,null), {a, (o,x)->pair{x,o.second}}, {b, (o,y)->pair{o.first,second}}).map(op).skip(1) |
|
concat |
✓ | |
contains(X) |
filter(x->x==X).take(1).map(x->true).default-if-empty(false) |
|
count |
with-index().take-last(1).map(pair->pair.first+1) |
|
debounce |
✓ | |
cdefault-if-empty |
✓ | |
delay |
✓ | |
distinct |
scan(tuple(None,false,set()), (a,x)->{is_new=!a.second.contains(x); tuple{x,is_new,a.append(x)}).filter(tuple->tuple[1]).map(tuple->tuple[0]) |
|
distinct-until-changed |
with-index().scan(tuple{0,None,None}, (pair,x)-> tuple{x.first,x.second,x[1]}).filter(tuple-> tuple[0]==0 or tuple[1]!=tuple[2]).map(tuple->tuple[1]) |
|
do |
✓ | |
element-at(N) |
skip(N).take(1) |
|
filter |
✓ | |
first |
take(1) |
|
flatmap |
✓ | |
group-by |
✓ | |
ignore-elements |
filter(x->false) |
|
join |
✓ | |
last |
take-last(1) |
|
map |
✓ | |
max |
reduce(MIN, (a,x)-> max(a,x)) |
|
merge(a,b) |
mscan(null, {a, (o,x)->x}, {b, (o,y)->y}) |
|
in |
reduce(MAX, (a,x)-> min(a,x)) |
|
scan |
✓ | |
bserve-on |
✓ | |
educe(op) |
scan(op).take-last(1) |
|
etry |
✓ | |
ample |
✓ | |
can(a0, (a,x)->a) |
mscan(a0,{source,(a,x)->a}) |
|
equence-equal(a,b) |
✓ | |
erialize |
✓ | |
kip(N) |
with-index().filter(pair-> pair.second >= N).map(pair->pair.first) |
|
kip-last(N) |
✓ | |
kip-until(a,b) |
mscan(true, {a, (o,x)->pair{o.first,x}}, {b,(o,y)->pair{false,pair.second}}).skip-while(pair->pair.first).map(pair->pair.second) |
|
kip-while |
✓ | |
tart-with |
✓ | |
subscribe-on |
✓ | |
sum |
reduce((a,x)-> a+x) |
|
switch |
✓ | |
take(N) |
with-index().filter(pair-> pair.second < N).map(pair->pair.first) |
|
take-last(N) |
reduce(pipe(N), (a,x)->{a.push(x); pipe.items}).flatmap(items->observable::from(items) |
|
take-until(a,b) |
mscan(true, {a, (o,x)->pair{o.first,x}}, {b,(o,y)->pair{false,pair.second}}).take-while(pair->pair.first).map(pair->pair.second) |
|
take-while |
✓ | |
time-interval |
✓ | |
timeout |
✓ | |
timestamp |
map(x->pair{gettime(), x}) |
|
to... |
✓ | |
window |
✓ | |
with-index |
scan(pair{-1,None}, (p,x)-> pair{p.first+1,x}) |
|
zip(ab,b) |
✓ |
Observable sources
source | type | formula |
---|---|---|
timer(T) |
just(x).delay(T) |