yikf commented on a change in pull request #34738:
URL: https://github.com/apache/spark/pull/34738#discussion_r758361092



##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownLimit.java
##########
@@ -33,4 +34,9 @@
    * Pushes down LIMIT to the data source.
    */
   boolean pushLimit(int limit);
+
+  /**
+   * Pushes down top N to the data source.
+   */
+  boolean pushTopN(SortValue[] orders, int limit);

Review comment:
       There is a little strange that `pushTopN` with the return value of the 
`Boolean`, How about adding two methods like `pushTopN` and `pushedTopN`, In 
this way, the responsibilities of each method are cleaner. FYI

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -255,7 +256,20 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
             sHolder.pushedLimit = Some(limitValue)
           }
           globalLimit
-        case _ => globalLimit
+        case _ =>
+          child transform {
+            case sort @ Sort(order, _, ScanOperation(_, filter, sHolder: 
ScanBuilderHolder))
+              if filter.length == 0 =>

Review comment:
       How about using `filter.isEmpty`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -255,7 +256,20 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper {
             sHolder.pushedLimit = Some(limitValue)
           }
           globalLimit
-        case _ => globalLimit
+        case _ =>
+          child transform {
+            case sort @ Sort(order, _, ScanOperation(_, filter, sHolder: 
ScanBuilderHolder))
+              if filter.length == 0 =>
+              val orders = DataSourceStrategy.translateSortOrders(order)
+              val topNPushed = PushDownUtils.pushTopN(sHolder.builder, 
orders.toArray, limitValue)
+              if (topNPushed) {
+                sHolder.pushedLimit = Some(limitValue)
+                sHolder.sortValues = orders
+              }
+              sort

Review comment:
       The limit with sort has been pushed done, Whether the sort node can be 
remove?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to