This is an automated email from the ASF dual-hosted git repository.

okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/tp4 by this push:
     new 9f7386c  simplified Pipes greatly. Now that Pipes is only for 
in-memory OLTP and no longer tied with GraphComputer and other execution 
models, the next()/hasNext() code of the various steps can be extremely simple. 
This should greatly speed up Pipes in TP4 releative to TP3.
9f7386c is described below

commit 9f7386c0ba1aa1b61b3cfa3dbf08001aac216b4e
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Sat Mar 16 10:02:03 2019 -0600

    simplified Pipes greatly. Now that Pipes is only for in-memory OLTP and no 
longer tied with GraphComputer and other execution models, the next()/hasNext() 
code of the various steps can be extremely simple. This should greatly speed up 
Pipes in TP4 releative to TP3.
---
 .../tinkerpop/machine/pipes/AbstractStep.java      | 28 +------------------
 .../tinkerpop/machine/pipes/BarrierStep.java       |  9 +++---
 .../apache/tinkerpop/machine/pipes/BranchStep.java |  8 ++++--
 .../apache/tinkerpop/machine/pipes/EmptyStep.java  |  5 ++++
 .../apache/tinkerpop/machine/pipes/FilterStep.java |  5 ++--
 .../tinkerpop/machine/pipes/FlatMapStep.java       | 27 +++++++++++-------
 .../tinkerpop/machine/pipes/InitialStep.java       |  7 +++--
 .../apache/tinkerpop/machine/pipes/MapStep.java    | 12 +++++++-
 .../org/apache/tinkerpop/machine/pipes/Pipes.java  | 15 ++++++----
 .../apache/tinkerpop/machine/pipes/ReduceStep.java |  7 ++---
 .../apache/tinkerpop/machine/pipes/RepeatStep.java | 11 ++++++--
 .../pipes/{AbstractStep.java => SourceStep.java}   | 32 +++++++---------------
 .../org/apache/tinkerpop/machine/pipes/Step.java   |  2 --
 .../apache/tinkerpop/machine/pipes/PipesTest.java  |  4 +--
 14 files changed, 82 insertions(+), 90 deletions(-)

diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
index 70c0f9c..3f8b0a3 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
@@ -19,8 +19,6 @@
 package org.apache.tinkerpop.machine.pipes;
 
 import org.apache.tinkerpop.machine.functions.CFunction;
