Hello,
Today I’m working on the TP4 BeamProcessor package. When you create an Apache
Beam pipeline, the final Fn (steam operation) is used to write the output
“somewhere.” Beam provides writers for common databases like Cassandra, HBase,
RDBMS and for common file formats/systems like HDFS and S3. For testing
purposes (and to move forward quickly), I just created a "public static
TraverserSet" (which is thread safe) and had the final Fn reference the
TraverserSet and write traversers to it. Of course, this doesn’t work outside
unit tests… So I needed to up my game. I need a way for all the final Fns to
send their traverses to a server.
I decided to build my own multi-threaded TraverserServer (ignore the rough
edges for now — error handling, ObjectInputStream, etc.).
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java>
I originally had this class in Beam only.
1. Start the server when the Beam pipeline is started.
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94>
2. Have the OutputFn connect to the server and send resultant
traversers to it.
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46>
Makes sense. Works. Great. Done. … or was I?!?!
Think about this in practice. If TP4 had a “GremlinServer”, we would have to
stream the resultant traversers in TraverserServer over the wire to the client.
This means an extra serialization/deserialization step. Why do that? Why not
move the TraverserServer to the client! That is, the OutputFn-steps send their
traversers to a TraverserSet regardless of it being maintained by Beam.
If you read my post yesterday about the Machine interface, you will know that
we now have a Machine interface. You do this:
Machine machine = LocalMachine.open()
TraversalSource g =
Gremlin.traversal(machine).withProcessor(BeamProcessor.class)
This morning, I thought to myself — “maybe I can just rip out a
RemoteMachine??” 4 hours later, done. Here is how you use it:
// assume this code is on 111.111.111.111
MachineServer machineServer = new MachineServer(7777);
// assume this code is on 222.222.222.222
Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
TraversalSource g = Gremlin.traversal(machine).withProcessor(
BeamProcessor.class,
Map.of("beam.traverserServer.location", “222.222.222.222",
"beam.traverserServer.port", 6666))
MachineServer is like GremlinServer. It just sits (at the cluster) and waits
for connections. It registers bytecode source, gets bytecode submissions, and
returns traversers. Note how it is backed by a LocalMachine! Genius.
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java>
- its basic right now and Stephen will know how to make stuff
better.
RemoteMachine is like RemoteConnection. It communicates with a MachineServer.
However, guess what else it can do! It can spawn a TraverserServers to
aggregate results in a threaded manner (from multiple traverser producers)!
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java>
Now here is the kicker — if you have a processor that parallel processes (Akka,
Spark, Flink, etc.), these processors can be provided the coordinates to a
TraverserServer (see the withProcess() Map in the above code fragment). This
way, they will send their resultant traversers directly to the RemoteMachine
(i.e. the client), bypassing double serialization at MachineServer. However, if
you don’t want that to happen -- that is, if you want the results to go through
the MachineServer, well just don’t specify the location of a TraverserServer
and bam, these processors will just send their results to the MachineServer as
it has have its own TraverserServer. Then the MachineServer will send
traversers to the RemoteMachine via classical serial Iterator<Traverser>-style.
This means that Pipes just works! Pipes has no notion of
networks/machines/serialization, so MachineServer just iterates out the
traversers and sends them to RemoteMachine.
Look — MachineServer always tries to iterate out processor results. However,
when the processor is parallelized with outputs doing their own network I/O,
the iterator is empty. Makes sense, logical and consistent. However, Pipes
won’t be empty. Thus, there is no “special logic” to handle distributed vs.
local processors!
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105>
Thus, this just works out of the box!
Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
TraversalSource g =
Gremlin.traversal(machine).withProcessor(PipesProcessor.class)
And finally coming full circle — this just works (no special logic required):
Machine machine = LocalMachine.open()
TraversalSource g =
Gremlin.traversal(machine).withProcessor(BeamProcessor.class)
Boo yea!
Thoughts?,
Marko.
http://rredux.com <http://rredux.com/>