I hadn't put together that each compilation could have its own processor.
Very cool. Thanks for the benchmarking numbers. I had a tp3 inspired
JMH-based module in progress when I saw your results so I added the two
test traversals in. It doesn't do any parameterization of input sizes at
this point but if you're interested in checking it out I pushed it to the
tp4-jmh branch:
https://github.com/apache/tinkerpop/blob/tp4-jmh/java/machine/machine-perf-test/src/main/java/org/apache/tinkerpop/benchmark/util/AbstractTraversalBenchmarkBase.java
.

The benchmarks can be run from the command line or from the IDE using the
individual process tests:
https://github.com/apache/tinkerpop/tree/tp4-jmh/java/machine/machine-perf-test/src/main/java/org/apache/tinkerpop/benchmark/machine
.

Here's the median times for input size = 1000 that I pulled from running.
The full output includes the P90, P95, etc and it can also be set to dump
an average or raw throughput:

mvn clean test -DskipBenchmarks=false -Dforks=1 -DmeasureIterations=5
-DwarmupIterations=5

RxSerialTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
  sample          6.988          ms/op
RxParallelTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
  sample         11.633          ms/op
PipesTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
  sample          6.627          ms/op

RxSerialTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
                sample 3.592          ms/op
RxParallelTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
                sample          7.897          ms/op
PipesTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
                sample          3.887          ms/op

JMH is great, but the defaults will have it do a ton of timing runs which
is slow so for quicker (but less accurate) runs the measureIterations and
warmupIterations can be decreased.

--Ted

On Mon, Apr 8, 2019 at 12:16 PM Marko Rodriguez <[email protected]>
wrote:

> Hi,
>
> I implemented Multi-threaded RxJava this morning — its called
> ParallelRxJava. Single-threaded is called SerialRxJava.
>
> The RxJavaProcessor factory will generate either depending on the Map.of()
> configuration:
>
>
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53
> <
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaProcessor.java#L49-L53
> >
>
>  You can see the source code for each RxJava implementation here:
>
>
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
> <
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
> >
>
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
> <
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
> >
>
> Given Ted’s comments last week, I decided to create a micro-benchmark
> @Test to compare SerialRxJava, ParallelRxJava, and Pipes.
>
>
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
> <
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/test/java/org/apache/tinkerpop/machine/processor/rxjava/RxJavaBenchmark.java
> >
>
> The results are below. My notes on the results are as follows:
>
>         * ParallelRxJava used 7 threads.
>         * Times averaged over 30 runs (minus the first 2 runs — JVM
> warmup).
>         * SerialRxJava and Pipes are very close on non-branching
> traversals with a small input set.
>         * ParallelRxJava never beats Pipes, but does beat SerialRxJava on
> large input sets.
>         * My implementation of repeat() in RxJava is not good. I need to
> think of a better way to implement recursion (and branching in general).
>         * ParallelRxJava will probably shine when the query has a lot of
> database operations (e.g. out(), inE(), addV(), etc.).
>         * There is a lot of intelligence to to add to ParallelRxJava —
> e.g.,
>                 ** If the nested traversal is simple (only a few steps),
> don’t thread. For example, there is no need to thread the is() of
> choose(is(gt(3)),….).
>                 ** One of the beautiful things about TP4 is that each
> Compilation (nest) can have a different processor.
>                         *** Thus, parallel for long sequences and serial
> for short sequences…or, Pipes for short sequences! (Beam uses Pipes for
> short sequences).
>
> ———
>
> g.inject(input).unfold().incr().incr().incr().incr().iterate()
>
> Input size: 10
> Average time [seri]: 0.4
> Average time [para]: 2.4
> Average time [pipe]: 0.5
>
> Input size: 100
> Average time [seri]: 0.96666664
> Average time [para]: 4.3333335
> Average time [pipe]: 0.8
>
> Input size: 1000
> Average time [seri]: 2.5333333
> Average time [para]: 4.1666665
> Average time [pipe]: 1.7
>
> Input size: 10000
> Average time [seri]: 12.1
> Average time [para]: 10.633333
> Average time [pipe]: 8.1
>
> Input size: 100000
> Average time [seri]: 103.96667
> Average time [para]: 95.066666
> Average time [pipe]: 59.933334
>
> ——————
> ——————
>
> g.inject(input).unfold().repeat(incr()).times(4).iterate()
>
> Input size: 10
> Average time [seri]: 1.3333334
> Average time [para]: 4.8
> Average time [pipe]: 0.8333333
>
> Input size: 100
> Average time [seri]: 2.9
> Average time [para]: 8.866667
> Average time [pipe]: 1.0333333
>
> Input size: 1000
> Average time [seri]: 15.7
> Average time [para]: 22.066668
> Average time [pipe]: 3.4
>
> Input size: 10000
> Average time [seri]: 50.4
> Average time [para]: 35.8
> Average time [pipe]: 8.566667
>
> Input size: 100000
> Average time [seri]: 387.06668
> Average time [para]: 271.2
> Average time [pipe]: 60.566666
>
> ——————
> ——————
>
> One of the reasons for implementing a multi-threaded single machine
> processor was to see how threading would work with the intermediate
> CFunction representation. At first, I thought I was going to have to make
> CFunctions thread safe (as they can nest and can contain Compilations), but
> then I realized we provide a clone() method. Thus, its up to the processor
> to clone CFunctions accordingly across threads (“rails” in RxJava). For
> ParallelRxJava, its as simple as using ThreadLocal. Here is the MapFlow
> ReactiveX Function:
>
>
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
> <
> https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/MapFlow.java
> >
>
> After using ThreadLocal in the map, flatmap, filter, and branch flows,
> everything just worked! I don’t know if this is the best idea, but its
> simple and it pushes multi-threaded query concerns to the processors as
> opposed to the TP4 VM. Given the numerous ways in which threading could be
> implemented, it seems that this shouldn’t be a TP4 VM concern. Thoughts?
>
> Thanks for reading,
> Marko.
>
> http://rredux.com <http://rredux.com/>
>
>
>
>
>

Reply via email to