Hello,

> I hadn't put together that each compilation could have its own processor.
> Very cool.

Yea. This is an important aspect of TP4. We do something similar in TP3, it is 
just not so overt — and its not configurable.

In TP3, for example, SparkGraphComputer uses Spark to do “global traversals” 
and uses Pipes to do “local traversals.”

        global: the root bytecode, branch bytecode, repeat bytecode.
        local: single input nested bytecode.

Where this TP3-distinction falls apart is in local traversals that have complex 
global patterns inside them. For example:

        g.V().where(…)

In TP3, ‘…’ is always considered a local traversal. This makes sense for 

        g.V().where(out(‘knows’))

However, imagine this situation:

        g.V().where(match(repeat(union)))

This is where SparkGraphComputer will throw the typical VerificationStrategy 
exception of “can’t process beyond the adjacent vertex.” Why does it do that? 
Because it relies on Pipes to do the processing and Pipes can only see the data 
within SparkGraphComputer’s StarVertex. Bummer.

*** Many unfortunate complications were introduced by this seemingly innocuous 
interface:
        
https://github.com/apache/tinkerpop/blob/master/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/TraversalParent.java
 
<https://github.com/apache/tinkerpop/blob/master/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/TraversalParent.java>

——

In TP4, we don’t make the global/local-traversal distinction as it is 
completely up to the processor provider to decide how they want to execute each 
Bytecode chunk. For example, a batch analytical processor strategy can look at 
where(…) and decide “eh, I’ll just use Pipes” or it can see that its 
“match(repeat(union))” and decide to continue to execute using its batch self. 
That explanation should leave you wondering how Spark could do that given the 
non-random access limitation of most batch processors. And if not, then you 
might be wondering — why don’t we just do that for TP3? I’ll leave the answer 
to a future post on ‘scoped traversers’ (which is big deal concept to come).

> Thanks for the benchmarking numbers. I had a tp3 inspired
> JMH-based module in progress when I saw your results so I added the two
> test traversals in. It doesn't do any parameterization of input sizes at
> this point but if you're interested in checking it out I pushed it to the
> tp4-jmh branch:
> https://github.com/apache/tinkerpop/blob/tp4-jmh/java/machine/machine-perf-test/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractTraversalBenchmarkBase.java
>  
> <https://github.com/apache/tinkerpop/blob/tp4-jmh/java/machine/machine-perf-test/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractTraversalBenchmarkBase.java>
> .

Awesome. Stephen is thinking through how we will do language agnostic testing 
in TP4, where our JVM-based VM will be one of many. I think when he gets that 
sorted out, we should figure out how to attach your benchmarking work to that 
same package so we can:

        1. Verify the operational semantics of any TP4 VM (and underlying 
database + processor)
        2. Benchmark the execution efficiency of any TP4 VM (and underlying 
database + processor)

It would be great to provide providers information such as:

        TP4 .NET VM w/ LINQ+CosmosDB is 100% TP4 compliant.
                The LINQ processor is in the top 90%-tile of single-machine, 
multi-threaded processors.
                The CosmosDB structure is in the top 80%-tile of sharded 
transactional structures.


> RxSerialTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
>  sample          6.988          ms/op
> RxParallelTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
>  sample         11.633          ms/op
> PipesTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
>  sample          6.627          ms/op
> 
> RxSerialTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
>                sample 3.592          ms/op
> RxParallelTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
>                sample          7.897          ms/op
> PipesTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
>                sample          3.887          ms/op

Pretty crazy how fast SerialRxJava is compared to Pipes — especially with my 
branch/repeat implementation being pretty freakin’ ghetto. I banged my head 
against the wall all yesterday morning trying to figure out how to do a loop in 
Rx. :| … have some new ideas this morning.

Thanks for the effort.

Take care,
Marko.

http://rredux.com


