Repository: spark Updated Branches: refs/heads/master 23ea89808 -> 7880909c4
[SPARK-21743][SQL][FOLLOW-UP] top-most limit should not cause memory leak ## What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/18955 , to fix a bug that we break whole stage codegen for `Limit`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenc...@databricks.com> Closes #18993 from cloud-fan/bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7880909c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7880909c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7880909c Branch: refs/heads/master Commit: 7880909c45916ab76dccac308a9b2c5225a00e89 Parents: 23ea898 Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri Aug 18 11:19:22 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Fri Aug 18 11:19:22 2017 -0700 ---------------------------------------------------------------------- .../sql/catalyst/optimizer/Optimizer.scala | 5 ++- .../spark/sql/execution/SparkStrategies.scala | 37 +++++++++----------- .../org/apache/spark/sql/execution/limit.scala | 8 ----- 3 files changed, 20 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7880909c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a51b385..e2d7164 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1171,7 +1171,7 @@ object DecimalAggregates extends Rule[LogicalPlan] { * Converts local operations (i.e. ones that don't require data exchange) on LocalRelation to * another LocalRelation. * - * This is relatively simple as it currently handles only a single case: Project. + * This is relatively simple as it currently handles only 2 single case: Project and Limit. */ object ConvertToLocalRelation extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -1180,6 +1180,9 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { val projection = new InterpretedProjection(projectList, output) projection.initialize(0) LocalRelation(projectList.map(_.toAttribute), data.map(projection)) + + case Limit(IntegerLiteral(limit), LocalRelation(output, data)) => + LocalRelation(output, data.take(limit)) } private def hasUnevaluableExpr(expr: Expression): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/7880909c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2e8ce45..c115cb6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -63,29 +63,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.ReturnAnswer(rootPlan) => rootPlan match { - case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => - execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil - case logical.Limit( - IntegerLiteral(limit), - logical.Project(projectList, logical.Sort(order, true, child))) => - execution.TakeOrderedAndProjectExec( - limit, order, projectList, planLater(child)) :: Nil - case logical.Limit(IntegerLiteral(limit), child) => - // Normally wrapping child with `LocalLimitExec` here is a no-op, because - // `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which - // calls `child.executeTake`. If child supports whole stage codegen, adding this - // `LocalLimitExec` can stop the processing of whole stage codegen and trigger the - // resource releasing work, after we consume `limit` rows. - execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil + case ReturnAnswer(rootPlan) => rootPlan match { + case Limit(IntegerLiteral(limit), Sort(order, true, child)) => + TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) => + TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), child) => + // With whole stage codegen, Spark releases resources only when all the output data of the + // query plan are consumed. It's possible that `CollectLimitExec` only consumes a little + // data from child plan and finishes the query without releasing resources. Here we wrap + // the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and + // trigger the resource releasing work, after we consume `limit` rows. + CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil case other => planLater(other) :: Nil } - case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => - execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil - case logical.Limit( - IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) => - execution.TakeOrderedAndProjectExec( - limit, order, projectList, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), Sort(order, true, child)) => + TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) => + TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/spark/blob/7880909c/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 7cef556..73a0f87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -54,14 +54,6 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { val limit: Int override def output: Seq[Attribute] = child.output - // Do not enable whole stage codegen for a single limit. - override def supportCodegen: Boolean = child match { - case plan: CodegenSupport => plan.supportCodegen - case _ => false - } - - override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit)) - protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org