This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c02aef01bbc [SPARK-40501][SQL] Add PushProjectionThroughLimit for Optimizer c02aef01bbc is described below commit c02aef01bbc1530884c23850224293336a5430a2 Author: panbingkun <pbk1...@gmail.com> AuthorDate: Mon Sep 26 15:50:07 2022 +0800 [SPARK-40501][SQL] Add PushProjectionThroughLimit for Optimizer ### What changes were proposed in this pull request? The pr aim to add `PushProjectionThroughLimit` for `Optimizer`, improve query performance. ### Why are the changes needed? "normalize" the order of project and limit operator **{project(..., limit(...)) => limit(..., project(...))}**, so that we can have more chances to merge adjacent projects or limits **{eg: by SpecialLimits}**. --- When I query a big table(size / per day: 10T, column size: 1219) with limit 1 #### A.Scenario 1(run sql in spark-sql) - The results will be fetched soon - The optimization of CollectLimitExec has taken effect 1.SQL: select * from xxx where ..._day = '20220919' limit 1 <img width="628" alt="image" src="https://user-images.githubusercontent.com/15246973/191184857-fa3d7f08-f0ea-4d70-a406-48828a3bb761.png"> 2.Spark UI: <img width="1416" alt="image" src="https://user-images.githubusercontent.com/15246973/191204519-86de05d3-40d4-4acd-9833-86bf318ecb72.png"> #### B.Scenario 2(run sql in spark-shell) - It took a long time to fetch out(still running after 20 minutes...) 1.Code: spark.sql("select * from xxx where ..._day = '20220919' limit 1").show() <img width="557" alt="image" src="https://user-images.githubusercontent.com/15246973/191211875-c29c3bae-1339-414b-84bc-2195545b8c35.png"> 2.Spark UI: <img width="1419" alt="image" src="https://user-images.githubusercontent.com/15246973/191212244-22108810-dd66-46bd-bea7-a7dab70a1a06.png"> #### C.Scenario 3(run sql in spark-shell) - The results will be fetched soon 1.Code: spark.sql("select * from xxx where ..._day = '20220919'").show(1) <img width="544" alt="image" src="https://user-images.githubusercontent.com/15246973/191215182-bf278f71-c3ee-4028-8372-c9ed69431b6f.png"> 2.Spark UI: <img width="1417" alt="image" src="https://user-images.githubusercontent.com/15246973/191215437-3a291b34-5257-485a-bc18-6c0e0865d7ce.png"> ## The diff between Scenario 2 and Scenario3 is focus on "Optimized Logical Plan" <img width="543" alt="image" src="https://user-images.githubusercontent.com/15246973/191216863-367c764d-2aa0-4c79-b240-0ebb6735937f.png"> <img width="544" alt="image" src="https://user-images.githubusercontent.com/15246973/191217175-02213b0c-5d09-4154-85f7-18751734afd0.png"> # After pr: #### Scenario 2(run sql in spark-shell) - The results will be fetched soon - The optimization of CollectLimitExec has taken effect 1.Code: spark.sql("select * from xxx where ..._day = '20220919' limit 1").show() <img width="553" alt="image" src="https://user-images.githubusercontent.com/15246973/191880203-2f951644-b59b-4dc8-9e80-c02c458fc28b.png"> 2.Spark UI: <img width="1417" alt="image" src="https://user-images.githubusercontent.com/15246973/191219627-82655d01-1d5e-44ee-af49-e8da51c9ca72.png"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add new UT & Pass GA. Closes #37941 from panbingkun/improve_shell_limit. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 1 + .../optimizer/PushProjectionThroughLimit.scala | 39 ++++++++++ .../PushProjectionThroughLimitSuite.scala | 90 ++++++++++++++++++++++ 3 files changed, 130 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3ac75554a2b..2664fd63806 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -81,6 +81,7 @@ abstract class Optimizer(catalogManager: CatalogManager) Seq( // Operator push down PushProjectionThroughUnion, + PushProjectionThroughLimit, ReorderJoin, EliminateOuterJoin, PushDownPredicates, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala new file mode 100644 index 00000000000..6280cc5e42c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LocalLimit, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, PROJECT} + +/** + * Pushes Project operator through Limit operator. + */ +object PushProjectionThroughLimit extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( + _.containsAllPatterns(PROJECT, LIMIT)) { + + case p @ Project(projectList, limit @ LocalLimit(_, child)) + if projectList.forall(_.deterministic) => + limit.copy(child = p.copy(projectList, child)) + + case p @ Project(projectList, g @ GlobalLimit(_, limit @ LocalLimit(_, child))) + if projectList.forall(_.deterministic) => + g.copy(child = limit.copy(child = p.copy(projectList, child))) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala new file mode 100644 index 00000000000..7e45fc5aeb3 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class PushProjectionThroughLimitSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("Optimizer Batch", + FixedPoint(100), + PushProjectionThroughLimit, + EliminateLimits) :: Nil + } + + test("SPARK-40501: push projection through limit") { + val testRelation = LocalRelation.fromExternalRows( + Seq("a".attr.int, "b".attr.int, "c".attr.int), + 1.to(20).map(_ => Row(1, 2, 3))) + + val query1 = testRelation + .limit(10) + .select('a, 'b, 'c') + .limit(15).analyze + val optimized1 = Optimize.execute(query1) + val expected1 = testRelation + .select('a, 'b, 'c') + .limit(10).analyze + comparePlans(optimized1, expected1) + + val query2 = testRelation + .sortBy($"a".asc) + .limit(10) + .select('a, 'b, 'c') + .limit(15).analyze + val optimized2 = Optimize.execute(query2) + val expected2 = testRelation + .sortBy($"a".asc) + .select('a, 'b, 'c') + .limit(10).analyze + comparePlans(optimized2, expected2) + + val query3 = testRelation + .limit(10) + .select('a, 'b, 'c') + .limit(20) + .select('a) + .limit(15).analyze + val optimized3 = Optimize.execute(query3) + val expected3 = testRelation + .select('a, 'b, 'c') + .select('a) + .limit(10).analyze + comparePlans(optimized3, expected3) + + val query4 = testRelation + .sortBy($"a".asc) + .limit(10) + .select('a, 'b, 'c') + .limit(20) + .select('a) + .limit(15).analyze + val optimized4 = Optimize.execute(query4) + val expected4 = testRelation + .sortBy($"a".asc) + .select('a, 'b, 'c') + .select('a) + .limit(10).analyze + comparePlans(optimized4, expected4) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org