maropu commented on a change in pull request #32350: URL: https://github.com/apache/spark/pull/32350#discussion_r624574030
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ########## @@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] { private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = { limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] } } + private def canEliminateLocalLimit(localLimitExpr: Expression, child: LogicalPlan): Boolean = { + localLimitExpr.foldable && + child.maxRowsPerPartition.exists { _ <= localLimitExpr.eval().asInstanceOf[Int] } + } def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case Limit(l, child) if canEliminate(l, child) => child case GlobalLimit(l, child) if canEliminate(l, child) => child + case LocalLimit(l, child) if !plan.isStreaming && canEliminateLocalLimit(l, child) => Review comment: It is not possible that a user's query reaches this optimization path now? ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ########## @@ -374,6 +374,8 @@ package object dsl { def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan) + def localLimit(limitExpr: Expression): LogicalPlan = LocalLimit(limitExpr, logicalPlan) Review comment: Since this is used only once now, could you use `LocalLimit` directly in the test? ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ########## @@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] { private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = { limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] } } + private def canEliminateLocalLimit(localLimitExpr: Expression, child: LogicalPlan): Boolean = { + localLimitExpr.foldable && + child.maxRowsPerPartition.exists { _ <= localLimitExpr.eval().asInstanceOf[Int] } + } def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case Limit(l, child) if canEliminate(l, child) => child case GlobalLimit(l, child) if canEliminate(l, child) => child + case LocalLimit(l, child) if !plan.isStreaming && canEliminateLocalLimit(l, child) => Review comment: In a streaming case, `maxRowsPerPartition` can be filled? (we need the condition `!plan.isStreaming` here?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org