This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 739aae15540 [SPARK-41509][SQL] Only execute `Murmur3Hash` on aggregate 
expressions for semi-join runtime filter
739aae15540 is described below

commit 739aae15540ff5957037ad88df3139e3934e3712
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Wed Dec 21 21:10:46 2022 +0800

    [SPARK-41509][SQL] Only execute `Murmur3Hash` on aggregate expressions for 
semi-join runtime filter
    
    ### What changes were proposed in this pull request?
    Currently, Spark runtime filter supports bloom filter and in subquery 
filter.
    The in subquery filter always execute `Murmur3Hash` before aggregate the 
join key.
    
    Because the data size before aggregate lager than after, we can only 
execute `Murmur3Hash` for aggregation and it will reduce the number of calls to 
`Murmur3Hash` and improve performance.
    
    ### Why are the changes needed?
    Improve performance for semi-join runtime filter.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    Just update the inner implementation.
    
    ### How was this patch tested?
    Manually test TPC-DS.
    ```
    spark.sql.optimizer.runtime.bloomFilter.enabled=false
    spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled=true
    ```
    
    TPC-DS data size: 2TB.
    This improvement is valid for below TPC-DS test cases and no regression for 
other test cases.
    
    | TPC-DS Query   | Before(Seconds)  | After(Seconds)  | Speedup(Percent)  |
    |  ----  | ----  | ----  | ----  |
    | q23a | 542.03 | 539.06 | 0.55% |
    | q23b | 776.7 | 769.74 | 0.90% |
    | q24a | 442.25 | 436.75 | 1.26% |
    | q24b | 436.16 | 432.86 | 0.76% |
    | q50 | 200.92 | 193.36 | 3.91% |
    | q64 | 426.73 | 421.03 | 1.35% |
    | q67 | 987.79 | 956.82 | 3.24% |
    | q93 | 397.26 | 393.9 | 0.85% |
    | All TPC-DS | 8392.49 | 8302.56 | 1.08% |
    
    Closes #39049 from beliefer/SPARK-41509.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index df04a248a27..161abff8fe3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -101,7 +101,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     require(filterApplicationSideExp.dataType == 
filterCreationSideExp.dataType)
     val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
     val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
-    val aggregate = ColumnPruning(Aggregate(Seq(alias), Seq(alias), 
filterCreationSidePlan))
+    val aggregate =
+      ColumnPruning(Aggregate(Seq(filterCreationSideExp), Seq(alias), 
filterCreationSidePlan))
     if (!canBroadcastBySize(aggregate, conf)) {
       // Skip the InSubquery filter if the size of `aggregate` is beyond 
broadcast join threshold,
       // i.e., the semi-join will be a shuffled join, which is not worthwhile.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to