zhengruifeng commented on a change in pull request #32350: URL: https://github.com/apache/spark/pull/32350#discussion_r627042329
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ########## @@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] { private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = { limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] } } + private def canEliminateLocalLimit(localLimitExpr: Expression, child: LogicalPlan): Boolean = { + localLimitExpr.foldable && + child.maxRowsPerPartition.exists { _ <= localLimitExpr.eval().asInstanceOf[Int] } + } def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case Limit(l, child) if canEliminate(l, child) => child case GlobalLimit(l, child) if canEliminate(l, child) => child + case LocalLimit(l, child) if !plan.isStreaming && canEliminateLocalLimit(l, child) => Review comment: > It is not possible that a user's query reaches this optimization path now? end user's query should not reaches this path, I think. This path is only for adding a _similar_ test in `CombiningLimitsSuite` > In a streaming case, maxRowsPerPartition can be filled? (we need the condition !plan.isStreaming here?) `org.apache.spark.sql.streaming.StreamSuite.SPARK-30657: streaming limit optimization from StreamingLocalLimitExec to LocalLimitExec` fails if do not add this condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org