maropu commented on a change in pull request #32350:
URL: https://github.com/apache/spark/pull/32350#discussion_r624574030



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] {
   private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
     limitExpr.foldable && child.maxRows.exists { _ <= 
limitExpr.eval().asInstanceOf[Int] }
   }
+  private def canEliminateLocalLimit(localLimitExpr: Expression, child: 
LogicalPlan): Boolean = {
+    localLimitExpr.foldable &&
+      child.maxRowsPerPartition.exists { _ <= 
localLimitExpr.eval().asInstanceOf[Int] }
+  }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
     case Limit(l, child) if canEliminate(l, child) =>
       child
     case GlobalLimit(l, child) if canEliminate(l, child) =>
       child
+    case LocalLimit(l, child) if !plan.isStreaming && 
canEliminateLocalLimit(l, child) =>

Review comment:
       It is not possible that a user's query reaches this optimization path 
now?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
##########
@@ -374,6 +374,8 @@ package object dsl {
 
       def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, 
logicalPlan)
 
+      def localLimit(limitExpr: Expression): LogicalPlan = 
LocalLimit(limitExpr, logicalPlan)

Review comment:
       Since this is used only once now, could you use `LocalLimit` directly in 
the test?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] {
   private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
     limitExpr.foldable && child.maxRows.exists { _ <= 
limitExpr.eval().asInstanceOf[Int] }
   }
+  private def canEliminateLocalLimit(localLimitExpr: Expression, child: 
LogicalPlan): Boolean = {
+    localLimitExpr.foldable &&
+      child.maxRowsPerPartition.exists { _ <= 
localLimitExpr.eval().asInstanceOf[Int] }
+  }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
     case Limit(l, child) if canEliminate(l, child) =>
       child
     case GlobalLimit(l, child) if canEliminate(l, child) =>
       child
+    case LocalLimit(l, child) if !plan.isStreaming && 
canEliminateLocalLimit(l, child) =>

Review comment:
       In a streaming case, `maxRowsPerPartition` can be filled? (we need the 
condition `!plan.isStreaming` here?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to