This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 739aae15540 [SPARK-41509][SQL] Only execute `Murmur3Hash` on aggregate expressions for semi-join runtime filter 739aae15540 is described below commit 739aae15540ff5957037ad88df3139e3934e3712 Author: Jiaan Geng <belie...@163.com> AuthorDate: Wed Dec 21 21:10:46 2022 +0800 [SPARK-41509][SQL] Only execute `Murmur3Hash` on aggregate expressions for semi-join runtime filter ### What changes were proposed in this pull request? Currently, Spark runtime filter supports bloom filter and in subquery filter. The in subquery filter always execute `Murmur3Hash` before aggregate the join key. Because the data size before aggregate lager than after, we can only execute `Murmur3Hash` for aggregation and it will reduce the number of calls to `Murmur3Hash` and improve performance. ### Why are the changes needed? Improve performance for semi-join runtime filter. ### Does this PR introduce _any_ user-facing change? 'No'. Just update the inner implementation. ### How was this patch tested? Manually test TPC-DS. ``` spark.sql.optimizer.runtime.bloomFilter.enabled=false spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled=true ``` TPC-DS data size: 2TB. This improvement is valid for below TPC-DS test cases and no regression for other test cases. | TPC-DS Query | Before(Seconds) | After(Seconds) | Speedup(Percent) | | ---- | ---- | ---- | ---- | | q23a | 542.03 | 539.06 | 0.55% | | q23b | 776.7 | 769.74 | 0.90% | | q24a | 442.25 | 436.75 | 1.26% | | q24b | 436.16 | 432.86 | 0.76% | | q50 | 200.92 | 193.36 | 3.91% | | q64 | 426.73 | 421.03 | 1.35% | | q67 | 987.79 | 956.82 | 3.24% | | q93 | 397.26 | 393.9 | 0.85% | | All TPC-DS | 8392.49 | 8302.56 | 1.08% | Closes #39049 from beliefer/SPARK-41509. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index df04a248a27..161abff8fe3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -101,7 +101,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType) val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp) val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)() - val aggregate = ColumnPruning(Aggregate(Seq(alias), Seq(alias), filterCreationSidePlan)) + val aggregate = + ColumnPruning(Aggregate(Seq(filterCreationSideExp), Seq(alias), filterCreationSidePlan)) if (!canBroadcastBySize(aggregate, conf)) { // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold, // i.e., the semi-join will be a shuffled join, which is not worthwhile. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org