Repository: spark
Updated Branches:
  refs/heads/branch-1.6 94af69c9b -> d061b8522


[SPARK-12678][CORE] MapPartitionsRDD clearDependencies

MapPartitionsRDD was keeping a reference to `prev` after a call to
`clearDependencies` which could lead to memory leak.

Author: Guillaume Poulin <poulin.guilla...@gmail.com>

Closes #10623 from gpoulin/map_partition_deps.

(cherry picked from commit b6738520374637347ab5ae6c801730cdb6b35daa)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d061b852
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d061b852
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d061b852

Branch: refs/heads/branch-1.6
Commit: d061b852274c12784f3feb96c0cdcab39989f8e7
Parents: 94af69c
Author: Guillaume Poulin <poulin.guilla...@gmail.com>
Authored: Wed Jan 6 21:34:46 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jan 6 21:34:54 2016 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala    | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d061b852/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 4312d3a..e4587c9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -25,7 +25,7 @@ import org.apache.spark.{Partition, TaskContext}
  * An RDD that applies the provided function to every partition of the parent 
RDD.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
-    prev: RDD[T],
+    var prev: RDD[T],
     f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
     preservesPartitioning: Boolean = false)
   extends RDD[U](prev) {
@@ -36,4 +36,9 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: 
ClassTag](
 
   override def compute(split: Partition, context: TaskContext): Iterator[U] =
     f(context, split.index, firstParent[T].iterator(split, context))
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to