This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 892fdc53269 [SPARK-45108][SQL] Improve the InjectRuntimeFilter for 
check probably shuffle
892fdc53269 is described below

commit 892fdc532696e703b353c4758320d69162fffe8c
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Thu Sep 21 19:52:40 2023 +0800

    [SPARK-45108][SQL] Improve the InjectRuntimeFilter for check probably 
shuffle
    
    ### What changes were proposed in this pull request?
    `InjectRuntimeFilter` needs to check probably shuffle. But the current code 
may lead to duplicate call of `isProbablyShuffleJoin` if we need the right side 
of `Join` node as the application side.
    
    ### Why are the changes needed?
    To avoid the duplicate call of `isProbablyShuffleJoin`.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    Just update the inner implementation.
    
    ### How was this patch tested?
    Exists test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #42861 from beliefer/SPARK-45108.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/optimizer/InjectRuntimeFilter.scala | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 44c55860375..13554908379 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -229,19 +229,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
    * - The filterApplicationSideJoinExp can be pushed down through joins, 
aggregates and windows
    *   (ie the expression references originate from a single leaf node)
    * - The filter creation side has a selective predicate
-   * - The current join is a shuffle join or a broadcast join that has a 
shuffle below it
    * - The max filterApplicationSide scan size is greater than a configurable 
threshold
    */
   private def extractBeneficialFilterCreatePlan(
       filterApplicationSide: LogicalPlan,
       filterCreationSide: LogicalPlan,
       filterApplicationSideExp: Expression,
-      filterCreationSideExp: Expression,
-      hint: JoinHint): Option[LogicalPlan] = {
+      filterCreationSideExp: Expression): Option[LogicalPlan] = {
     if (findExpressionAndTrackLineageDown(
       filterApplicationSideExp, filterApplicationSide).isDefined &&
-      (isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint) 
||
-        probablyHasShuffle(filterApplicationSide)) &&
       satisfyByteSizeRequirement(filterApplicationSide)) {
       extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp)
     } else {
@@ -326,15 +322,21 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
             isSimpleExpression(l) && isSimpleExpression(r)) {
             val oldLeft = newLeft
             val oldRight = newRight
-            if (canPruneLeft(joinType)) {
-              extractBeneficialFilterCreatePlan(left, right, l, r, 
hint).foreach {
+            // Check if the current join is a shuffle join or a broadcast join 
that
+            // has a shuffle below it
+            val hasShuffle = isProbablyShuffleJoin(left, right, hint)
+            if (canPruneLeft(joinType) && (hasShuffle || 
probablyHasShuffle(left))) {
+              extractBeneficialFilterCreatePlan(left, right, l, r).foreach {
                 filterCreationSidePlan =>
                   newLeft = injectFilter(l, newLeft, r, filterCreationSidePlan)
               }
             }
             // Did we actually inject on the left? If not, try on the right
-            if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType)) {
-              extractBeneficialFilterCreatePlan(right, left, r, l, 
hint).foreach {
+            // Check if the current join is a shuffle join or a broadcast join 
that
+            // has a shuffle below it
+            if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) &&
+              (hasShuffle || probablyHasShuffle(right))) {
+              extractBeneficialFilterCreatePlan(right, left, r, l).foreach {
                 filterCreationSidePlan =>
                   newRight = injectFilter(r, newRight, l, 
filterCreationSidePlan)
               }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to