Avoid starting VP worker iterations that never end

SparkExecutor.executeVertexProgramIteration was written in such a way
that an empty RDD partition would cause it to invoke
VertexProgram.workerIterationStart without ever invoking
VertexProgram.workerIterationEnd.  This seems like a contract
violation.  I have at least one VP that relies on
workerIterationStart|End to allocate and release resources.  Failing
to invoke End like this causes a leak in that VP, as it would for any
VP that uses that resource management pattern.

(cherry picked from commit 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9;
 this is the same change, except it tracks a switch between Spark 1.6
 and 2.0 away from functions that manipulate iterables to those that
 manipulate iterators)


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f2606364
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f2606364
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f2606364

Branch: refs/heads/master
Commit: f2606364af3a1bf3b9815b2499a1181082ea414f
Parents: 3919875
Author: Dan LaRocque <[email protected]>
Authored: Tue Oct 25 19:37:17 2016 -0500
Committer: Marko A. Rodriguez <[email protected]>
Committed: Tue Nov 29 04:57:14 2016 -0700

----------------------------------------------------------------------
 .../tinkerpop/gremlin/spark/process/computer/SparkExecutor.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2606364/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index b8dfdd3..e9372d0 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -94,7 +94,7 @@ public final class SparkExecutor {
 
                     // if the partition is empty, return without starting a 
new VP iteration
                     if (!partitionIterator.hasNext())
-                        return Collections.emptyList();
+                        return Collections.emptyIterator();
 
                     final VertexProgram<M> workerVertexProgram = 
VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), 
vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local 
copy of the vertex program (a worker's task)
                     final String[] vertexComputeKeysArray = 
VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys());
 // the compute keys as an array

Reply via email to