Repository: spark
Updated Branches:
  refs/heads/branch-1.0 70109da21 -> 797c663ae


[SPARK-2529] Clean closures in foreach and foreachPartition.

Author: Reynold Xin <r...@apache.org>

Closes #1583 from rxin/closureClean and squashes the following commits:

8982fe6 [Reynold Xin] [SPARK-2529] Clean closures in foreach and 
foreachPartition.

(cherry picked from commit eb82abd8e3d25c912fa75201cf4f429aab8d73c7)
Signed-off-by: Reynold Xin <r...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/797c663a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/797c663a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/797c663a

Branch: refs/heads/branch-1.0
Commit: 797c663ae715820ffac5880ecb098a8d3b56fb20
Parents: 70109da
Author: Reynold Xin <r...@apache.org>
Authored: Fri Jul 25 01:10:05 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Fri Jul 25 01:10:16 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/797c663a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index da2dc58..9439618 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -712,14 +712,16 @@ abstract class RDD[T: ClassTag](
    * Applies a function f to all elements of this RDD.
    */
   def foreach(f: T => Unit) {
-    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
   }
 
   /**
    * Applies a function f to each partition of this RDD.
    */
   def foreachPartition(f: Iterator[T] => Unit) {
-    sc.runJob(this, (iter: Iterator[T]) => f(iter))
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
   }
 
   /**

Reply via email to