This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7796d8a6331 [SPARK-45531][SQL][DOCS] Add more comments and rename some 
variable name for InjectRuntimeFilter
7796d8a6331 is described below

commit 7796d8a63318d560b08d4d6a8b4d68ea0112bd3e
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Mon Oct 16 15:40:17 2023 +0800

    [SPARK-45531][SQL][DOCS] Add more comments and rename some variable name 
for InjectRuntimeFilter
    
    ### What changes were proposed in this pull request?
    After many improvements, `InjectRuntimeFilter` is a bit complex. We need 
add more comments to give more design details and rename some variable name so 
that the `InjectRuntimeFilter` have better readability and maintainability.
    
    The core of a runtime filter is join keys, but the suffix `Exp` is not 
intuitive, so it's better to use the suffix `Key` directly. So rename as 
follows:
    `filterApplicationSideExp` -> `filterApplicationSideKey`
    `filterCreationSideExp` -> `filterCreationSideKey`
    `findBloomFilterWithExp` -> `findBloomFilterWithKey`
    `expr` -> `joinKey`
    
    ### Why are the changes needed?
    Improve the readability and maintainability.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43359 from beliefer/SPARK-45531.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   | 76 ++++++++++++----------
 1 file changed, 40 insertions(+), 36 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 614ab4a1d01..8737082e571 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -29,48 +29,50 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
- * Insert a filter on one side of the join if the other side has a selective 
predicate.
- * The filter could be an IN subquery (converted to a semi join), a bloom 
filter, or something
- * else in the future.
+ * Insert a runtime filter on one side of the join (we call this side the 
application side) if
+ * we can extract a runtime filter from the other side (creation side). A 
simple case is that
+ * the creation side is a table scan with a selective filter.
+ * The runtime filter is logically an IN subquery with the join keys 
(converted to a semi join),
+ * but can be something different physically, such as a bloom filter.
  */
 object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with 
JoinSelectionHelper {
 
-  // Wraps `expr` with a hash function if its byte size is larger than an 
integer.
-  private def mayWrapWithHash(expr: Expression): Expression = {
-    if (expr.dataType.defaultSize > IntegerType.defaultSize) {
-      new Murmur3Hash(Seq(expr))
+  // Wraps `joinKey` with a hash function if its byte size is larger than an 
integer.
+  private def mayWrapWithHash(joinKey: Expression): Expression = {
+    if (joinKey.dataType.defaultSize > IntegerType.defaultSize) {
+      new Murmur3Hash(Seq(joinKey))
     } else {
-      expr
+      joinKey
     }
   }
 
   private def injectFilter(
-      filterApplicationSideExp: Expression,
+      filterApplicationSideKey: Expression,
       filterApplicationSidePlan: LogicalPlan,
-      filterCreationSideExp: Expression,
+      filterCreationSideKey: Expression,
       filterCreationSidePlan: LogicalPlan): LogicalPlan = {
     require(conf.runtimeFilterBloomFilterEnabled || 
conf.runtimeFilterSemiJoinReductionEnabled)
     if (conf.runtimeFilterBloomFilterEnabled) {
       injectBloomFilter(
-        filterApplicationSideExp,
+        filterApplicationSideKey,
         filterApplicationSidePlan,
-        filterCreationSideExp,
+        filterCreationSideKey,
         filterCreationSidePlan
       )
     } else {
       injectInSubqueryFilter(
-        filterApplicationSideExp,
+        filterApplicationSideKey,
         filterApplicationSidePlan,
-        filterCreationSideExp,
+        filterCreationSideKey,
         filterCreationSidePlan
       )
     }
   }
 
   private def injectBloomFilter(
-      filterApplicationSideExp: Expression,
+      filterApplicationSideKey: Expression,
       filterApplicationSidePlan: LogicalPlan,
-      filterCreationSideExp: Expression,
+      filterCreationSideKey: Expression,
       filterCreationSidePlan: LogicalPlan): LogicalPlan = {
     // Skip if the filter creation side is too big
     if (filterCreationSidePlan.stats.sizeInBytes > 
conf.runtimeFilterCreationSideThreshold) {
@@ -79,9 +81,9 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
     val rowCount = filterCreationSidePlan.stats.rowCount
     val bloomFilterAgg =
       if (rowCount.isDefined && rowCount.get.longValue > 0L) {
-        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)), 
rowCount.get.longValue)
+        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideKey)), 
rowCount.get.longValue)
       } else {
-        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideExp)))
+        new BloomFilterAggregate(new XxHash64(Seq(filterCreationSideKey)))
       }
 
     val alias = Alias(bloomFilterAgg.toAggregateExpression(), "bloomFilter")()
