This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new af978c8 [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only af978c8 is described below commit af978c87f10c89ee3a7c927ab9b039a2b84a492a Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Tue Jul 20 20:24:07 2021 +0800 [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only ### What changes were proposed in this pull request? Push down limit 1 and turn `Aggregate` into `Project` through `Aggregate` if it is group only. For example: ```sql create table t1 using parquet as select id from range(100000000L); create table t2 using parquet as select id from range(100000000L); create view v1 as select * from t1 union select * from t2; select * from v1 limit 1; ``` Before this PR | After this PR -- | -- ![image](https://user-images.githubusercontent.com/5399861/125975690-55663515-c4c5-4a04-aedf-f8ba37581ba7.png) | ![image](https://user-images.githubusercontent.com/5399861/126168972-b2675e09-4f93-4026-b1be-af317205e57f.png) ### Why are the changes needed? Improve query performance. This is a real case from the cluster: ![image](https://user-images.githubusercontent.com/5399861/125976597-18cb68d6-b22a-4d80-b270-01b2b13d1ef5.png) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #33397 from wangyum/SPARK-36183. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 5 +++++ .../catalyst/optimizer/LimitPushdownSuite.scala | 24 ++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa2221b..8cc6378 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -647,6 +647,11 @@ object LimitPushDown extends Rule[LogicalPlan] { // There is a Project between LocalLimit and Join if they do not have the same output. case LocalLimit(exp, project @ Project(_, join: Join)) => LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, join))) + // Push down limit 1 through Aggregate and turn Aggregate into Project if it is group only. + case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly => + Limit(le, Project(a.output, LocalLimit(le, a.child))) + case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly => + Limit(le, p.copy(child = Project(a.output, LocalLimit(le, a.child)))) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index c2503e3..848416b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -239,4 +239,28 @@ class LimitPushdownSuite extends PlanTest { Limit(5, LocalLimit(5, x).join(y, LeftOuter, joinCondition).select("x.a".attr)).analyze comparePlans(optimized, correctAnswer) } + + test("SPARK-36183: Push down limit 1 through Aggregate if it is group only") { + // Push down when it is group only and limit 1. + comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(1).analyze), + LocalLimit(1, x).select("x.a".attr).limit(1).analyze) + + comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).select("x.a".attr).limit(1).analyze), + LocalLimit(1, x).select("x.a".attr).select("x.a".attr).limit(1).analyze) + + comparePlans( + Optimize.execute(x.union(y).groupBy("x.a".attr)("x.a".attr).limit(1).analyze), + LocalLimit(1, LocalLimit(1, x).union(LocalLimit(1, y))).select("x.a".attr).limit(1).analyze) + + // No push down + comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze), + x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze) + + comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze), + x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org