Repository: spark Updated Branches: refs/heads/branch-1.6 94af69c9b -> d061b8522
[SPARK-12678][CORE] MapPartitionsRDD clearDependencies MapPartitionsRDD was keeping a reference to `prev` after a call to `clearDependencies` which could lead to memory leak. Author: Guillaume Poulin <poulin.guilla...@gmail.com> Closes #10623 from gpoulin/map_partition_deps. (cherry picked from commit b6738520374637347ab5ae6c801730cdb6b35daa) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d061b852 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d061b852 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d061b852 Branch: refs/heads/branch-1.6 Commit: d061b852274c12784f3feb96c0cdcab39989f8e7 Parents: 94af69c Author: Guillaume Poulin <poulin.guilla...@gmail.com> Authored: Wed Jan 6 21:34:46 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Jan 6 21:34:54 2016 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d061b852/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index 4312d3a..e4587c9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -25,7 +25,7 @@ import org.apache.spark.{Partition, TaskContext} * An RDD that applies the provided function to every partition of the parent RDD. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( - prev: RDD[T], + var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { @@ -36,4 +36,9 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) + + override def clearDependencies() { + super.clearDependencies() + prev = null + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org