Created OrdertedTraverser which allows us to move beyond the star graph for OrderGlobalStep by()-projections. Moreover, it reduces the complexity of ordering as the objects of comparison are already determined. Going to generalize fully to a ProjectedTraverser which will allow us to do the same for SampleGlobalStep, DedupGlobalStep, and down the road maintain order even as the computation is re-distributed to workers.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/501e97a1 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/501e97a1 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/501e97a1 Branch: refs/heads/TINKERPOP-1602 Commit: 501e97a1ecb23f76b2fddba8eaed1dba4a5a839e Parents: e80a4cd Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Wed Jan 18 09:08:24 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Wed Jan 18 09:08:24 2017 -0700 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../traversal/step/map/OrderGlobalStep.java | 61 ++--- .../step/util/CollectingBarrierStep.java | 4 +- .../ComputerVerificationStrategy.java | 5 +- .../traversal/traverser/OrderedTraverser.java | 235 +++++++++++++++++++ .../gremlin/structure/io/gryo/GryoVersion.java | 4 +- 6 files changed, 277 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/CHANGELOG.asciidoc ---------------------------------------------------------------------- diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4f3f9ce..86c6b4f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +* `OrderGlobalStep` now emits traversers with their `by()`-projections and thus, can move beyond the local star graph. * SASL negotiation supports both a byte array and Base64 encoded bytes as a string for authentication to Gremlin Server. * Deprecated `TinkerIoRegistry` replacing it with the more consistently named `TinkerIoRegistryV1d0`. * Made error messaging more consistent during result iteration timeouts in Gremlin Server. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java index a7d21b2..ac5df90 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java @@ -27,10 +27,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating; import org.apache.tinkerpop.gremlin.process.traversal.step.ComparatorHolder; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.OrderedTraverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; -import org.apache.tinkerpop.gremlin.util.function.ChainedComparator; import org.javatuples.Pair; import java.io.Serializable; @@ -49,8 +49,8 @@ import java.util.stream.Collectors; public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBarrierStep<S> implements ComparatorHolder<S, C>, TraversalParent, ByModulating { private List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new ArrayList<>(); - private ChainedComparator<S, C> chainedComparator = null; private long limit = Long.MAX_VALUE; + private boolean isShuffle = false; public OrderGlobalStep(final Traversal.Admin traversal) { super(traversal); @@ -58,12 +58,30 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa @Override public void barrierConsumer(final TraverserSet<S> traverserSet) { - if (null == this.chainedComparator) - this.chainedComparator = new ChainedComparator<>(true, this.comparators); - if (this.chainedComparator.isShuffle()) + if (this.isShuffle) traverserSet.shuffle(); else - traverserSet.sort((Comparator) this.chainedComparator); + traverserSet.sort(Comparator.naturalOrder()); + } + + @Override + public void processAllStarts() { + if (this.starts.hasNext()) { + while (this.starts.hasNext()) { + this.traverserSet.add(new OrderedTraverser<S>(this.starts.next(), (List) this.comparators)); + } + this.barrierConsumer(this.traverserSet); + } + } + + @Override + public Traverser.Admin<S> processNextStart() { + if (!this.traverserSet.isEmpty()) { + return this.traverserSet.remove(); + } else if (this.starts.hasNext()) { + this.processAllStarts(); + } + return ((OrderedTraverser) this.traverserSet.remove()).getInternal(); } public void setLimit(final long limit) { @@ -76,6 +94,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa @Override public void addComparator(final Traversal.Admin<S, C> traversal, final Comparator<C> comparator) { + this.isShuffle = Order.shuffle == (Comparator) comparator; this.comparators.add(new Pair<>(this.integrateChild(traversal), comparator)); } @@ -125,7 +144,6 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa for (final Pair<Traversal.Admin<S, C>, Comparator<C>> comparator : this.comparators) { clone.comparators.add(new Pair<>(comparator.getValue0().clone(), comparator.getValue1())); } - clone.chainedComparator = null; return clone; } @@ -137,47 +155,34 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa @Override public MemoryComputeKey<TraverserSet<S>> getMemoryComputeKey() { - if (null == this.chainedComparator) - this.chainedComparator = new ChainedComparator<>(true, this.comparators); - return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.chainedComparator, this.limit), false, true); + return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.limit, this.isShuffle), false, true); } //////////////// - public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable, Cloneable { + public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable { - private ChainedComparator chainedComparator; private long limit; + private boolean isShuffle; private OrderBiOperator() { // for serializers that need a no-arg constructor } - public OrderBiOperator(final ChainedComparator<S, ?> chainedComparator, final long limit) { - this.chainedComparator = chainedComparator; + public OrderBiOperator(final long limit, final boolean isShuffle) { this.limit = limit; - } - - @Override - public OrderBiOperator<S> clone() { - try { - final OrderBiOperator<S> clone = (OrderBiOperator<S>) super.clone(); - clone.chainedComparator = this.chainedComparator.clone(); - return clone; - } catch (final CloneNotSupportedException e) { - throw new IllegalStateException(e.getMessage(), e); - } + this.isShuffle = isShuffle; } @Override public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) { setA.addAll(setB); if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) { - if (this.chainedComparator.isShuffle()) + if (this.isShuffle) setA.shuffle(); else - setA.sort(this.chainedComparator); - long counter = 0l; + setA.sort(Comparator.naturalOrder()); + long counter = 0L; final Iterator<Traverser.Admin<S>> traversers = setA.iterator(); while (traversers.hasNext()) { final Traverser.Admin<S> traverser = traversers.next(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java index f9c85a2..b0cce80 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java @@ -39,8 +39,8 @@ import java.util.function.BinaryOperator; */ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implements Barrier<TraverserSet<S>> { - private TraverserSet<S> traverserSet = new TraverserSet<>(); - private int maxBarrierSize; + protected TraverserSet<S> traverserSet = new TraverserSet<>(); + protected int maxBarrierSize; public CollectingBarrierStep(final Traversal.Admin traversal) { this(traversal, Integer.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java index fc73fc3..5777adb 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing; import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating; import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor; import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent; +import org.apache.tinkerpop.gremlin.process.traversal.step.filter.SampleGlobalStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep; import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep; import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphStep; @@ -86,8 +87,8 @@ public final class ComputerVerificationStrategy extends AbstractTraversalStrateg throw new VerificationException("Local traversals may not traverse past the local star-graph on GraphComputer: " + traversalOptional.get(), traversal); } - // collecting barriers and dedup global use can only operate on the element and its properties (no incidences) - if (step instanceof CollectingBarrierStep && step instanceof TraversalParent) { + // sample step use can only operate on the element and its properties (no incidences) + if (step instanceof SampleGlobalStep) { if (((TraversalParent) step).getLocalChildren().stream().filter(t -> !TraversalHelper.isLocalProperties(t)).findAny().isPresent()) throw new VerificationException("The following barrier step can not process the incident edges of a vertex on GraphComputer: " + step, traversal); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java new file mode 100644 index 0000000..4dddaa3 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/OrderedTraverser.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.process.traversal.traverser; + +import org.apache.tinkerpop.gremlin.process.traversal.Order; +import org.apache.tinkerpop.gremlin.process.traversal.Path; +import org.apache.tinkerpop.gremlin.process.traversal.Step; +import org.apache.tinkerpop.gremlin.process.traversal.Traversal; +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects; +import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil; +import org.apache.tinkerpop.gremlin.structure.util.Attachable; +import org.javatuples.Pair; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class OrderedTraverser<T> implements Traverser.Admin<T> { + + private Traverser.Admin<T> internal; + private int order; + private final List<Pair<Object, Comparator<?>>> orderChecks = new ArrayList<>(); + + private OrderedTraverser() { + // for serialization + } + + public OrderedTraverser(final Traverser.Admin<T> internal, final List<Pair<Traversal.Admin<T, ?>, Comparator>> checks) { + this(internal, 0); + for (final Pair<Traversal.Admin<T, ?>, Comparator> pairs : checks) { + this.orderChecks.add(Pair.with(TraversalUtil.apply(this.internal, pairs.getValue0()), pairs.getValue1())); + } + } + + public OrderedTraverser(final Traverser.Admin<T> internal, final int order) { + this.internal = internal instanceof OrderedTraverser ? ((OrderedTraverser) internal).internal : internal; + this.order = order; + } + + public Traverser.Admin<T> getInternal() { + return this.internal; + } + + public int order() { + return this.order; + } + + @Override + public void merge(final Admin<?> other) { + this.internal.merge(other); + } + + @Override + public <R> Admin<R> split(R r, Step<T, R> step) { + return new OrderedTraverser<>(this.internal.split(r, step), this.order); + } + + @Override + public Admin<T> split() { + return new OrderedTraverser<>(this.internal.split(), this.order); + } + + @Override + public void addLabels(final Set<String> labels) { + this.internal.addLabels(labels); + } + + @Override + public void keepLabels(final Set<String> labels) { + this.internal.keepLabels(labels); + } + + @Override + public void dropLabels(final Set<String> labels) { + this.internal.dropLabels(labels); + } + + @Override + public void dropPath() { + this.internal.dropPath(); + } + + @Override + public void set(final T t) { + this.internal.set(t); + } + + @Override + public void incrLoops(final String stepLabel) { + this.internal.incrLoops(stepLabel); + } + + @Override + public void resetLoops() { + this.internal.resetLoops(); + } + + @Override + public String getStepId() { + return this.internal.getStepId(); + } + + @Override + public void setStepId(final String stepId) { + this.internal.setStepId(stepId); + } + + @Override + public void setBulk(final long count) { + this.internal.setBulk(count); + } + + @Override + public Admin<T> detach() { + this.internal = this.internal.detach(); + return this; + } + + @Override + public T attach(final Function<Attachable<T>, T> method) { + return this.internal.attach(method); + } + + @Override + public void setSideEffects(final TraversalSideEffects sideEffects) { + this.internal.setSideEffects(sideEffects); + } + + @Override + public TraversalSideEffects getSideEffects() { + return this.internal.getSideEffects(); + } + + @Override + public Set<String> getTags() { + return this.internal.getTags(); + } + + @Override + public T get() { + return this.internal.get(); + } + + @Override + public <S> S sack() { + return this.internal.sack(); + } + + @Override + public <S> void sack(final S object) { + this.internal.sack(object); + } + + @Override + public Path path() { + return this.internal.path(); + } + + @Override + public int loops() { + return this.internal.loops(); + } + + @Override + public long bulk() { + return this.internal.bulk(); + } + + @Override + public int hashCode() { + return this.internal.hashCode(); + } + + @Override + public boolean equals(final Object object) { + return object instanceof OrderedTraverser && ((OrderedTraverser) object).internal.equals(this.internal); + } + + @Override + public OrderedTraverser<T> clone() { + try { + final OrderedTraverser<T> clone = (OrderedTraverser<T>) super.clone(); + clone.internal = (Traverser.Admin<T>) this.internal.clone(); + return clone; + } catch (final CloneNotSupportedException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + + @Override + public int compareTo(final Traverser<T> o) { + if (!(o instanceof OrderedTraverser)) + return 0; + else { + if (this.orderChecks.isEmpty()) { + return Order.incr.compare(this.get(), o.get()); + } else { + final OrderedTraverser<T> other = (OrderedTraverser<T>) o; + for (int i = 0; i < this.orderChecks.size(); i++) { + final Comparator comparator = this.orderChecks.get(i).getValue1(); + final Object thisObject = this.orderChecks.get(i).getValue0(); + final Object otherObject = other.orderChecks.get(i).getValue0(); + final int comparison = comparator.compare(thisObject, otherObject); + if (comparison != 0) + return comparison; + + } + return 0; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/501e97a1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java index 0bd9e87..a04f2d9 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoVersion.java @@ -51,6 +51,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_P_S_SE_S import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_S_SE_SL_Traverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_OB_S_SE_SL_Traverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.OrderedTraverser; import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.process.traversal.util.AndP; import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalMetrics; @@ -221,7 +222,7 @@ public enum GryoVersion { add(GryoTypeReg.of(AbstractMap.SimpleImmutableEntry.class, 121)); add(GryoTypeReg.of(java.sql.Timestamp.class, 161)); add(GryoTypeReg.of(InetAddress.class, 162, new UtilSerializers.InetAddressSerializer())); - add(GryoTypeReg.of(ByteBuffer.class, 163, new UtilSerializers.ByteBufferSerializer())); // ***LAST ID*** + add(GryoTypeReg.of(ByteBuffer.class, 163, new UtilSerializers.ByteBufferSerializer())); add(GryoTypeReg.of(ReferenceEdge.class, 81)); add(GryoTypeReg.of(ReferenceVertexProperty.class, 82)); @@ -245,6 +246,7 @@ public enum GryoVersion { add(GryoTypeReg.of(O_OB_S_SE_SL_Traverser.class, 89)); add(GryoTypeReg.of(LP_O_OB_S_SE_SL_Traverser.class, 90)); add(GryoTypeReg.of(LP_O_OB_P_S_SE_SL_Traverser.class, 91)); + add(GryoTypeReg.of(OrderedTraverser.class, 164)); // ***LAST ID*** add(GryoTypeReg.of(DefaultRemoteTraverser.class, 123, new GryoSerializers.DefaultRemoteTraverserSerializer())); add(GryoTypeReg.of(Bytecode.class, 122, new GryoSerializers.BytecodeSerializer()));