> On Mon, Apr 8, 2019 at 12:16 PM Marko Rodriguez <[email protected] 
> <mailto:[email protected]>>
> wrote:
> 
>> Hi,
>> 
>> I implemented Multi-threaded RxJava this morning — its called
>> ParallelRxJava. Single-threaded is called SerialRxJava.
>> 
>> The RxJavaProcessor factory will generate either depending on the Map.of()
>> configuration:
>> 
>> 
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53>
>> <
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53>
>>> 
>> 
>> You can see the source code for each RxJava implementation here:
>> 
>> 
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java>
>> <
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java>
>>> 
>> 
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java>
>> <
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java>
>>> 
>> 
>> Given Ted’s comments last week, I decided to create a micro-benchmark
>> @Test to compare SerialRxJava, ParallelRxJava, and Pipes.
>> 
>> 
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java>
>> <
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java>
>>> 
>> 
>> The results are below. My notes on the results are as follows:
>> 
>>        * ParallelRxJava used 7 threads.
>>        * Times averaged over 30 runs (minus the first 2 runs — JVM
>> warmup).
>>        * SerialRxJava and Pipes are very close on non-branching
>> traversals with a small input set.
>>        * ParallelRxJava never beats Pipes, but does beat SerialRxJava on
>> large input sets.
>>        * My implementation of repeat() in RxJava is not good. I need to
>> think of a better way to implement recursion (and branching in general).
>>        * ParallelRxJava will probably shine when the query has a lot of
>> database operations (e.g. out(), inE(), addV(), etc.).
>>        * There is a lot of intelligence to to add to ParallelRxJava —
>> e.g.,
>>                ** If the nested traversal is simple (only a few steps),
>> don’t thread. For example, there is no need to thread the is() of
>> choose(is(gt(3)),….).
>>                ** One of the beautiful things about TP4 is that each
>> Compilation (nest) can have a different processor.
>>                        *** Thus, parallel for long sequences and serial
>> for short sequences…or, Pipes for short sequences! (Beam uses Pipes for
>> short sequences).
>> 
>> ———
>> 
>> g.inject(input).unfold().incr().incr().incr().incr().iterate()
>> 
>> Input size: 10
>> Average time [seri]: 0.4
>> Average time [para]: 2.4
>> Average time [pipe]: 0.5
>> 
>> Input size: 100
>> Average time [seri]: 0.96666664
>> Average time [para]: 4.3333335
>> Average time [pipe]: 0.8
>> 
>> Input size: 1000
>> Average time [seri]: 2.5333333
>> Average time [para]: 4.1666665
>> Average time [pipe]: 1.7
>> 
>> Input size: 10000
>> Average time [seri]: 12.1
>> Average time [para]: 10.633333
>> Average time [pipe]: 8.1
>> 
>> Input size: 100000
>> Average time [seri]: 103.96667
>> Average time [para]: 95.066666
>> Average time [pipe]: 59.933334
>> 
>> ——————
>> ——————
>> 
>> g.inject(input).unfold().repeat(incr()).times(4).iterate()
>> 
>> Input size: 10
>> Average time [seri]: 1.3333334
>> Average time [para]: 4.8
>> Average time [pipe]: 0.8333333
>> 
>> Input size: 100
>> Average time [seri]: 2.9
>> Average time [para]: 8.866667
>> Average time [pipe]: 1.0333333
>> 
>> Input size: 1000
>> Average time [seri]: 15.7
>> Average time [para]: 22.066668
>> Average time [pipe]: 3.4
>> 
>> Input size: 10000
>> Average time [seri]: 50.4
>> Average time [para]: 35.8
>> Average time [pipe]: 8.566667
>> 
>> Input size: 100000
>> Average time [seri]: 387.06668
>> Average time [para]: 271.2
>> Average time [pipe]: 60.566666
>> 
>> ——————
>> ——————
>> 
>> One of the reasons for implementing a multi-threaded single machine
>> processor was to see how threading would work with the intermediate
>> CFunction representation. At first, I thought I was going to have to make
>> CFunctions thread safe (as they can nest and can contain Compilations), but
>> then I realized we provide a clone() method. Thus, its up to the processor
>> to clone CFunctions accordingly across threads (“rails” in RxJava). For
>> ParallelRxJava, its as simple as using ThreadLocal. Here is the MapFlow
>> ReactiveX Function:
>> 
>> 
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
>> <
>> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
>>  
>> <https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java>
>>> 
>> 
>> After using ThreadLocal in the map, flatmap, filter, and branch flows,
>> everything just worked! I don’t know if this is the best idea, but its
>> simple and it pushes multi-threaded query concerns to the processors as
>> opposed to the TP4 VM. Given the numerous ways in which threading could be
>> implemented, it seems that this shouldn’t be a TP4 VM concern. Thoughts?
>> 
>> Thanks for reading,
>> Marko.
>> 
>> http://rredux.com <http://rredux.com/> <http://rredux.com/ 
>> <http://rredux.com/>>

Reply via email to