andygrove opened a new pull request, #4207:
URL: https://github.com/apache/datafusion-comet/pull/4207

   ## Which issue does this PR close?
   
   Closes #4122.
   
   ## Rationale for this change
   
   On Spark 4.1, SPARK-52921 added `UNION_OUTPUT_PARTITIONING`: when all 
children of a `UnionExec` share the same hash/single partitioning, the union 
itself reports that same partitioning. Downstream operators (e.g. a final hash 
aggregate) then skip an otherwise-required shuffle, and Spark's row-based 
`UnionExec.doExecute` keeps the partitioning invariant by routing through 
`SQLPartitioningAwareUnionRDD` (each output partition unions partition *i* from 
every child).
   
   `CometUnionExec` silently broke both halves of that contract:
   
   - `doExecuteColumnar` used `sparkContext.union(...)`, which concatenates 
partitions — partition *i* of the output only holds partition *i* of a single 
child.
   - `outputPartitioning` delegated to the frozen `originalPlan` snapshot 
captured at `CometExecRule` time, so AQE's post-stage coalescing was invisible.
   
   The result: `EXCEPT ALL` / `INTERSECT ALL` whose sides are themselves `GROUP 
BY` aggregates lost rows silently (e.g. `EXCEPT ALL` returning `{2, 3}` instead 
of `{3}`). Two Spark 4.1.1 `SQLQueryTestSuite` files (`except-all.sql`, 
`intersect-all.sql`) were disabled for Comet because of this.
   
   ## What changes are included in this PR?
   
   - Override `CometUnionExec.outputPartitioning` to recompute from the live 
`children` rather than `originalPlan`.
   - Route `doExecuteColumnar` through a new `ShimCometUnionExec.unionRDDs` 
helper that uses `SQLPartitioningAwareUnionRDD` on Spark 4.1+ when a known 
partitioning is declared (with a partition-count sanity check and a safe 
fallback to plain concat), and retains `sparkContext.union` behavior on pre-4.1 
Spark where `UnionExec.outputPartitioning` is always `UnknownPartitioning`.
   - Add `CometSetOpWithGroupBySuite` covering the two queries from the Spark 
SQL tests.
   - Remove the `spark.comet.enabled = false` guards at the top of 
`except-all.sql` and `intersect-all.sql` in `dev/diffs/4.1.1.diff`.
   
   ## How are these changes tested?
   
   - New `CometSetOpWithGroupBySuite` passes on Spark 3.5 and Spark 4.1.1 
profiles.
   - Existing `CometExecSuite` (246 tests) passes on Spark 3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to