Repository: spark
Updated Branches:
  refs/heads/master 23ea89808 -> 7880909c4


[SPARK-21743][SQL][FOLLOW-UP] top-most limit should not cause memory leak

## What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/18955 , to fix a 
bug that we break whole stage codegen for `Limit`.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #18993 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7880909c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7880909c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7880909c

Branch: refs/heads/master
Commit: 7880909c45916ab76dccac308a9b2c5225a00e89
Parents: 23ea898
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Aug 18 11:19:22 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Fri Aug 18 11:19:22 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  5 ++-
 .../spark/sql/execution/SparkStrategies.scala   | 37 +++++++++-----------
 .../org/apache/spark/sql/execution/limit.scala  |  8 -----
 3 files changed, 20 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7880909c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a51b385..e2d7164 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1171,7 +1171,7 @@ object DecimalAggregates extends Rule[LogicalPlan] {
  * Converts local operations (i.e. ones that don't require data exchange) on 
LocalRelation to
  * another LocalRelation.
  *
- * This is relatively simple as it currently handles only a single case: 
Project.
+ * This is relatively simple as it currently handles only 2 single case: 
Project and Limit.
  */
 object ConvertToLocalRelation extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -1180,6 +1180,9 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
       val projection = new InterpretedProjection(projectList, output)
       projection.initialize(0)
       LocalRelation(projectList.map(_.toAttribute), data.map(projection))
+
+    case Limit(IntegerLiteral(limit), LocalRelation(output, data)) =>
+      LocalRelation(output, data.take(limit))
   }
 
   private def hasUnevaluableExpr(expr: Expression): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7880909c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2e8ce45..c115cb6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -63,29 +63,24 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
    */
   object SpecialLimits extends Strategy {
     override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.ReturnAnswer(rootPlan) => rootPlan match {
-        case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, 
child)) =>
-          execution.TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-        case logical.Limit(
-            IntegerLiteral(limit),
-            logical.Project(projectList, logical.Sort(order, true, child))) =>
-          execution.TakeOrderedAndProjectExec(
-            limit, order, projectList, planLater(child)) :: Nil
-        case logical.Limit(IntegerLiteral(limit), child) =>
-          // Normally wrapping child with `LocalLimitExec` here is a no-op, 
because
-          // `CollectLimitExec.executeCollect` will call 
`LocalLimitExec.executeTake`, which
-          // calls `child.executeTake`. If child supports whole stage codegen, 
adding this
-          // `LocalLimitExec` can stop the processing of whole stage codegen 
and trigger the
-          // resource releasing work, after we consume `limit` rows.
-          execution.CollectLimitExec(limit, LocalLimitExec(limit, 
planLater(child))) :: Nil
+      case ReturnAnswer(rootPlan) => rootPlan match {
+        case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+          TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+        case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child))) =>
+          TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+        case Limit(IntegerLiteral(limit), child) =>
+          // With whole stage codegen, Spark releases resources only when all 
the output data of the
+          // query plan are consumed. It's possible that `CollectLimitExec` 
only consumes a little
+          // data from child plan and finishes the query without releasing 
resources. Here we wrap
+          // the child plan with `LocalLimitExec`, to stop the processing of 
whole stage codegen and
+          // trigger the resource releasing work, after we consume `limit` 
rows.
+          CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: 
Nil
         case other => planLater(other) :: Nil
       }
-      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, 
child)) =>
-        execution.TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-      case logical.Limit(
-          IntegerLiteral(limit), logical.Project(projectList, 
logical.Sort(order, true, child))) =>
-        execution.TakeOrderedAndProjectExec(
-          limit, order, projectList, planLater(child)) :: Nil
+      case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+        TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
+      case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, 
child))) =>
+        TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) 
:: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7880909c/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 7cef556..73a0f87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -54,14 +54,6 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
   val limit: Int
   override def output: Seq[Attribute] = child.output
 
-  // Do not enable whole stage codegen for a single limit.
-  override def supportCodegen: Boolean = child match {
-    case plan: CodegenSupport => plan.supportCodegen
-    case _ => false
-  }
-
-  override def executeTake(n: Int): Array[InternalRow] = 
child.executeTake(math.min(n, limit))
-
   protected override def doExecute(): RDD[InternalRow] = 
child.execute().mapPartitions { iter =>
     iter.take(limit)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to