This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new af978c8  [SPARK-36183][SQL] Push down limit 1 through Aggregate if it 
is group only
af978c8 is described below

commit af978c87f10c89ee3a7c927ab9b039a2b84a492a
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Tue Jul 20 20:24:07 2021 +0800

    [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only
    
    ### What changes were proposed in this pull request?
    
    Push down limit 1 and turn `Aggregate` into `Project` through `Aggregate` 
if it is group only. For example:
    ```sql
    create table t1 using parquet as select id from range(100000000L);
    create table t2 using parquet as select id from range(100000000L);
    create view v1 as select * from t1 union select * from t2;
    select * from v1 limit 1;
    ```
    
    Before this PR | After this PR
    -- | --
    
![image](https://user-images.githubusercontent.com/5399861/125975690-55663515-c4c5-4a04-aedf-f8ba37581ba7.png)
 | 
![image](https://user-images.githubusercontent.com/5399861/126168972-b2675e09-4f93-4026-b1be-af317205e57f.png)
    
    ### Why are the changes needed?
    
    Improve query performance. This is a real case from the cluster:
    
![image](https://user-images.githubusercontent.com/5399861/125976597-18cb68d6-b22a-4d80-b270-01b2b13d1ef5.png)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #33397 from wangyum/SPARK-36183.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  5 +++++
 .../catalyst/optimizer/LimitPushdownSuite.scala    | 24 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index aa2221b..8cc6378 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -647,6 +647,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
     // There is a Project between LocalLimit and Join if they do not have the 
same output.
     case LocalLimit(exp, project @ Project(_, join: Join)) =>
       LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, 
join)))
+    // Push down limit 1 through Aggregate and turn Aggregate into Project if 
it is group only.
+    case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly =>
+      Limit(le, Project(a.output, LocalLimit(le, a.child)))
+    case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if 
a.groupOnly =>
+      Limit(le, p.copy(child = Project(a.output, LocalLimit(le, a.child))))
   }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index c2503e3..848416b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -239,4 +239,28 @@ class LimitPushdownSuite extends PlanTest {
       Limit(5, LocalLimit(5, x).join(y, LeftOuter, 
joinCondition).select("x.a".attr)).analyze
     comparePlans(optimized, correctAnswer)
   }
+
+  test("SPARK-36183: Push down limit 1 through Aggregate if it is group only") 
{
+    // Push down when it is group only and limit 1.
+    comparePlans(
+      Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(1).analyze),
+      LocalLimit(1, x).select("x.a".attr).limit(1).analyze)
+
+    comparePlans(
+      
Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).select("x.a".attr).limit(1).analyze),
+      LocalLimit(1, x).select("x.a".attr).select("x.a".attr).limit(1).analyze)
+
+    comparePlans(
+      
Optimize.execute(x.union(y).groupBy("x.a".attr)("x.a".attr).limit(1).analyze),
+      LocalLimit(1, LocalLimit(1, x).union(LocalLimit(1, 
y))).select("x.a".attr).limit(1).analyze)
+
+    // No push down
+    comparePlans(
+      Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze),
+      x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze)
+
+    comparePlans(
+      Optimize.execute(x.groupBy("x.a".attr)("x.a".attr, 
count("x.a".attr)).limit(1).analyze),
+      x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze)
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to