Hello,

> I think it would be better to either expose Flowable on the API (or Flow if 
> you don't want to be tied in to RxJava)

We definitely don’t want to expose anything “provider specific.” Especially at 
the Processor interface level. I note your Flow API reference in 
java.concurrent and have noticed that RxJava mimics many java.concurrent 
classes (Subscriber, Subscription, etc.). I will dig deeper.

> 1. Using Consumer will break the Rx chain. This is undesirable as it will 
> prevent backpressure and cancellation from working properly.

Understood about breaking the chain.

> 2. The Scheduler to run the traversal on can be set. For instance, in the 
> case where only certain threads are allowed to perform IO once the user has 
> the Flowable they can call subscribeOn before subscribe.
> 3. Backpressure strategy can be set, such as dropping results on buffer 
> overflow.
> 4. Buffer size can be set.

Hm. Here are my thoughts on the matter.

RxJava is just one of many Processors that will interact with TP4. If we start 
exposing backpressure strategies, buffer sizes, etc. at the Processor API 
level, then we expect other providers to have those concepts. Does Spark 
support backpressure? Does Hadoop? Does Pipes? ...

I believe such provider-specific parameterization should happen via 
language-agnostic configuration. For instance:

g = g.withProcessor(RxJavaProcessor.class, Map.of(“rxjava.backpressure”, 
“drop”, “rxjava.bufferSize”, 2000))
g.V().out().blah()

Unlike TP3, TP4 users will never interact with our Java API. They will never 
have a reference to a Processor instance. They only talk to the TP4 VM via 
Bytecode. However, with that said, systems that will integrate the TP4 VM (e.g. 
database vendors, data server systems, etc.) will have to handle Processor 
traverser results in some way (i.e. within Java). Thus, if they are a Reactive 
architecture, then they will want to be able to Flow, but we need to make sure 
that java.concurrent Flow semantics doesn't go too far in demanding 
“unreasonable” behaviors from other Processor implementations. (I need to study 
the java.concurrent Flow API)

Thus, I see it like this:

        1. RxJava specific configuration is not available at the Process API 
level (only via configuration).
        2. Drop Consumer and expose java.concurrent Flow in Processor so the 
chain isn’t broken for systems integrating the TP4 VM.
                - predicated on java.concurrent Flow having reasonable 
expectations of non-reactive sources (i.e. processors).

Does this make sense to you?

———

Stephen said you made a comment regarding ParallelRxJava as not being 
necessary. If this is a true statement, can you explain your thoughts on 
ParallelRxJava. My assumptions regarding serial vs. parallel:

        1. For TP4 VM vendors in a highly concurrent, multi-user environment, 
multi-threading individual queries is bad.
        2. For TP4 VM vendors in a lowly concurrent, limited-user environment, 
multi-threading a single query is good.
                - also related to the workload — e.g. ParallelRxJava for an AI 
system where one query at a time is happening over lots of data.

Thank you for your feedback,
Marko.

http://rredux.com <http://rredux.com/>




> On Apr 24, 2019, at 3:41 AM, [email protected] wrote:
> 
> 
> 
> On 2019/04/23 13:07:09, Marko Rodriguez <[email protected] 
> <mailto:[email protected]>> wrote: 
>> Hi,
>> 
>> Stephen and Bryn were looking over my RxJava implementation the other day 
>> and Bryn, with his British accent, was like [I paraphrase]:
>> 
>>      “Whoa dawg! Bro should like totally not be blocking to fill an 
>> iterator. Gnar gnar for surezies.”
>> 
>> Prior to now, Processor implemented Iterator<Traverser>, where for RxJava, 
>> when you do next()/hasNext() if there were no results in the queue and the 
>> flowable was still running, then the iterator while()-blocks waiting for a 
>> result or for the flowable to terminate.
>> 
>> This morning I decided to redo the Processor interface (and respective 
>> implementations) and it is much nicer now. We have two “execute” methods:
>> 
>> Iterator<Traverser>  Processor.iterator(Iterator<Traverser> starts)
>> void Processor.subscribe(Iterator<Traverser> starts, Consumer<Traverser> 
>> consumer)
>> 
>> A processor can only be executed using one of the methods above. Thus, 
>> depending on context and the underlying processor, the VM determines whether 
>> to use pull-based or push-based semantics. Pretty neat, eh?
>> 
>>      
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java>
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/processor/Processor.java>>
>> 
>> Check out how I do Pipes:
>> 
>>      
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/pipes/src/main/java/org/apache/tinkerpop/machine/processor/pipes/Pipes.java#L113-L126>>
>> 
>> Pipes is inherently pull-based. However, to simulate push-based semantics, I 
>> Thread().start() the iterator.hasNext()/next() and just consume.accept() the 
>> results. Thus, as desired, subscribe() returns immediately.
>> 
>> Next, here is my RxJava implementation.
>> 
>>      
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L59-L65>>
>>      
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L66-L86>>
>> 
>> You can see how I turn a push-based subscription into a pull-based iteration 
>> using the good ‘ol while()-block :).
>> 
>>      
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102><https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/AbstractRxJava.java#L98-L102>>
>> 
>> ——
>> 
>> What I need to do next is to redo the RxJava execution planner such that 
>> nested traversals (e.g. map(out()))) are subscription-based with the parent 
>> flowable. I don’t quite know how I will do it — but I believe I will have to 
>> write custom Publisher/Subscriber objects for use with Flowable.compose() 
>> such that onNext() and onComplete() will be called accordingly within the 
>> consumer.accept(). It will be tricky as I’m not too good with low-level 
>> RxJava, but thems the breaks.
>> 
>> Please note that my push-based conceptual skills are not the sharpest so if 
>> anyone has any recommendations, please advise.
>> 
>> Take care,
>> Marko.
>> 
>> http://rredux.com <http://rredux.com/> <http://rredux.com/ 
>> <http://rredux.com/>>
>> 
>> 
>> 
>> 
>> 
> 

Reply via email to