Hello,
I spent most of the last 1.5 weeks working on RxJavaProcessor (ReactiveX —
http://reactivex.io/ <http://reactivex.io/>), where 3 of those days were spent
in a nasty code hell trying to figure out how to do cyclic stream topologies
for repeat(). I’ve never read so much of someone else’s code in my life — I’ve
come to know the inner workings of RxJava quite well.
Without further ado, here is what the tp4/ branch is looking like these days:
Machine machine = LocalMachine.open()
TraversalSource g =
Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class)
From g, you can spawn single-threaded Rx Flowables. Here is the SerialRxJava
processor code:
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java>
- So simple. 130 lines of code.
If you do:
TraversalSource g =
Gremlin.traversal(machine).withProcessor(RxJavaProcessor.class,
Map.of(“rx.threadPool.size”,10))
You can spawn multi-threaded Rx ParallelFlowables. Here is the ParallelRxJava
processor code:
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/ParallelRxJava.java>
- So simple. 150 lines of code.
I now am completely confident that the TP4 CFunction intermediate
representation (Compilation.class) is sufficient to support all types of
execution engines.
- Pipes: single-threaded, pull-based.
- Beam: distributed, push-based.
- RxJava: multi-threaded, push-based
Implementing BeamProcessor was important to know that multi-machine execution
would come naturally and RxJava was important to know that multi-threading
would come naturally. I know that Akka will work just fine as it is both
multi-threaded and distributed. Therefore, I believe we have converted on the
chain of representational mappings that we will use in the TP4 VM.
Language ==> Bytecode ==> CFunction Intermediate Representation ==>
Processor-Specific Execution Plan
In TP3, we do:
Language ==> Bytecode ==> Pipes ==> Processor-Specific Execution Plan
…we foolishly embedded one execution engine within another and this has been a
cause of various pains that are now rectified in TP4.
——
Ted Wilmes did some preliminary benchmarking of Pipes vs. SerialRxJava vs.
ParallelRxJava. Here are his results for two traversals:
RxSerialTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
sample 6.988 ms/op
RxParallelTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
sample 11.633 ms/op
PipesTraversalBenchmark.g_inject_unfold_incr_incr_incr_incr:g_inject_unfold_incr_incr_incr_incr·p0.50
sample 6.627 ms/op
RxSerialTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
sample 3.592 ms/op
RxParallelTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
sample 7.897 ms/op
PipesTraversalBenchmark.g_inject_unfold_repeat_times:g_inject_unfold_repeat_times·p0.50
sample 3.887 ms/op
We should expect ParallelRxJava to shine when interacting with a data source
where lots of time is wasted on I/O. I’m hoping that ParallelRxJava will be
able to transform borderline real-time queries in TP3 into genuine real-time
queries in TP4.
——
One of the outstanding problems I’m having (and I have given up on for now) is
that I can’t figure out how to do cyclic stream topologies in ReactiveX.
Instead, I have to do the repetiion-based implementation of repeat() as defined
in Stream Ring Theory.
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L108-L127
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/SerialRxJava.java#L108-L127>
What I’m doing is creating MAX_REPETITIONS amount of “unrolled loops” with exit
streams for emit() and until() breaks. I then merge all those exit streams into
the main outgoing stream. What I would like to do is be able to send a
traverser back to a previous Operator. I’ve gone through about 5 different
implementations, but they each have their problems. If anyone is versed in
ReactiveX and can tell me the best way to do looping, I would appreciate it.
And yes, I’ve read many a StackOverflows and docs … always close, but never
exactly my problem. … also, the solution needs to work for both serial and
parallel streams (I had a serial implementation that worked, but I ‘git
stashed’ it cause it wasn’t thread-safe.)
——
Finally, I’ve started doing more work with Strategies. You can see our most
complex TP4 strategy to date here:
https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java
<https://github.com/apache/tinkerpop/blob/tp4/java/machine/processor/rxjava/src/main/java/org/apache/tinkerpop/machine/processor/rxjava/strategy/RxJavaStrategy.java>
If a traversal is multi-threaded, it will use a single-threaded execution for
nested traversals if they are “simple” (no branching, no flatmaps, and not too
long). Given that nested traversals are both the simplest and most executed
traversals in TP, it is important to make sure they don’t waste resources. Thus:
g.V().has(‘age’,gt(32))
// the gt(32) traversal is single threaded
g.V().has(‘age’, gt(out(‘knows’).value(‘age’).max()))
// the gt(…) traversal is multi-threaded
Pretty neat, eh? My rules for determining “simple” are hardcoded as I’m
primarily trying to figure out how the strategy interface will look and feel.
Strategies are different in TP4 as they operate on bytecode and not on Pipe
steps. I’m still not completely happy with the strategy model in TP4 …. its a
bit awkward. Still more to learn. As we develop more strategies, I’m sure a
pattern will emerge.
——
For the next push, I plan to turn my attention to data structures. I’m happy
with our processing infrastructure. I think its theoretically sound and easy to
use. With respects to data structures, the question remains — does TP4 go
“beyond graph” ? ….. Unfortunately, lately, I’ve been thinking “no” … However,
I need to focus some thought on the subject so I can make a confident argument
one way or the other.
Thanks for reading. Enjoy your weekend!
Marko.
http://rredux.com <http://rredux.com/>