Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 f8fc43d46 -> 45b03b20e
fixed a bug where nested traversal side-effects were not using message passing. lots of test cases now pass that didn't before. getting close. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/45b03b20 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/45b03b20 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/45b03b20 Branch: refs/heads/TINKERPOP-1564 Commit: 45b03b20e6835d78bac41fff27295b2ee9362df4 Parents: f8fc43d Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Dec 12 11:26:10 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Dec 12 11:26:10 2016 -0700 ---------------------------------------------------------------------- .../process/akka/TinkerActorSystem.java | 15 ++------ .../process/akka/TraverserMailbox.java | 39 ++++++++++---------- .../process/akka/WorkerTraversalActor.java | 5 ++- 3 files changed, 27 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java index 06183dc..f348293 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java @@ -21,29 +21,20 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka; import akka.actor.ActorSystem; import akka.actor.Props; - -import org.apache.tinkerpop.gremlin.process.traversal.Order; -import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; -import org.apache.tinkerpop.gremlin.structure.Column; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; import org.apache.tinkerpop.gremlin.structure.util.HashPartitioner; import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.traversal.PartitionerStrategy; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import org.javatuples.Pair; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.as; -import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; /** @@ -73,9 +64,9 @@ public final class TinkerActorSystem<S, E> { public static void main(String args[]) throws Exception { final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph("data/grateful-dead.kryo"); + graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); GraphTraversalSource g = graph.traversal().withStrategies(new PartitionerStrategy(new HashPartitioner(graph.partitioner(), 3))); - System.out.println(g.V().both().both().count().toList()); + System.out.println(g.V().repeat(out().group("a").by("name").by(count())).times(2).cap("a").toList()); // 1406914 /*for (int i = 0; i < 10000; i++) { final Graph graph = TinkerGraph.open(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java index 65983de..392b691 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java @@ -31,8 +31,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage; import scala.Option; +import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -40,10 +40,10 @@ import java.util.concurrent.ConcurrentLinkedQueue; public final class TraverserMailbox implements MailboxType, ProducesMessageQueue<TraverserMailbox.TraverserMessageQueue> { public static class TraverserMessageQueue implements MessageQueue, TraverserSetSemantics { - private final Queue<Envelope> otherMessages = new ConcurrentLinkedQueue<>(); + private final Queue<Envelope> otherMessages = new LinkedList<>(); // TODO: we need a concurrent linked hash map private final TraverserSet<?> traverserMessages = new TraverserSet<>(); - private final Queue<Envelope> haltMessages = new ConcurrentLinkedQueue<>(); + private final Queue<Envelope> haltMessages = new LinkedList<>(); private Envelope terminateToken = null; private final ActorRef owner; private final Object MUTEX = new Object(); @@ -67,18 +67,17 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue public Envelope dequeue() { synchronized (MUTEX) { + if (!this.otherMessages.isEmpty()) + return this.otherMessages.poll(); if (!this.traverserMessages.isEmpty()) return new Envelope(this.traverserMessages.poll(), this.owner); else if (null != this.terminateToken) { final Envelope temp = this.terminateToken; this.terminateToken = null; return temp; - } else if (!this.haltMessages.isEmpty()) + } else return this.haltMessages.poll(); - else - return this.otherMessages.poll(); } - } public int numberOfMessages() { @@ -94,18 +93,20 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue } public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) { - for (final Envelope handle : this.otherMessages) { - deadLetters.enqueue(owner, handle); - } - for (final Traverser.Admin<?> traverser : this.traverserMessages) { - deadLetters.enqueue(owner, new Envelope(traverser, this.owner)); - } - for (final Envelope handle : this.haltMessages) { - deadLetters.enqueue(owner, handle); - } - if (null != this.terminateToken) { - deadLetters.enqueue(owner, this.terminateToken); - this.terminateToken = null; + synchronized (MUTEX) { + for (final Envelope handle : this.otherMessages) { + deadLetters.enqueue(owner, handle); + } + for (final Traverser.Admin<?> traverser : this.traverserMessages) { + deadLetters.enqueue(owner, new Envelope(traverser, this.owner)); + } + for (final Envelope handle : this.haltMessages) { + deadLetters.enqueue(owner, handle); + } + if (null != this.terminateToken) { + deadLetters.enqueue(owner, this.terminateToken); + this.terminateToken = null; + } } } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/45b03b20/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java index b21d168..880f78e 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java @@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; @@ -70,8 +71,9 @@ public final class WorkerTraversalActor extends AbstractActor implements Require // set up partition and traversal information this.localPartition = localPartition; this.partitioner = partitioner; + final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), context()); + TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); this.matrix = new TraversalMatrix<>(traversal); - this.matrix.getTraversal().setSideEffects(new WorkerTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context())); final GraphStep graphStep = (GraphStep) traversal.getStartStep(); if (0 == graphStep.getIds().length) ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges); @@ -119,6 +121,7 @@ public final class WorkerTraversalActor extends AbstractActor implements Require while (this.barrierLock.hasNextBarrier()) { master().tell(new BarrierAddMessage(this.barrierLock), self()); } + this.voteToHalt = false; } // use termination token to determine termination condition if (null != this.terminate) {