Repository: spark Updated Branches: refs/heads/branch-1.4 9dcf4f78f -> 86f141c90
[SPARK-7116] [SQL] [PYSPARK] Remove cache() causing memory leak This patch simply removes a `cache()` on an intermediate RDD when evaluating Python UDFs. Author: ksonj <k...@siberie.de> Closes #5973 from ksonj/udf and squashes the following commits: db5b564 [ksonj] removed TODO about cleaning up fe70c54 [ksonj] Remove cache() causing memory leak (cherry picked from commit dec8f53719597119034dffbe43b2a9e5fd963083) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86f141c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86f141c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86f141c9 Branch: refs/heads/branch-1.4 Commit: 86f141c90af0f2186fdae89513fdb1264af5fb40 Parents: 9dcf4f7 Author: ksonj <k...@siberie.de> Authored: Thu May 7 12:04:19 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Thu May 7 12:05:03 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/execution/pythonUdfs.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/86f141c9/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 7a43bfd..58cb198 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -219,8 +219,8 @@ case class EvaluatePython( /** * :: DeveloperApi :: - * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. The input - * data is cached and zipped with the result of the udf evaluation. + * Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. + * The input data is zipped with the result of the udf evaluation. */ @DeveloperApi case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan) @@ -229,8 +229,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: def children: Seq[SparkPlan] = child :: Nil def execute(): RDD[Row] = { - // TODO: Clean up after ourselves? - val childResults = child.execute().map(_.copy()).cache() + val childResults = child.execute().map(_.copy()) val parent = childResults.mapPartitions { iter => val pickle = new Pickler --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org