On 2019/04/23 13:07:09, Marko Rodriguez <[email protected]> wrote: 
> Hi,
> 
> Stephen and Bryn were looking over my RxJava implementation the other day and 
> Bryn, with his British accent, was like [I paraphrase]:
> 
>       “Whoa dawg! Bro should like totally not be blocking to fill an 
> iterator. Gnar gnar for surezies.”
> 
> Prior to now, Processor implemented Iterator<Traverser>, where for RxJava, 
> when you do next()/hasNext() if there were no results in the queue and the 
> flowable was still running, then the iterator while()-blocks waiting for a 
> result or for the flowable to terminate.
> 
> This morning I decided to redo the Processor interface (and respective 
> implementations) and it is much nicer now. We have two “execute” methods:
> 
> Iterator<Traverser>   Processor.iterator(Iterator<Traverser> starts)
> void Processor.subscribe(Iterator<Traverser> starts, Consumer<Traverser> 
> consumer)
> 
> A processor can only be executed using one of the methods above. Thus, 
> depending on context and the underlying processor, the VM determines whether 
> to use pull-based or push-based semantics. Pretty neat, eh?
> 
>       
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
>  
> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java>
> 
> Check out how I do Pipes:
> 
>       
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
>  
> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126>
> 
> Pipes is inherently pull-based. However, to simulate push-based semantics, I 
> Thread().start() the iterator.hasNext()/next() and just consume.accept() the 
> results. Thus, as desired, subscribe() returns immediately.
> 
> Next, here is my RxJava implementation.
> 
>       
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
>  
> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65>
>       
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
>  
> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86>
> 
> You can see how I turn a push-based subscription into a pull-based iteration 
> using the good ‘ol while()-block :).
> 
>       
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
>  
> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102>
> 
> ——
> 
> What I need to do next is to redo the RxJava execution planner such that 
> nested traversals (e.g. map(out()))) are subscription-based with the parent 
> flowable. I don’t quite know how I will do it — but I believe I will have to 
> write custom Publisher/Subscriber objects for use with Flowable.compose() 
> such that onNext() and onComplete() will be called accordingly within the 
> consumer.accept(). It will be tricky as I’m not too good with low-level 
> RxJava, but thems the breaks.
> 
> Please note that my push-based conceptual skills are not the sharpest so if 
> anyone has any recommendations, please advise.
> 
> Take care,
> Marko.
> 
> http://rredux.com <http://rredux.com/>
> 
> 
> 
> 
> 

Hi Marko,

I think it would be better to either expose Flowable on the API (or Flow if you 
don't want to be tied in to RxJava)

void Processor.subscribe(Iterator<Traverser> starts, Consumer<Traverser> 
consumer)
Changes to:
Flowable Processor.flowable(Iterator<Traverser> starts)

There are a few of reasons to do this:
1. Using Consumer will break the Rx chain. This is undesirable as it will 
prevent backpressure and cancellation from working properly.
2. The Scheduler to run the traversal on can be set. For instance, in the case 
where only certain threads are allowed to perform IO once the user has the 
Flowable they can call subscribeOn before subscribe.
3. Backpressure strategy can be set, such as dropping results on buffer 
overflow.
4. Buffer size can be set.

Hope this helps,

Bryn

Reply via email to