Hi,
Stephen and Bryn were looking over my RxJava implementation the other day and
Bryn, with his British accent, was like [I paraphrase]:
“Whoa dawg! Bro should like totally not be blocking to fill an
iterator. Gnar gnar for surezies.”
Prior to now, Processor implemented Iterator<Traverser>, where for RxJava, when
you do next()/hasNext() if there were no results in the queue and the flowable
was still running, then the iterator while()-blocks waiting for a result or for
the flowable to terminate.
This morning I decided to redo the Processor interface (and respective
implementations) and it is much nicer now. We have two “execute” methods:
Iterator<Traverser> Processor.iterator(Iterator<Traverser> starts)
void Processor.subscribe(Iterator<Traverser> starts, Consumer<Traverser>
consumer)
A processor can only be executed using one of the methods above. Thus,
depending on context and the underlying processor, the VM determines whether to
use pull-based or push-based semantics. Pretty neat, eh?
https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java>
Check out how I do Pipes:
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126>
Pipes is inherently pull-based. However, to simulate push-based semantics, I
Thread().start() the iterator.hasNext()/next() and just consume.accept() the
results. Thus, as desired, subscribe() returns immediately.
Next, here is my RxJava implementation.
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65>
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86>
You can see how I turn a push-based subscription into a pull-based iteration
using the good ‘ol while()-block :).
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102>
——
What I need to do next is to redo the RxJava execution planner such that nested
traversals (e.g. map(out()))) are subscription-based with the parent flowable.
I don’t quite know how I will do it — but I believe I will have to write custom
Publisher/Subscriber objects for use with Flowable.compose() such that onNext()
and onComplete() will be called accordingly within the consumer.accept(). It
will be tricky as I’m not too good with low-level RxJava, but thems the breaks.
Please note that my push-based conceptual skills are not the sharpest so if
anyone has any recommendations, please advise.
Take care,
Marko.
http://rredux.com <http://rredux.com/>