Created OrdertedTraverser which allows us to move beyond the star graph for 
OrderGlobalStep by()-projections. Moreover, it reduces the complexity of 
ordering as the objects of comparison are already determined. Going to 
generalize fully to a ProjectedTraverser which will allow us to do the same for 
SampleGlobalStep, DedupGlobalStep, and down the road maintain order even as the 
computation is re-distributed to workers.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/501e97a1
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/501e97a1
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/501e97a1

Branch: refs/heads/TINKERPOP-1602
Commit: 501e97a1ecb23f76b2fddba8eaed1dba4a5a839e
Parents: e80a4cd
Author: Marko A. Rodriguez <okramma...@gmail.com>
Authored: Wed Jan 18 09:08:24 2017 -0700
Committer: Marko A. Rodriguez <okramma...@gmail.com>
Committed: Wed Jan 18 09:08:24 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   1 +
 .../traversal/step/map/OrderGlobalStep.java     |  61 ++---
 .../step/util/CollectingBarrierStep.java        |   4 +-
 .../ComputerVerificationStrategy.java           |   5 +-
 .../traversal/traverser/OrderedTraverser.java   | 235 +++++++++++++++++++
 .../gremlin/structure/io/gryo/GryoVersion.java  |   4 +-
 6 files changed, 277 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 4f3f9ce..86c6b4f 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* `OrderGlobalStep` now emits traversers with their `by()`-projections and 
thus, can move beyond the local star graph.
 * SASL negotiation supports both a byte array and Base64 encoded bytes as a 
string for authentication to Gremlin Server.
 * Deprecated `TinkerIoRegistry` replacing it with the more consistently named 
`TinkerIoRegistryV1d0`.
 * Made error messaging more consistent during result iteration timeouts in 
Gremlin Server.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
index a7d21b2..ac5df90 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
@@ -27,10 +27,10 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ComparatorHolder;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.OrderedTraverser;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.util.function.ChainedComparator;
 import org.javatuples.Pair;
 
 import java.io.Serializable;
@@ -49,8 +49,8 @@ import java.util.stream.Collectors;
 public final class OrderGlobalStep<S, C extends Comparable> extends 
