Avoid starting VP worker iterations that never end SparkExecutor.executeVertexProgramIteration was written in such a way that an empty RDD partition would cause it to invoke VertexProgram.workerIterationStart without ever invoking VertexProgram.workerIterationEnd. This seems like a contract violation. I have at least one VP that relies on workerIterationStart|End to allocate and release resources. Failing to invoke End like this causes a leak in that VP, as it would for any VP that uses that resource management pattern.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/36e1159a Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/36e1159a Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/36e1159a Branch: refs/heads/TINKERPOP-1490 Commit: 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9 Parents: b262c7e Author: Dan LaRocque <dal...@hopcount.org> Authored: Tue Oct 25 19:37:17 2016 -0500 Committer: Dan LaRocque <dal...@hopcount.org> Committed: Tue Oct 25 20:37:17 2016 -0400 ---------------------------------------------------------------------- .../gremlin/spark/process/computer/SparkExecutor.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/36e1159a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index 8dd2381..6e65e26 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -91,9 +91,15 @@ public final class SparkExecutor { // for each partition of vertices emit a view and their outgoing messages .mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); + + // if the partition is empty, return without starting a new VP iteration + if (!partitionIterator.hasNext()) + return Collections.emptyList(); + final VertexProgram<M> workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task) final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array final SparkMessenger<M> messenger = new SparkMessenger<>(); + workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> { final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable