Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 8ef0897f9 -> 7725fc1f3


okay, so I have barrier locking correct, but if I run the 'test suite' 100 
times, I sometimes get locking if I do a sideEffect then a barrier. Need to 
figure out why that is happening. Committing thus far.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/7725fc1f
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/7725fc1f
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/7725fc1f

Branch: refs/heads/TINKERPOP-1564
Commit: 7725fc1f3507500ba900962a9b43a74d6c1edcb2
Parents: 8ef0897
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Thu Dec 8 12:50:24 2016 -0700
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Thu Dec 8 12:50:24 2016 -0700

----------------------------------------------------------------------
 .../process/akka/MasterTraversalActor.java      | 63 +++++++++++---------
 .../process/akka/TinkerActorSystem.java         | 60 ++++++++++---------
 .../process/akka/WorkerTraversalActor.java      | 43 +++++++++----
 .../process/akka/messages/StartMessage.java     |  3 +
 .../akka/messages/VoteToContinueMessage.java    | 34 +++++++++++
 .../akka/messages/VoteToHaltMessage.java        | 10 ++--
 6 files changed, 139 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 20d770a..6c68c68 100644
--- 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -42,11 +42,13 @@ import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
+import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage;
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
 
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -57,7 +59,7 @@ public final class MasterTraversalActor extends AbstractActor 
implements Require
     private final Traversal.Admin<?, ?> traversal;
     private final TraversalMatrix<?, ?> matrix;
     private final Partitioner partitioner;
-    private List<ActorSelection> workers;
+    private final Map<String, ActorSelection> workers = new HashMap<>();
     private final Set<ActorPath> haltSynchronization = new HashSet<>();
     private Barrier barrierLock = null;
 
@@ -82,49 +84,50 @@ public final class MasterTraversalActor extends 
AbstractActor implements Require
                     assert null == this.barrierLock || this.barrierLock == 
barrier;
                     this.barrierLock = barrier;
                     this.barrierLock.addBarrier(barrierMerge.getBarrier());
+                    this.haltSynchronization.remove(sender().path());
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
                     
this.traversal.getSideEffects().add(sideEffect.getSideEffectKey(), 
sideEffect.getSideEffectValue());
-                    //this.broadcast(new 
SideEffectAddMessage(sideEffect.getKey(), sideEffect.getValue()));
+                    this.haltSynchronization.remove(sender().path());
                 }).