CollectingBarrierStep<S> implements ComparatorHolder<S, C>, TraversalParent, 
ByModulating {
 
     private List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new 
ArrayList<>();
-    private ChainedComparator<S, C> chainedComparator = null;
     private long limit = Long.MAX_VALUE;
+    private boolean isShuffle = false;
 
     public OrderGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
@@ -58,12 +58,30 @@ public final class OrderGlobalStep<S, C extends Comparable> 
extends CollectingBa
 
     @Override
     public void barrierConsumer(final TraverserSet<S> traverserSet) {
-        if (null == this.chainedComparator)
-            this.chainedComparator = new ChainedComparator<>(true, 
this.comparators);
-        if (this.chainedComparator.isShuffle())
+        if (this.isShuffle)
             traverserSet.shuffle();
         else
-            traverserSet.sort((Comparator) this.chainedComparator);
+            traverserSet.sort(Comparator.naturalOrder());
+    }
+
+    @Override
+    public void processAllStarts() {
+        if (this.starts.hasNext()) {
+            while (this.starts.hasNext()) {
+                this.traverserSet.add(new 
OrderedTraverser<S>(this.starts.next(), (List) this.comparators));
+            }
+            this.barrierConsumer(this.traverserSet);
+        }
+    }
+
+    @Override
+    public Traverser.Admin<S> processNextStart() {
+        if (!this.traverserSet.isEmpty()) {
+            return this.traverserSet.remove();
+        } else if (this.starts.hasNext()) {
+            this.processAllStarts();
+        }
+        return ((OrderedTraverser) this.traverserSet.remove()).getInternal();
     }
 
     public void setLimit(final long limit) {
@@ -76,6 +94,7 @@ public final class OrderGlobalStep<S, C extends Comparable> 
extends CollectingBa
 
     @Override
     public void addComparator(final Traversal.Admin<S, C> traversal, final 
Comparator<C> comparator) {
+        this.isShuffle = Order.shuffle == (Comparator) comparator;
         this.comparators.add(new Pair<>(this.integrateChild(traversal), 
comparator));
     }
 
@@ -125,7 +144,6 @@ public final class OrderGlobalStep<S, C extends Comparable> 
extends CollectingBa
         for (final Pair<Traversal.Admin<S, C>, Comparator<C>> comparator : 
this.comparators) {
             clone.comparators.add(new Pair<>(comparator.getValue0().clone(), 
comparator.getValue1()));
         }
-        clone.chainedComparator = null;
         return clone;
     }
 
@@ -137,47 +155,34 @@ public final class OrderGlobalStep<S, C extends 
Comparable> extends CollectingBa
 
     @Override
     public MemoryComputeKey<TraverserSet<S>> getMemoryComputeKey() {
-        if (null == this.chainedComparator)
-            this.chainedComparator = new ChainedComparator<>(true, 
this.comparators);
-        return MemoryComputeKey.of(this.getId(), new 
OrderBiOperator<>(this.chainedComparator, this.limit), false, true);
+        return MemoryComputeKey.of(this.getId(), new 
OrderBiOperator<>(this.limit, this.isShuffle), false, true);
     }
 
     ////////////////
 
-    public static final class OrderBiOperator<S> implements 
BinaryOperator<TraverserSet<S>>, Serializable, Cloneable {
+    public static final class OrderBiOperator<S> implements 
BinaryOperator<TraverserSet<S>>, Serializable {
 
-        private ChainedComparator chainedComparator;
         private long limit;
+        private boolean isShuffle;
 
         private OrderBiOperator() {
             // for serializers that need a no-arg constructor
         }
 
-        public OrderBiOperator(final ChainedComparator<S, ?> 
chainedComparator, final long limit) {
-            this.chainedComparator = chainedComparator;
+        public OrderBiOperator(final long limit, final boolean isShuffle) {
             this.limit = limit;
-        }
-
-        @Override
-        public OrderBiOperator<S> clone() {
-            try {
-                final OrderBiOperator<S> clone = (OrderBiOperator<S>) 
super.clone();
-                clone.chainedComparator = this.chainedComparator.clone();
-                return clone;
-            } catch (final CloneNotSupportedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+            this.isShuffle = isShuffle;
         }
 
         @Override
         public TraverserSet<S> apply(final TraverserSet<S> setA, final 
TraverserSet<S> setB) {
             setA.addAll(setB);
             if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) {
-                if (this.chainedComparator.isShuffle())
+                if (this.isShuffle)
                     setA.shuffle();
                 else
-                    setA.sort(this.chainedComparator);
-                long counter = 0l;
+                    setA.sort(Comparator.naturalOrder());
+                long counter = 0L;
                 final Iterator<Traverser.Admin<S>> traversers = 
setA.iterator();
                 while (traversers.hasNext()) {
                     final Traverser.Admin<S> traverser = traversers.next();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
index f9c85a2..b0cce80 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
@@ -39,8 +39,8 @@ import java.util.function.BinaryOperator;
  */
 public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> 
implements Barrier<TraverserSet<S>> {
 
-    private TraverserSet<S> traverserSet = new TraverserSet<>();
-    private int maxBarrierSize;
+    protected TraverserSet<S> traverserSet = new TraverserSet<>();
+    protected int maxBarrierSize;
 
     public CollectingBarrierStep(final Traversal.Admin traversal) {
         this(traversal, Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
index fc73fc3..5777adb 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
@@ -28,6 +28,7 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.filter.SampleGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
 import 
org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphStep;
@@ -86,8 +87,8 @@ public final class ComputerVerificationStrategy extends 
AbstractTraversalStrateg
                     throw new VerificationException("Local traversals may not 
traverse past the local star-graph on GraphComputer: " + 
traversalOptional.get(), traversal);
             }
 
-            // collecting barriers and dedup global use can only operate on 
the element and its properties (no incidences)
-            if (step instanceof CollectingBarrierStep && step instanceof 
TraversalParent) {
+            // sample step use can only operate on the element and its 
properties (no incidences)
+            if (step instanceof SampleGlobalStep) {
                 if (((TraversalParent) 
step).getLocalChildren().stream().filter(t -> 
!TraversalHelper.isLocalProperties(t)).findAny().isPresent())
                     throw new VerificationException("The following barrier 
step can not process the incident edges of a vertex on GraphComputer: " + step, 
traversal);
             }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java
new file mode 100644
index 0000000..4dddaa3
--- /dev/null
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java
@@ -0,0 +1,235 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.traversal.traverser;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.javatuples.Pair;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class OrderedTraverser<T> implements Traverser.Admin<T> {
+
+    private Traverser.Admin<T> internal;
+    private int order;
+    private final List<Pair<Object, Comparator<?>>> orderChecks = new 
ArrayList<>();
+
+    private OrderedTraverser() {
+        // for serialization
+    }
+
+    public OrderedTraverser(final Traverser.Admin<T> internal, final 
List<Pair<Traversal.Admin<T, ?>, Comparator>> checks) {
+        this(internal, 0);
+        for (final Pair<Traversal.Admin<T, ?>, Comparator> pairs : checks) {
+            this.orderChecks.add(Pair.with(TraversalUtil.apply(this.internal, 
pairs.getValue0()), pairs.getValue1()));
+        }
+    }
+
+    public OrderedTraverser(final Traverser.Admin<T> internal, final int 
order) {
+        this.internal = internal instanceof OrderedTraverser ? 
((OrderedTraverser) internal).internal : internal;
+        this.order = order;
+    }
+
+    public Traverser.Admin<T> getInternal() {
+        return this.internal;
+    }
+
+    public int order() {
+        return this.order;
+    }
+
+    @Override
+    public void merge(final Admin<?> other) {
+        this.internal.merge(other);
+    }
+
+    @Override
+    public <R> Admin<R> split(R r, Step<T, R> step) {
+        return new OrderedTraverser<>(this.internal.split(r, step), 
this.order);
+    }
+
+    @Override
+    public Admin<T> split() {
+        return new OrderedTraverser<>(this.internal.split(), this.order);
+    }
+
+    @Override
+    public void addLabels(final Set<String> labels) {
+        this.internal.addLabels(labels);
+    }
+
+    @Override
+    public void keepLabels(final Set<String> labels) {
+        this.internal.keepLabels(labels);
+    }
+
+    @Override
+    public void dropLabels(final Set<String> labels) {
+        this.internal.dropLabels(labels);
+    }
+
+    @Override
+    public void dropPath() {
+        this.internal.dropPath();
+    }
+
+    @Override
+    public void set(final T t) {
+        this.internal.set(t);
+    }
+
+    @Override
+    public void incrLoops(final String stepLabel) {
+        this.internal.incrLoops(stepLabel);
+    }
+
+    @Override
+    public void resetLoops() {
+        this.internal.resetLoops();
+    }
+
+    @Override
+    public String getStepId() {
+        return this.internal.getStepId();
+    }
+
+    @Override
+    public void setStepId(final String stepId) {
+        this.internal.setStepId(stepId);
+    }
+
+    @Override
+    public void setBulk(final long count) {
+        this.internal.setBulk(count);
+    }
+
+    @Override
+    public Admin<T> detach() {
+        this.internal = this.internal.detach();
+        return this;
+    }
+
+    @Override
+    public T attach(final Function<Attachable<T>, T> method) {
+        return this.internal.attach(method);
+    }
+
+    @Override
+    public void setSideEffects(final TraversalSideEffects sideEffects) {
+        this.internal.setSideEffects(sideEffects);
+    }
+
+    @Override
+    public TraversalSideEffects getSideEffects() {
+        return this.internal.getSideEffects();
+    }
+
+    @Override
+    public Set<String> getTags() {
+        return this.internal.getTags();
+    }
+
+    @Override
+    public T get() {
+        return this.internal.get();
+    }
+
+    @Override
+    public <S> S sack() {
+        return this.internal.sack();
+    }
+
+    @Override
+    public <S> void sack(final S object) {
+        this.internal.sack(object);
+    }
+
+    @Override
+    public Path path() {
+        return this.internal.path();
+    }
+
+    @Override
+    public int loops() {
+        return this.internal.loops();
+    }
+
+    @Override
+    public long bulk() {
+        return this.internal.bulk();
+    }
+
+    @Override
+    public int hashCode() {
+        return this.internal.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object object) {
+        return object instanceof OrderedTraverser && ((OrderedTraverser) 
object).internal.equals(this.internal);
+    }
+
+    @Override
+    public OrderedTraverser<T> clone() {
+        try {
+            final OrderedTraverser<T> clone = (OrderedTraverser<T>) 
super.clone();
+            clone.internal = (Traverser.Admin<T>) this.internal.clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public int compareTo(final Traverser<T> o) {
+        if (!(o instanceof OrderedTraverser))
+            return 0;
+        else {
+            if (this.orderChecks.isEmpty()) {
+                return Order.incr.compare(this.get(), o.get());
+            } else {
+                final OrderedTraverser<T> other = (OrderedTraverser<T>) o;
+                for (int i = 0; i < this.orderChecks.size(); i++) {
+                    final Comparator comparator = 
this.orderChecks.get(i).getValue1();
+                    final Object thisObject = 
this.orderChecks.get(i).getValue0();
+                    final Object otherObject = 
other.orderChecks.get(i).getValue0();
+                    final int comparison = comparator.compare(thisObject, 
otherObject);
+                    if (comparison != 0)
+                        return comparison;
+
+                }
+                return 0;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java
index 0bd9e87..a04f2d9 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java
@@ -51,6 +51,7 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_P_S_SE_S
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_S_SE_SL_Traverser;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.O_OB_S_SE_SL_Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_Traverser;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.OrderedTraverser;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.AndP;
 import 
org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalMetrics;
@@ -221,7 +222,7 @@ public enum GryoVersion {
             add(GryoTypeReg.of(AbstractMap.SimpleImmutableEntry.class, 121));
             add(GryoTypeReg.of(java.sql.Timestamp.class, 161));
             add(GryoTypeReg.of(InetAddress.class, 162, new 
UtilSerializers.InetAddressSerializer()));
-            add(GryoTypeReg.of(ByteBuffer.class, 163, new 
UtilSerializers.ByteBufferSerializer()));  // ***LAST ID***
+            add(GryoTypeReg.of(ByteBuffer.class, 163, new 
UtilSerializers.ByteBufferSerializer()));
 
             add(GryoTypeReg.of(ReferenceEdge.class, 81));
             add(GryoTypeReg.of(ReferenceVertexProperty.class, 82));
@@ -245,6 +246,7 @@ public enum GryoVersion {
             add(GryoTypeReg.of(O_OB_S_SE_SL_Traverser.class, 89));
             add(GryoTypeReg.of(LP_O_OB_S_SE_SL_Traverser.class, 90));
             add(GryoTypeReg.of(LP_O_OB_P_S_SE_SL_Traverser.class, 91));
+            add(GryoTypeReg.of(OrderedTraverser.class, 164));   // ***LAST 
ID***
             add(GryoTypeReg.of(DefaultRemoteTraverser.class, 123, new 
GryoSerializers.DefaultRemoteTraverserSerializer()));
 
             add(GryoTypeReg.of(Bytecode.class, 122, new 
GryoSerializers.BytecodeSerializer()));

Reply via email to