andygrove commented on code in PR #1746:
URL: https://github.com/apache/datafusion-comet/pull/1746#discussion_r2096187472
##########
spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala:
##########
@@ -2376,12 +2375,26 @@ object QueryPlanSerde extends Logging with
CometExprShim {
val cond = exprToProto(condition, child.output)
if (cond.isDefined && childOp.nonEmpty) {
+ // We need to determine whether to use DataFusion's FilterExec or
Comet's
+ // FilterExec. The difference is that DataFusion's implementation
will sometimes pass
+ // batches through whereas the Comet implementation guarantees that
a copy is always
+ // made, which is critical when using `native_comet` scans due to
buffer re-use
+
+ // TODO this could be optimized more to stop walking the tree on
hitting
+ // certain operators such as join or aggregate which will copy
batches
+ def containsNativeCometScan(plan: SparkPlan): Boolean = {
+ plan match {
+ case w: CometScanWrapper =>
containsNativeCometScan(w.originalPlan)
+ case scan: CometScanExec => scan.scanImpl ==
CometConf.SCAN_NATIVE_COMET
Review Comment:
We only need to use Comet's FilterExec if any of the scans are
`NATIVE_COMET` due to the buffer re-use in that scan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]