This is an automated email from the ASF dual-hosted git repository. okram pushed a commit to branch tp4 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push: new e0d74e3 Path is now an interface. BasicPath an implementation. Have started down the Traverser species route with COTraverser and COPTraverser. e0d74e3 is described below commit e0d74e3ec09379cdce425fc749a4d70e6e0d2195 Author: Marko A. Rodriguez <okramma...@gmail.com> AuthorDate: Sun Mar 17 07:49:55 2019 -0600 Path is now an interface. BasicPath an implementation. Have started down the Traverser species route with COTraverser and COPTraverser. --- .../tinkerpop/machine/bytecode/BytecodeUtil.java | 8 +++- .../tinkerpop/machine/function/map/PathMap.java | 3 +- .../traverser/{Path.java => BasicPath.java} | 7 ++-- .../{CompleteTraverser.java => COPTraverser.java} | 26 ++++++------- ...verserFactory.java => COPTraverserFactory.java} | 14 ++++++- .../{CompleteTraverser.java => COTraverser.java} | 45 ++++++---------------- ...averserFactory.java => COTraverserFactory.java} | 14 ++++++- .../tinkerpop/machine/traverser/EmptyPath.java} | 38 +++++++++++------- .../apache/tinkerpop/machine/traverser/Path.java | 44 +++------------------ .../tinkerpop/machine/traverser/Traverser.java | 4 +- .../tinkerpop/machine/traverser/TraverserSet.java | 6 +-- .../machine/traverser/TraverserSetTest.java | 6 +-- .../tinkerpop/machine/beam/util/TopologyUtil.java | 8 ++-- .../apache/tinkerpop/machine/pipes/EmptyStep.java | 4 +- .../apache/tinkerpop/machine/pipes/SourceStep.java | 15 +++++--- 15 files changed, 113 insertions(+), 129 deletions(-) diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java index f62cd4a..581fc9d 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java @@ -40,7 +40,7 @@ import org.apache.tinkerpop.machine.function.reduce.GroupCountReduce; import org.apache.tinkerpop.machine.function.reduce.SumReduce; import org.apache.tinkerpop.machine.processor.ProcessorFactory; import org.apache.tinkerpop.machine.strategy.Strategy; -import org.apache.tinkerpop.machine.traverser.CompleteTraverserFactory; +import org.apache.tinkerpop.machine.traverser.COPTraverserFactory; import org.apache.tinkerpop.machine.traverser.TraverserFactory; import java.lang.reflect.InvocationTargetException; @@ -122,7 +122,11 @@ public final class BytecodeUtil { } public static <C> Optional<TraverserFactory<C>> getTraverserFactory(final Bytecode<C> bytecode) { - return Optional.of(new CompleteTraverserFactory<C>()); + for (final Instruction<C> instruction : bytecode.getInstructions()) { + if (instruction.op().equals(Symbols.PATH)) + return Optional.of(COPTraverserFactory.instance()); + } + return Optional.of(COPTraverserFactory.instance()); } public static <C> List<CFunction<C>> compile(final Bytecode<C> bytecode) { diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java b/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java index 59f2a0e..39d7514 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java @@ -23,6 +23,7 @@ import org.apache.tinkerpop.machine.bytecode.CompilationCircle; import org.apache.tinkerpop.machine.coefficient.Coefficient; import org.apache.tinkerpop.machine.function.AbstractFunction; import org.apache.tinkerpop.machine.function.MapFunction; +import org.apache.tinkerpop.machine.traverser.BasicPath; import org.apache.tinkerpop.machine.traverser.Path; import org.apache.tinkerpop.machine.traverser.Traverser; @@ -45,7 +46,7 @@ public class PathMap<C, S> extends AbstractFunction<C> implements MapFunction<C, public Path apply(final Traverser<C, S> traverser) { if (!this.compilationCircle.isEmpty()) { final Path oldPath = traverser.path(); - final Path newPath = new Path(); + final Path newPath = new BasicPath(); for (int i = 0; i < oldPath.size(); i++) { newPath.add(oldPath.labels(i), this.compilationCircle.next().mapObject(oldPath.object(i)).object()); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/BasicPath.java similarity index 93% copy from java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/traverser/BasicPath.java index 263c63b..0413814 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/BasicPath.java @@ -18,7 +18,6 @@ */ package org.apache.tinkerpop.machine.traverser; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -27,15 +26,15 @@ import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class Path implements Serializable { +public class BasicPath implements Path { private final List<Object> objects = new ArrayList<>(); private final List<Set<String>> labels = new ArrayList<>(); - public Path() { + public BasicPath() { } - public Path(final Path path) { + public BasicPath(final BasicPath path) { this.objects.addAll(path.objects); this.labels.addAll(path.labels); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverser.java similarity index 74% copy from java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverser.java index 8de96be..4810f24 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverser.java @@ -25,13 +25,13 @@ import org.apache.tinkerpop.machine.function.ReduceFunction; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class CompleteTraverser<C, S> implements Traverser<C, S> { +public class COPTraverser<C, S> implements Traverser<C, S> { private Coefficient<C> coefficient; private S object; - private Path path = new Path(); + private BasicPath path = new BasicPath(); - public CompleteTraverser(final Coefficient<C> coefficient, final S object) { + COPTraverser(final Coefficient<C> coefficient, final S object) { this.coefficient = coefficient; this.object = object; } @@ -44,24 +44,24 @@ public class CompleteTraverser<C, S> implements Traverser<C, S> { return this.object; } - public Path path() { + public BasicPath path() { return this.path; } @Override - public <E> Traverser<C, E> split(final CFunction<C> function, final E eObject) { - final CompleteTraverser<C, E> clone = new CompleteTraverser<>( + public <E> Traverser<C, E> split(final CFunction<C> function, final E newObject) { + final COPTraverser<C, E> clone = new COPTraverser<>( function instanceof ReduceFunction ? function.coefficient().clone().unity() : - function.coefficient().clone().multiply(this.coefficient().value()), eObject); - clone.path = function instanceof ReduceFunction ? new Path() : new Path(this.path); - clone.path.add(function.labels(), eObject); + function.coefficient().clone().multiply(this.coefficient().value()), newObject); + clone.path = function instanceof ReduceFunction ? new BasicPath() : new BasicPath(this.path); + clone.path.add(function.labels(), newObject); return clone; } @Override public boolean equals(final Object other) { - return other instanceof CompleteTraverser && ((CompleteTraverser<C, S>) other).object.equals(this.object); + return other instanceof COPTraverser && ((COPTraverser<C, S>) other).object.equals(this.object); } @Override @@ -75,12 +75,12 @@ public class CompleteTraverser<C, S> implements Traverser<C, S> { } @Override - public CompleteTraverser<C, S> clone() { + public COPTraverser<C, S> clone() { try { - final CompleteTraverser<C, S> clone = (CompleteTraverser<C, S>) super.clone(); + final COPTraverser<C, S> clone = (COPTraverser<C, S>) super.clone(); clone.object = this.object; clone.coefficient = this.coefficient.clone(); - clone.path = new Path(this.path); + clone.path = new BasicPath(this.path); return clone; } catch (final CloneNotSupportedException e) { throw new RuntimeException(e.getMessage(), e); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverserFactory.java similarity index 74% copy from java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverserFactory.java index 680a3e8..0f16025 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverserFactory.java @@ -23,10 +23,20 @@ import org.apache.tinkerpop.machine.coefficient.Coefficient; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class CompleteTraverserFactory<C> implements TraverserFactory<C> { +public class COPTraverserFactory<C> implements TraverserFactory<C> { + + private static final COPTraverserFactory INSTANCE = new COPTraverserFactory(); + + private COPTraverserFactory() { + // static instance + } @Override public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final S object) { - return new CompleteTraverser<>(coefficient.clone(), object); + return new COPTraverser<>(coefficient.clone(), object); + } + + public static <C> COPTraverserFactory<C> instance() { + return INSTANCE; } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverser.java similarity index 56% rename from java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java rename to java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverser.java index 8de96be..2e56865 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverser.java @@ -20,71 +20,48 @@ package org.apache.tinkerpop.machine.traverser; import org.apache.tinkerpop.machine.coefficient.Coefficient; import org.apache.tinkerpop.machine.function.CFunction; -import org.apache.tinkerpop.machine.function.ReduceFunction; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class CompleteTraverser<C, S> implements Traverser<C, S> { +public class COTraverser<C, S> implements Traverser<C, S> { private Coefficient<C> coefficient; - private S object; - private Path path = new Path(); + private final S object; - public CompleteTraverser(final Coefficient<C> coefficient, final S object) { + COTraverser(final Coefficient<C> coefficient, final S object) { this.coefficient = coefficient; this.object = object; } + @Override public Coefficient<C> coefficient() { return this.coefficient; } + @Override public S object() { return this.object; } - public Path path() { - return this.path; - } - @Override - public <E> Traverser<C, E> split(final CFunction<C> function, final E eObject) { - final CompleteTraverser<C, E> clone = new CompleteTraverser<>( - function instanceof ReduceFunction ? - function.coefficient().clone().unity() : - function.coefficient().clone().multiply(this.coefficient().value()), eObject); - clone.path = function instanceof ReduceFunction ? new Path() : new Path(this.path); - clone.path.add(function.labels(), eObject); - return clone; - } - - @Override - public boolean equals(final Object other) { - return other instanceof CompleteTraverser && ((CompleteTraverser<C, S>) other).object.equals(this.object); - } - - @Override - public int hashCode() { - return this.object.hashCode(); // TODO: include path + public Path path() { + return EmptyPath.instance(); } @Override - public String toString() { - return this.object.toString(); + public <E> Traverser<C, E> split(final CFunction<C> function, final E object) { + return new COTraverser<>(this.coefficient.clone().multiply(function.coefficient().value()), object); } @Override - public CompleteTraverser<C, S> clone() { + public Traverser<C, S> clone() { try { - final CompleteTraverser<C, S> clone = (CompleteTraverser<C, S>) super.clone(); - clone.object = this.object; + final COTraverser<C, S> clone = (COTraverser<C, S>) super.clone(); clone.coefficient = this.coefficient.clone(); - clone.path = new Path(this.path); return clone; } catch (final CloneNotSupportedException e) { throw new RuntimeException(e.getMessage(), e); } } - } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverserFactory.java similarity index 74% rename from java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java rename to java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverserFactory.java index 680a3e8..a5f50f2 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverserFactory.java @@ -23,10 +23,20 @@ import org.apache.tinkerpop.machine.coefficient.Coefficient; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class CompleteTraverserFactory<C> implements TraverserFactory<C> { +public class COTraverserFactory<C> implements TraverserFactory<C> { + + private static final COTraverserFactory INSTANCE = new COTraverserFactory(); + + private COTraverserFactory() { + // static instance + } @Override public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final S object) { - return new CompleteTraverser<>(coefficient.clone(), object); + return new COTraverser<>(coefficient.clone(), object); + } + + public static <C> COTraverserFactory<C> instance() { + return INSTANCE; } } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/EmptyPath.java similarity index 59% copy from java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java copy to java/core/src/main/java/org/apache/tinkerpop/machine/traverser/EmptyPath.java index bcdd789..09e677e 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/EmptyPath.java @@ -16,38 +16,48 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.tinkerpop.machine.pipes; +package org.apache.tinkerpop.machine.traverser; -import org.apache.tinkerpop.machine.traverser.CompleteTraverser; -import org.apache.tinkerpop.util.FastNoSuchElementException; +import java.util.Collections; +import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -final class EmptyStep<C, S, E> extends AbstractStep<C, S, E> { +public class EmptyPath implements Path { - private static final EmptyStep INSTANCE = new EmptyStep<>(); + private static final EmptyPath INSTANCE = new EmptyPath(); + + private EmptyPath() { - private EmptyStep() { - super(null, null); } @Override - public boolean hasNext() { - return false; + public void add(final Set<String> labels, final Object object) { + } @Override - public CompleteTraverser<C, E> next() { - throw FastNoSuchElementException.instance(); + public void addLabels(final Set<String> labels) { + } - static <C, S, E> EmptyStep<C, S, E> instance() { - return INSTANCE; + @Override + public Object object(int index) { + throw new IllegalStateException("No objects in EmptyPath"); } @Override - public void reset() { + public Set<String> labels(int index) { + return Collections.emptySet(); + } + @Override + public int size() { + return 0; + } + + public static final EmptyPath instance() { + return INSTANCE; } } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java index 263c63b..5e487ff 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java @@ -19,52 +19,20 @@ package org.apache.tinkerpop.machine.traverser; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; import java.util.Set; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public class Path implements Serializable { +public interface Path extends Serializable { - private final List<Object> objects = new ArrayList<>(); - private final List<Set<String>> labels = new ArrayList<>(); + public void add(final Set<String> labels, final Object object); - public Path() { - } + public void addLabels(final Set<String> labels); - public Path(final Path path) { - this.objects.addAll(path.objects); - this.labels.addAll(path.labels); - } + public Object object(final int index); - public void add(final Set<String> labels, final Object object) { - this.labels.add(labels); - this.objects.add(object); - } + public Set<String> labels(final int index); - public void addLabels(final Set<String> labels) { - if (this.labels.isEmpty()) - this.labels.add(new HashSet<>()); - this.labels.get(this.labels.size() - 1).addAll(labels); - } - - public Object object(final int index) { - return this.objects.get(index); - } - - public Set<String> labels(final int index) { - return this.labels.get(index); - } - - public int size() { - return this.objects.size(); - } - - @Override - public String toString() { - return this.objects.toString(); - } + public int size(); } diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java index 0b5d5b1..40901ae 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java @@ -58,7 +58,9 @@ public interface Traverser<C, S> extends Serializable, Cloneable { return IteratorUtils.map(function.apply(this), e -> this.split(function, e)); } - //public default void sideeffect(final SideEffectFunction<C,S> function); + /*public default <C,S> Traverser<C,S> repeatLoop(final RepeatBranch<C,S> repeatBranch) { + // set loops + }*/ public default <E> Traverser<C, E> reduce(final ReduceFunction<C, S, E> function, final E reducedValue) { return this.split(function, reducedValue); diff --git a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java index 80cf339..e4dbe8a 100644 --- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java +++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java @@ -132,8 +132,8 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple return this.map.values().toString(); } - /*public void sort(final Comparator<CompleteTraverser<S>> comparator) { - final List<CompleteTraverser<C, S>> list = new ArrayList<>(this.map.size()); + /*public void sort(final Comparator<COPTraverser<S>> comparator) { + final List<COPTraverser<C, S>> list = new ArrayList<>(this.map.size()); IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add); Collections.sort(list, comparator); this.map.reset(); @@ -141,7 +141,7 @@ public final class TraverserSet<C, S> extends AbstractSet<Traverser<C, S>> imple } public void shuffle() { - final List<CompleteTraverser<C, S>> list = new ArrayList<>(this.map.size()); + final List<COPTraverser<C, S>> list = new ArrayList<>(this.map.size()); IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add); Collections.shuffle(list); this.map.reset(); diff --git a/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java b/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java index 7edfc47..7d07120 100644 --- a/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java +++ b/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java @@ -33,9 +33,9 @@ public class TraverserSetTest { @Test public void shouldAddContainTraversers() { final TraverserSet<Long, String> traverserSet = new TraverserSet<>(); - Traverser<Long, String> a = new CompleteTraverser<>(LongCoefficient.create(10L), "hello"); - Traverser<Long, String> b = new CompleteTraverser<>(LongCoefficient.create(5L), "hello"); - Traverser<Long, String> c = new CompleteTraverser<>(LongCoefficient.create(3L), "world"); + Traverser<Long, String> a = new COPTraverser<>(LongCoefficient.create(10L), "hello"); + Traverser<Long, String> b = new COPTraverser<>(LongCoefficient.create(5L), "hello"); + Traverser<Long, String> c = new COPTraverser<>(LongCoefficient.create(3L), "world"); /// assertTrue(traverserSet.isEmpty()); assertEquals(0, traverserSet.size()); diff --git a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java index d7cd0db..af7584f 100644 --- a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java +++ b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java @@ -82,7 +82,7 @@ public class TopologyUtil { sink = source.apply(Combine.globally(new ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory))); } else if (function instanceof RepeatBranch) { final RepeatBranch<C, S> repeatFunction = (RepeatBranch<C, S>) function; - final List<PCollection<Traverser<C, S>>> repeatSinks = new ArrayList<>(); + final List<PCollection<Traverser<C, S>>> repeatOutputs = new ArrayList<>(); final TupleTag<Traverser<C, S>> repeatDone = new TupleTag<>(); final TupleTag<Traverser<C, S>> repeatLoop = new TupleTag<>(); sink = source; @@ -91,7 +91,7 @@ public class TopologyUtil { final RepeatStartFn<C, S> startFn = new RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop); final PCollectionTuple outputs = (PCollectionTuple) sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); - repeatSinks.add(outputs.get(repeatDone)); + repeatOutputs.add(outputs.get(repeatDone)); sink = outputs.get(repeatLoop); } sink = TopologyUtil.compile(sink, repeatFunction.getRepeat()); @@ -99,13 +99,13 @@ public class TopologyUtil { final RepeatEndFn<C, S> endFn = new RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop); final PCollectionTuple outputs = (PCollectionTuple) sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, TupleTagList.of(repeatDone))); outputs.getAll().values().forEach(c -> c.setCoder(new TraverserCoder())); - repeatSinks.add(outputs.get(repeatDone)); + repeatOutputs.add(outputs.get(repeatDone)); sink = outputs.get(repeatLoop); } } sink = (PCollection<Traverser<C, S>>) sink.apply(ParDo.of(new RepeatDeadEndFn<>())); sink.setCoder(new TraverserCoder<>()); - sink = PCollectionList.of(repeatSinks).apply(Flatten.pCollections()); + sink = PCollectionList.of(repeatOutputs).apply(Flatten.pCollections()); } else if (function instanceof BranchFunction) { final BranchFunction<C, S, E, M> branchFunction = (BranchFunction<C, S, E, M>) function; final Map<M, TupleTag<Traverser<C, S>>> selectors = new LinkedHashMap<>(); diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java index bcdd789..48acade 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java @@ -18,7 +18,7 @@ */ package org.apache.tinkerpop.machine.pipes; -import org.apache.tinkerpop.machine.traverser.CompleteTraverser; +import org.apache.tinkerpop.machine.traverser.COPTraverser; import org.apache.tinkerpop.util.FastNoSuchElementException; /** @@ -38,7 +38,7 @@ final class EmptyStep<C, S, E> extends AbstractStep<C, S, E> { } @Override - public CompleteTraverser<C, E> next() { + public COPTraverser<C, E> next() { throw FastNoSuchElementException.instance(); } diff --git a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java index bdf10a5..272b6fa 100644 --- a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java +++ b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java @@ -19,32 +19,35 @@ package org.apache.tinkerpop.machine.pipes; import org.apache.tinkerpop.machine.traverser.Traverser; -import org.apache.tinkerpop.machine.traverser.TraverserSet; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ final class SourceStep<C, S> implements Step<C, S, S> { - private final TraverserSet<C, S> traverserSet = new TraverserSet<>(); + private Traverser<C, S> traverser = null; @Override public boolean hasNext() { - return !this.traverserSet.isEmpty(); + return null != this.traverser; } @Override public Traverser<C, S> next() { - return this.traverserSet.remove(); + final Traverser<C, S> temp = this.traverser; + this.traverser = null; + return temp; } @Override public void reset() { - this.traverserSet.clear(); + this.traverser = null; } public void addStart(final Traverser<C, S> traverser) { - this.traverserSet.add(traverser); + if (null != this.traverser) + throw new IllegalStateException("This shouldn't happen"); // TODO: verify fully and then remove + this.traverser = traverser; } @Override