-import org.apache.tinkerpop.machine.traversers.Traverser;
-import org.apache.tinkerpop.machine.traversers.TraverserSet;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -28,37 +26,13 @@ import org.apache.tinkerpop.machine.traversers.TraverserSet;
 public abstract class AbstractStep<C, S, E> implements Step<C, S, E> {
 
     final CFunction<C> function;
-    private final Step<C, ?, S> previousStep;
-    private TraverserSet<C, S> traverserSet = new TraverserSet<>();
+    protected final Step<C, ?, S> previousStep;
 
     public AbstractStep(final Step<C, ?, S> previousStep, final CFunction<C> 
function) {
         this.previousStep = previousStep;
         this.function = function;
     }
 
-    public void addStart(final Traverser<C, S> traverser) {
-        this.traverserSet.add(traverser);
-    }
-
-    @Override
-    public boolean hasNext() {
-        return !this.traverserSet.isEmpty() || this.previousStep.hasNext();
-    }
-
-    @Override
-    public abstract Traverser<C, E> next();
-
-    final Traverser<C, S> getPreviousTraverser() {
-        return this.traverserSet.isEmpty() ?
-                this.previousStep.next() :
-                this.traverserSet.remove();
-    }
-
-    @Override
-    public void reset() {
-        this.traverserSet.clear();
-    }
-
     @Override
     public String toString() {
         return this.function.toString();
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
index b0b7f02..063ac64 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
@@ -19,8 +19,8 @@
 package org.apache.tinkerpop.machine.pipes;
 
 import org.apache.tinkerpop.machine.functions.BarrierFunction;
-import org.apache.tinkerpop.machine.pipes.util.InMemoryBarrier;
 import org.apache.tinkerpop.machine.pipes.util.Barrier;
+import org.apache.tinkerpop.machine.pipes.util.InMemoryBarrier;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 
 import java.util.Collections;
@@ -45,8 +45,8 @@ public class BarrierStep<C, S, E, B> extends AbstractStep<C, 
S, E> {
     @Override
     public Traverser<C, E> next() {
         if (!this.done) {
-            while (super.hasNext()) {
-                
this.barrier.update(this.barrierFunction.apply(super.getPreviousTraverser(), 
this.barrier.get()));
+            while (this.previousStep.hasNext()) {
+                
this.barrier.update(this.barrierFunction.apply(super.previousStep.next(), 
this.barrier.get()));
             }
             this.done = true;
             this.output = (Iterator<E>) 
this.barrierFunction.createIterator(this.barrier.get());
@@ -56,12 +56,11 @@ public class BarrierStep<C, S, E, B> extends 
AbstractStep<C, S, E> {
 
     @Override
     public boolean hasNext() {
-        return this.output.hasNext() || (!this.done && super.hasNext());
+        return this.output.hasNext() || (!this.done && 
this.previousStep.hasNext());
     }
 
     @Override
     public void reset() {
-        super.reset();
         this.barrier.reset();
         this.done = false;
     }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
index a850f5c..8092929 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
@@ -58,8 +58,8 @@ public final class BranchStep<C, S, E, M> extends 
AbstractStep<C, S, E> {
     }
 
     private final void stageOutput() {
-        while (!this.nextTraversers.hasNext() && super.hasNext()) {
-            final Traverser<C, S> traverser = super.getPreviousTraverser();
+        while (!this.nextTraversers.hasNext() && this.previousStep.hasNext()) {
+            final Traverser<C, S> traverser = this.previousStep.next();
             final Optional<M> token = this.branchSelector.from(traverser);
             if (token.isPresent()) {
                 final List<Compilation<C, S, E>> matches = 
this.branches.get(token.get());
@@ -75,4 +75,8 @@ public final class BranchStep<C, S, E, M> extends 
AbstractStep<C, S, E> {
         }
     }
 
+    @Override
+    public void reset() {
+        this.nextTraversers = Collections.emptyIterator();
+    }
 }
\ No newline at end of file
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
index c676cd9..f584c16 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
@@ -45,4 +45,9 @@ public final class EmptyStep<C, S, E> extends AbstractStep<C, 
S, E> {
     static <C, S, E> EmptyStep<C, S, E> instance() {
         return INSTANCE;
     }
+
+    @Override
+    public void reset() {
+
+    }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
index 236d8fd..8f0fc70 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
@@ -55,8 +55,8 @@ public final class FilterStep<C, S> extends AbstractStep<C, 
S, S> {
 
     private void stageNextTraverser() {
         if (null == this.nextTraverser) {
-            while (super.hasNext()) {
-                this.nextTraverser = super.getPreviousTraverser();
+            while (this.previousStep.hasNext()) {
+                this.nextTraverser = this.previousStep.next();
                 if (this.nextTraverser.filter(this.filterFunction))
                     return;
             }
@@ -66,7 +66,6 @@ public final class FilterStep<C, S> extends AbstractStep<C, 
S, S> {
 
     @Override
     public void reset() {
-        super.reset();
         this.nextTraverser = null;
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
index b7ac7a8..31f8c05 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
@@ -39,20 +39,27 @@ public final class FlatMapStep<C, S, E> extends 
AbstractStep<C, S, E> {
 
     @Override
     public boolean hasNext() {
-        while (true) {
-            if (this.iterator.hasNext())
-                return true;
-            else if (super.hasNext())
-                this.iterator = 
super.getPreviousTraverser().flatMap(this.flatMapFunction);
-            else
-                return false;
-        }
+        this.stageNextTraversers();
+        return this.iterator.hasNext();
     }
 
     @Override
     public Traverser<C, E> next() {
-        while (!this.iterator.hasNext())
-            this.iterator = 
super.getPreviousTraverser().flatMap(this.flatMapFunction);
+        this.stageNextTraversers();
         return this.iterator.next();
     }
+
+    private void stageNextTraversers() {
+        while (!this.iterator.hasNext()) {
+            if (this.previousStep.hasNext())
+                this.iterator = 
this.previousStep.next().flatMap(this.flatMapFunction);
+            else
+                return;
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.iterator = Collections.emptyIterator();
+    }
 }
\ No newline at end of file
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
index 94fe43e..98f8c1a 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
@@ -41,17 +41,18 @@ public final class InitialStep<C, S> extends 
AbstractStep<C, S, S> {
 
     @Override
     public boolean hasNext() {
-        return this.objects.hasNext();
+        return this.objects.hasNext() || this.previousStep.hasNext();
     }
 
     @Override
     public Traverser<C, S> next() {
-        return this.traverserFactory.create(this.function.coefficient(), 
this.objects.next());
+        return this.objects.hasNext() ?
+                this.traverserFactory.create(this.function.coefficient(), 
this.objects.next()) :
+                this.previousStep.next();
     }
 
     @Override
     public void reset() {
-        super.reset();
         this.objects = Collections.emptyIterator();
     }
 
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
index 6b27023..777cbb7 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
@@ -34,7 +34,17 @@ public final class MapStep<C, S, E> extends AbstractStep<C, 
S, E> {
     }
 
     @Override
+    public boolean hasNext() {
+        return this.previousStep.hasNext();
+    }
+
+    @Override
     public Traverser<C, E> next() {
-        return super.getPreviousTraverser().map(this.mapFunction);
+        return super.previousStep.next().map(this.mapFunction);
+    }
+
+    @Override
+    public void reset() {
+
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index 8dc493c..2713a43 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -42,12 +42,18 @@ public final class Pipes<C, S, E> implements Processor<C, 
S, E> {
 
     private final List<Step<?, ?, ?>> steps = new ArrayList<>();
     private Step<C, ?, E> endStep;
-    private Step<C, S, ?> startStep = EmptyStep.instance();
+    private SourceStep<C, S> startStep;
 
     public Pipes(final Compilation<C, S, E> compilation) {
-        AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
+        Step<C, ?, ?> previousStep = EmptyStep.instance();
         for (final CFunction<?> function : compilation.getFunctions()) {
-            final AbstractStep nextStep;
+            final Step nextStep;
+            if (this.steps.isEmpty() && !(function instanceof 
InitialFunction)) {
+                this.startStep = new SourceStep<>();
+                this.steps.add(this.startStep);
+                previousStep = this.startStep;
+            }
+
             if (function instanceof RepeatBranch)
                 nextStep = new RepeatStep(previousStep, (RepeatBranch<C, ?>) 
function);
             else if (function instanceof BranchFunction)
@@ -68,9 +74,6 @@ public final class Pipes<C, S, E> implements Processor<C, S, 
E> {
             else
                 throw new RuntimeException("You need a new step type:" + 
function);
 
-            if (EmptyStep.instance() == this.startStep)
-                this.startStep = nextStep;
-
             this.steps.add(nextStep);
             previousStep = nextStep;
         }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
index 32756c7..aaba0d9 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
@@ -45,8 +45,8 @@ public final class ReduceStep<C, S, E> extends 
AbstractStep<C, S, E> {
 
     @Override
     public Traverser<C, E> next() {
-        while (super.hasNext()) {
-            this.reducer.add(super.getPreviousTraverser());
+        while (this.previousStep.hasNext()) {
+            this.reducer.add(super.previousStep.next());
         }
         this.done = true;
         return this.traverserFactory.create(this.reduceFunction.coefficient(), 
this.reducer.get());
@@ -54,12 +54,11 @@ public final class ReduceStep<C, S, E> extends 
AbstractStep<C, S, E> {
 
     @Override
     public boolean hasNext() {
-        return !this.done && super.hasNext();
+        return !this.done && this.previousStep.hasNext();
     }
 
     @Override
     public void reset() {
-        super.reset();
         this.reducer.reset();
         this.done = false;
     }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
index 8f9cd5e..60f128d 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
@@ -23,6 +23,7 @@ import 
org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.util.IteratorUtils;
 
+import java.util.Collections;
 import java.util.Iterator;
 
 /**
@@ -32,7 +33,7 @@ public final class RepeatStep<C, S> extends AbstractStep<C, 
S, S> {
 
     private final Compilation<C, S, ?> until;
     private final Compilation<C, S, S> repeat;
-    private final Iterator<Traverser<C, S>> nextTraversers;
+    private Iterator<Traverser<C, S>> nextTraversers;
 
     RepeatStep(final Step<C, ?, S> previousStep, final RepeatBranch<C, S> 
repeatFunction) {
         super(previousStep, repeatFunction);
@@ -60,11 +61,15 @@ public final class RepeatStep<C, S> extends AbstractStep<C, 
S, S> {
     }
 
     private final void stageOutput() {
-        while (!this.nextTraversers.hasNext() && super.hasNext()) {
-            this.repeat.addTraverser(super.getPreviousTraverser());
+        while (!this.nextTraversers.hasNext() && this.previousStep.hasNext()) {
+            this.repeat.addTraverser(super.previousStep.next());
         }
 
     }
 
+    @Override
+    public void reset() {
+        this.nextTraversers = Collections.emptyIterator();
+    }
 }
 
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
similarity index 63%
copy from 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
copy to 
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
index 70c0f9c..857ea69 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
@@ -18,40 +18,24 @@
  */
 package org.apache.tinkerpop.machine.pipes;
 
-import org.apache.tinkerpop.machine.functions.CFunction;
 import org.apache.tinkerpop.machine.traversers.Traverser;
 import org.apache.tinkerpop.machine.traversers.TraverserSet;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public abstract class AbstractStep<C, S, E> implements Step<C, S, E> {
+public final class SourceStep<C, S> implements Step<C, S, S> {
 
-    final CFunction<C> function;
-    private final Step<C, ?, S> previousStep;
-    private TraverserSet<C, S> traverserSet = new TraverserSet<>();
-
-    public AbstractStep(final Step<C, ?, S> previousStep, final CFunction<C> 
function) {
-        this.previousStep = previousStep;
-        this.function = function;
-    }
-
-    public void addStart(final Traverser<C, S> traverser) {
-        this.traverserSet.add(traverser);
-    }
+    private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
 
     @Override
     public boolean hasNext() {
-        return !this.traverserSet.isEmpty() || this.previousStep.hasNext();
+        return !this.traverserSet.isEmpty();
     }
 
     @Override
-    public abstract Traverser<C, E> next();
-
-    final Traverser<C, S> getPreviousTraverser() {
-        return this.traverserSet.isEmpty() ?
-                this.previousStep.next() :
-                this.traverserSet.remove();
+    public Traverser<C, S> next() {
+        return this.traverserSet.remove();
     }
 
     @Override
@@ -59,8 +43,12 @@ public abstract class AbstractStep<C, S, E> implements 
Step<C, S, E> {
         this.traverserSet.clear();
     }
 
+    public void addStart(final Traverser<C, S> traverser) {
+        this.traverserSet.add(traverser);
+    }
+
     @Override
     public String toString() {
-        return this.function.toString();
+        return "Source";
     }
 }
diff --git 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java
index ce801a5..3af812c 100644
--- 
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java
+++ 
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java
@@ -27,7 +27,5 @@ import java.util.Iterator;
  */
 public interface Step<C, S, E> extends Iterator<Traverser<C, E>> {
 
-    public void addStart(final Traverser<C, S> traverser);
-
     public void reset();
 }
diff --git 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index b07f183..5ec1f8b 100644
--- 
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++ 
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -44,7 +44,7 @@ public class PipesTest {
                 .withProcessor(PipesProcessor.class)
                 .withStrategy(IdentityStrategy.class);
 
-        Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 
1L)).<Long>unfold().repeat(incr()).until(is(10L)).sum();
+        Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L, 
1L)).<Long>unfold().map(incr()).repeat(incr()).until(is(10L)).sum();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.toList());
@@ -59,7 +59,7 @@ public class PipesTest {
         System.out.println(traversal);
         System.out.println(traversal.toList());
         System.out.println("\n----------\n");
-        traversal = g.inject(7L,7L,7L,2L).incr().barrier();
+        traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier();
         System.out.println(TraversalUtil.getBytecode(traversal));
         System.out.println(traversal);
         System.out.println(traversal.nextTraverser());

Reply via email to