Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 8ef0897f9 -> 7725fc1f3
okay, so I have barrier locking correct, but if I run the 'test suite' 100 times, I sometimes get locking if I do a sideEffect then a barrier. Need to figure out why that is happening. Committing thus far. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/7725fc1f Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/7725fc1f Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/7725fc1f Branch: refs/heads/TINKERPOP-1564 Commit: 7725fc1f3507500ba900962a9b43a74d6c1edcb2 Parents: 8ef0897 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Thu Dec 8 12:50:24 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Dec 8 12:50:24 2016 -0700 ---------------------------------------------------------------------- .../process/akka/MasterTraversalActor.java | 63 +++++++++++--------- .../process/akka/TinkerActorSystem.java | 60 ++++++++++--------- .../process/akka/WorkerTraversalActor.java | 43 +++++++++---- .../process/akka/messages/StartMessage.java | 3 + .../akka/messages/VoteToContinueMessage.java | 34 +++++++++++ .../akka/messages/VoteToHaltMessage.java | 10 ++-- 6 files changed, 139 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java index 20d770a..6c68c68 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java @@ -42,11 +42,13 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage; import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage; import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage; +import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage; import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage; -import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -57,7 +59,7 @@ public final class MasterTraversalActor extends AbstractActor implements Require private final Traversal.Admin<?, ?> traversal; private final TraversalMatrix<?, ?> matrix; private final Partitioner partitioner; - private List<ActorSelection> workers; + private final Map<String, ActorSelection> workers = new HashMap<>(); private final Set<ActorPath> haltSynchronization = new HashSet<>(); private Barrier barrierLock = null; @@ -82,49 +84,50 @@ public final class MasterTraversalActor extends AbstractActor implements Require assert null == this.barrierLock || this.barrierLock == barrier; this.barrierLock = barrier; this.barrierLock.addBarrier(barrierMerge.getBarrier()); + this.haltSynchronization.remove(sender().path()); }). match(SideEffectAddMessage.class, sideEffect -> { this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), sideEffect.getSideEffectValue()); - //this.broadcast(new SideEffectAddMessage(sideEffect.getKey(), sideEffect.getValue())); + this.haltSynchronization.remove(sender().path()); }). - match(VoteToHaltMessage.class, haltSync -> { + match(VoteToContinueMessage.class, voteToContinue -> { + this.haltSynchronization.remove(sender().path()); + }). + match(VoteToHaltMessage.class, voteToHalt -> { + assert !sender().equals(self()); // receive vote to halt messages from worker // when all workers have voted to halt then terminate the system - if (haltSync.voteToHalt()) { - this.haltSynchronization.add(sender().path()); - if (this.haltSynchronization.size() == this.workers.size()) { - if (null != this.barrierLock) { - final Step<?, ?> step = (Step) this.barrierLock; - while (step.hasNext()) { - this.sendTraverser(step.next()); - } - // broadcast to all workers that the barrier is unlocked - this.broadcast(new BarrierDoneMessage(this.barrierLock)); - this.barrierLock = null; - this.haltSynchronization.clear(); - } else - context().system().terminate(); - } - } else - this.haltSynchronization.remove(sender().path()); - + this.haltSynchronization.add(sender().path()); + if (this.haltSynchronization.size() == this.workers.size()) { + if (null != this.barrierLock) { + final Step<?, ?> step = (Step) this.barrierLock; + while (step.hasNext()) { + this.sendTraverser(step.next()); + } + // broadcast to all workers that the barrier is unlocked + this.broadcast(new BarrierDoneMessage(this.barrierLock)); + this.barrierLock = null; + this.haltSynchronization.clear(); + } else + context().system().terminate(); + } }).build()); } private void initializeWorkers() { final List<Partition> partitions = this.partitioner.getPartitions(); - this.workers = new ArrayList<>(partitions.size()); for (final Partition partition : partitions) { - final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), "worker-" + partition.hashCode()); - this.workers.add(context().actorSelection(worker.path())); + final String workerPathString = "worker-" + partition.hashCode(); + final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), workerPathString); + this.workers.put(workerPathString, context().actorSelection(worker.path())); } - for (final ActorSelection worker : this.workers) { + for (final ActorSelection worker : this.workers.values()) { worker.tell(StartMessage.instance(), self()); } } private void broadcast(final Object message) { - for (final ActorSelection worker : this.workers) { + for (final ActorSelection worker : this.workers.values()) { worker.tell(message, self()); } } @@ -145,8 +148,10 @@ public final class MasterTraversalActor extends AbstractActor implements Require if (traverser.isHalted()) { System.out.println("master[result]: " + traverser); } else if (traverser.get() instanceof Element) { - final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get()); - context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser, self()); + final Partition partition = this.partitioner.getPartition((Element) traverser.get()); + final ActorRef worker = this.workers.get("worker-" + partition.hashCode()).anchor(); + this.haltSynchronization.remove(worker.path()); + worker.tell(traverser, self()); } else { self().tell(traverser, self()); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java index 6f83920..be7de68 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java @@ -53,37 +53,39 @@ public final class TinkerActorSystem { ////////////// public static void main(String args[]) throws Exception { - final Graph graph = TinkerGraph.open(); - graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); - final GraphTraversalSource g = graph.traversal().withComputer(); - final List<Traversal.Admin<?, ?>> traversals = Arrays.asList( - // match() works - g.V().match( - as("a").out("created").as("b"), - as("b").in("created").as("c"), - as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin(), - // side-effects work - g.V().repeat(both()).times(2). - groupCount("a").by("name"). - cap("a"). - unfold(). - order().by(Column.values, Order.decr).limit(3).asAdmin(), - // barriers work and beyond the local star graph works - g.V().repeat(both()).times(2).hasLabel("person"). - group(). - by("name"). - by(out("created").values("name").dedup().fold()).asAdmin(), - // no results works - g.V().out("blah").asAdmin() - ); - for (final Traversal.Admin<?, ?> traversal : traversals) { - System.out.println("EXECUTING: " + traversal.getBytecode()); - final TinkerActorSystem actors = new TinkerActorSystem(traversal.clone()); - while (!actors.system.isTerminated()) { + for(int i=0; i<100; i++) { + final Graph graph = TinkerGraph.open(); + graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo"); + final GraphTraversalSource g = graph.traversal().withComputer(); + final List<Traversal.Admin<?, ?>> traversals = Arrays.asList( + // match() works + g.V().match( + as("a").out("created").as("b"), + as("b").in("created").as("c"), + as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin(), + // side-effects work + /*g.V().repeat(both()).times(2). + groupCount("a").by("name"). + cap("a"). + unfold(). + order().by(Column.values, Order.decr).limit(3).asAdmin(),*/ + // barriers work and beyond the local star graph works + g.V().repeat(both()).times(2).hasLabel("person"). + group(). + by("name"). + by(out("created").values("name").dedup().fold()).asAdmin(), + // no results works + g.V().out("blah").asAdmin() + ); + for (final Traversal.Admin<?, ?> traversal : traversals) { + System.out.println("EXECUTING: " + traversal.getBytecode()); + final TinkerActorSystem actors = new TinkerActorSystem(traversal.clone()); + while (!actors.system.isTerminated()) { + } + //System.out.println(traversal.toList()); + System.out.println("//////////////////////////////////\n"); } - //System.out.println(traversal.toList()); - System.out.println("//////////////////////////////////\n"); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java index f280678..c718d13 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java @@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage; import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage; import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage; +import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage; import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage; import java.util.HashMap; @@ -72,40 +73,47 @@ public final class WorkerTraversalActor extends AbstractActor implements this.sendTraverser(step.next()); } // internal vote to have in mailbox as final message to process - self().tell(new VoteToHaltMessage(true), self()); + self().tell(VoteToHaltMessage.instance(), self()); + this.voteToHalt = false; }). match(Traverser.Admin.class, traverser -> { if (this.voteToHalt) { // tell master you no longer want to halt - master().tell(new VoteToHaltMessage(false), self()); + master().tell(VoteToContinueMessage.instance(), self()); // internal vote to have in mailbox as final message to process - self().tell(new VoteToHaltMessage(true), self()); + self().tell(VoteToHaltMessage.instance(), self()); this.voteToHalt = false; } this.processTraverser(traverser); }). match(SideEffectAddMessage.class, sideEffect -> { - // TODO: sideEffect.setSideEffect(this.matrix.getTraversal()); + // TODO }). match(BarrierDoneMessage.class, barrierSync -> { // barrier is complete and processing can continue if (null != this.barrierLock) { this.barrierLock.done(); this.barrierLock = null; - // internal vote to have in mailbox as final message to process - self().tell(new VoteToHaltMessage(true), self()); } + // internal vote to have in mailbox as final message to process + self().tell(VoteToHaltMessage.instance(), self()); + this.voteToHalt = false; }). match(VoteToHaltMessage.class, haltSync -> { + assert sender().equals(self()); + boolean hasBarrier = null != this.barrierLock && this.barrierLock.hasNextBarrier(); // if there is a barrier and thus, halting at barrier, then process barrier - if (null != this.barrierLock) { + if (hasBarrier) { while (this.barrierLock.hasNextBarrier()) { master().tell(new BarrierAddMessage(this.barrierLock), self()); } + self().tell(VoteToHaltMessage.instance(), self()); + this.voteToHalt = false; + } else if (!this.voteToHalt) { + // the final message in the worker mail box, tell master you are done processing messages + master().tell(VoteToHaltMessage.instance(), self()); + this.voteToHalt = true; } - // the final message in the worker mail box, tell master you are done processing messages - master().tell(new VoteToHaltMessage(true), self()); - this.voteToHalt = true; }).build() ); } @@ -140,6 +148,21 @@ public final class WorkerTraversalActor extends AbstractActor implements self().tell(traverser, self()); } + private void voteToHalt() { + if (!this.voteToHalt) { + master().tell(VoteToHaltMessage.instance(), self()); + this.voteToHalt = true; + } + } + + private void voteToContinue() { + if (this.voteToHalt) { + master().tell(VoteToContinueMessage.instance(), self()); + } + self().tell(VoteToHaltMessage.instance(), self()); + this.voteToHalt = false; + } + private ActorRef master() { return context().parent(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java index 7acb251..df064a4 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java @@ -26,6 +26,9 @@ public final class StartMessage { private static final StartMessage INSTANCE = new StartMessage(); + private StartMessage() { + } + public static StartMessage instance() { return INSTANCE; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java new file mode 100644 index 0000000..1faa7a0 --- /dev/null +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class VoteToContinueMessage { + + private static final VoteToContinueMessage INSTANCE = new VoteToContinueMessage(); + + private VoteToContinueMessage() {} + + public static VoteToContinueMessage instance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java index ca5929f..f7d8970 100644 --- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java +++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java @@ -24,13 +24,11 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages; */ public final class VoteToHaltMessage implements SynchronizationMessage { - private final boolean voteToHalt; + private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage(); - public VoteToHaltMessage(final boolean voteToHalt) { - this.voteToHalt = voteToHalt; - } + private VoteToHaltMessage() {} - public boolean voteToHalt() { - return this.voteToHalt; + public static VoteToHaltMessage instance() { + return INSTANCE; } }