Made is so MemoryComputeKey implements Cloneable. This is actually really 
important we have NOT been cloning the BiOperators of OrderGlobalStep and 
GroupStep. We have just been 'getting lucky' in that Spark and Giraph use 
Serialization and thus we get a clone for free. However, for parallelization 
within a JVM, we woulld have issues except we never realized because we had a 
single global Memory for TinkerGraph. Now we don't and clone()ing bi operators 
works.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8deca706
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8deca706
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8deca706

Branch: refs/heads/TINKERPOP-1585
Commit: 8deca70680e8c89b31fea1cc99300740f45eec56
Parents: 1ac003d
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Wed Jan 4 04:55:12 2017 -0700
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../process/computer/MemoryComputeKey.java      | 24 ++++++++++++++++++--
 .../process/traversal/step/map/GroupStep.java   | 18 +++++++++++++--
 .../traversal/step/map/OrderGlobalStep.java     | 13 ++++++++++-
 .../util/function/ChainedComparator.java        | 18 +++++++++++++--
 .../process/computer/TinkerWorkerMemory.java    |  9 +-------
 5 files changed, 67 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
index 94ca675..70adf3d 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
@@ -22,6 +22,8 @@ package org.apache.tinkerpop.gremlin.process.computer;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.function.BinaryOperator;
 
 /**
@@ -32,10 +34,10 @@ import java.util.function.BinaryOperator;
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryComputeKey<A> implements Serializable {
+public final class MemoryComputeKey<A> implements Serializable, Cloneable {
 
     private final String key;
-    private final BinaryOperator<A> reducer;
+    private BinaryOperator<A> reducer;
     private final boolean isTransient;
     private final boolean isBroadcast;
 
@@ -73,7 +75,25 @@ public final class MemoryComputeKey<A> implements 
Serializable {
         return object instanceof MemoryComputeKey && ((MemoryComputeKey) 
object).key.equals(this.key);
     }
 
+    @Override
+    public MemoryComputeKey<A> clone() {
+        try {
+            final MemoryComputeKey<A> clone = (MemoryComputeKey<A>) 
super.clone();
+            for (final Method method : this.reducer.getClass().getMethods()) {
+                if (method.getName().equals("clone") && 0 == 
method.getParameterCount()) {
+                    clone.reducer = (BinaryOperator<A>) 
method.invoke(this.reducer);
+                    break;
+                }
+            }
+            return clone;
+        } catch (final IllegalAccessException | InvocationTargetException | 
CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
     public static <A> MemoryComputeKey<A> of(final String key, final 
BinaryOperator<A> reducer, final boolean isBroadcast, final boolean 
isTransient) {
         return new MemoryComputeKey<>(key, reducer, isBroadcast, isTransient);
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index de4e223..d6ce421 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -95,7 +95,7 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
             final TraverserSet traverserSet = new TraverserSet<>();
             this.preTraversal.reset();
             this.preTraversal.addStart(traverser);
-            while(this.preTraversal.hasNext()) {
+            while (this.preTraversal.hasNext()) {
                 traverserSet.add(this.preTraversal.nextTraverser());
             }
             map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), 
(V) traverserSet);
@@ -158,7 +158,7 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
 
     ///////////////////////
 
-    public static final class GroupBiOperator<K, V> implements 
BinaryOperator<Map<K, V>>, Serializable {
+    public static final class GroupBiOperator<K, V> implements 
BinaryOperator<Map<K, V>>, Serializable, Cloneable {
 
         // size limit before Barrier.processAllStarts() to lazy reduce
         private static final int SIZE_LIMIT = 1000;
@@ -182,6 +182,20 @@ public final class GroupStep<S, K, V> extends 
ReducingBarrierStep<S, Map<K, V>>
         }
 
         @Override
+        public GroupBiOperator<K, V> clone() {
+            try {
+                final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) 
super.clone();
+                if (null != this.valueTraversal) {
+                    clone.valueTraversal = this.valueTraversal.clone();
+                    clone.barrierStep = 
TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, 
clone.valueTraversal).orElse(null);
+                }
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB) {
             for (final K key : mapB.keySet()) {
                 Object objectA = mapA.get(key);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
index 60be2d6..a7d21b2 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
@@ -144,7 +144,7 @@ public final class OrderGlobalStep<S, C extends Comparable> 
extends CollectingBa
 
     ////////////////
 
-    public static final class OrderBiOperator<S> implements 
BinaryOperator<TraverserSet<S>>, Serializable {
+    public static final class OrderBiOperator<S> implements 
BinaryOperator<TraverserSet<S>>, Serializable, Cloneable {
 
         private ChainedComparator chainedComparator;
         private long limit;
@@ -159,6 +159,17 @@ public final class OrderGlobalStep<S, C extends 
Comparable> extends CollectingBa
         }
 
         @Override
+        public OrderBiOperator<S> clone() {
+            try {
+                final OrderBiOperator<S> clone = (OrderBiOperator<S>) 
super.clone();
+                clone.chainedComparator = this.chainedComparator.clone();
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public TraverserSet<S> apply(final TraverserSet<S> setA, final 
TraverserSet<S> setB) {
             setA.addAll(setB);
             if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
index 44a994b..bdb2e6d 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
@@ -34,9 +34,9 @@ import java.util.stream.Collectors;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class ChainedComparator<S, C extends Comparable> implements 
Comparator<S>, Serializable {
+public final class ChainedComparator<S, C extends Comparable> implements 
Comparator<S>, Serializable, Cloneable {
 
-    private final List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators 
= new ArrayList<>();
+    private List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new 
ArrayList<>();
     private final boolean isShuffle;
     private final boolean traversers;
 
@@ -66,4 +66,18 @@ public final class ChainedComparator<S, C extends 
Comparable> implements Compara
         }
         return 0;
     }
+
+    @Override
+    public ChainedComparator<S, C> clone() {
+        try {
+            final ChainedComparator<S, C> clone = (ChainedComparator<S, C>) 
super.clone();
+            clone.comparators = new ArrayList<>();
+            for (final Pair<Traversal.Admin<S, C>, Comparator<C>> comparator : 
this.comparators) {
+                clone.comparators.add(new 
Pair<>(comparator.getValue0().clone(), comparator.getValue1()));
+            }
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git 
a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
 
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
index 081e4fa..1afa27e 100644
--- 
a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
+++ 
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -21,9 +21,7 @@ package 
org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
-import org.apache.tinkerpop.gremlin.util.Serializer;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -41,12 +39,7 @@ public final class TinkerWorkerMemory implements 
Memory.Admin {
     public TinkerWorkerMemory(final TinkerMemory mainMemory) {
         this.mainMemory = mainMemory;
         for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) 
{
-            try {
-                final MemoryComputeKey clone = Serializer.cloneObject(key);
-                this.reducers.put(clone.getKey(), clone.getReducer());
-            } catch (final IOException | ClassNotFoundException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+            this.reducers.put(key.getKey(), key.clone().getReducer());
         }
     }
 

Reply via email to