This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new e0d74e3  Path is now an interface. BasicPath an implementation. Have 
started down the Traverser species route with COTraverser and COPTraverser.
e0d74e3 is described below

commit e0d74e3ec09379cdce425fc749a4d70e6e0d2195
Author: Marko A. Rodriguez <okramma...@gmail.com>
AuthorDate: Sun Mar 17 07:49:55 2019 -0600

    Path is now an interface. BasicPath an implementation. Have started down 
the Traverser species route with COTraverser and COPTraverser.
---
 .../tinkerpop/machine/bytecode/BytecodeUtil.java   |  8 +++-
 .../tinkerpop/machine/function/map/PathMap.java    |  3 +-
 .../traverser/{Path.java => BasicPath.java}        |  7 ++--
 .../{CompleteTraverser.java => COPTraverser.java}  | 26 ++++++-------
 ...verserFactory.java => COPTraverserFactory.java} | 14 ++++++-
 .../{CompleteTraverser.java => COTraverser.java}   | 45 ++++++----------------
 ...averserFactory.java => COTraverserFactory.java} | 14 ++++++-
 .../tinkerpop/machine/traverser/EmptyPath.java}    | 38 +++++++++++-------
 .../apache/tinkerpop/machine/traverser/Path.java   | 44 +++------------------
 .../tinkerpop/machine/traverser/Traverser.java     |  4 +-
 .../tinkerpop/machine/traverser/TraverserSet.java  |  6 +--
 .../machine/traverser/TraverserSetTest.java        |  6 +--
 .../tinkerpop/machine/beam/util/TopologyUtil.java  |  8 ++--
 .../apache/tinkerpop/machine/pipes/EmptyStep.java  |  4 +-
 .../apache/tinkerpop/machine/pipes/SourceStep.java | 15 +++++---
 15 files changed, 113 insertions(+), 129 deletions(-)

diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
index f62cd4a..581fc9d 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/bytecode/BytecodeUtil.java
@@ -40,7 +40,7 @@ import 
org.apache.tinkerpop.machine.function.reduce.GroupCountReduce;
 import org.apache.tinkerpop.machine.function.reduce.SumReduce;
 import org.apache.tinkerpop.machine.processor.ProcessorFactory;
 import org.apache.tinkerpop.machine.strategy.Strategy;
-import org.apache.tinkerpop.machine.traverser.CompleteTraverserFactory;
+import org.apache.tinkerpop.machine.traverser.COPTraverserFactory;
 import org.apache.tinkerpop.machine.traverser.TraverserFactory;
 
 import java.lang.reflect.InvocationTargetException;