-                match(VoteToHaltMessage.class, haltSync -> {
+                match(VoteToContinueMessage.class, voteToContinue -> {
+                    this.haltSynchronization.remove(sender().path());
+                }).
+                match(VoteToHaltMessage.class, voteToHalt -> {
+                    assert !sender().equals(self());
                     // receive vote to halt messages from worker
                     // when all workers  have voted to halt then terminate the 
system
-                    if (haltSync.voteToHalt()) {
-                        this.haltSynchronization.add(sender().path());
-                        if (this.haltSynchronization.size() == 
this.workers.size()) {
-                            if (null != this.barrierLock) {
-                                final Step<?, ?> step = (Step) 
this.barrierLock;
-                                while (step.hasNext()) {
-                                    this.sendTraverser(step.next());
-                                }
-                                // broadcast to all workers that the barrier 
is unlocked
-                                this.broadcast(new 
BarrierDoneMessage(this.barrierLock));
-                                this.barrierLock = null;
-                                this.haltSynchronization.clear();
-                            } else
-                                context().system().terminate();
-                        }
-                    } else
-                        this.haltSynchronization.remove(sender().path());
-
+                    this.haltSynchronization.add(sender().path());
+                    if (this.haltSynchronization.size() == 
this.workers.size()) {
+                        if (null != this.barrierLock) {
+                            final Step<?, ?> step = (Step) this.barrierLock;
+                            while (step.hasNext()) {
+                                this.sendTraverser(step.next());
+                            }
+                            // broadcast to all workers that the barrier is 
unlocked
+                            this.broadcast(new 
BarrierDoneMessage(this.barrierLock));
+                            this.barrierLock = null;
+                            this.haltSynchronization.clear();
+                        } else
+                            context().system().terminate();
+                    }
                 }).build());
     }
 
     private void initializeWorkers() {
         final List<Partition> partitions = this.partitioner.getPartitions();
-        this.workers = new ArrayList<>(partitions.size());
         for (final Partition partition : partitions) {
-            final ActorRef worker = 
context().actorOf(Props.create(WorkerTraversalActor.class, 
this.traversal.clone(), partition, this.partitioner), "worker-" + 
partition.hashCode());
-            this.workers.add(context().actorSelection(worker.path()));
+            final String workerPathString = "worker-" + partition.hashCode();
+            final ActorRef worker = 
context().actorOf(Props.create(WorkerTraversalActor.class, 
this.traversal.clone(), partition, this.partitioner), workerPathString);
+            this.workers.put(workerPathString, 
context().actorSelection(worker.path()));
         }
-        for (final ActorSelection worker : this.workers) {
+        for (final ActorSelection worker : this.workers.values()) {
             worker.tell(StartMessage.instance(), self());
         }
     }
 
     private void broadcast(final Object message) {
-        for (final ActorSelection worker : this.workers) {
+        for (final ActorSelection worker : this.workers.values()) {
             worker.tell(message, self());
         }
     }
@@ -145,8 +148,10 @@ public final class MasterTraversalActor extends 
AbstractActor implements Require
         if (traverser.isHalted()) {
             System.out.println("master[result]: " + traverser);
         } else if (traverser.get() instanceof Element) {
-            final Partition otherPartition = 
this.partitioner.getPartition((Element) traverser.get());
-            context().actorSelection("worker-" + 
otherPartition.hashCode()).tell(traverser, self());
+            final Partition partition = 
this.partitioner.getPartition((Element) traverser.get());
+            final ActorRef worker = this.workers.get("worker-" + 
partition.hashCode()).anchor();
+            this.haltSynchronization.remove(worker.path());
+            worker.tell(traverser, self());
         } else {
             self().tell(traverser, self());
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index 6f83920..be7de68 100644
--- 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -53,37 +53,39 @@ public final class TinkerActorSystem {
     //////////////
 
     public static void main(String args[]) throws Exception {
-        final Graph graph = TinkerGraph.open();
-        graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
-        final GraphTraversalSource g = graph.traversal().withComputer();
-        final List<Traversal.Admin<?, ?>> traversals = Arrays.asList(
-                // match() works
-                g.V().match(
-                        as("a").out("created").as("b"),
-                        as("b").in("created").as("c"),
-                        as("b").has("name", P.eq("lop"))).where("a", 
P.neq("c")).select("a", "b", "c").by("name").asAdmin(),
-                // side-effects work
-                g.V().repeat(both()).times(2).
-                        groupCount("a").by("name").
-                        cap("a").
-                        unfold().
-                        order().by(Column.values, 
Order.decr).limit(3).asAdmin(),
-                // barriers work and beyond the local star graph works
-                g.V().repeat(both()).times(2).hasLabel("person").
-                        group().
-                        by("name").
-                        
by(out("created").values("name").dedup().fold()).asAdmin(),
-                // no results works
-                g.V().out("blah").asAdmin()
-        );
-        for (final Traversal.Admin<?, ?> traversal : traversals) {
-            System.out.println("EXECUTING: " + traversal.getBytecode());
-            final TinkerActorSystem actors = new 
TinkerActorSystem(traversal.clone());
-            while (!actors.system.isTerminated()) {
+        for(int i=0; i<100; i++) {
+            final Graph graph = TinkerGraph.open();
+            graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+            final GraphTraversalSource g = graph.traversal().withComputer();
+            final List<Traversal.Admin<?, ?>> traversals = Arrays.asList(
+                    // match() works
+                    g.V().match(
+                            as("a").out("created").as("b"),
+                            as("b").in("created").as("c"),
+                            as("b").has("name", P.eq("lop"))).where("a", 
P.neq("c")).select("a", "b", "c").by("name").asAdmin(),
+                    // side-effects work
+                    /*g.V().repeat(both()).times(2).
+                            groupCount("a").by("name").
+                            cap("a").
+                            unfold().
+                            order().by(Column.values, 
Order.decr).limit(3).asAdmin(),*/
+                    // barriers work and beyond the local star graph works
+                    g.V().repeat(both()).times(2).hasLabel("person").
+                            group().
+                            by("name").
+                            
by(out("created").values("name").dedup().fold()).asAdmin(),
+                    // no results works
+                    g.V().out("blah").asAdmin()
+            );
+            for (final Traversal.Admin<?, ?> traversal : traversals) {
+                System.out.println("EXECUTING: " + traversal.getBytecode());
+                final TinkerActorSystem actors = new 
TinkerActorSystem(traversal.clone());
+                while (!actors.system.isTerminated()) {
 
+                }
+                //System.out.println(traversal.toList());
+                System.out.println("//////////////////////////////////\n");
             }
-            //System.out.println(traversal.toList());
-            System.out.println("//////////////////////////////////\n");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index f280678..c718d13 100644
--- 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -37,6 +37,7 @@ import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierAdd
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierDoneMessage;
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectAddMessage;
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.StartMessage;
+import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToContinueMessage;
 import 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.VoteToHaltMessage;
 
 import java.util.HashMap;
@@ -72,40 +73,47 @@ public final class WorkerTraversalActor extends 
AbstractActor implements
                         this.sendTraverser(step.next());
                     }
                     // internal vote to have in mailbox as final message to 
process
-                    self().tell(new VoteToHaltMessage(true), self());
+                    self().tell(VoteToHaltMessage.instance(), self());
+                    this.voteToHalt = false;
                 }).
                 match(Traverser.Admin.class, traverser -> {
                     if (this.voteToHalt) {
                         // tell master you no longer want to halt
-                        master().tell(new VoteToHaltMessage(false), self());
+                        master().tell(VoteToContinueMessage.instance(), 
self());
                         // internal vote to have in mailbox as final message 
to process
-                        self().tell(new VoteToHaltMessage(true), self());
+                        self().tell(VoteToHaltMessage.instance(), self());
                         this.voteToHalt = false;
                     }
                     this.processTraverser(traverser);
                 }).
                 match(SideEffectAddMessage.class, sideEffect -> {
-                    // TODO: 
sideEffect.setSideEffect(this.matrix.getTraversal());
+                  // TODO
                 }).
                 match(BarrierDoneMessage.class, barrierSync -> {
                     // barrier is complete and processing can continue
                     if (null != this.barrierLock) {
                         this.barrierLock.done();
                         this.barrierLock = null;
-                        // internal vote to have in mailbox as final message 
to process
-                        self().tell(new VoteToHaltMessage(true), self());
                     }
+                    // internal vote to have in mailbox as final message to 
process
+                    self().tell(VoteToHaltMessage.instance(), self());
+                    this.voteToHalt = false;
                 }).
                 match(VoteToHaltMessage.class, haltSync -> {
+                    assert sender().equals(self());
+                    boolean hasBarrier = null != this.barrierLock && 
this.barrierLock.hasNextBarrier();
                     // if there is a barrier and thus, halting at barrier, 
then process barrier
-                    if (null != this.barrierLock) {
+                    if (hasBarrier) {
                         while (this.barrierLock.hasNextBarrier()) {
                             master().tell(new 
BarrierAddMessage(this.barrierLock), self());
                         }
+                        self().tell(VoteToHaltMessage.instance(), self());
+                        this.voteToHalt = false;
+                    } else if (!this.voteToHalt) {
+                        // the final message in the worker mail box, tell 
master you are done processing messages
+                        master().tell(VoteToHaltMessage.instance(), self());
+                        this.voteToHalt = true;
                     }
-                    // the final message in the worker mail box, tell master 
you are done processing messages
-                    master().tell(new VoteToHaltMessage(true), self());
-                    this.voteToHalt = true;
                 }).build()
         );
     }
@@ -140,6 +148,21 @@ public final class WorkerTraversalActor extends 
AbstractActor implements
             self().tell(traverser, self());
     }
 
+    private void voteToHalt() {
+        if (!this.voteToHalt) {
+            master().tell(VoteToHaltMessage.instance(), self());
+            this.voteToHalt = true;
+        }
+    }
+
+    private void voteToContinue() {
+        if (this.voteToHalt) {
+            master().tell(VoteToContinueMessage.instance(), self());
+        }
+        self().tell(VoteToHaltMessage.instance(), self());
+        this.voteToHalt = false;
+    }
+
     private ActorRef master() {
         return context().parent();
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
index 7acb251..df064a4 100644
--- 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/StartMessage.java
@@ -26,6 +26,9 @@ public final class StartMessage {
 
     private static final StartMessage INSTANCE = new StartMessage();
 
+    private StartMessage() {
+    }
+
     public static StartMessage instance() {
         return INSTANCE;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
new file mode 100644
index 0000000..1faa7a0
--- /dev/null
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToContinueMessage.java
@@ -0,0 +1,34 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class VoteToContinueMessage {
+
+    private static final VoteToContinueMessage INSTANCE = new 
VoteToContinueMessage();
+
+    private VoteToContinueMessage() {}
+
+    public static VoteToContinueMessage instance() {
+        return INSTANCE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/7725fc1f/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
index ca5929f..f7d8970 100644
--- 
a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
+++ 
b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/VoteToHaltMessage.java
@@ -24,13 +24,11 @@ package 
org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
  */
 public final class VoteToHaltMessage implements SynchronizationMessage {
 
-    private final boolean voteToHalt;
+    private static final VoteToHaltMessage INSTANCE = new VoteToHaltMessage();
 
-    public VoteToHaltMessage(final boolean voteToHalt) {
-        this.voteToHalt = voteToHalt;
-    }
+    private VoteToHaltMessage() {}
 
-    public boolean voteToHalt() {
-        return this.voteToHalt;
+    public static VoteToHaltMessage instance() {
+        return INSTANCE;
     }
 }

Reply via email to