This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d8df4f6b99b [SPARK-45606][SQL] Release restrictions on multi-layer 
runtime filter
1d8df4f6b99b is described below

commit 1d8df4f6b99b836f4267b888e81d67c75b4dfdcd
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Wed Nov 8 19:43:33 2023 +0800

    [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter
    
    ### What changes were proposed in this pull request?
    Before https://github.com/apache/spark/pull/39170, Spark only supports 
insert runtime filter for application side of shuffle join on single-layer. 
Considered it's not worth to insert more runtime filter if the column already 
exists runtime filter, Spark restricts it at 
https://github.com/apache/spark/blob/7057952f6bc2c5cf97dd408effd1b18bee1cb8f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala#L346
    For example
    `select * from bf1 join bf2 on bf1.c1 = bf2.c2 and bf1.c1 = bf2.b2 where 
bf2.a2 = 62`
    This SQL have two join conditions. There will insert two runtime filter on 
`bf1.c1` if haven't the restriction mentioned above.
    At that time, it was reasonable.
    
    After https://github.com/apache/spark/pull/39170, Spark supports insert 
runtime filter for one side of any shuffle join on multi-layer. But the 
restrictions on multi-layer runtime filter mentioned above looks outdated.
    For example
    `select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 
where bf2.a2 = 5`
    Assume bf2 as the build side and insert a runtime filter for bf1. We can't 
insert the same runtime filter for bf3 due to there are already a runtime 
filter on `bf1.c1`.
    The behavior is different from the origin and is unexpected.
    
    The change of the PR doesn't affect the restriction mentioned above.
    
    ### Why are the changes needed?
    Release restrictions on multi-layer runtime filter.
    Expand optimization surface.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    Test cases updated.
    
    Micro benchmark for q9 in TPC-H.
    **TPC-H 100**
    Query | Master(ms) | PR(ms) | Difference(ms) | Percent
    -- | -- | -- | -- | --
    q9 | 26491 | 20725 | 5766| 27.82%
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43449 from beliefer/SPARK-45606.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Jiaan Geng <belie...@163.com>
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   | 33 ++++++++++------------
 .../spark/sql/InjectRuntimeFilterSuite.scala       |  8 ++----
 2 files changed, 18 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 5f5508d6b22c..9c150f1f3308 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -247,15 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     }
   }
 
-  private def hasBloomFilter(
-      left: LogicalPlan,
-      right: LogicalPlan,
-      leftKey: Expression,
-      rightKey: Expression): Boolean = {
-    findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right, 
rightKey)
-  }
-
-  private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): 
Boolean = {
+  private def hasBloomFilter(plan: LogicalPlan, key: Expression): Boolean = {
     plan.exists {
       case Filter(condition, _) =>
         splitConjunctivePredicates(condition).exists {
@@ -277,28 +269,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
         leftKeys.lazyZip(rightKeys).foreach((l, r) => {
           // Check if:
           // 1. There is already a DPP filter on the key
-          // 2. There is already a bloom filter on the key
-          // 3. The keys are simple cheap expressions
+          // 2. The keys are simple cheap expressions
           if (filterCounter < numFilterThreshold &&
             !hasDynamicPruningSubquery(left, right, l, r) &&
-            !hasBloomFilter(newLeft, newRight, l, r) &&
             isSimpleExpression(l) && isSimpleExpression(r)) {
             val oldLeft = newLeft
             val oldRight = newRight
-            // Check if the current join is a shuffle join or a broadcast join 
that
-            // has a shuffle below it
+            // Check if:
+            // 1. The current join type supports prune the left side with 
runtime filter
+            // 2. The current join is a shuffle join or a broadcast join that
+            //    has a shuffle below it
+            // 3. There is no bloom filter on the left key yet
             val hasShuffle = isProbablyShuffleJoin(left, right, hint)
-            if (canPruneLeft(joinType) && (hasShuffle || 
probablyHasShuffle(left))) {
+            if (canPruneLeft(joinType) && (hasShuffle || 
probablyHasShuffle(left)) &&
+              !hasBloomFilter(newLeft, l)) {
               extractBeneficialFilterCreatePlan(left, right, l, r).foreach {
                 case (filterCreationSideKey, filterCreationSidePlan) =>
                   newLeft = injectFilter(l, newLeft, filterCreationSideKey, 
filterCreationSidePlan)
               }
             }
             // Did we actually inject on the left? If not, try on the right
-            // Check if the current join is a shuffle join or a broadcast join 
that
-            // has a shuffle below it
+            // Check if:
+            // 1. The current join type supports prune the right side with 
runtime filter
+            // 2. The current join is a shuffle join or a broadcast join that
+            //    has a shuffle below it
+            // 3. There is no bloom filter on the right key yet
             if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
-              (hasShuffle || probablyHasShuffle(right))) {
+              (hasShuffle || probablyHasShuffle(right)) && 
!hasBloomFilter(newRight, r)) {
               extractBeneficialFilterCreatePlan(right, left, r, l).foreach {
                 case (filterCreationSideKey, filterCreationSidePlan) =>
                   newRight = injectFilter(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 2e57975ee6d1..fc1524be1317 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -335,14 +335,12 @@ class InjectRuntimeFilterSuite extends QueryTest with 
SQLTestUtils with SharedSp
         "bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2)
       assertRewroteWithBloomFilter("select * from (select * from bf1 right 
join bf2 on " +
         "bf1.c1 = bf2.c2 where bf2.a2 = 5) as a join bf3 on bf3.c3 = a.c1", 2)
-      // Can't leverage the transitivity of join keys due to runtime filters 
already exists.
-      // bf2 as creation side and inject runtime filter for bf1.
       assertRewroteWithBloomFilter("select * from bf1 join bf2 join bf3 on 
bf1.c1 = bf2.c2 " +
-        "and bf3.c3 = bf1.c1 where bf2.a2 = 5")
+        "and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
       assertRewroteWithBloomFilter("select * from bf1 left outer join bf2 join 
bf3 on " +
-        "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5")
+        "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
       assertRewroteWithBloomFilter("select * from bf1 right outer join bf2 
join bf3 on " +
-        "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5")
+        "bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 where bf2.a2 = 5", 2)
     }
 
     
withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key
 -> "3000",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to