This is an automated email from the ASF dual-hosted git repository.
okram pushed a commit to branch tp4
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/tp4 by this push:
new 9f7386c simplified Pipes greatly. Now that Pipes is only for
in-memory OLTP and no longer tied with GraphComputer and other execution
models, the next()/hasNext() code of the various steps can be extremely simple.
This should greatly speed up Pipes in TP4 releative to TP3.
9f7386c is described below
commit 9f7386c0ba1aa1b61b3cfa3dbf08001aac216b4e
Author: Marko A. Rodriguez <[email protected]>
AuthorDate: Sat Mar 16 10:02:03 2019 -0600
simplified Pipes greatly. Now that Pipes is only for in-memory OLTP and no
longer tied with GraphComputer and other execution models, the next()/hasNext()
code of the various steps can be extremely simple. This should greatly speed up
Pipes in TP4 releative to TP3.
---
.../tinkerpop/machine/pipes/AbstractStep.java | 28 +------------------
.../tinkerpop/machine/pipes/BarrierStep.java | 9 +++---
.../apache/tinkerpop/machine/pipes/BranchStep.java | 8 ++++--
.../apache/tinkerpop/machine/pipes/EmptyStep.java | 5 ++++
.../apache/tinkerpop/machine/pipes/FilterStep.java | 5 ++--
.../tinkerpop/machine/pipes/FlatMapStep.java | 27 +++++++++++-------
.../tinkerpop/machine/pipes/InitialStep.java | 7 +++--
.../apache/tinkerpop/machine/pipes/MapStep.java | 12 +++++++-
.../org/apache/tinkerpop/machine/pipes/Pipes.java | 15 ++++++----
.../apache/tinkerpop/machine/pipes/ReduceStep.java | 7 ++---
.../apache/tinkerpop/machine/pipes/RepeatStep.java | 11 ++++++--
.../pipes/{AbstractStep.java => SourceStep.java} | 32 +++++++---------------
.../org/apache/tinkerpop/machine/pipes/Step.java | 2 --
.../apache/tinkerpop/machine/pipes/PipesTest.java | 4 +--
14 files changed, 82 insertions(+), 90 deletions(-)
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
index 70c0f9c..3f8b0a3 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
@@ -19,8 +19,6 @@
package org.apache.tinkerpop.machine.pipes;
import org.apache.tinkerpop.machine.functions.CFunction;
-import org.apache.tinkerpop.machine.traversers.Traverser;
-import org.apache.tinkerpop.machine.traversers.TraverserSet;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -28,37 +26,13 @@ import org.apache.tinkerpop.machine.traversers.TraverserSet;
public abstract class AbstractStep<C, S, E> implements Step<C, S, E> {
final CFunction<C> function;
- private final Step<C, ?, S> previousStep;
- private TraverserSet<C, S> traverserSet = new TraverserSet<>();
+ protected final Step<C, ?, S> previousStep;
public AbstractStep(final Step<C, ?, S> previousStep, final CFunction<C>
function) {
this.previousStep = previousStep;
this.function = function;
}
- public void addStart(final Traverser<C, S> traverser) {
- this.traverserSet.add(traverser);
- }
-
- @Override
- public boolean hasNext() {
- return !this.traverserSet.isEmpty() || this.previousStep.hasNext();
- }
-
- @Override
- public abstract Traverser<C, E> next();
-
- final Traverser<C, S> getPreviousTraverser() {
- return this.traverserSet.isEmpty() ?
- this.previousStep.next() :
- this.traverserSet.remove();
- }
-
- @Override
- public void reset() {
- this.traverserSet.clear();
- }
-
@Override
public String toString() {
return this.function.toString();
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
index b0b7f02..063ac64 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BarrierStep.java
@@ -19,8 +19,8 @@
package org.apache.tinkerpop.machine.pipes;
import org.apache.tinkerpop.machine.functions.BarrierFunction;
-import org.apache.tinkerpop.machine.pipes.util.InMemoryBarrier;
import org.apache.tinkerpop.machine.pipes.util.Barrier;
+import org.apache.tinkerpop.machine.pipes.util.InMemoryBarrier;
import org.apache.tinkerpop.machine.traversers.Traverser;
import java.util.Collections;
@@ -45,8 +45,8 @@ public class BarrierStep<C, S, E, B> extends AbstractStep<C,
S, E> {
@Override
public Traverser<C, E> next() {
if (!this.done) {
- while (super.hasNext()) {
-
this.barrier.update(this.barrierFunction.apply(super.getPreviousTraverser(),
this.barrier.get()));
+ while (this.previousStep.hasNext()) {
+
this.barrier.update(this.barrierFunction.apply(super.previousStep.next(),
this.barrier.get()));
}
this.done = true;
this.output = (Iterator<E>)
this.barrierFunction.createIterator(this.barrier.get());
@@ -56,12 +56,11 @@ public class BarrierStep<C, S, E, B> extends
AbstractStep<C, S, E> {
@Override
public boolean hasNext() {
- return this.output.hasNext() || (!this.done && super.hasNext());
+ return this.output.hasNext() || (!this.done &&
this.previousStep.hasNext());
}
@Override
public void reset() {
- super.reset();
this.barrier.reset();
this.done = false;
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
index a850f5c..8092929 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/BranchStep.java
@@ -58,8 +58,8 @@ public final class BranchStep<C, S, E, M> extends
AbstractStep<C, S, E> {
}
private final void stageOutput() {
- while (!this.nextTraversers.hasNext() && super.hasNext()) {
- final Traverser<C, S> traverser = super.getPreviousTraverser();
+ while (!this.nextTraversers.hasNext() && this.previousStep.hasNext()) {
+ final Traverser<C, S> traverser = this.previousStep.next();
final Optional<M> token = this.branchSelector.from(traverser);
if (token.isPresent()) {
final List<Compilation<C, S, E>> matches =
this.branches.get(token.get());
@@ -75,4 +75,8 @@ public final class BranchStep<C, S, E, M> extends
AbstractStep<C, S, E> {
}
}
+ @Override
+ public void reset() {
+ this.nextTraversers = Collections.emptyIterator();
+ }
}
\ No newline at end of file
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
index c676cd9..f584c16 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/EmptyStep.java
@@ -45,4 +45,9 @@ public final class EmptyStep<C, S, E> extends AbstractStep<C,
S, E> {
static <C, S, E> EmptyStep<C, S, E> instance() {
return INSTANCE;
}
+
+ @Override
+ public void reset() {
+
+ }
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
index 236d8fd..8f0fc70 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FilterStep.java
@@ -55,8 +55,8 @@ public final class FilterStep<C, S> extends AbstractStep<C,
S, S> {
private void stageNextTraverser() {
if (null == this.nextTraverser) {
- while (super.hasNext()) {
- this.nextTraverser = super.getPreviousTraverser();
+ while (this.previousStep.hasNext()) {
+ this.nextTraverser = this.previousStep.next();
if (this.nextTraverser.filter(this.filterFunction))
return;
}
@@ -66,7 +66,6 @@ public final class FilterStep<C, S> extends AbstractStep<C,
S, S> {
@Override
public void reset() {
- super.reset();
this.nextTraverser = null;
}
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
index b7ac7a8..31f8c05 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/FlatMapStep.java
@@ -39,20 +39,27 @@ public final class FlatMapStep<C, S, E> extends
AbstractStep<C, S, E> {
@Override
public boolean hasNext() {
- while (true) {
- if (this.iterator.hasNext())
- return true;
- else if (super.hasNext())
- this.iterator =
super.getPreviousTraverser().flatMap(this.flatMapFunction);
- else
- return false;
- }
+ this.stageNextTraversers();
+ return this.iterator.hasNext();
}
@Override
public Traverser<C, E> next() {
- while (!this.iterator.hasNext())
- this.iterator =
super.getPreviousTraverser().flatMap(this.flatMapFunction);
+ this.stageNextTraversers();
return this.iterator.next();
}
+
+ private void stageNextTraversers() {
+ while (!this.iterator.hasNext()) {
+ if (this.previousStep.hasNext())
+ this.iterator =
this.previousStep.next().flatMap(this.flatMapFunction);
+ else
+ return;
+ }
+ }
+
+ @Override
+ public void reset() {
+ this.iterator = Collections.emptyIterator();
+ }
}
\ No newline at end of file
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
index 94fe43e..98f8c1a 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/InitialStep.java
@@ -41,17 +41,18 @@ public final class InitialStep<C, S> extends
AbstractStep<C, S, S> {
@Override
public boolean hasNext() {
- return this.objects.hasNext();
+ return this.objects.hasNext() || this.previousStep.hasNext();
}
@Override
public Traverser<C, S> next() {
- return this.traverserFactory.create(this.function.coefficient(),
this.objects.next());
+ return this.objects.hasNext() ?
+ this.traverserFactory.create(this.function.coefficient(),
this.objects.next()) :
+ this.previousStep.next();
}
@Override
public void reset() {
- super.reset();
this.objects = Collections.emptyIterator();
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
index 6b27023..777cbb7 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/MapStep.java
@@ -34,7 +34,17 @@ public final class MapStep<C, S, E> extends AbstractStep<C,
S, E> {
}
@Override
+ public boolean hasNext() {
+ return this.previousStep.hasNext();
+ }
+
+ @Override
public Traverser<C, E> next() {
- return super.getPreviousTraverser().map(this.mapFunction);
+ return super.previousStep.next().map(this.mapFunction);
+ }
+
+ @Override
+ public void reset() {
+
}
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
index 8dc493c..2713a43 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Pipes.java
@@ -42,12 +42,18 @@ public final class Pipes<C, S, E> implements Processor<C,
S, E> {
private final List<Step<?, ?, ?>> steps = new ArrayList<>();
private Step<C, ?, E> endStep;
- private Step<C, S, ?> startStep = EmptyStep.instance();
+ private SourceStep<C, S> startStep;
public Pipes(final Compilation<C, S, E> compilation) {
- AbstractStep<C, ?, ?> previousStep = EmptyStep.instance();
+ Step<C, ?, ?> previousStep = EmptyStep.instance();
for (final CFunction<?> function : compilation.getFunctions()) {
- final AbstractStep nextStep;
+ final Step nextStep;
+ if (this.steps.isEmpty() && !(function instanceof
InitialFunction)) {
+ this.startStep = new SourceStep<>();
+ this.steps.add(this.startStep);
+ previousStep = this.startStep;
+ }
+
if (function instanceof RepeatBranch)
nextStep = new RepeatStep(previousStep, (RepeatBranch<C, ?>)
function);
else if (function instanceof BranchFunction)
@@ -68,9 +74,6 @@ public final class Pipes<C, S, E> implements Processor<C, S,
E> {
else
throw new RuntimeException("You need a new step type:" +
function);
- if (EmptyStep.instance() == this.startStep)
- this.startStep = nextStep;
-
this.steps.add(nextStep);
previousStep = nextStep;
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
index 32756c7..aaba0d9 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/ReduceStep.java
@@ -45,8 +45,8 @@ public final class ReduceStep<C, S, E> extends
AbstractStep<C, S, E> {
@Override
public Traverser<C, E> next() {
- while (super.hasNext()) {
- this.reducer.add(super.getPreviousTraverser());
+ while (this.previousStep.hasNext()) {
+ this.reducer.add(super.previousStep.next());
}
this.done = true;
return this.traverserFactory.create(this.reduceFunction.coefficient(),
this.reducer.get());
@@ -54,12 +54,11 @@ public final class ReduceStep<C, S, E> extends
AbstractStep<C, S, E> {
@Override
public boolean hasNext() {
- return !this.done && super.hasNext();
+ return !this.done && this.previousStep.hasNext();
}
@Override
public void reset() {
- super.reset();
this.reducer.reset();
this.done = false;
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
index 8f9cd5e..60f128d 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/RepeatStep.java
@@ -23,6 +23,7 @@ import
org.apache.tinkerpop.machine.functions.branch.RepeatBranch;
import org.apache.tinkerpop.machine.traversers.Traverser;
import org.apache.tinkerpop.util.IteratorUtils;
+import java.util.Collections;
import java.util.Iterator;
/**
@@ -32,7 +33,7 @@ public final class RepeatStep<C, S> extends AbstractStep<C,
S, S> {
private final Compilation<C, S, ?> until;
private final Compilation<C, S, S> repeat;
- private final Iterator<Traverser<C, S>> nextTraversers;
+ private Iterator<Traverser<C, S>> nextTraversers;
RepeatStep(final Step<C, ?, S> previousStep, final RepeatBranch<C, S>
repeatFunction) {
super(previousStep, repeatFunction);
@@ -60,11 +61,15 @@ public final class RepeatStep<C, S> extends AbstractStep<C,
S, S> {
}
private final void stageOutput() {
- while (!this.nextTraversers.hasNext() && super.hasNext()) {
- this.repeat.addTraverser(super.getPreviousTraverser());
+ while (!this.nextTraversers.hasNext() && this.previousStep.hasNext()) {
+ this.repeat.addTraverser(super.previousStep.next());
}
}
+ @Override
+ public void reset() {
+ this.nextTraversers = Collections.emptyIterator();
+ }
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
similarity index 63%
copy from
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
copy to
java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
index 70c0f9c..857ea69 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/AbstractStep.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/SourceStep.java
@@ -18,40 +18,24 @@
*/
package org.apache.tinkerpop.machine.pipes;
-import org.apache.tinkerpop.machine.functions.CFunction;
import org.apache.tinkerpop.machine.traversers.Traverser;
import org.apache.tinkerpop.machine.traversers.TraverserSet;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public abstract class AbstractStep<C, S, E> implements Step<C, S, E> {
+public final class SourceStep<C, S> implements Step<C, S, S> {
- final CFunction<C> function;
- private final Step<C, ?, S> previousStep;
- private TraverserSet<C, S> traverserSet = new TraverserSet<>();
-
- public AbstractStep(final Step<C, ?, S> previousStep, final CFunction<C>
function) {
- this.previousStep = previousStep;
- this.function = function;
- }
-
- public void addStart(final Traverser<C, S> traverser) {
- this.traverserSet.add(traverser);
- }
+ private final TraverserSet<C, S> traverserSet = new TraverserSet<>();
@Override
public boolean hasNext() {
- return !this.traverserSet.isEmpty() || this.previousStep.hasNext();
+ return !this.traverserSet.isEmpty();
}
@Override
- public abstract Traverser<C, E> next();
-
- final Traverser<C, S> getPreviousTraverser() {
- return this.traverserSet.isEmpty() ?
- this.previousStep.next() :
- this.traverserSet.remove();
+ public Traverser<C, S> next() {
+ return this.traverserSet.remove();
}
@Override
@@ -59,8 +43,12 @@ public abstract class AbstractStep<C, S, E> implements
Step<C, S, E> {
this.traverserSet.clear();
}
+ public void addStart(final Traverser<C, S> traverser) {
+ this.traverserSet.add(traverser);
+ }
+
@Override
public String toString() {
- return this.function.toString();
+ return "Source";
}
}
diff --git
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java
index ce801a5..3af812c 100644
---
a/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java
+++
b/java/machine/pipes/src/main/java/org/apache/tinkerpop/machine/pipes/Step.java
@@ -27,7 +27,5 @@ import java.util.Iterator;
*/
public interface Step<C, S, E> extends Iterator<Traverser<C, E>> {
- public void addStart(final Traverser<C, S> traverser);
-
public void reset();
}
diff --git
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
index b07f183..5ec1f8b 100644
---
a/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
+++
b/java/machine/pipes/src/test/java/org/apache/tinkerpop/machine/pipes/PipesTest.java
@@ -44,7 +44,7 @@ public class PipesTest {
.withProcessor(PipesProcessor.class)
.withStrategy(IdentityStrategy.class);
- Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L,
1L)).<Long>unfold().repeat(incr()).until(is(10L)).sum();
+ Traversal<Long, ?, ?> traversal = g.inject(Arrays.asList(1L,
1L)).<Long>unfold().map(incr()).repeat(incr()).until(is(10L)).sum();
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal);
System.out.println(traversal.toList());
@@ -59,7 +59,7 @@ public class PipesTest {
System.out.println(traversal);
System.out.println(traversal.toList());
System.out.println("\n----------\n");
- traversal = g.inject(7L,7L,7L,2L).incr().barrier();
+ traversal = g.inject(7L, 7L, 7L, 2L).incr().barrier();
System.out.println(TraversalUtil.getBytecode(traversal));
System.out.println(traversal);
System.out.println(traversal.nextTraverser());