Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 5ef9e2b20 -> 3035bbcfd
realized we don't use stepId in hashCode(). eek. work around in actors system. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3035bbcf Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3035bbcf Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3035bbcf Branch: refs/heads/TINKERPOP-1564 Commit: 3035bbcfd4862648230b90f8591479db95c94f20 Parents: 5ef9e2b Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Dec 12 15:12:24 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Dec 12 15:12:24 2016 -0700 ---------------------------------------------------------------------- .../tinkergraph/process/akka/MasterTraversalActor.java | 8 +++----- .../gremlin/tinkergraph/process/akka/TinkerActorSystem.java | 5 +++-- .../gremlin/tinkergraph/process/akka/TraverserMailbox.java | 8 +++++--- .../tinkergraph/process/akka/WorkerTraversalActor.java | 6 +++--- 4 files changed, 14 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3035bbcf/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java index 834beab..2bf62eb 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java @@ -47,10 +47,8 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessa import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -61,7 +59,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require private final TraversalMatrix<?, ?> matrix; private final Partitioner partitioner; private final Map<String, ActorSelection> workers = new HashMap<>(); - private Set<Barrier> barriers = new HashSet<>(); + private Map<String, Barrier> barriers = new HashMap<>(); private final TraverserSet<?> results; private final String leaderWorker; @@ -91,7 +89,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require final Step<?, ?> step = (Step) barrier; GraphComputing.atMaster(step, true); barrier.addBarrier(barrierMerge.getBarrier()); - this.barriers.add(barrier); + this.barriers.put(step.getId(), barrier); }). match(SideEffectAddMessage.class, sideEffect -> { // get the side-effect updates from the workers to generate the master side-effects @@ -100,7 +98,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require match(VoteToHaltMessage.class, voteToHalt -> { assert !sender().equals(self()); if (!this.barriers.isEmpty()) { - for (final Barrier barrier : this.barriers) { + for (final Barrier barrier : this.barriers.values()) { final Step<?, ?> step = (Step) barrier; if (!(barrier instanceof LocalBarrier)) { while (step.hasNext()) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3035bbcf/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java index 53ff18f..461ebe3 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java @@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.count; +import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.inE; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE; @@ -68,8 +69,8 @@ public final class TinkerActorSystem<S, E> { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); GraphTraversalSource g = graph.traversal().withStrategies(new PartitionerStrategy(new HashPartitioner(graph.partitioner(), 3))); - System.out.println(g.V().order().by(outE().count(), Order.decr).toList()); - // 1406914 + System.out.println(g.V(1, 2).union(outE().count(), inE().count(), (Traversal) outE().values("weight").sum()).toList()); + //3, 1.9, 1 /*for (int i = 0; i < 10000; i++) { final Graph graph = TinkerGraph.open(); graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3035bbcf/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java index c76e71f..585b967 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TraverserMailbox.java @@ -55,11 +55,13 @@ public final class TraverserMailbox implements MailboxType, ProducesMessageQueue synchronized (MUTEX) { if (handle.message() instanceof Traverser.Admin) this.traverserMessages.offer((Traverser.Admin) handle.message()); - else if (handle.message() instanceof VoteToHaltMessage) + else if (handle.message() instanceof VoteToHaltMessage) { + assert null == this.haltMessages; this.haltMessages = handle; - else if (handle.message() instanceof WorkerTraversalActor.Terminate) + } else if (handle.message() instanceof WorkerTraversalActor.Terminate) { + assert null == this.terminateToken; this.terminateToken = handle; - else + } else this.otherMessages.offer(handle); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3035bbcf/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java index b6e90d5..da8a942 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java @@ -69,7 +69,7 @@ public final class WorkerTraversalActor extends AbstractActor implements Require private boolean isLeader; private Terminate terminate = null; private boolean voteToHalt = false; - private Set<Barrier> barriers = new HashSet<>(); + private Map<String,Barrier> barriers = new HashMap<>(); public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition localPartition, final Partitioner partitioner) { System.out.println("worker[created]: " + self().path()); @@ -129,7 +129,7 @@ public final class WorkerTraversalActor extends AbstractActor implements Require match(VoteToHaltMessage.class, haltSync -> { // if there is a barrier and thus, halting at barrier, then process barrier if (!this.barriers.isEmpty()) { - for (final Barrier barrier : this.barriers) { + for (final Barrier barrier : this.barriers.values()) { while (barrier.hasNextBarrier()) { master().tell(new BarrierAddMessage(barrier), self()); } @@ -160,7 +160,7 @@ public final class WorkerTraversalActor extends AbstractActor implements Require GraphComputing.atMaster(step, false); step.addStart(traverser); if (step instanceof Barrier) { - this.barriers.add((Barrier) step); + this.barriers.put(step.getId(),(Barrier) step); } else { while (step.hasNext()) { this.sendTraverser(step.next());