[GitHub] [spark] zhengruifeng commented on a change in pull request #32350: [SPARK-35231][SQL] logical.Range override maxRowsPerPartition

2021-05-08 Thread GitBox


zhengruifeng commented on a change in pull request #32350:
URL: https://github.com/apache/spark/pull/32350#discussion_r622693358



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##
@@ -161,6 +162,7 @@ case class Filter(condition: Expression, child: LogicalPlan)
   override def output: Seq[Attribute] = child.output
 
   override def maxRows: Option[Long] = child.maxRows
+  override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition

Review comment:
   this change is not needed for the added test
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #32350: [SPARK-35231][SQL] logical.Range override maxRowsPerPartition

2021-05-05 Thread GitBox


zhengruifeng commented on a change in pull request #32350:
URL: https://github.com/apache/spark/pull/32350#discussion_r627042329



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] {
   private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean 
= {
 limitExpr.foldable && child.maxRows.exists { _ <= 
limitExpr.eval().asInstanceOf[Int] }
   }
+  private def canEliminateLocalLimit(localLimitExpr: Expression, child: 
LogicalPlan): Boolean = {
+localLimitExpr.foldable &&
+  child.maxRowsPerPartition.exists { _ <= 
localLimitExpr.eval().asInstanceOf[Int] }
+  }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
 case Limit(l, child) if canEliminate(l, child) =>
   child
 case GlobalLimit(l, child) if canEliminate(l, child) =>
   child
+case LocalLimit(l, child) if !plan.isStreaming && 
canEliminateLocalLimit(l, child) =>

Review comment:
   > It is not possible that a user's query reaches this optimization path 
now?
   
   end user's query should not reaches this path, I think. This path is only 
for adding a _similar_ test in `CombiningLimitsSuite`
   
   
   > In a streaming case, maxRowsPerPartition can be filled? (we need the 
condition !plan.isStreaming here?)
   
   `org.apache.spark.sql.streaming.StreamSuite.SPARK-30657: streaming limit 
optimization from StreamingLocalLimitExec to LocalLimitExec` fails if do not 
add this condition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #32350: [SPARK-35231][SQL] logical.Range override maxRowsPerPartition

2021-04-28 Thread GitBox


zhengruifeng commented on a change in pull request #32350:
URL: https://github.com/apache/spark/pull/32350#discussion_r622693358



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##
@@ -161,6 +162,7 @@ case class Filter(condition: Expression, child: LogicalPlan)
   override def output: Seq[Attribute] = child.output
 
   override def maxRows: Option[Long] = child.maxRows
+  override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition

Review comment:
   this change is not needed for the added test
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a change in pull request #32350: [SPARK-35231][SQL] logical.Range override maxRowsPerPartition

2021-04-28 Thread GitBox


zhengruifeng commented on a change in pull request #32350:
URL: https://github.com/apache/spark/pull/32350#discussion_r622693068



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##
@@ -69,6 +69,7 @@ case class Project(projectList: Seq[NamedExpression], child: 
LogicalPlan)
 extends OrderPreservingUnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
   override def maxRows: Option[Long] = child.maxRows
+  override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition

Review comment:
   this override is needed for the added test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org