mbutrovich opened a new pull request, #4812:
URL: https://github.com/apache/datafusion-comet/pull/4812

   ## Which issue does this PR close?
   
   Closes #4774.
   
   ## Rationale for this change
   
   Comet silently produces wrong results when the same Iceberg table is scanned 
more than once with different pushed-down filters (for example a `FULL OUTER 
JOIN` or `UNION ALL` of two differently-filtered reads). Spark's non-AQE 
`ReuseExchangeAndSubquery` keys exchange reuse off `Exchange.canonicalized`, 
which delegates to the scan's canonical form. 
`CometIcebergNativeScanExec.equals`/`hashCode`/`doCanonicalize` identified a 
scan by `metadataLocation`, `output`, `serializedPlanOpt`, and `runtimeFilters` 
only. None of these carry the pushed static filters, which live in the 
`@transient originalPlan` that canonicalization nulls out. Two scans of the 
same table+snapshot with different filters therefore canonicalize identically, 
and reuse collapses them into one, so one branch reads the other branch's data.
   
   Vanilla Spark avoids this because `BatchScanExec.equals` compares 
`scan.toBatch`, and Iceberg's `SparkBatch.equals` compares `table.name()` plus 
`SparkScan.hashCode()`, which folds in pushed filters, snapshot, branch, and 
read schema. Spark also keeps the `scan` reference alive through 
`doCanonicalize`, so the fingerprint survives. Comet's V1 Parquet scan 
(`CometNativeScanExec`) is not affected because its filters are top-level 
`partitionFilters`/`dataFilters` fields that are already in equality and 
preserved by canonicalization.
   
   ## What changes are included in this PR?
   
   - Add a non-transient `scanHashCode: Int` field to 
`CometIcebergNativeScanExec`, captured from `scanExec.scan.hashCode()` in the 
`apply` factory while the transient scan is still available.
   - Include `scanHashCode` in `equals` and `hashCode`, and carry it through 
`doCanonicalize` (it is the only distinguishing field left once `originalPlan` 
is nulled) and `convertBlock`.
   
   ## How are these changes tested?
   
   New test in `CometIcebergNativeSuite`: "exchange reuse must not collapse 
scans with different pushed filters (#4774)". It runs a `UNION ALL` of two 
aggregations over the same partitioned Iceberg table with different partition 
filters under non-AQE with `spark.sql.exchange.reuse=true`. It fails on `main` 
(branch B reuses branch A's exchange, yielding A twice and dropping B) and 
passes with this change. Beyond `checkSparkAnswerAndOperator`, it asserts both 
`CometIcebergNativeScanExec` nodes survive and that no `ReusedExchangeExec` 
collapsed the branches. The existing AQE DPP broadcast-reuse tests continue to 
pass, confirming legitimate reuse (identical scans still share an exchange) is 
preserved.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to