Hello,

Today I’m working on the TP4 BeamProcessor package. When you create an Apache 
Beam pipeline, the final Fn (steam operation) is used to write the output 
“somewhere.” Beam provides writers for common databases like Cassandra, HBase, 
RDBMS and for common file formats/systems like HDFS and S3. For testing 
purposes (and to move forward quickly), I just created a "public static 
TraverserSet" (which is thread safe) and had the final Fn reference the 
TraverserSet and write traversers to it. Of course, this doesn’t work outside 
unit tests… So I needed to up my game. I need a way for all the final Fns to 
send their traverses to a server.

I decided to build my own multi-threaded TraverserServer (ignore the rough 
edges for now — error handling, ObjectInputStream, etc.).

        
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java
 
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/TraverserServer.java>

I originally had this class in Beam only.

        1. Start the server when the Beam pipeline is started.
                
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94
 
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/Beam.java#L92-L94>
        2. Have the OutputFn connect to the server and send resultant 
traversers to it.
                
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46
 
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/processor/beam/src/main/java/org/apache/tinkerpop/machine/processor/beam/OutputFn.java#L46>

Makes sense. Works. Great. Done. … or was I?!?!

Think about this in practice. If TP4 had a “GremlinServer”, we would have to 
stream the resultant traversers in TraverserServer over the wire to the client. 
This means an extra serialization/deserialization step. Why do that? Why not 
move the TraverserServer to the client! That is, the OutputFn-steps send their 
traversers to a TraverserSet regardless of it being maintained by Beam.

If you read my post yesterday about the Machine interface, you will know that 
we now have a Machine interface. You do this:

Machine machine = LocalMachine.open()
TraversalSource g = 
Gremlin.traversal(machine).withProcessor(BeamProcessor.class)

This morning, I thought to myself — “maybe I can just rip out a 
RemoteMachine??” 4 hours later, done. Here is how you use it:

// assume this code is on 111.111.111.111
MachineServer machineServer = new MachineServer(7777); 

// assume this code is on 222.222.222.222
Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
TraversalSource g = Gremlin.traversal(machine).withProcessor(
        BeamProcessor.class, 
        Map.of("beam.traverserServer.location", “222.222.222.222", 
"beam.traverserServer.port", 6666))
      
MachineServer is like GremlinServer. It just sits (at the cluster) and waits 
for connections. It registers bytecode source, gets bytecode submissions, and 
returns traversers. Note how it is backed by a LocalMachine! Genius.
        
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java
 
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java>
                - its basic right now and Stephen will know how to make stuff 
better.
        
RemoteMachine is like RemoteConnection. It communicates with a MachineServer. 
However, guess what else it can do! It can spawn a TraverserServers to 
aggregate results in a threaded manner (from multiple traverser producers)!
        
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java
 
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/RemoteMachine.java>

Now here is the kicker — if you have a processor that parallel processes (Akka, 
Spark, Flink, etc.), these processors can be provided the coordinates to a 
TraverserServer (see the withProcess() Map in the above code fragment). This 
way, they will send their resultant traversers directly to the RemoteMachine 
(i.e. the client), bypassing double serialization at MachineServer. However, if 
you don’t want that to happen -- that is, if you want the results to go through 
the MachineServer, well just don’t specify the location of a TraverserServer 
and bam, these processors will just send their results to the MachineServer as 
it has have its own TraverserServer. Then the MachineServer will send 
traversers to the RemoteMachine via classical serial Iterator<Traverser>-style. 
This means that Pipes just works! Pipes has no notion of 
networks/machines/serialization, so MachineServer just iterates out the 
traversers and sends them to RemoteMachine.

Look — MachineServer always tries to iterate out processor results. However, 
when the processor is parallelized with outputs doing their own network I/O, 
the iterator is empty. Makes sense, logical and consistent. However, Pipes 
won’t be empty. Thus, there is no “special logic” to handle distributed vs. 
local processors!
        
https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105
 
<https://github.com/apache/tinkerpop/blob/da887c23c1f18617ff77ede74d69d93a4895fb83/java/machine/machine-core/src/main/java/org/apache/tinkerpop/machine/species/remote/MachineServer.java#L95-L105>

Thus, this just works out of the box!

Machine machine = RemoteMachine.open(6666, “111.111.111.111", 7777);
TraversalSource g = 
Gremlin.traversal(machine).withProcessor(PipesProcessor.class)

And finally coming full circle — this just works (no special logic required):

Machine machine = LocalMachine.open()
TraversalSource g = 
Gremlin.traversal(machine).withProcessor(BeamProcessor.class)       

Boo yea!

Thoughts?,
Marko.

http://rredux.com <http://rredux.com/>




Reply via email to