This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 892fdc53269 [SPARK-45108][SQL] Improve the InjectRuntimeFilter for check probably shuffle 892fdc53269 is described below commit 892fdc532696e703b353c4758320d69162fffe8c Author: Jiaan Geng <belie...@163.com> AuthorDate: Thu Sep 21 19:52:40 2023 +0800 [SPARK-45108][SQL] Improve the InjectRuntimeFilter for check probably shuffle ### What changes were proposed in this pull request? `InjectRuntimeFilter` needs to check probably shuffle. But the current code may lead to duplicate call of `isProbablyShuffleJoin` if we need the right side of `Join` node as the application side. ### Why are the changes needed? To avoid the duplicate call of `isProbablyShuffleJoin`. ### Does this PR introduce _any_ user-facing change? 'No'. Just update the inner implementation. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #42861 from beliefer/SPARK-45108. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/optimizer/InjectRuntimeFilter.scala | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 44c55860375..13554908379 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -229,19 +229,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J * - The filterApplicationSideJoinExp can be pushed down through joins, aggregates and windows * (ie the expression references originate from a single leaf node) * - The filter creation side has a selective predicate - * - The current join is a shuffle join or a broadcast join that has a shuffle below it * - The max filterApplicationSide scan size is greater than a configurable threshold */ private def extractBeneficialFilterCreatePlan( filterApplicationSide: LogicalPlan, filterCreationSide: LogicalPlan, filterApplicationSideExp: Expression, - filterCreationSideExp: Expression, - hint: JoinHint): Option[LogicalPlan] = { + filterCreationSideExp: Expression): Option[LogicalPlan] = { if (findExpressionAndTrackLineageDown( filterApplicationSideExp, filterApplicationSide).isDefined && - (isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint) || - probablyHasShuffle(filterApplicationSide)) && satisfyByteSizeRequirement(filterApplicationSide)) { extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp) } else { @@ -326,15 +322,21 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J isSimpleExpression(l) && isSimpleExpression(r)) { val oldLeft = newLeft val oldRight = newRight - if (canPruneLeft(joinType)) { - extractBeneficialFilterCreatePlan(left, right, l, r, hint).foreach { + // Check if the current join is a shuffle join or a broadcast join that + // has a shuffle below it + val hasShuffle = isProbablyShuffleJoin(left, right, hint) + if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left))) { + extractBeneficialFilterCreatePlan(left, right, l, r).foreach { filterCreationSidePlan => newLeft = injectFilter(l, newLeft, r, filterCreationSidePlan) } } // Did we actually inject on the left? If not, try on the right - if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType)) { - extractBeneficialFilterCreatePlan(right, left, r, l, hint).foreach { + // Check if the current join is a shuffle join or a broadcast join that + // has a shuffle below it + if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) && + (hasShuffle || probablyHasShuffle(right))) { + extractBeneficialFilterCreatePlan(right, left, r, l).foreach { filterCreationSidePlan => newRight = injectFilter(r, newRight, l, filterCreationSidePlan) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org