Hello,

I have been studying Akka lately as a way to implement distributed OLTP/dynamic 
query routing. I have a working implementation.

This is a “toy implementation” currently in the TINKERPOP-1564 test/ of 
tinkergraph-gremlin.

There are 3 classes: 

TinkerActorSystem
        
https://github.com/apache/tinkerpop/blob/0462411f1d1ad6fab9d3e6e58e5e158c9bb3f207/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
 
<https://github.com/apache/tinkerpop/blob/0462411f1d1ad6fab9d3e6e58e5e158c9bb3f207/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java>
This class creates an Akka ActorSystem for a submitted traversal. It then 
spawns a MasterTraversalActor given the traversal and a partitioner. The 
partitioner is currently hardcoded.

MasterTraversalActor
        
https://github.com/apache/tinkerpop/blob/0462411f1d1ad6fab9d3e6e58e5e158c9bb3f207/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
 
<https://github.com/apache/tinkerpop/blob/0462411f1d1ad6fab9d3e6e58e5e158c9bb3f207/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java>
This class spawns a WorkerTraversalActor for each partition in the partitioner. 
It then “starts” each worker and awaits halted traversers (results).

WorkerTraversalActor
        
https://github.com/apache/tinkerpop/blob/0462411f1d1ad6fab9d3e6e58e5e158c9bb3f207/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
 
<https://github.com/apache/tinkerpop/blob/0462411f1d1ad6fab9d3e6e58e5e158c9bb3f207/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java>
This class streams out its partition's start step traversers. If the traverser 
references elements local to its partition, it process it. Else, it messages 
the traverser to the respective worker partition. It also receives traversers 
from remote workers and processes them accordingly. If it yields a halted 
traverser, the worker sends the halted traverser to the master traversal.

Here is the System.out data when using:

g.V().match(
  as("a").out("created").as("b"),
  as("b").in("created").as("c"),
  as("b").has("name", eq("lop"))).
    where("a", neq("c")).
    select("a", "b", "c").by("name")

master[created]: akka://traversal-1552478766/user/master 
<akka://traversal-1552478766/user/master>
worker[created]: akka://traversal-1552478766/user/master/worker-1211806485 
<akka://traversal-1552478766/user/master/worker-1211806485>
worker[created]: akka://traversal-1552478766/user/master/worker-1261612621 
<akka://traversal-1552478766/user/master/worker-1261612621>
worker[created]: akka://traversal-1552478766/user/master/worker-1864420351 
<akka://traversal-1552478766/user/master/worker-1864420351>
worker[created]: akka://traversal-1552478766/user/master/worker-1054674616 
<akka://traversal-1552478766/user/master/worker-1054674616>
worker[created]: akka://traversal-1552478766/user/master/worker-593666157 
<akka://traversal-1552478766/user/master/worker-593666157>
master[result]: {a=marko, b=lop, c=peter}
master[result]: {a=josh, b=lop, c=marko}
master[result]: {a=marko, b=lop, c=josh}
master[result]: {a=peter, b=lop, c=josh}
master[result]: {a=peter, b=lop, c=marko}
master[result]: {a=josh, b=lop, c=peter}


This implementation currently does not support traversals with barriers or 
side-effects. Moreover, it does not “stall the workers” to build up traverser 
bulks. In other words, it currently messages one traverser at a time.

I hope you can appreciate the simplicity of implementation. Using akka-remote, 
the code stays the same, save the URIs of the Akka actors changes. Its all 
pretty basic in fact.

Enjoy,
Marko.

http://markorodriguez.com <http://markorodriguez.com/>

Reply via email to