Avoid starting VP worker iterations that never end SparkExecutor.executeVertexProgramIteration was written in such a way that an empty RDD partition would cause it to invoke VertexProgram.workerIterationStart without ever invoking VertexProgram.workerIterationEnd. This seems like a contract violation. I have at least one VP that relies on workerIterationStart|End to allocate and release resources. Failing to invoke End like this causes a leak in that VP, as it would for any VP that uses that resource management pattern.
(cherry picked from commit 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9; this is the same change, except it tracks a switch between Spark 1.6 and 2.0 away from functions that manipulate iterables to those that manipulate iterators) Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f2606364 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f2606364 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f2606364 Branch: refs/heads/master Commit: f2606364af3a1bf3b9815b2499a1181082ea414f Parents: 3919875 Author: Dan LaRocque <[email protected]> Authored: Tue Oct 25 19:37:17 2016 -0500 Committer: Marko A. Rodriguez <[email protected]> Committed: Tue Nov 29 04:57:14 2016 -0700 ---------------------------------------------------------------------- .../tinkerpop/gremlin/spark/process/computer/SparkExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f2606364/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index b8dfdd3..e9372d0 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -94,7 +94,7 @@ public final class SparkExecutor { // if the partition is empty, return without starting a new VP iteration if (!partitionIterator.hasNext()) - return Collections.emptyList(); + return Collections.emptyIterator(); final VertexProgram<M> workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task) final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
