Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 d651d38f0 -> f8fc43d46
dah. had to add a mutex to the TraverserMailbox ... need to think this through so it doesn't lock. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f8fc43d4 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f8fc43d4 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f8fc43d4 Branch: refs/heads/TINKERPOP-1564 Commit: f8fc43d464ebf9ee279be37f244bede55b64927f Parents: d651d38 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Dec 12 11:05:52 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Dec 12 11:05:52 2016 -0700 ---------------------------------------------------------------------- .../process/akka/TinkerActorSystem.java | 13 ++--- .../process/akka/TraverserMailbox.java | 52 ++++++++++++++------ 2 files changed, 43 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f8fc43d4/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java index 3eeae71..06183dc 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java @@ -72,12 +72,12 @@ public final class TinkerActorSystem<S, E> { ////////////// public static void main(String args[]) throws Exception { - /*final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph("data/grateful-dead.kryo"); GraphTraversalSource g = graph.traversal().withStrategies(new PartitionerStrategy(new HashPartitioner(graph.partitioner(), 3))); - System.out.println(g.V().out().values("name").toList());*/ - - for (int i = 0; i < 10000; i++) { + System.out.println(g.V().both().both().count().toList()); + // 1406914 + /*for (int i = 0; i < 10000; i++) { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); final GraphTraversalSource g = graph.traversal().withComputer(); @@ -110,7 +110,8 @@ public final class TinkerActorSystem<S, E> { System.out.println("//////////////////////////////////\n"); } } - } + }*/ + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f8fc43d4/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java index 3d61c2c..65983de 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java @@ -32,7 +32,6 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHalt import scala.Option; import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -43,38 +42,55 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics { private final Queue<Envelope> otherMessages = new ConcurrentLinkedQueue<>(); // TODO: we need a concurrent linked hash map - private final TraverserSet<?> traverserMessages = new TraverserSet<>(new ConcurrentHashMap<>()); + private final TraverserSet<?> traverserMessages = new TraverserSet<>(); private final Queue<Envelope> haltMessages = new ConcurrentLinkedQueue<>(); + private Envelope terminateToken = null; private final ActorRef owner; + private final Object MUTEX = new Object(); public TraverserMessageQueue(final ActorRef owner) { this.owner = owner; } public void enqueue(final ActorRef receiver, final Envelope handle) { - if (handle.message() instanceof Traverser.Admin) - this.traverserMessages.offer((Traverser.Admin) handle.message()); - else if (handle.message() instanceof VoteToHaltMessage) - this.haltMessages.offer(handle); - else - this.otherMessages.offer(handle); + synchronized (MUTEX) { + if (handle.message() instanceof Traverser.Admin) + this.traverserMessages.offer((Traverser.Admin) handle.message()); + else if (handle.message() instanceof VoteToHaltMessage) + this.haltMessages.offer(handle); + else if (handle.message() instanceof WorkerTraversalActor.Terminate) + this.terminateToken = handle; + else + this.otherMessages.offer(handle); + } } public Envelope dequeue() { - if (!this.otherMessages.isEmpty()) - return this.otherMessages.poll(); - else if (!this.traverserMessages.isEmpty()) - return new Envelope(this.traverserMessages.poll(), this.owner); - else - return this.haltMessages.poll(); + synchronized (MUTEX) { + if (!this.traverserMessages.isEmpty()) + return new Envelope(this.traverserMessages.poll(), this.owner); + else if (null != this.terminateToken) { + final Envelope temp = this.terminateToken; + this.terminateToken = null; + return temp; + } else if (!this.haltMessages.isEmpty()) + return this.haltMessages.poll(); + else + return this.otherMessages.poll(); + } + } public int numberOfMessages() { - return this.otherMessages.size() + this.traverserMessages.size() + this.haltMessages.size(); + synchronized (MUTEX) { + return this.otherMessages.size() + this.traverserMessages.size() + this.haltMessages.size() + (null == this.terminateToken ? 0 : 1); + } } public boolean hasMessages() { - return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty() || !this.haltMessages.isEmpty(); + synchronized (MUTEX) { + return !this.otherMessages.isEmpty() || !this.traverserMessages.isEmpty() || !this.haltMessages.isEmpty() || this.terminateToken != null; + } } public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) { @@ -87,6 +103,10 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue for (final Envelope handle : this.haltMessages) { deadLetters.enqueue(owner, handle); } + if (null != this.terminateToken) { + deadLetters.enqueue(owner, this.terminateToken); + this.terminateToken = null; + } } }