@@ -89,26 +91,26 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
       ConstantFolding(ColumnPruning(Aggregate(Nil, Seq(alias), 
filterCreationSidePlan)))
     val bloomFilterSubquery = ScalarSubquery(aggregate, Nil)
     val filter = BloomFilterMightContain(bloomFilterSubquery,
-      new XxHash64(Seq(filterApplicationSideExp)))
+      new XxHash64(Seq(filterApplicationSideKey)))
     Filter(filter, filterApplicationSidePlan)
   }
 
   private def injectInSubqueryFilter(
-      filterApplicationSideExp: Expression,
+      filterApplicationSideKey: Expression,
       filterApplicationSidePlan: LogicalPlan,
-      filterCreationSideExp: Expression,
+      filterCreationSideKey: Expression,
       filterCreationSidePlan: LogicalPlan): LogicalPlan = {
-    require(filterApplicationSideExp.dataType == 
filterCreationSideExp.dataType)
-    val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideExp)
+    require(filterApplicationSideKey.dataType == 
filterCreationSideKey.dataType)
+    val actualFilterKeyExpr = mayWrapWithHash(filterCreationSideKey)
     val alias = Alias(actualFilterKeyExpr, actualFilterKeyExpr.toString)()
     val aggregate =
-      ColumnPruning(Aggregate(Seq(filterCreationSideExp), Seq(alias), 
filterCreationSidePlan))
+      ColumnPruning(Aggregate(Seq(filterCreationSideKey), Seq(alias), 
filterCreationSidePlan))
     if (!canBroadcastBySize(aggregate, conf)) {
       // Skip the InSubquery filter if the size of `aggregate` is beyond 
broadcast join threshold,
       // i.e., the semi-join will be a shuffled join, which is not worthwhile.
       return filterApplicationSidePlan
     }
-    val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideExp)),
+    val filter = InSubquery(Seq(mayWrapWithHash(filterApplicationSideKey)),
       ListQuery(aggregate, numCols = aggregate.output.length))
     Filter(filter, filterApplicationSidePlan)
   }
@@ -117,11 +119,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
    * Extracts a sub-plan which is a simple filter over scan from the input 
plan. The simple
    * filter should be selective and the filter condition (including 
expressions in the child
    * plan referenced by the filter condition) should be a simple expression, 
so that we do
-   * not add a subquery that might have an expensive computation.
+   * not add a subquery that might have an expensive computation. The 
extracted sub-plan should
+   * produce a superset of the entire creation side output data, so that it's 
still correct to
+   * use the sub-plan to build the runtime filter to prune the application 
side.
    */
   private def extractSelectiveFilterOverScan(
       plan: LogicalPlan,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
+      filterCreationSideKey: Expression): Option[LogicalPlan] = {
     @tailrec
     def extract(
         p: LogicalPlan,
@@ -157,10 +161,10 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
         // Runtime filters use one side of the [[Join]] to build a set of join 
key values and prune
         // the other side of the [[Join]]. It's also OK to use a superset of 
the join key values
         // (ignore null values) to do the pruning.
-        if (left.output.exists(_.semanticEquals(filterCreationSideExp))) {
+        if (left.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(left, AttributeSet.empty,
             hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = 
left)
-        } else if 
(right.output.exists(_.semanticEquals(filterCreationSideExp))) {
+        } else if 
(right.output.exists(_.semanticEquals(filterCreationSideKey))) {
           extract(right, AttributeSet.empty,
             hasHitFilter = false, hasHitSelectiveFilter = false, currentPlan = 
right)
         } else {
@@ -226,7 +230,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 
   /**
    * Extracts the beneficial filter creation plan with check show below:
-   * - The filterApplicationSideJoinExp can be pushed down through joins, 
aggregates and windows
+   * - The filterApplicationSideKey can be pushed down through joins, 
aggregates and windows
    *   (ie the expression references originate from a single leaf node)
    * - The filter creation side has a selective predicate
    * - The max filterApplicationSide scan size is greater than a configurable 
threshold
@@ -234,12 +238,12 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   private def extractBeneficialFilterCreatePlan(
       filterApplicationSide: LogicalPlan,
       filterCreationSide: LogicalPlan,
-      filterApplicationSideExp: Expression,
-      filterCreationSideExp: Expression): Option[LogicalPlan] = {
+      filterApplicationSideKey: Expression,
+      filterCreationSideKey: Expression): Option[LogicalPlan] = {
     if (findExpressionAndTrackLineageDown(
-      filterApplicationSideExp, filterApplicationSide).isDefined &&
+      filterApplicationSideKey, filterApplicationSide).isDefined &&
       satisfyByteSizeRequirement(filterApplicationSide)) {
-      extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp)
+      extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideKey)
     } else {
       None
     }
@@ -276,10 +280,10 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
       right: LogicalPlan,
       leftKey: Expression,
       rightKey: Expression): Boolean = {
-    findBloomFilterWithExp(left, leftKey) || findBloomFilterWithExp(right, 
rightKey)
+    findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right, 
rightKey)
   }
 
-  private def findBloomFilterWithExp(plan: LogicalPlan, key: Expression): 
Boolean = {
+  private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): 
Boolean = {
     plan.exists {
       case Filter(condition, _) =>
         splitConjunctivePredicates(condition).exists {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to