szehon-ho commented on code in PR #55887:
URL: https://github.com/apache/spark/pull/55887#discussion_r3247503308


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -187,6 +193,90 @@ object PushDownUtils extends Logging {
     }
   }
 
+  /**
+   * Pushes runtime filters into `scan` and re-plans its input partitions. For 
scans whose
+   * `outputPartitioning` is a [[KeyedPartitioning]] (SPJ-active), validates 
that the data source
+   * preserved the original partitioning and pads with `None` to preserve key 
alignment with the
+   * pre-filter partition set.
+   *
+   * Must be called at execute time: runtime filters carry 
[[DynamicPruningExpression]] and
+   * scalar-subquery references whose values are only resolved after their 
broadcast/subquery
+   * side completes. Callers should wrap the result in a `lazy val` so the 
mutating

Review Comment:
   nit: dont need to mention 'lazy val' explicitly.  but we can keep the 
recommendation for callers to ensure its called once per V2 scan instance



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -187,6 +193,90 @@ object PushDownUtils extends Logging {
     }
   }
 
+  /**
+   * Pushes runtime filters into `scan` and re-plans its input partitions. For 
scans whose
+   * `outputPartitioning` is a [[KeyedPartitioning]] (SPJ-active), validates 
that the data source
+   * preserved the original partitioning and pads with `None` to preserve key 
alignment with the
+   * pre-filter partition set.
+   *
+   * Must be called at execute time: runtime filters carry 
[[DynamicPruningExpression]] and
+   * scalar-subquery references whose values are only resolved after their 
broadcast/subquery
+   * side completes. Callers should wrap the result in a `lazy val` so the 
mutating
+   * [[pushRuntimeFilters]] call runs at most once per scan instance.
+   *
+   * @param scan                      the V2 scan to push filters into
+   * @param runtimeFilters            runtime filters to translate and push
+   * @param partitionPredicateSchema  by-name schema for iterative 
[[PartitionPredicate]] pushdown
+   * @param output                    scan output attributes
+   * @param outputPartitioning        Spark-side output partitioning (used for 
SPJ validation)
+   * @param inputPartitions           by-name original (unfiltered) 
partitions; consulted only when
+   *                                  no runtime filters fire, so callers can 
compute it lazily
+   * @return one entry per original input partition: `Some(part)` for 
surviving partitions and
+   *         `None` for partition keys whose splits were entirely pruned (SPJ 
alignment)
+   */
+  def filterAndPlanPartitions(
+      scan: Scan,
+      runtimeFilters: Seq[Expression],
+      partitionPredicateSchema: => Option[Seq[PartitionPredicateField]],
+      output: Seq[AttributeReference],
+      outputPartitioning: Partitioning,
+      inputPartitions: => Seq[InputPartition]): Seq[Option[InputPartition]] = {
+    val filtered = pushRuntimeFilters(scan, runtimeFilters, 
partitionPredicateSchema, output)
+    if (filtered) {
+      // call toBatch again to get filtered partitions
+      val newPartitions = scan.toBatch.planInputPartitions()
+
+      outputPartitioning match {
+        case k: KeyedPartitioning =>

Review Comment:
   let's just add a small comment here (or above) that this block is to pad 
expected partitions with empty for SPJ case



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -187,6 +193,90 @@ object PushDownUtils extends Logging {
     }
   }
 
+  /**
+   * Pushes runtime filters into `scan` and re-plans its input partitions. For 
scans whose
+   * `outputPartitioning` is a [[KeyedPartitioning]] (SPJ-active), validates 
that the data source
+   * preserved the original partitioning and pads with `None` to preserve key 
alignment with the
+   * pre-filter partition set.
+   *
+   * Must be called at execute time: runtime filters carry 
[[DynamicPruningExpression]] and
+   * scalar-subquery references whose values are only resolved after their 
broadcast/subquery
+   * side completes. Callers should wrap the result in a `lazy val` so the 
mutating
+   * [[pushRuntimeFilters]] call runs at most once per scan instance.
+   *
+   * @param scan                      the V2 scan to push filters into
+   * @param runtimeFilters            runtime filters to translate and push
+   * @param partitionPredicateSchema  by-name schema for iterative 
[[PartitionPredicate]] pushdown
+   * @param output                    scan output attributes
+   * @param outputPartitioning        Spark-side output partitioning (used for 
SPJ validation)
+   * @param inputPartitions           by-name original (unfiltered) 
partitions; consulted only when
+   *                                  no runtime filters fire, so callers can 
compute it lazily
+   * @return one entry per original input partition: `Some(part)` for 
surviving partitions and
+   *         `None` for partition keys whose splits were entirely pruned (SPJ 
alignment)
+   */
+  def filterAndPlanPartitions(

Review Comment:
   what do you guys think of replanWithRuntimeFilters() ?  cc @gengliangwang 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -187,6 +193,90 @@ object PushDownUtils extends Logging {
     }
   }
 
+  /**
+   * Pushes runtime filters into `scan` and re-plans its input partitions. For 
scans whose
+   * `outputPartitioning` is a [[KeyedPartitioning]] (SPJ-active), validates 
that the data source
+   * preserved the original partitioning and pads with `None` to preserve key 
alignment with the
+   * pre-filter partition set.
+   *
+   * Must be called at execute time: runtime filters carry 
[[DynamicPruningExpression]] and
+   * scalar-subquery references whose values are only resolved after their 
broadcast/subquery
+   * side completes. Callers should wrap the result in a `lazy val` so the 
mutating
+   * [[pushRuntimeFilters]] call runs at most once per scan instance.
+   *
+   * @param scan                      the V2 scan to push filters into
+   * @param runtimeFilters            runtime filters to translate and push
+   * @param partitionPredicateSchema  by-name schema for iterative 
[[PartitionPredicate]] pushdown
+   * @param output                    scan output attributes
+   * @param outputPartitioning        Spark-side output partitioning (used for 
SPJ validation)
+   * @param inputPartitions           by-name original (unfiltered) 
partitions; consulted only when

Review Comment:
   should we rename, 'originalPartitions'.  its a bit awkward to decipher what 
the comment infers on the caller, i would just leave it out



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala:
##########
@@ -187,6 +193,90 @@ object PushDownUtils extends Logging {
     }
   }
 
+  /**
+   * Pushes runtime filters into `scan` and re-plans its input partitions. For 
scans whose
+   * `outputPartitioning` is a [[KeyedPartitioning]] (SPJ-active), validates 
that the data source
+   * preserved the original partitioning and pads with `None` to preserve key 
alignment with the
+   * pre-filter partition set.
+   *
+   * Must be called at execute time: runtime filters carry 
[[DynamicPruningExpression]] and
+   * scalar-subquery references whose values are only resolved after their 
broadcast/subquery
+   * side completes. Callers should wrap the result in a `lazy val` so the 
mutating
+   * [[pushRuntimeFilters]] call runs at most once per scan instance.
+   *
+   * @param scan                      the V2 scan to push filters into
+   * @param runtimeFilters            runtime filters to translate and push
+   * @param partitionPredicateSchema  by-name schema for iterative 
[[PartitionPredicate]] pushdown
+   * @param output                    scan output attributes
+   * @param outputPartitioning        Spark-side output partitioning (used for 
SPJ validation)
+   * @param inputPartitions           by-name original (unfiltered) 
partitions; consulted only when

Review Comment:
   also I would skip the scaladoc on 'by-name' , its already apparent in the 
arg definition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to