@@ -122,7 +122,11 @@ public final class BytecodeUtil {
     }
 
     public static <C> Optional<TraverserFactory<C>> getTraverserFactory(final 
Bytecode<C> bytecode) {
-        return Optional.of(new CompleteTraverserFactory<C>());
+        for (final Instruction<C> instruction : bytecode.getInstructions()) {
+            if (instruction.op().equals(Symbols.PATH))
+                return Optional.of(COPTraverserFactory.instance());
+        }
+        return Optional.of(COPTraverserFactory.instance());
     }
 
     public static <C> List<CFunction<C>> compile(final Bytecode<C> bytecode) {
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
index 59f2a0e..39d7514 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/function/map/PathMap.java
@@ -23,6 +23,7 @@ import 
org.apache.tinkerpop.machine.bytecode.CompilationCircle;
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.function.AbstractFunction;
 import org.apache.tinkerpop.machine.function.MapFunction;
+import org.apache.tinkerpop.machine.traverser.BasicPath;
 import org.apache.tinkerpop.machine.traverser.Path;
 import org.apache.tinkerpop.machine.traverser.Traverser;
 
@@ -45,7 +46,7 @@ public class PathMap<C, S> extends AbstractFunction<C> 
implements MapFunction<C,
     public Path apply(final Traverser<C, S> traverser) {
         if (!this.compilationCircle.isEmpty()) {
             final Path oldPath = traverser.path();
-            final Path newPath = new Path();
+            final Path newPath = new BasicPath();
             for (int i = 0; i < oldPath.size(); i++) {
                 newPath.add(oldPath.labels(i), 
this.compilationCircle.next().mapObject(oldPath.object(i)).object());
             }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/BasicPath.java
similarity index 93%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/BasicPath.java
index 263c63b..0413814 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/BasicPath.java
@@ -18,7 +18,6 @@
  */
 package org.apache.tinkerpop.machine.traverser;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -27,15 +26,15 @@ import java.util.Set;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class Path implements Serializable {
+public class BasicPath implements Path {
 
     private final List<Object> objects = new ArrayList<>();
     private final List<Set<String>> labels = new ArrayList<>();
 
-    public Path() {
+    public BasicPath() {
     }
 
-    public Path(final Path path) {
+    public BasicPath(final BasicPath path) {
         this.objects.addAll(path.objects);
         this.labels.addAll(path.labels);
     }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverser.java
similarity index 74%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverser.java
index 8de96be..4810f24 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverser.java
@@ -25,13 +25,13 @@ import org.apache.tinkerpop.machine.function.ReduceFunction;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class CompleteTraverser<C, S> implements Traverser<C, S> {
+public class COPTraverser<C, S> implements Traverser<C, S> {
 
     private Coefficient<C> coefficient;
     private S object;
-    private Path path = new Path();
+    private BasicPath path = new BasicPath();
 
-    public CompleteTraverser(final Coefficient<C> coefficient, final S object) 
{
+    COPTraverser(final Coefficient<C> coefficient, final S object) {
         this.coefficient = coefficient;
         this.object = object;
     }
@@ -44,24 +44,24 @@ public class CompleteTraverser<C, S> implements 
Traverser<C, S> {
         return this.object;
     }
 
-    public Path path() {
+    public BasicPath path() {
         return this.path;
     }
 
     @Override
-    public <E> Traverser<C, E> split(final CFunction<C> function, final E 
eObject) {
-        final CompleteTraverser<C, E> clone = new CompleteTraverser<>(
+    public <E> Traverser<C, E> split(final CFunction<C> function, final E 
newObject) {
+        final COPTraverser<C, E> clone = new COPTraverser<>(
                 function instanceof ReduceFunction ?
                         function.coefficient().clone().unity() :
-                        
function.coefficient().clone().multiply(this.coefficient().value()), eObject);
-        clone.path = function instanceof ReduceFunction ? new Path() : new 
Path(this.path);
-        clone.path.add(function.labels(), eObject);
+                        
function.coefficient().clone().multiply(this.coefficient().value()), newObject);
+        clone.path = function instanceof ReduceFunction ? new BasicPath() : 
new BasicPath(this.path);
+        clone.path.add(function.labels(), newObject);
         return clone;
     }
 
     @Override
     public boolean equals(final Object other) {
-        return other instanceof CompleteTraverser && ((CompleteTraverser<C, 
S>) other).object.equals(this.object);
+        return other instanceof COPTraverser && ((COPTraverser<C, S>) 
other).object.equals(this.object);
     }
 
     @Override
@@ -75,12 +75,12 @@ public class CompleteTraverser<C, S> implements 
Traverser<C, S> {
     }
 
     @Override
-    public CompleteTraverser<C, S> clone() {
+    public COPTraverser<C, S> clone() {
         try {
-            final CompleteTraverser<C, S> clone = (CompleteTraverser<C, S>) 
super.clone();
+            final COPTraverser<C, S> clone = (COPTraverser<C, S>) 
super.clone();
             clone.object = this.object;
             clone.coefficient = this.coefficient.clone();
-            clone.path = new Path(this.path);
+            clone.path = new BasicPath(this.path);
             return clone;
         } catch (final CloneNotSupportedException e) {
             throw new RuntimeException(e.getMessage(), e);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverserFactory.java
similarity index 74%
copy from 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverserFactory.java
index 680a3e8..0f16025 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COPTraverserFactory.java
@@ -23,10 +23,20 @@ import org.apache.tinkerpop.machine.coefficient.Coefficient;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class CompleteTraverserFactory<C> implements TraverserFactory<C> {
+public class COPTraverserFactory<C> implements TraverserFactory<C> {
+
+    private static final COPTraverserFactory INSTANCE = new 
COPTraverserFactory();
+
+    private COPTraverserFactory() {
+        // static instance
+    }
 
     @Override
     public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final 
S object) {
-        return new CompleteTraverser<>(coefficient.clone(), object);
+        return new COPTraverser<>(coefficient.clone(), object);
+    }
+
+    public static <C> COPTraverserFactory<C> instance() {
+        return INSTANCE;
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverser.java
similarity index 56%
rename from 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java
rename to 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverser.java
index 8de96be..2e56865 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverser.java
@@ -20,71 +20,48 @@ package org.apache.tinkerpop.machine.traverser;
 
 import org.apache.tinkerpop.machine.coefficient.Coefficient;
 import org.apache.tinkerpop.machine.function.CFunction;
-import org.apache.tinkerpop.machine.function.ReduceFunction;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class CompleteTraverser<C, S> implements Traverser<C, S> {
+public class COTraverser<C, S> implements Traverser<C, S> {
 
     private Coefficient<C> coefficient;
-    private S object;
-    private Path path = new Path();
+    private final S object;
 
-    public CompleteTraverser(final Coefficient<C> coefficient, final S object) 
{
+    COTraverser(final Coefficient<C> coefficient, final S object) {
         this.coefficient = coefficient;
         this.object = object;
     }
 
+    @Override
     public Coefficient<C> coefficient() {
         return this.coefficient;
     }
 
+    @Override
     public S object() {
         return this.object;
     }
 
-    public Path path() {
-        return this.path;
-    }
-
     @Override
-    public <E> Traverser<C, E> split(final CFunction<C> function, final E 
eObject) {
-        final CompleteTraverser<C, E> clone = new CompleteTraverser<>(
-                function instanceof ReduceFunction ?
-                        function.coefficient().clone().unity() :
-                        
function.coefficient().clone().multiply(this.coefficient().value()), eObject);
-        clone.path = function instanceof ReduceFunction ? new Path() : new 
Path(this.path);
-        clone.path.add(function.labels(), eObject);
-        return clone;
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        return other instanceof CompleteTraverser && ((CompleteTraverser<C, 
S>) other).object.equals(this.object);
-    }
-
-    @Override
-    public int hashCode() {
-        return this.object.hashCode(); // TODO: include path
+    public Path path() {
+        return EmptyPath.instance();
     }
 
     @Override
-    public String toString() {
-        return this.object.toString();
+    public <E> Traverser<C, E> split(final CFunction<C> function, final E 
object) {
+        return new 
COTraverser<>(this.coefficient.clone().multiply(function.coefficient().value()),
 object);
     }
 
     @Override
-    public CompleteTraverser<C, S> clone() {
+    public Traverser<C, S> clone() {
         try {
-            final CompleteTraverser<C, S> clone = (CompleteTraverser<C, S>) 
super.clone();
-            clone.object = this.object;
+            final COTraverser<C, S> clone = (COTraverser<C, S>) super.clone();
             clone.coefficient = this.coefficient.clone();
-            clone.path = new Path(this.path);
             return clone;
         } catch (final CloneNotSupportedException e) {
             throw new RuntimeException(e.getMessage(), e);
         }
     }
-
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverserFactory.java
similarity index 74%
rename from 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java
rename to 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverserFactory.java
index 680a3e8..a5f50f2 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/CompleteTraverserFactory.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/COTraverserFactory.java
@@ -23,10 +23,20 @@ import org.apache.tinkerpop.machine.coefficient.Coefficient;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class CompleteTraverserFactory<C> implements TraverserFactory<C> {
+public class COTraverserFactory<C> implements TraverserFactory<C> {
+
+    private static final COTraverserFactory INSTANCE = new 
COTraverserFactory();
+
+    private COTraverserFactory() {
+        // static instance
+    }
 
     @Override
     public <S> Traverser<C, S> create(final Coefficient<C> coefficient, final 
S object) {
-        return new CompleteTraverser<>(coefficient.clone(), object);
+        return new COTraverser<>(coefficient.clone(), object);
+    }
+
+    public static <C> COTraverserFactory<C> instance() {
+        return INSTANCE;
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
 b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/EmptyPath.java
similarity index 59%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
copy to 
java/core/src/main/java/org/apache/tinkerpop/machine/traverser/EmptyPath.java
index bcdd789..09e677e 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/EmptyPath.java
@@ -16,38 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.tinkerpop.machine.pipes;
+package org.apache.tinkerpop.machine.traverser;
 
-import org.apache.tinkerpop.machine.traverser.CompleteTraverser;
-import org.apache.tinkerpop.util.FastNoSuchElementException;
+import java.util.Collections;
+import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-final class EmptyStep<C, S, E> extends AbstractStep<C, S, E> {
+public class EmptyPath implements Path {
 
-    private static final EmptyStep INSTANCE = new EmptyStep<>();
+    private static final EmptyPath INSTANCE = new EmptyPath();
+
+    private EmptyPath() {
 
-    private EmptyStep() {
-        super(null, null);
     }
 
     @Override
-    public boolean hasNext() {
-        return false;
+    public void add(final Set<String> labels, final Object object) {
+
     }
 
     @Override
-    public CompleteTraverser<C, E> next() {
-        throw FastNoSuchElementException.instance();
+    public void addLabels(final Set<String> labels) {
+
     }
 
-    static <C, S, E> EmptyStep<C, S, E> instance() {
-        return INSTANCE;
+    @Override
+    public Object object(int index) {
+        throw new IllegalStateException("No objects in EmptyPath");
     }
 
     @Override
-    public void reset() {
+    public Set<String> labels(int index) {
+        return Collections.emptySet();
+    }
 
+    @Override
+    public int size() {
+        return 0;
+    }
+
+    public static final EmptyPath instance() {
+        return INSTANCE;
     }
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java
index 263c63b..5e487ff 100644
--- a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java
+++ b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Path.java
@@ -19,52 +19,20 @@
 package org.apache.tinkerpop.machine.traverser;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class Path implements Serializable {
+public interface Path extends Serializable {
 
-    private final List<Object> objects = new ArrayList<>();
-    private final List<Set<String>> labels = new ArrayList<>();
+    public void add(final Set<String> labels, final Object object);
 
-    public Path() {
-    }
+    public void addLabels(final Set<String> labels);
 
-    public Path(final Path path) {
-        this.objects.addAll(path.objects);
-        this.labels.addAll(path.labels);
-    }
+    public Object object(final int index);
 
-    public void add(final Set<String> labels, final Object object) {
-        this.labels.add(labels);
-        this.objects.add(object);
-    }
+    public Set<String> labels(final int index);
 
-    public void addLabels(final Set<String> labels) {
-        if (this.labels.isEmpty())
-            this.labels.add(new HashSet<>());
-        this.labels.get(this.labels.size() - 1).addAll(labels);
-    }
-
-    public Object object(final int index) {
-        return this.objects.get(index);
-    }
-
-    public Set<String> labels(final int index) {
-        return this.labels.get(index);
-    }
-
-    public int size() {
-        return this.objects.size();
-    }
-
-    @Override
-    public String toString() {
-        return this.objects.toString();
-    }
+    public int size();
 }
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java
index 0b5d5b1..40901ae 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/Traverser.java
@@ -58,7 +58,9 @@ public interface Traverser<C, S> extends Serializable, 
Cloneable {
         return IteratorUtils.map(function.apply(this), e -> 
this.split(function, e));
     }
 
-    //public default void sideeffect(final SideEffectFunction<C,S> function);
+    /*public default <C,S> Traverser<C,S> repeatLoop(final RepeatBranch<C,S> 
repeatBranch) {
+        // set loops
+    }*/
 
     public default <E> Traverser<C, E> reduce(final ReduceFunction<C, S, E> 
function, final E reducedValue) {
         return this.split(function, reducedValue);
diff --git 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
index 80cf339..e4dbe8a 100644
--- 
a/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
+++ 
b/java/core/src/main/java/org/apache/tinkerpop/machine/traverser/TraverserSet.java
@@ -132,8 +132,8 @@ public final class TraverserSet<C, S> extends 
AbstractSet<Traverser<C, S>> imple
         return this.map.values().toString();
     }
 
-    /*public void sort(final Comparator<CompleteTraverser<S>> comparator) {
-        final List<CompleteTraverser<C, S>> list = new 
ArrayList<>(this.map.size());
+    /*public void sort(final Comparator<COPTraverser<S>> comparator) {
+        final List<COPTraverser<C, S>> list = new ArrayList<>(this.map.size());
         
IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add);
         Collections.sort(list, comparator);
         this.map.reset();
@@ -141,7 +141,7 @@ public final class TraverserSet<C, S> extends 
AbstractSet<Traverser<C, S>> imple
     }
 
     public void shuffle() {
-        final List<CompleteTraverser<C, S>> list = new 
ArrayList<>(this.map.size());
+        final List<COPTraverser<C, S>> list = new ArrayList<>(this.map.size());
         
IteratorUtils.removeOnNext(this.map.values().iterator()).forEachRemaining(list::add);
         Collections.shuffle(list);
         this.map.reset();
diff --git 
a/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java
 
b/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java
index 7edfc47..7d07120 100644
--- 
a/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java
+++ 
b/java/core/src/test/java/org/apache/tinkerpop/machine/traverser/TraverserSetTest.java
@@ -33,9 +33,9 @@ public class TraverserSetTest {
     @Test
     public void shouldAddContainTraversers() {
         final TraverserSet<Long, String> traverserSet = new TraverserSet<>();
-        Traverser<Long, String> a = new 
CompleteTraverser<>(LongCoefficient.create(10L), "hello");
-        Traverser<Long, String> b = new 
CompleteTraverser<>(LongCoefficient.create(5L), "hello");
-        Traverser<Long, String> c = new 
CompleteTraverser<>(LongCoefficient.create(3L), "world");
+        Traverser<Long, String> a = new 
COPTraverser<>(LongCoefficient.create(10L), "hello");
+        Traverser<Long, String> b = new 
COPTraverser<>(LongCoefficient.create(5L), "hello");
+        Traverser<Long, String> c = new 
COPTraverser<>(LongCoefficient.create(3L), "world");
         ///
         assertTrue(traverserSet.isEmpty());
         assertEquals(0, traverserSet.size());
diff --git 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
index d7cd0db..af7584f 100644
--- 
a/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
+++ 
b/java/machine/beam/src/main/java/org/apache/tinkerpop/machine/beam/util/TopologyUtil.java
@@ -82,7 +82,7 @@ public class TopologyUtil {
             sink = source.apply(Combine.globally(new 
ReduceFn<>((ReduceFunction<C, S, E>) function, traverserFactory)));
         } else if (function instanceof RepeatBranch) {
             final RepeatBranch<C, S> repeatFunction = (RepeatBranch<C, S>) 
function;
-            final List<PCollection<Traverser<C, S>>> repeatSinks = new 
ArrayList<>();
+            final List<PCollection<Traverser<C, S>>> repeatOutputs = new 
ArrayList<>();
             final TupleTag<Traverser<C, S>> repeatDone = new TupleTag<>();
             final TupleTag<Traverser<C, S>> repeatLoop = new TupleTag<>();
             sink = source;
@@ -91,7 +91,7 @@ public class TopologyUtil {
                     final RepeatStartFn<C, S> startFn = new 
RepeatStartFn<>(repeatFunction, repeatDone, repeatLoop);
                     final PCollectionTuple outputs = (PCollectionTuple) 
sink.apply(ParDo.of(startFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
                     outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
-                    repeatSinks.add(outputs.get(repeatDone));
+                    repeatOutputs.add(outputs.get(repeatDone));
                     sink = outputs.get(repeatLoop);
                 }
                 sink = TopologyUtil.compile(sink, repeatFunction.getRepeat());
@@ -99,13 +99,13 @@ public class TopologyUtil {
                     final RepeatEndFn<C, S> endFn = new 
RepeatEndFn<>(repeatFunction, repeatDone, repeatLoop);
                     final PCollectionTuple outputs = (PCollectionTuple) 
sink.apply(ParDo.of(endFn).withOutputTags(repeatLoop, 
TupleTagList.of(repeatDone)));
                     outputs.getAll().values().forEach(c -> c.setCoder(new 
TraverserCoder()));
-                    repeatSinks.add(outputs.get(repeatDone));
+                    repeatOutputs.add(outputs.get(repeatDone));
                     sink = outputs.get(repeatLoop);
                 }
             }
             sink = (PCollection<Traverser<C, S>>) sink.apply(ParDo.of(new 
RepeatDeadEndFn<>()));
             sink.setCoder(new TraverserCoder<>());
-            sink = 
PCollectionList.of(repeatSinks).apply(Flatten.pCollections());
+            sink = 
PCollectionList.of(repeatOutputs).apply(Flatten.pCollections());
         } else if (function instanceof BranchFunction) {
             final BranchFunction<C, S, E, M> branchFunction = 
(BranchFunction<C, S, E, M>) function;
             final Map<M, TupleTag<Traverser<C, S>>> selectors = new 
LinkedHashMap<>();
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
index bcdd789..48acade 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
-import org.apache.tinkerpop.machine.traverser.CompleteTraverser;
+import org.apache.tinkerpop.machine.traverser.COPTraverser;
 import org.apache.tinkerpop.util.FastNoSuchElementException;
 
 /**
@@ -38,7 +38,7 @@ final class EmptyStep<C, S, E> extends AbstractStep<C, S, E> {
     }
 
     @Override
-    public CompleteTraverser<C, E> next() {
+    public COPTraverser<C, E> next() {
         throw FastNoSuchElementException.instance();
     }
 
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
index bdf10a5..272b6fa 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
@@ -19,32 +19,35 @@
 package org.apache.tinkerpop.machine.pipes;
 
 import org.apache.tinkerpop.machine.traverser.Traverser;
-import org.apache.tinkerpop.machine.traverser.TraverserSet;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 final class SourceStep<C, S> implements Step<C, S, S> {
 
-    private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
+    private Traverser<C, S> traverser = null;
 
     @Override
     public boolean hasNext() {
-        return !this.traverserSet.isEmpty();
+        return null != this.traverser;
     }
 
     @Override
     public Traverser<C, S> next() {
-        return this.traverserSet.remove();
+        final Traverser<C, S> temp = this.traverser;
+        this.traverser = null;
+        return temp;
     }
 
     @Override
     public void reset() {
-        this.traverserSet.clear();
+        this.traverser = null;
     }
 
     public void addStart(final Traverser<C, S> traverser) {
-        this.traverserSet.add(traverser);
+        if (null != this.traverser)
+            throw new IllegalStateException("This shouldn't happen"); // TODO: 
verify fully and then remove
+        this.traverser = traverser;
     }
 
     @Override

Reply via email to