Avoid starting VP worker iterations that never end

SparkExecutor.executeVertexProgramIteration was written in such a way
that an empty RDD partition would cause it to invoke
VertexProgram.workerIterationStart without ever invoking
VertexProgram.workerIterationEnd.  This seems like a contract
violation.  I have at least one VP that relies on
workerIterationStart|End to allocate and release resources.  Failing
to invoke End like this causes a leak in that VP, as it would for any
VP that uses that resource management pattern.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/36e1159a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/36e1159a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/36e1159a

Branch: refs/heads/TINKERPOP-1490
Commit: 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9
Parents: b262c7e
Author: Dan LaRocque <dal...@hopcount.org>
Authored: Tue Oct 25 19:37:17 2016 -0500
Committer: Dan LaRocque <dal...@hopcount.org>
Committed: Tue Oct 25 20:37:17 2016 -0400

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/SparkExecutor.java          | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/36e1159a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 8dd2381..6e65e26 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -91,9 +91,15 @@ public final class SparkExecutor {
                 // for each partition of vertices emit a view and their 
outgoing messages
                 .mapPartitionsToPair(partitionIterator -> {
                     
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+
+                    // if the partition is empty, return without starting a 
new VP iteration
+                    if (!partitionIterator.hasNext())
+                        return Collections.emptyList();
+
                     final VertexProgram<M> workerVertexProgram = 
VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), 
vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local 
copy of the vertex program (a worker's task)
                     final String[] vertexComputeKeysArray = 
VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys());
 // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
+
                     
workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the 
worker
                     return () -> IteratorUtils.map(partitionIterator, 
vertexViewIncoming -> {
                         final StarGraph.StarVertex vertex = 
vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable

Reply via email to