Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 d651d38f0 -> f8fc43d46


dah. had to add a mutex to the TraverserMailbox ... need to think this through 
so it doesn't lock.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f8fc43d4
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f8fc43d4
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f8fc43d4

Branch: refs/heads/TINKERPOP-1564
Commit: f8fc43d464ebf9ee279be37f244bede55b64927f
Parents: d651d38
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Mon Dec 12 11:05:52 2016 -0700
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Mon Dec 12 11:05:52 2016 -0700

----------------------------------------------------------------------
 .../process/akka/TinkerActorSystem.java         | 13 ++---
 .../process/akka/TraverserMailbox.java          | 52 ++++++++++++++------
 2 files changed, 43 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f8fc43d4/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 3eeae71..06183dc 100644
--- 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -72,12 +72,12 @@ public final class TinkerActorSystem<S, E> {
     //////////////
 
     public static void main(String args[]) throws Exception {
-        /*final Graph graph = TinkerGraph.open();
-        graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+        final Graph graph = TinkerGraph.open();
+        graph.io(GryoIo.build()).readGraph("data/grateful-dead.kryo");
         GraphTraversalSource g = graph.traversal().withStrategies(new 
PartitionerStrategy(new HashPartitioner(graph.partitioner(), 3)));
-        System.out.println(g.V().out().values("name").toList());*/
-
-        for (int i = 0; i < 10000; i++) {
+        System.out.println(g.V().both().both().count().toList());
+        // 1406914
+        /*for (int i = 0; i < 10000; i++) {
             final Graph graph = TinkerGraph.open();
             graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
             final GraphTraversalSource g = graph.traversal().withComputer();
@@ -110,7 +110,8 @@ public final class TinkerActorSystem<S, E> {
                 System.out.println("//////////////////////////////////\n");
             }
         }
-    }
+    }*/
 
 
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f8fc43d4/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
index 3d61c2c..65983de 100644
--- 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java
@@ -32,7 +32,6 @@ import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHalt
 import scala.Option;
 
 import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -43,38 +42,55 @@ public final class TraverserMailbox implements MailboxType, 
ProducesMessageQueue
     public static class TraverserMessageQueue implements MessageQueue, 
TraverserSetSemantics {
         private final Queue<Envelope> otherMessages = new 
ConcurrentLinkedQueue<>();
         // TODO: we need a concurrent linked hash map
-        private final TraverserSet<?> traverserMessages = new 
TraverserSet<>(new ConcurrentHashMap<>());
+        private final TraverserSet<?> traverserMessages = new TraverserSet<>();
         private final Queue<Envelope> haltMessages = new 
ConcurrentLinkedQueue<>();
+        private Envelope terminateToken = null;
         private final ActorRef owner;
+        private final Object MUTEX = new Object();
 
         public TraverserMessageQueue(final ActorRef owner) {
             this.owner = owner;
         }
 
         public void enqueue(final ActorRef receiver, final Envelope handle) {
-            if (handle.message() instanceof Traverser.Admin)
-                this.traverserMessages.offer((Traverser.Admin) 
handle.message());
-            else if (handle.message() instanceof VoteToHaltMessage)
-                this.haltMessages.offer(handle);
-            else
-                this.otherMessages.offer(handle);
+            synchronized (MUTEX) {
+                if (handle.message() instanceof Traverser.Admin)
+                    this.traverserMessages.offer((Traverser.Admin) 
handle.message());
+                else if (handle.message() instanceof VoteToHaltMessage)
+                    this.haltMessages.offer(handle);
+                else if (handle.message() instanceof 
WorkerTraversalActor.Terminate)
+                    this.terminateToken = handle;
+                else
+                    this.otherMessages.offer(handle);
+            }
         }
 
         public Envelope dequeue() {
-            if (!this.otherMessages.isEmpty())
-                return this.otherMessages.poll();
-            else if (!this.traverserMessages.isEmpty())
-                return new Envelope(this.traverserMessages.poll(), this.owner);
-            else
-                return this.haltMessages.poll();
+            synchronized (MUTEX) {
+                if (!this.traverserMessages.isEmpty())
+                    return new Envelope(this.traverserMessages.poll(), 
this.owner);
+                else if (null != this.terminateToken) {
+                    final Envelope temp = this.terminateToken;
+                    this.terminateToken = null;
+                    return temp;
+                } else if (!this.haltMessages.isEmpty())
+                    return this.haltMessages.poll();
+                else
+                    return this.otherMessages.poll();
+            }
+
         }
 
         public int numberOfMessages() {
-            return this.otherMessages.size() + this.traverserMessages.size() + 
this.haltMessages.size();
+            synchronized (MUTEX) {
+                return this.otherMessages.size() + 
this.traverserMessages.size() + this.haltMessages.size() + (null == 
this.terminateToken ? 0 : 1);
+            }
         }
 
         public boolean hasMessages() {
-            return !this.otherMessages.isEmpty() || 
!this.traverserMessages.isEmpty() || !this.haltMessages.isEmpty();
+            synchronized (MUTEX) {
+                return !this.otherMessages.isEmpty() || 
!this.traverserMessages.isEmpty() || !this.haltMessages.isEmpty() || 
this.terminateToken != null;
+            }
         }
 
         public void cleanUp(final ActorRef owner, final MessageQueue 
deadLetters) {
@@ -87,6 +103,10 @@ public final class TraverserMailbox implements MailboxType, 
ProducesMessageQueue
             for (final Envelope handle : this.haltMessages) {
                 deadLetters.enqueue(owner, handle);
             }
+            if (null != this.terminateToken) {
+                deadLetters.enqueue(owner, this.terminateToken);
+                this.terminateToken = null;
+            }
         }
     }
 

Reply via email to