Added TinkerWorkerMemory which will aggregate local to the current thread 
before propagated Memory to TinkerMemory. This reduces synchronization issues 
due all threads contending to mutate the master memory.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0db0991c
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0db0991c
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0db0991c

Branch: refs/heads/tp32
Commit: 0db0991cc092e33091068f59f81c27feb9712379
Parents: f352b01
Author: Marko A. Rodriguez <[email protected]>
Authored: Tue Jan 3 17:59:39 2017 -0700
Committer: Marko A. Rodriguez <[email protected]>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../gremlin/process/traversal/step/map/GroupStep.java        | 3 ++-
 .../tinkergraph/process/computer/TinkerGraphComputer.java    | 8 +++++---
 .../gremlin/tinkergraph/process/computer/TinkerMemory.java   | 2 +-
 3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0db0991c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index 7d80d69..de4e223 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -131,6 +131,7 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
             clone.keyTraversal = this.keyTraversal.clone();
         clone.valueTraversal = this.valueTraversal.clone();
         clone.preTraversal = (Traversal.Admin<S, ?>) 
GroupStep.generatePreTraversal(clone.valueTraversal);
+        clone.setReducingBiOperator(new 
GroupBiOperator<>(clone.valueTraversal));
         return clone;
     }
 
@@ -171,7 +172,7 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
                 this.valueTraversal = null;
                 this.barrierStep = null;
             } else {
-                this.valueTraversal = valueTraversal;
+                this.valueTraversal = valueTraversal.clone();
                 this.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
this.valueTraversal).orElse(null);
             }
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0db0991c/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
 
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index c333130..fef2e1a 100644
--- 
a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ 
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -169,7 +169,8 @@ public final class TinkerGraphComputer implements 
GraphComputer {
                         workers.setVertexProgram(this.vertexProgram);
                         final SynchronizedIterator<Vertex> vertices = new 
SynchronizedIterator<>(this.graph.vertices());
                         workers.executeVertexProgram(vertexProgram -> {
-                            
vertexProgram.workerIterationStart(this.memory.asImmutable());
+                            final TinkerWorkerMemory workerMemory = new 
TinkerWorkerMemory(this.memory);
+                            
vertexProgram.workerIterationStart(workerMemory.asImmutable());
                             while (true) {
                                 final Vertex vertex = vertices.next();
                                 if (Thread.interrupted()) throw new 
TraversalInterruptedException();
@@ -177,10 +178,11 @@ public final class TinkerGraphComputer implements 
GraphComputer {
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, 
vertexProgram),
                                         new TinkerMessenger<>(vertex, 
this.messageBoard, vertexProgram.getMessageCombiner()),
-                                        this.memory
+                                        workerMemory
                                 );
                             }
-                            
vertexProgram.workerIterationEnd(this.memory.asImmutable());
+                            
vertexProgram.workerIterationEnd(workerMemory.asImmutable());
+                            workerMemory.complete();
                         });
                         this.messageBoard.completeIteration();
                         this.memory.completeSubRound();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0db0991c/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
 
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
index 34144e3..1502d84 100644
--- 
a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
+++ 
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
@@ -138,7 +138,7 @@ public final class TinkerMemory implements Memory.Admin {
         return StringFactory.memoryString(this);
     }
 
-    private void checkKeyValue(final String key, final Object value) {
+    protected void checkKeyValue(final String key, final Object value) {
         if (!this.memoryKeys.containsKey(key))
             throw 
GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);

Reply via email to