schenksj commented on code in PR #4700:
URL: https://github.com/apache/datafusion-comet/pull/4700#discussion_r3488603418
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -901,6 +943,43 @@ abstract class CometLeafExec extends CometNativeExec with
LeafExecNode {
}
}
+/**
+ * Marker trait for scan execs that surface planning data (a `commonData`
block + per-partition
+ * task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can
find and inject the
+ * data when the scan is fused into a larger native subtree.
+ *
+ * Implemented by `CometNativeScanExec` and the contrib's
`CometDeltaNativeScanExec` -- without
+ * it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition
tasks and the
+ * parent's native execution receives an empty input.
(`CometIcebergNativeScanExec` does NOT use
+ * this trait; it has a dedicated `findAllPlanData` case.)
+ *
+ * Each implementation also resolves its own DPP subqueries via
`ensureSubqueriesResolved`
+ * (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData`
are read.
+ */
+trait CometScanWithPlanData {
Review Comment:
Done in dfde184b5 — went with the self-type: `trait CometScanWithPlanData {
self: CometLeafExec => ... }`. Now "is a leaf scan" is a compile-time
requirement (an impl that forgets to extend `CometLeafExec` no longer
compiles), and `findAllPlanData` matches the compound type `case s:
CometLeafExec with CometScanWithPlanData` and drives
`ensureSubqueriesResolved()` directly — the runtime `case _ =>` silent-skip is
gone. The stub test you suggested now exercises this exact path (`CometLeafExec
with CometScanWithPlanData` through `findAllPlanData`).
##########
spark/src/main/scala/org/apache/spark/sql/comet/operators.scala:
##########
@@ -901,6 +943,43 @@ abstract class CometLeafExec extends CometNativeExec with
LeafExecNode {
}
}
+/**
+ * Marker trait for scan execs that surface planning data (a `commonData`
block + per-partition
+ * task bytes keyed by `sourceKey`) so that a parent `CometNativeExec` can
find and inject the
+ * data when the scan is fused into a larger native subtree.
+ *
+ * Implemented by `CometNativeScanExec` and the contrib's
`CometDeltaNativeScanExec` -- without
+ * it, [[PlanDataInjector.findAllPlanData]] cannot collect the per-partition
tasks and the
+ * parent's native execution receives an empty input.
(`CometIcebergNativeScanExec` does NOT use
+ * this trait; it has a dedicated `findAllPlanData` case.)
+ *
+ * Each implementation also resolves its own DPP subqueries via
`ensureSubqueriesResolved`
+ * (overridden from [[CometLeafExec]]) before `commonData`/`perPartitionData`
are read.
+ */
+trait CometScanWithPlanData {
+ def sourceKey: String
+ def commonData: Array[Byte]
+ def perPartitionData: Array[Array[Byte]]
+
+ // DPP / partition filters that may carry AQE SubqueryAdaptiveBroadcast
+ // subqueries needing rewrite by CometPlanAdaptiveDynamicPruningFilters.
+ // Default empty: scans with dedicated handling (CometNativeScanExec,
+ // CometIcebergNativeScanExec) don't use this path.
+ def dynamicPruningFilters: Seq[Expression] = Nil
+
+ // Install rewritten DPP filters on this scan. Implementers whose filters
live
+ // in a @transient field (which TreeNode.makeCopy can't carry, #3510) update
+ // them via a transient side-channel and return `this` -- so the optimizer
+ // rule's rewrite lands on the SAME instance that executes, instead of a copy
+ // that gets dropped when the enclosing native block is rebuilt. Only called
+ // when `dynamicPruningFilters` is non-empty, so the default is never reached
+ // for scans that leave it empty.
+ def withDynamicPruningFilters(filters: Seq[Expression]): SparkPlan =
Review Comment:
Done in dfde184b5 — mirrored the marker: `withDynamicPruningFilters` now
carries a `TODO(#3510)` noting the mutate-and-return-`this` collapses back to a
normal copy once `makeCopy` preserves `@transient` fields, pointing at the
matching TODO in `CometPlanAdaptiveDynamicPruningFilters`.
##########
spark/src/main/scala/org/apache/comet/rules/CometPlanAdaptiveDynamicPruningFilters.scala:
##########
@@ -83,6 +83,15 @@ case object CometPlanAdaptiveDynamicPruningFilters
if icebergScan.runtimeFilters.exists(hasCometSAB) =>
logDebug("Converting AQE DPP for CometIcebergNativeScanExec")
convertIcebergScanDPP(icebergScan, plan)
+ // Comet scans whose DPP filters live in a @transient field (the
contrib's
+ // CometDeltaNativeScanExec). transformExpressions/makeCopy can't rewrite
+ // them, and a rewritten copy is orphaned when the enclosing native block
+ // is rebuilt (#3510). The scan's `withDynamicPruningFilters` installs
the
+ // rewrite in place and returns `this`, so it lands on the executing
+ // instance.
+ case p: CometScanWithPlanData if
p.dynamicPruningFilters.exists(hasCometSAB) =>
+ logDebug(s"Converting AQE DPP for ${p.getClass.getSimpleName} in
place")
+ p.withDynamicPruningFilters(p.dynamicPruningFilters.map(f =>
convertFilter(f, plan)))
case p: SparkPlan
Review Comment:
Thanks — addressed pre-emptively in dfde184b5 rather than deferring, since
it was cheap and forward-safe: the non-Comet arm now also excludes
`CometScanWithPlanData` (`&& !p.isInstanceOf[CometScanWithPlanData]`), so a
future trait scan with a wrapped SAB but empty `dynamicPruningFilters` can't be
misrouted to `convertNonCometNodeDPP`. No behavior change today
(`CometNativeScanExec` was already excluded by name).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]