Hi,

I implemented Multi-threaded RxJava this morning — its called ParallelRxJava. 
Single-threaded is called SerialRxJava.

The RxJavaProcessor factory will generate either depending on the Map.of() 
configuration:

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53>

 You can see the source code for each RxJava implementation here:

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java>
        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java>

Given Ted’s comments last week, I decided to create a micro-benchmark @Test to 
compare SerialRxJava, ParallelRxJava, and Pipes.

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java>

The results are below. My notes on the results are as follows:

        * ParallelRxJava used 7 threads.
        * Times averaged over 30 runs (minus the first 2 runs — JVM warmup).
        * SerialRxJava and Pipes are very close on non-branching traversals 
with a small input set.
        * ParallelRxJava never beats Pipes, but does beat SerialRxJava on large 
input sets.
        * My implementation of repeat() in RxJava is not good. I need to think 
of a better way to implement recursion (and branching in general).
        * ParallelRxJava will probably shine when the query has a lot of 
database operations (e.g. out(), inE(), addV(), etc.).
        * There is a lot of intelligence to to add to ParallelRxJava — e.g., 
                ** If the nested traversal is simple (only a few steps), don’t 
thread. For example, there is no need to thread the is() of 
choose(is(gt(3)),….).        
                ** One of the beautiful things about TP4 is that each 
Compilation (nest) can have a different processor.
                        *** Thus, parallel for long sequences and serial for 
short sequences…or, Pipes for short sequences! (Beam uses Pipes for short 
sequences).

———

g.inject(input).unfold().incr().incr().incr().incr().iterate()

Input size: 10
Average time [seri]: 0.4
Average time [para]: 2.4
Average time [pipe]: 0.5

Input size: 100
Average time [seri]: 0.96666664
Average time [para]: 4.3333335
Average time [pipe]: 0.8

Input size: 1000
Average time [seri]: 2.5333333
Average time [para]: 4.1666665
Average time [pipe]: 1.7

Input size: 10000
Average time [seri]: 12.1
Average time [para]: 10.633333
Average time [pipe]: 8.1

Input size: 100000
Average time [seri]: 103.96667
Average time [para]: 95.066666
Average time [pipe]: 59.933334

——————
——————

g.inject(input).unfold().repeat(incr()).times(4).iterate()

Input size: 10
Average time [seri]: 1.3333334
Average time [para]: 4.8
Average time [pipe]: 0.8333333

Input size: 100
Average time [seri]: 2.9
Average time [para]: 8.866667
Average time [pipe]: 1.0333333

Input size: 1000
Average time [seri]: 15.7
Average time [para]: 22.066668
Average time [pipe]: 3.4

Input size: 10000
Average time [seri]: 50.4
Average time [para]: 35.8
Average time [pipe]: 8.566667

Input size: 100000
Average time [seri]: 387.06668
Average time [para]: 271.2
Average time [pipe]: 60.566666

——————
——————

One of the reasons for implementing a multi-threaded single machine processor 
was to see how threading would work with the intermediate CFunction 
representation. At first, I thought I was going to have to make CFunctions 
thread safe (as they can nest and can contain Compilations), but then I 
realized we provide a clone() method. Thus, its up to the processor to clone 
CFunctions accordingly across threads (“rails” in RxJava). For ParallelRxJava, 
its as simple as using ThreadLocal. Here is the MapFlow ReactiveX Function:

        
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
 
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java>

After using ThreadLocal in the map, flatmap, filter, and branch flows, 
everything just worked! I don’t know if this is the best idea, but its simple 
and it pushes multi-threaded query concerns to the processors as opposed to the 
TP4 VM. Given the numerous ways in which threading could be implemented, it 
seems that this shouldn’t be a TP4 VM concern. Thoughts?

Thanks for reading,
Marko.

http://rredux.com <http://rredux.com/>




Reply via email to