have DedupGlobalStep as optimized as I can get it. I think BloomFilters are the next thing. Also, detachment factory stuff will help reduce barrier sizes. Found a bug in SparkInterceptorStrategyTest. Fixed.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/e9258086 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/e9258086 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/e9258086 Branch: refs/heads/tp32 Commit: e9258086441d013e9961d81473b275054d41d8cc Parents: ba39074 Author: Marko A. Rodriguez <[email protected]> Authored: Thu Jan 5 13:29:32 2017 -0700 Committer: Marko A. Rodriguez <[email protected]> Committed: Thu Jan 5 17:00:17 2017 -0700 ---------------------------------------------------------------------- .../traversal/step/filter/DedupGlobalStep.java | 70 ++++++++++---------- .../SparkInterceptorStrategyTest.java | 2 +- 2 files changed, 36 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e9258086/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java index a4c1b6a..d024456 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java @@ -40,6 +40,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -57,8 +58,8 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal private final Set<String> dedupLabels; private Set<String> keepLabels; private boolean executingAtMaster = false; - private boolean barrierAdded = false; private Map<Object, Traverser.Admin<S>> barrier; + private Iterator<Map.Entry<Object, Traverser.Admin<S>>> barrierIterator; public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) { super(traversal); @@ -67,8 +68,8 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal @Override protected boolean filter(final Traverser.Admin<S> traverser) { - if (this.onGraphComputer && (!this.executingAtMaster || this.barrierAdded)) return true; - traverser.setBulk(1); + if (this.onGraphComputer && !this.executingAtMaster) return true; + traverser.setBulk(1L); if (null == this.dedupLabels) { return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal)); } else { @@ -92,13 +93,16 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal @Override protected Traverser.Admin<S> processNextStart() { if (null != this.barrier) { - for (final Map.Entry<Object, Traverser.Admin<S>> entry : this.barrier.entrySet()) { - if (this.duplicateSet.add(entry.getKey())) - this.starts.add(entry.getValue()); - } - this.barrierAdded = true; + this.barrierIterator = this.barrier.entrySet().iterator(); this.barrier = null; } + while (this.barrierIterator != null && this.barrierIterator.hasNext()) { + if (null == this.barrierIterator) + this.barrierIterator = this.barrier.entrySet().iterator(); + final Map.Entry<Object, Traverser.Admin<S>> entry = this.barrierIterator.next(); + if (this.duplicateSet.add(entry.getKey())) + return PathProcessor.processTraverserPathLabels(entry.getValue(), this.keepLabels); + } return PathProcessor.processTraverserPathLabels(super.processNextStart(), this.keepLabels); } @@ -141,8 +145,8 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal public void reset() { super.reset(); this.duplicateSet.clear(); - this.barrierAdded = false; this.barrier = null; + this.barrierIterator = null; } @Override @@ -179,35 +183,31 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal @Override public Map<Object, Traverser.Admin<S>> nextBarrier() throws NoSuchElementException { - if (null != this.barrier) { - final Map<Object, Traverser.Admin<S>> tempBarrier = this.barrier; - this.barrier = null; - this.barrierAdded = false; - return tempBarrier; - } else { - final Map<Object, Traverser.Admin<S>> map = new HashMap<>(); - while (this.starts.hasNext()) { - final Traverser.Admin<S> traverser = this.starts.next(); - final Object object; - if (null != this.dedupLabels) { - object = new ArrayList<>(this.dedupLabels.size()); - for (final String label : this.dedupLabels) { - ((List) object).add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal)); - } - } else { - object = TraversalUtil.applyNullable(traverser, this.dedupTraversal); - } - if (!map.containsKey(object)) { - traverser.setBulk(1L); - traverser.set(DetachedFactory.detach(traverser.get(), true)); - map.put(object, traverser); + final Map<Object, Traverser.Admin<S>> map = null != this.barrier ? this.barrier : new HashMap<>(); + while (this.starts.hasNext()) { + final Traverser.Admin<S> traverser = this.starts.next(); + final Object object; + if (null != this.dedupLabels) { + object = new ArrayList<>(this.dedupLabels.size()); + for (final String label : this.dedupLabels) { + ((List) object).add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal)); } + } else { + object = TraversalUtil.applyNullable(traverser, this.dedupTraversal); + } + if (!map.containsKey(object)) { + traverser.setBulk(1L); + // traverser.detach(); + traverser.set(DetachedFactory.detach(traverser.get(), true)); // TODO: detect required detachment accordingly + map.put(object, traverser); } - if (map.isEmpty()) - throw FastNoSuchElementException.instance(); - else - return map; } + this.barrier = null; + this.barrierIterator = null; + if (map.isEmpty()) + throw FastNoSuchElementException.instance(); + else + return map; } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e9258086/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java index 24e3663..a53b3bd 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java @@ -142,7 +142,7 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest { test(6l, g.V().out().values("name").count()); test(2l, g.V().out("knows").values("name").count()); test(3l, g.V().in().has("name", "marko").count()); - test(6l, g.V().repeat(__.dedup()).times(2).count()); + test(0l, g.V().repeat(__.dedup()).times(2).count()); test(6l, g.V().dedup().count()); test(4l, g.V().hasLabel("person").order().by("age").count()); test(1l, g.V().count().count());
