This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 04e53d2 [SPAR-27342][SQL] Optimize Limit 0 queries 04e53d2 is described below commit 04e53d2e3c9dc993b8ba88c5ef0bcf0a8a9b06b2 Author: Aayushmaan Jain <aayushmaan.jai...@gmail.com> AuthorDate: Thu Apr 4 21:19:40 2019 -0700 [SPAR-27342][SQL] Optimize Limit 0 queries ## What changes were proposed in this pull request? With this change, unnecessary file scans are avoided in case of Limit 0 queries. I added a case (rule) to `PropagateEmptyRelation` to replace `GlobalLimit 0` and `LocalLimit 0` nodes with an empty `LocalRelation`. This prunes the subtree under the Limit 0 node and further allows other rules of `PropagateEmptyRelation` to optimize the Logical Plan - while remaining semantically consistent with the Limit 0 query. For instance: **Query:** `SELECT * FROM table1 INNER JOIN (SELECT * FROM table2 LIMIT 0) AS table2 ON table1.id = table2.id` **Optimized Plan without fix:** ``` Join Inner, (id#79 = id#87) :- Filter isnotnull(id#79) : +- Relation[id#79,num1#80] parquet +- Filter isnotnull(id#87) +- GlobalLimit 0 +- LocalLimit 0 +- Relation[id#87,num2#88] parquet ``` **Optimized Plan with fix:** `LocalRelation <empty>, [id#75, num1#76, id#77, num2#78]` ## How was this patch tested? Added unit tests to verify Limit 0 optimization for: - Simple query containing Limit 0 - Inner Join, Left Outer Join, Right Outer Join, Full Outer Join queries containing Limit 0 as one of their children - Nested Inner Joins between 3 tables with one of them having a Limit 0 clause. - Intersect query wherein one of the subqueries was a Limit 0 query. Closes #24271 from aayushmaanjain/optimize-limit0. Authored-by: Aayushmaan Jain <aayushmaan.jai...@gmail.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 36 +++++++ .../optimizer/OptimizeLimitZeroSuite.scala | 108 +++++++++++++++++++++ 2 files changed, 144 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6319d47..d0368be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -137,6 +137,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: + Batch("OptimizeLimitZero", Once, + OptimizeLimitZero) :: // Run this once earlier. This might simplify the plan and reduce cost of optimizer. // For example, a query such as Filter(LocalRelation) would go through all the heavy // optimizer rules that are triggered when there is a filter @@ -1681,3 +1683,37 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { } } } + +/** + * Replaces GlobalLimit 0 and LocalLimit 0 nodes (subtree) with empty Local Relation, as they don't + * return any rows. + */ +object OptimizeLimitZero extends Rule[LogicalPlan] { + // returns empty Local Relation corresponding to given plan + private def empty(plan: LogicalPlan) = + LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) + + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + // Nodes below GlobalLimit or LocalLimit can be pruned if the limit value is zero (0). + // Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 as its root is + // semantically equivalent to an empty relation. + // + // In such cases, the effects of Limit 0 can be propagated through the Logical Plan by replacing + // the (Global/Local) Limit subtree with an empty LocalRelation, thereby pruning the subtree + // below and triggering other optimization rules of PropagateEmptyRelation to propagate the + // changes up the Logical Plan. + // + // Replace Global Limit 0 nodes with empty Local Relation + case gl @ GlobalLimit(IntegerLiteral(0), _) => + empty(gl) + + // Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a + // GlobalLimit 0 node would also exist. Thus, the above case would be sufficient to handle + // almost all cases. However, if a user explicitly creates a Logical Plan with LocalLimit 0 node + // then the following rule will handle that case as well. + // + // Replace Local Limit 0 nodes with empty Local Relation + case ll @ LocalLimit(IntegerLiteral(0), _) => + empty(ll) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala new file mode 100644 index 0000000..cf875ef --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.IntegerType + +// Test class to verify correct functioning of OptimizeLimitZero rule in various scenarios +class OptimizeLimitZeroSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("OptimizeLimitZero", Once, + ReplaceIntersectWithSemiJoin, + OptimizeLimitZero, + PropagateEmptyRelation) :: Nil + } + + val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) + val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1))) + + test("Limit 0: return empty local relation") { + val query = testRelation1.limit(0) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: individual LocalLimit 0 node") { + val query = LocalLimit(0, testRelation1) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: individual GlobalLimit 0 node") { + val query = GlobalLimit(0, testRelation1) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int) + + comparePlans(optimized, correctAnswer) + } + + Seq( + (Inner, LocalRelation('a.int, 'b.int)), + (LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze), + (RightOuter, LocalRelation('a.int, 'b.int)), + (FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze) + ).foreach { case (jt, correctAnswer) => + test(s"Limit 0: for join type $jt") { + val query = testRelation1 + .join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr)) + + val optimized = Optimize.execute(query.analyze) + + comparePlans(optimized, correctAnswer) + } + } + + test("Limit 0: 3-way join") { + val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = Seq(Row(1))) + + val subJoinQuery = testRelation1 + .join(testRelation2, joinType = Inner, condition = Some('a.attr == 'b.attr)) + val query = subJoinQuery + .join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr == 'c.attr)) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = LocalRelation('a.int, 'b.int, 'c.int) + + comparePlans(optimized, correctAnswer) + } + + test("Limit 0: intersect") { + val query = testRelation1 + .intersect(testRelation1.limit(0), isAll = false) + + val optimized = Optimize.execute(query.analyze) + val correctAnswer = Distinct(LocalRelation('a.int)) + + comparePlans(optimized, correctAnswer) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org