This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 04e53d2  [SPAR-27342][SQL] Optimize Limit 0 queries
04e53d2 is described below

commit 04e53d2e3c9dc993b8ba88c5ef0bcf0a8a9b06b2
Author: Aayushmaan Jain <aayushmaan.jai...@gmail.com>
AuthorDate: Thu Apr 4 21:19:40 2019 -0700

    [SPAR-27342][SQL] Optimize Limit 0 queries
    
    ## What changes were proposed in this pull request?
    With this change, unnecessary file scans are avoided in case of Limit 0 
queries.
    
    I added a case (rule) to `PropagateEmptyRelation` to replace `GlobalLimit 
0` and `LocalLimit 0` nodes with an empty `LocalRelation`. This prunes the 
subtree under the Limit 0 node and further allows other rules of 
`PropagateEmptyRelation` to optimize the Logical Plan - while remaining 
semantically consistent with the Limit 0 query.
    
    For instance:
    **Query:**
    `SELECT * FROM table1 INNER JOIN (SELECT * FROM table2 LIMIT 0) AS table2 
ON table1.id = table2.id`
    
    **Optimized Plan without fix:**
    ```
    Join Inner, (id#79 = id#87)
    :- Filter isnotnull(id#79)
    :  +- Relation[id#79,num1#80] parquet
    +- Filter isnotnull(id#87)
       +- GlobalLimit 0
          +- LocalLimit 0
             +- Relation[id#87,num2#88] parquet
    ```
    
    **Optimized Plan with fix:**
    `LocalRelation <empty>, [id#75, num1#76, id#77, num2#78]`
    
    ## How was this patch tested?
    Added unit tests to verify Limit 0 optimization for:
    - Simple query containing Limit 0
    - Inner Join, Left Outer Join, Right Outer Join, Full Outer Join queries 
containing Limit 0 as one of their children
    - Nested Inner Joins between 3 tables with one of them having a Limit 0 
clause.
    - Intersect query wherein one of the subqueries was a Limit 0 query.
    
    Closes #24271 from aayushmaanjain/optimize-limit0.
    
    Authored-by: Aayushmaan Jain <aayushmaan.jai...@gmail.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  36 +++++++
 .../optimizer/OptimizeLimitZeroSuite.scala         | 108 +++++++++++++++++++++
 2 files changed, 144 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6319d47..d0368be 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -137,6 +137,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
     //   since the other rules might make two separate Unions operators 
adjacent.
     Batch("Union", Once,
       CombineUnions) ::
+    Batch("OptimizeLimitZero", Once,
+      OptimizeLimitZero) ::
     // Run this once earlier. This might simplify the plan and reduce cost of 
optimizer.
     // For example, a query such as Filter(LocalRelation) would go through all 
the heavy
     // optimizer rules that are triggered when there is a filter
@@ -1681,3 +1683,37 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
       }
   }
 }
+
+/**
+ * Replaces GlobalLimit 0 and LocalLimit 0 nodes (subtree) with empty Local 
Relation, as they don't
+ * return any rows.
+ */
+object OptimizeLimitZero extends Rule[LogicalPlan] {
+  // returns empty Local Relation corresponding to given plan
+  private def empty(plan: LogicalPlan) =
+    LocalRelation(plan.output, data = Seq.empty, isStreaming = 
plan.isStreaming)
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    // Nodes below GlobalLimit or LocalLimit can be pruned if the limit value 
is zero (0).
+    // Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 
as its root is
+    // semantically equivalent to an empty relation.
+    //
+    // In such cases, the effects of Limit 0 can be propagated through the 
Logical Plan by replacing
+    // the (Global/Local) Limit subtree with an empty LocalRelation, thereby 
pruning the subtree
+    // below and triggering other optimization rules of PropagateEmptyRelation 
to propagate the
+    // changes up the Logical Plan.
+    //
+    // Replace Global Limit 0 nodes with empty Local Relation
+    case gl @ GlobalLimit(IntegerLiteral(0), _) =>
+      empty(gl)
+
+    // Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical 
Plan, then a
+    // GlobalLimit 0 node would also exist. Thus, the above case would be 
sufficient to handle
+    // almost all cases. However, if a user explicitly creates a Logical Plan 
with LocalLimit 0 node
+    // then the following rule will handle that case as well.
+    //
+    // Replace Local Limit 0 nodes with empty Local Relation
+    case ll @ LocalLimit(IntegerLiteral(0), _) =>
+      empty(ll)
+  }
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala
new file mode 100644
index 0000000..cf875ef
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, 
LocalLimit, LocalRelation, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.types.IntegerType
+
+// Test class to verify correct functioning of OptimizeLimitZero rule in 
various scenarios
+class OptimizeLimitZeroSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("OptimizeLimitZero", Once,
+        ReplaceIntersectWithSemiJoin,
+        OptimizeLimitZero,
+        PropagateEmptyRelation) :: Nil
+  }
+
+  val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = 
Seq(Row(1)))
+  val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = 
Seq(Row(1)))
+
+  test("Limit 0: return empty local relation") {
+    val query = testRelation1.limit(0)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Limit 0: individual LocalLimit 0 node") {
+    val query = LocalLimit(0, testRelation1)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Limit 0: individual GlobalLimit 0 node") {
+    val query = GlobalLimit(0, testRelation1)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  Seq(
+    (Inner, LocalRelation('a.int, 'b.int)),
+    (LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), 
testRelation1).analyze),
+    (RightOuter, LocalRelation('a.int, 'b.int)),
+    (FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), 
testRelation1).analyze)
+  ).foreach { case (jt, correctAnswer) =>
+      test(s"Limit 0: for join type $jt") {
+        val query = testRelation1
+          .join(testRelation2.limit(0), joinType = jt, condition = 
Some('a.attr == 'b.attr))
+
+        val optimized = Optimize.execute(query.analyze)
+
+        comparePlans(optimized, correctAnswer)
+      }
+  }
+
+  test("Limit 0: 3-way join") {
+    val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = 
Seq(Row(1)))
+
+    val subJoinQuery = testRelation1
+      .join(testRelation2, joinType = Inner, condition = Some('a.attr == 
'b.attr))
+    val query = subJoinQuery
+      .join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr 
== 'c.attr))
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = LocalRelation('a.int, 'b.int, 'c.int)
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("Limit 0: intersect") {
+    val query = testRelation1
+      .intersect(testRelation1.limit(0), isAll = false)
+
+    val optimized = Optimize.execute(query.analyze)
+    val correctAnswer = Distinct(LocalRelation('a.int))
+
+    comparePlans(optimized, correctAnswer)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to