This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c02aef01bbc [SPARK-40501][SQL] Add PushProjectionThroughLimit for 
Optimizer
c02aef01bbc is described below

commit c02aef01bbc1530884c23850224293336a5430a2
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Mon Sep 26 15:50:07 2022 +0800

    [SPARK-40501][SQL] Add PushProjectionThroughLimit for Optimizer
    
    ### What changes were proposed in this pull request?
    The pr aim to add `PushProjectionThroughLimit` for `Optimizer`, improve 
query performance.
    
    ### Why are the changes needed?
    "normalize" the order of project and limit operator **{project(..., 
limit(...)) => limit(..., project(...))}**, so that we can have more chances to 
merge adjacent projects or limits **{eg: by SpecialLimits}**.
    
    ---
    When I query a big table(size / per day: 10T, column size: 1219) with limit 
1
    #### A.Scenario 1(run sql in spark-sql) - The results will be fetched soon 
- The optimization of CollectLimitExec has taken effect
    1.SQL: select * from xxx where ..._day = '20220919' limit 1
    <img width="628" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191184857-fa3d7f08-f0ea-4d70-a406-48828a3bb761.png";>
    2.Spark UI:
    <img width="1416" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191204519-86de05d3-40d4-4acd-9833-86bf318ecb72.png";>
    
    #### B.Scenario 2(run sql in spark-shell) - It took a long time to fetch 
out(still running after 20 minutes...)
    1.Code: spark.sql("select * from xxx where ..._day = '20220919' limit 
1").show()
    <img width="557" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191211875-c29c3bae-1339-414b-84bc-2195545b8c35.png";>
    2.Spark UI:
    <img width="1419" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191212244-22108810-dd66-46bd-bea7-a7dab70a1a06.png";>
    
    #### C.Scenario 3(run sql in spark-shell) - The results will be fetched soon
    1.Code: spark.sql("select * from xxx where ..._day = '20220919'").show(1)
    <img width="544" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191215182-bf278f71-c3ee-4028-8372-c9ed69431b6f.png";>
    2.Spark UI:
    <img width="1417" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191215437-3a291b34-5257-485a-bc18-6c0e0865d7ce.png";>
    
    ## The diff between Scenario 2 and Scenario3 is focus on "Optimized Logical 
Plan"
    <img width="543" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191216863-367c764d-2aa0-4c79-b240-0ebb6735937f.png";>
    <img width="544" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191217175-02213b0c-5d09-4154-85f7-18751734afd0.png";>
    
    # After pr:
    #### Scenario 2(run sql in spark-shell) - The results will be fetched soon 
- The optimization of CollectLimitExec has taken effect
    1.Code: spark.sql("select * from xxx where ..._day = '20220919' limit 
1").show()
    <img width="553" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191880203-2f951644-b59b-4dc8-9e80-c02c458fc28b.png";>
    2.Spark UI:
    <img width="1417" alt="image" 
src="https://user-images.githubusercontent.com/15246973/191219627-82655d01-1d5e-44ee-af49-e8da51c9ca72.png";>
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add new UT & Pass GA.
    
    Closes #37941 from panbingkun/improve_shell_limit.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  1 +
 .../optimizer/PushProjectionThroughLimit.scala     | 39 ++++++++++
 .../PushProjectionThroughLimitSuite.scala          | 90 ++++++++++++++++++++++
 3 files changed, 130 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 3ac75554a2b..2664fd63806 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -81,6 +81,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
       Seq(
         // Operator push down
         PushProjectionThroughUnion,
+        PushProjectionThroughLimit,
         ReorderJoin,
         EliminateOuterJoin,
         PushDownPredicates,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala
new file mode 100644
index 00000000000..6280cc5e42c
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimit.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LocalLimit, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, PROJECT}
+
+/**
+ * Pushes Project operator through Limit operator.
+ */
+object PushProjectionThroughLimit extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsAllPatterns(PROJECT, LIMIT)) {
+
+    case p @ Project(projectList, limit @ LocalLimit(_, child))
+        if projectList.forall(_.deterministic) =>
+      limit.copy(child = p.copy(projectList, child))
+
+    case p @ Project(projectList, g @ GlobalLimit(_, limit @ LocalLimit(_, 
child)))
+        if projectList.forall(_.deterministic) =>
+      g.copy(child = limit.copy(child = p.copy(projectList, child)))
+  }
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala
new file mode 100644
index 00000000000..7e45fc5aeb3
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughLimitSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class PushProjectionThroughLimitSuite extends PlanTest {
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches = Batch("Optimizer Batch",
+      FixedPoint(100),
+      PushProjectionThroughLimit,
+      EliminateLimits) :: Nil
+  }
+
+  test("SPARK-40501: push projection through limit") {
+    val testRelation = LocalRelation.fromExternalRows(
+      Seq("a".attr.int, "b".attr.int, "c".attr.int),
+      1.to(20).map(_ => Row(1, 2, 3)))
+
+    val query1 = testRelation
+      .limit(10)
+      .select('a, 'b, 'c')
+      .limit(15).analyze
+    val optimized1 = Optimize.execute(query1)
+    val expected1 = testRelation
+      .select('a, 'b, 'c')
+      .limit(10).analyze
+    comparePlans(optimized1, expected1)
+
+    val query2 = testRelation
+      .sortBy($"a".asc)
+      .limit(10)
+      .select('a, 'b, 'c')
+      .limit(15).analyze
+    val optimized2 = Optimize.execute(query2)
+    val expected2 = testRelation
+      .sortBy($"a".asc)
+      .select('a, 'b, 'c')
+      .limit(10).analyze
+    comparePlans(optimized2, expected2)
+
+    val query3 = testRelation
+      .limit(10)
+      .select('a, 'b, 'c')
+      .limit(20)
+      .select('a)
+      .limit(15).analyze
+    val optimized3 = Optimize.execute(query3)
+    val expected3 = testRelation
+      .select('a, 'b, 'c')
+      .select('a)
+      .limit(10).analyze
+    comparePlans(optimized3, expected3)
+
+    val query4 = testRelation
+      .sortBy($"a".asc)
+      .limit(10)
+      .select('a, 'b, 'c')
+      .limit(20)
+      .select('a)
+      .limit(15).analyze
+    val optimized4 = Optimize.execute(query4)
+    val expected4 = testRelation
+      .sortBy($"a".asc)
+      .select('a, 'b, 'c')
+      .select('a)
+      .limit(10).analyze
+    comparePlans(optimized4, expected4)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to