This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7796d8a6331 [SPARK-45531][SQL][DOCS] Add more comments and rename some variable name for InjectRuntimeFilter 7796d8a6331 is described below commit 7796d8a63318d560b08d4d6a8b4d68ea0112bd3e Author: Jiaan Geng <belie...@163.com> AuthorDate: Mon Oct 16 15:40:17 2023 +0800 [SPARK-45531][SQL][DOCS] Add more comments and rename some variable name for InjectRuntimeFilter ### What changes were proposed in this pull request? After many improvements, `InjectRuntimeFilter` is a bit complex. We need add more comments to give more design details and rename some variable name so that the `InjectRuntimeFilter` have better readability and maintainability. The core of a runtime filter is join keys, but the suffix `Exp` is not intuitive, so it's better to use the suffix `Key` directly. So rename as follows: `filterApplicationSideExp` -> `filterApplicationSideKey` `filterCreationSideExp` -> `filterCreationSideKey` `findBloomFilterWithExp` -> `findBloomFilterWithKey` `expr` -> `joinKey` ### Why are the changes needed? Improve the readability and maintainability. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43359 from beliefer/SPARK-45531. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 76 ++++++++++++---------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 614ab4a1d01..8737082e571 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -29,48 +29,50 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** - * Insert a filter on one side of the join if the other side has a selective predicate. - * The filter could be an IN subquery (converted to a semi join), a bloom filter, or something - * else in the future. + * Insert a runtime filter on one side of the join (we call this side the application side) if + * we can extract a runtime filter from the other side (creation side). A simple case is that + * the creation side is a table scan with a selective filter. + * The runtime filter is logically an IN subquery with the join keys (converted to a semi join), + * but can be something different physically, such as a bloom filter. */ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper { - // Wraps `expr` with a hash function if its byte size is larger than an integer. - private def mayWrapWithHash(expr: Expression): Expression = { - if (expr.dataType.defaultSize > IntegerType.defaultSize) { - new Murmur3Hash(Seq(expr)) + // Wraps `joinKey` with a hash function if its byte size is larger than an integer. + private def mayWrapWithHash(joinKey: Expression): Expression = { + if (joinKey.dataType.defaultSize > IntegerType.defaultSize) { + new Murmur3Hash(Seq(joinKey)) } else { - expr + joinKey } } private def injectFilter( - filterApplicationSideExp: Expression, + filterApplicationSideKey: Expression, filterApplicationSidePlan: LogicalPlan, - filterCreationSideExp: Expression, + filterCreationSideKey: Expression, filterCreationSidePlan: LogicalPlan): LogicalPlan = { require(conf.runtimeFilterBloomFilterEnabled || conf.runtimeFilterSemiJoinReductionEnabled) if (conf.runtimeFilterBloomFilterEnabled) { injectBloomFilter( - filterApplicationSideExp, + filterApplicationSideKey, filterApplicationSidePlan, - filterCreationSideExp, + filterCreationSideKey, filterCreationSidePlan ) } else { injectInSubqueryFilter( - filterApplicationSideExp, + filterApplicationSideKey, filterApplicationSidePlan, - filterCreationSideExp, + filterCreationSideKey, filterCreationSidePlan ) } } private def injectBloomFilter( - filterApplicationSideExp: Expression, + filterApplicationSideKey: Expression, filterApplicationSidePlan: LogicalPlan, - filterCreationSideExp: Expression, + filterCreationSideKey: Expression, filterCreationSidePlan: LogicalPlan): LogicalPlan = { // Skip if the filter creation side is too big if (filterCreationSidePlan.stats.sizeInBytes > conf.runtimeFilterCreationSideThreshold) { @@ -79,9 +81,9 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J val rowCount = filterCreationSidePlan.stats.rowCount val bloomFilterAgg = if (rowCount.isDefined && rowCount.get.longValue > 0L) { - new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), rowCount.get.longValue) + new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideKey)), rowCount.get.longValue) } else { - new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp))) + new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideKey))) } val alias = Alias(bloomFilterAgg.toAggregateExpression(), "bloomFilter")() @@ -89,26 +91,26 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), filterCreationSidePlan))) val bloomFilterSubquery = ScalarSubquery(aggregate, Nil) val filter = BloomFilterMightContain(bloomFilterSubquery, - new XxHash64(Seq(filterApplicationSideExp))) + new XxHash64(Seq(filterApplicationSideKey))) Filter(filter, filterApplicationSidePlan) } private def injectInSubqueryFilter( - filterApplicationSideExp: Expression, + filterApplicationSideKey: Expression, filterApplicationSidePlan: LogicalPlan, - filterCreationSideExp: Expression, + filterCreationSideKey: Expression, filterCreationSidePlan: LogicalPlan): LogicalPlan = { - require(filterApplicationSideExp.dataType == filterCreationSideExp.dataType) - val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp) + require(filterApplicationSideKey.dataType == filterCreationSideKey.dataType) + val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideKey) val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)() val aggregate = - ColumnPruning(Aggregate(Seq(filterCreationSideExp), Seq(alias), filterCreationSidePlan)) + ColumnPruning(Aggregate(Seq(filterCreationSideKey), Seq(alias), filterCreationSidePlan)) if (!canBroadcastBySize(aggregate, conf)) { // Skip the InSubquery filter if the size of `aggregate` is beyond broadcast join threshold, // i.e., the semi-join will be a shuffled join, which is not worthwhile. return filterApplicationSidePlan } - val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)), + val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideKey)), ListQuery(aggregate, numCols = aggregate.output.length)) Filter(filter, filterApplicationSidePlan) } @@ -117,11 +119,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J * Extracts a sub-plan which is a simple filter over scan from the input plan. The simple * filter should be selective and the filter condition (including expressions in the child * plan referenced by the filter condition) should be a simple expression, so that we do - * not add a subquery that might have an expensive computation. + * not add a subquery that might have an expensive computation. The extracted sub-plan should + * produce a superset of the entire creation side output data, so that it's still correct to + * use the sub-plan to build the runtime filter to prune the application side. */ private def extractSelectiveFilterOverScan( plan: LogicalPlan, - filterCreationSideExp: Expression): Option[LogicalPlan] = { + filterCreationSideKey: Expression): Option[LogicalPlan] = { @tailrec def extract( p: LogicalPlan, @@ -157,10 +161,10 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J // Runtime filters use one side of the [[Join]] to build a set of join key values and prune // the other side of the [[Join]]. It's also OK to use a superset of the join key values // (ignore null values) to do the pruning. - if (left.output.exists(_.semanticEquals(filterCreationSideExp))) { + if (left.output.exists(_.semanticEquals(filterCreationSideKey))) { extract(left, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = left) - } else if (right.output.exists(_.semanticEquals(filterCreationSideExp))) { + } else if (right.output.exists(_.semanticEquals(filterCreationSideKey))) { extract(right, AttributeSet.empty, hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = right) } else { @@ -226,7 +230,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J /** * Extracts the beneficial filter creation plan with check show below: - * - The filterApplicationSideJoinExp can be pushed down through joins, aggregates and windows + * - The filterApplicationSideKey can be pushed down through joins, aggregates and windows * (ie the expression references originate from a single leaf node) * - The filter creation side has a selective predicate * - The max filterApplicationSide scan size is greater than a configurable threshold @@ -234,12 +238,12 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J private def extractBeneficialFilterCreatePlan( filterApplicationSide: LogicalPlan, filterCreationSide: LogicalPlan, - filterApplicationSideExp: Expression, - filterCreationSideExp: Expression): Option[LogicalPlan] = { + filterApplicationSideKey: Expression, + filterCreationSideKey: Expression): Option[LogicalPlan] = { if (findExpressionAndTrackLineageDown( - filterApplicationSideExp, filterApplicationSide).isDefined && + filterApplicationSideKey, filterApplicationSide).isDefined && satisfyByteSizeRequirement(filterApplicationSide)) { - extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp) + extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideKey) } else { None } @@ -276,10 +280,10 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J right: LogicalPlan, leftKey: Expression, rightKey: Expression): Boolean = { - findBloomFilterWithExp(left, leftKey) || findBloomFilterWithExp(right, rightKey) + findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right, rightKey) } - private def findBloomFilterWithExp(plan: LogicalPlan, key: Expression): Boolean = { + private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): Boolean = { plan.exists { case Filter(condition, _) => splitConjunctivePredicates(condition).exists { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org