Made is so MemoryComputeKey implements Cloneable. This is actually really important we have NOT been cloning the BiOperators of OrderGlobalStep and GroupStep. We have just been 'getting lucky' in that Spark and Giraph use Serialization and thus we get a clone for free. However, for parallelization within a JVM, we woulld have issues except we never realized because we had a single global Memory for TinkerGraph. Now we don't and clone()ing bi operators works.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8deca706 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8deca706 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8deca706 Branch: refs/heads/TINKERPOP-1585 Commit: 8deca70680e8c89b31fea1cc99300740f45eec56 Parents: 1ac003d Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Jan 4 04:55:12 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 5 16:59:45 2017 -0700 ---------------------------------------------------------------------- .../process/computer/MemoryComputeKey.java | 24 ++++++++++++++++++-- .../process/traversal/step/map/GroupStep.java | 18 +++++++++++++-- .../traversal/step/map/OrderGlobalStep.java | 13 ++++++++++- .../util/function/ChainedComparator.java | 18 +++++++++++++-- .../process/computer/TinkerWorkerMemory.java | 9 +------- 5 files changed, 67 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java index 94ca675..70adf3d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java @@ -22,6 +22,8 @@ package org.apache.tinkerpop.gremlin.process.computer; import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.function.BinaryOperator; /** @@ -32,10 +34,10 @@ import java.util.function.BinaryOperator; * * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class MemoryComputeKey<A> implements Serializable { +public final class MemoryComputeKey<A> implements Serializable, Cloneable { private final String key; - private final BinaryOperator<A> reducer; + private BinaryOperator<A> reducer; private final boolean isTransient; private final boolean isBroadcast; @@ -73,7 +75,25 @@ public final class MemoryComputeKey<A> implements Serializable { return object instanceof MemoryComputeKey && ((MemoryComputeKey) object).key.equals(this.key); } + @Override + public MemoryComputeKey<A> clone() { + try { + final MemoryComputeKey<A> clone = (MemoryComputeKey<A>) super.clone(); + for (final Method method : this.reducer.getClass().getMethods()) { + if (method.getName().equals("clone") && 0 == method.getParameterCount()) { + clone.reducer = (BinaryOperator<A>) method.invoke(this.reducer); + break; + } + } + return clone; + } catch (final IllegalAccessException | InvocationTargetException | CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + public static <A> MemoryComputeKey<A> of(final String key, final BinaryOperator<A> reducer, final boolean isBroadcast, final boolean isTransient) { return new MemoryComputeKey<>(key, reducer, isBroadcast, isTransient); } + + } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java index de4e223..d6ce421 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java @@ -95,7 +95,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> final TraverserSet traverserSet = new TraverserSet<>(); this.preTraversal.reset(); this.preTraversal.addStart(traverser); - while(this.preTraversal.hasNext()) { + while (this.preTraversal.hasNext()) { traverserSet.add(this.preTraversal.nextTraverser()); } map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet); @@ -158,7 +158,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> /////////////////////// - public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable { + public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable, Cloneable { // size limit before Barrier.processAllStarts() to lazy reduce private static final int SIZE_LIMIT = 1000; @@ -182,6 +182,20 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> } @Override + public GroupBiOperator<K, V> clone() { + try { + final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) super.clone(); + if (null != this.valueTraversal) { + clone.valueTraversal = this.valueTraversal.clone(); + clone.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal).orElse(null); + } + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB) { for (final K key : mapB.keySet()) { Object objectA = mapA.get(key); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java index 60be2d6..a7d21b2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java @@ -144,7 +144,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa //////////////// - public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable { + public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable, Cloneable { private ChainedComparator chainedComparator; private long limit; @@ -159,6 +159,17 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa } @Override + public OrderBiOperator<S> clone() { + try { + final OrderBiOperator<S> clone = (OrderBiOperator<S>) super.clone(); + clone.chainedComparator = this.chainedComparator.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) { setA.addAll(setB); if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java index 44a994b..bdb2e6d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java @@ -34,9 +34,9 @@ import java.util.stream.Collectors; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>, Serializable { +public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>, Serializable, Cloneable { - private final List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new ArrayList<>(); + private List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new ArrayList<>(); private final boolean isShuffle; private final boolean traversers; @@ -66,4 +66,18 @@ public final class ChainedComparator<S, C extends Comparable> implements Compara } return 0; } + + @Override + public ChainedComparator<S, C> clone() { + try { + final ChainedComparator<S, C> clone = (ChainedComparator<S, C>) super.clone(); + clone.comparators = new ArrayList<>(); + for (final Pair<Traversal.Admin<S, C>, Comparator<C>> comparator : this.comparators) { + clone.comparators.add(new Pair<>(comparator.getValue0().clone(), comparator.getValue1())); + } + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java index 081e4fa..1afa27e 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java @@ -21,9 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer; import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey; -import org.apache.tinkerpop.gremlin.util.Serializer; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -41,12 +39,7 @@ public final class TinkerWorkerMemory implements Memory.Admin { public TinkerWorkerMemory(final TinkerMemory mainMemory) { this.mainMemory = mainMemory; for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) { - try { - final MemoryComputeKey clone = Serializer.cloneObject(key); - this.reducers.put(clone.getKey(), clone.getReducer()); - } catch (final IOException | ClassNotFoundException e) { - throw new IllegalStateException(e.getMessage(), e); - } + this.reducers.put(key.getKey(), key.clone().getReducer()); } }