Fundametal reactive operators

Intro

I think it would be benefictial to have a common set of fundamental operators, such that all other operators can be expressed as their combination and composition etc.

Motivation

Example of where that analysis can turn to be useful are:

  • creating a new Rx library, e.g. for non-supported-yet language. It would be a sufficient to limit implementations for fundamental operators only, and provide all others as derived (e.g. as a first supported version).
  • unit testing a library. Unit tests could be reduced to the fundamental operators. Derived operators would be then warrantied to work. In case of creating dedicated optimized implementations of derived operators, unit tests could be as simple as comparing results produced with those with reference derived variant of it.
  • lacking one of listed derived operators in a particular Rx library of choice. Derived implementation can act as a reference here and a quick guide how-to come with a quick and correct version of it.

Operators

Here is my proposal of operator classification, based on the alphabetical list from here plus some few added from my side.

I’ve marked the ones that I couldn’t come with a way to express by any simpler means

  • as fundamental. Others - have alternative formula.
operator is fund. formula
amb
all(cond) scan(true, (a,x)-> a && cond).take-last(1)
average with-index().scan(pair{0,0}, (pair,x)->pair{pair.first+1, x+pair.second}).take-last(1).map(pair->pair.second/pair.first)
buffer
catch
combine-latest(op) mscan(pair(null,null), {a, (o,x)->pair{x,o.second}}, {b, (o,y)->pair{o.first,second}}).map(op).skip(1)
concat
contains(X) filter(x->x==X).take(1).map(x->true).default-if-empty(false)
count with-index().take-last(1).map(pair->pair.first+1)
debounce
cdefault-if-empty
delay
distinct scan(tuple(None,false,set()), (a,x)->{is_new=!a.second.contains(x); tuple{x,is_new,a.append(x)}).filter(tuple->tuple[1]).map(tuple->tuple[0])
distinct-until-changed with-index().scan(tuple{0,None,None}, (pair,x)-> tuple{x.first,x.second,x[1]}).filter(tuple-> tuple[0]==0 or tuple[1]!=tuple[2]).map(tuple->tuple[1])
do
element-at(N) skip(N).take(1)
filter
first take(1)
flatmap
group-by
ignore-elements filter(x->false)
join
last take-last(1)
map
max reduce(MIN, (a,x)-> max(a,x))
merge(a,b) mscan(null, {a, (o,x)->x}, {b, (o,y)->y})
in reduce(MAX, (a,x)-> min(a,x))
scan
bserve-on
educe(op) scan(op).take-last(1)
etry
ample
can(a0, (a,x)->a) mscan(a0,{source,(a,x)->a})
equence-equal(a,b)
erialize
kip(N) with-index().filter(pair-> pair.second >= N).map(pair->pair.first)
kip-last(N)
kip-until(a,b) mscan(true, {a, (o,x)->pair{o.first,x}}, {b,(o,y)->pair{false,pair.second}}).skip-while(pair->pair.first).map(pair->pair.second)
kip-while
tart-with
subscribe-on
sum reduce((a,x)-> a+x)
switch
take(N) with-index().filter(pair-> pair.second < N).map(pair->pair.first)
take-last(N) reduce(pipe(N), (a,x)->{a.push(x); pipe.items}).flatmap(items->observable::from(items)
take-until(a,b) mscan(true, {a, (o,x)->pair{o.first,x}}, {b,(o,y)->pair{false,pair.second}}).take-while(pair->pair.first).map(pair->pair.second)
take-while
time-interval
timeout
timestamp map(x->pair{gettime(), x})
to...
window
with-index scan(pair{-1,None}, (p,x)-> pair{p.first+1,x})
zip(ab,b)

Observable sources

source type formula
timer(T) just(x).delay(T)
comments powered by Disqus