This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 78700d939c4 [SPARK-38797][SQL] Runtime Filter supports pruning side 
has window
78700d939c4 is described below

commit 78700d939c42404ce6bd420094e13a258875949b
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Thu Apr 14 08:39:15 2022 +0800

    [SPARK-38797][SQL] Runtime Filter supports pruning side has window
    
    ### What changes were proposed in this pull request?
    
    1. Makes row-level runtime filtering support pruning side has window. For 
example:
        ```sql
        SELECT *
        FROM   (SELECT *,
                       Row_number() OVER ( partition BY c1 ORDER BY f1) rn
                FROM   bf1) bf1
               JOIN bf2
                 ON bf1.c1 = bf2.c2
        WHERE  bf2.a2 = 62
        ```
    
        After this PR:
        ```
        == Optimized Logical Plan ==
        Join Inner, (c1#45922 = c2#45928), Statistics(sizeInBytes=12.3 MiB)
        :- Window [row_number() windowspecdefinition(c1#45922, f1#45925 ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rn#45976], [c1#45922], [f1#45925 ASC NULLS FIRST], 
Statistics(sizeInBytes=3.7 KiB)
        :  +- Filter (isnotnull(c1#45922) AND 
might_contain(scalar-subquery#45993 [], xxhash64(c1#45922, 42))), 
Statistics(sizeInBytes=3.3 KiB)
        :     :  +- Aggregate [bloom_filter_agg(xxhash64(c2#45928, 42), 
1000000, 8388608, 0, 0) AS bloomFilter#45992], Statistics(sizeInBytes=108.0 B, 
rowCount=1)
        :     :     +- Project [c2#45928], Statistics(sizeInBytes=1278.0 B)
        :     :        +- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND 
isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB)
        :     :           +- Relation 
default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet, 
Statistics(sizeInBytes=3.3 KiB)
        :     +- Relation 
default.bf1[a1#45920,b1#45921,c1#45922,d1#45923,e1#45924,f1#45925] parquet, 
Statistics(sizeInBytes=3.3 KiB)
        +- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND 
isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB)
           +- Relation 
default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet,     
Statistics(sizeInBytes=3.3 KiB)
        ```
    
    2. Make sure injected filters could push through Shuffle if current join is 
a broadcast join.
    
    ### Why are the changes needed?
    
    Improve query performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #36080 from wangyum/SPARK-38797.
    
    Lead-authored-by: Yuming Wang <yumw...@ebay.com>
    Co-authored-by: Yuming Wang <wgy...@gmail.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   |  5 +++--
 .../spark/sql/InjectRuntimeFilterSuite.scala       | 26 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 134292ae30d..01c1786e05a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -141,6 +141,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     plan.exists {
       case Join(left, right, _, _, hint) => isProbablyShuffleJoin(left, right, 
hint)
       case _: Aggregate => true
+      case _: Window => true
       case _ => false
     }
   }
@@ -172,8 +173,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 
   /**
    * Check that:
-   * - The filterApplicationSideJoinExp can be pushed down through joins and 
aggregates (ie the
-   *   expression references originate from a single leaf node)
+   * - The filterApplicationSideJoinExp can be pushed down through joins, 
aggregates and windows
+   *   (ie the expression references originate from a single leaf node)
    * - The filter creation side has a selective predicate
    * - The current join is a shuffle join or a broadcast join that has a 
shuffle below it
    * - The max filterApplicationSide scan size is greater than a configurable 
threshold
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 726fa341b5c..6065f232109 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -539,4 +539,30 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
         """.stripMargin)
     }
   }
+
+  test("Runtime Filter supports pruning side has Aggregate") {
+    
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000") {
+      assertRewroteWithBloomFilter(
+        """
+          |SELECT *
+          |FROM   (SELECT c1 AS aliased_c1, d1 FROM bf1 GROUP BY c1, d1) bf1
+          |       JOIN bf2 ON bf1.aliased_c1 = bf2.c2
+          |WHERE  bf2.a2 = 62
+        """.stripMargin)
+    }
+  }
+
+  test("Runtime Filter supports pruning side has Window") {
+    
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000") {
+      assertRewroteWithBloomFilter(
+        """
+          |SELECT *
+          |FROM   (SELECT *,
+          |               Row_number() OVER (PARTITION BY c1 ORDER BY f1) rn
+          |        FROM   bf1) bf1
+          |       JOIN bf2 ON bf1.c1 = bf2.c2
+          |WHERE  bf2.a2 = 62
+        """.stripMargin)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to