andygrove opened a new issue, #4813:
URL: https://github.com/apache/datafusion-comet/issues/4813
## Describe the bug
An aggregate whose intermediate buffer format is incompatible between Spark
and Comet (`CometAggregateExpressionSerde.supportsMixedPartialFinal == false`)
can be **split across Comet and Spark within a distinct-aggregate rewrite**, so
a Comet-encoded partial buffer is handed to a Spark aggregate (or vice versa).
Depending on the aggregate this produces a hard crash or silently wrong results.
This is a pre-existing gap in the mixed-execution guards added for #1389.
Those guards only handle a direct single-stage `Partial → Final` pair:
- `CometExecRule.tagUnsafePartialAggregates` / `findPartialAggInPlan` only
looked for a `Partial` aggregate feeding *directly* into a non-convertible
`Final`, and **stopped at intermediate aggregate stages**.
- `CometBaseAggregate.doConvert` only guarded the `Final` mode
(`sparkFinalMode`), not intermediate `PartialMerge` stages.
Spark's single-distinct rewrite (`AggUtils.planAggregateWithOneDistinct`)
produces a **four-stage** plan where the non-distinct aggregate's `Partial` is
separated from its `Final` by intermediate `PartialMerge` stages:
```
Final(pa, count) <- consumes buffers
PartialMerge(pa) + Partial(count distinct)
PartialMerge(pa) <- consumes buffers, keyed by the distinct column
Partial(pa) <- produces the incompatible buffer
```
If any stage of this chain falls back to Spark while another converts to
Comet (e.g. because the distinct aggregate or its grouping key is unsupported
by Comet, or via the `comet.exec.*HashAggregate.enabled` test configs), the
incompatible buffer crosses the engine boundary.
## Steps to reproduce
Any incompatible-buffer aggregate combined with a distinct aggregate, forced
to split. For example with a `TypedImperativeAggregate`:
```sql
SELECT collect_set(v), count(DISTINCT k) FROM t GROUP BY g
```
Forcing the split with `spark.comet.exec.finalHashAggregate.enabled=false`
(Comet does the partial/merge, Spark does the final) crashes, because Spark's
aggregate tries to deserialize a Comet-encoded buffer.
## Expected behavior
When an incompatible-buffer aggregate cannot run entirely in one engine
across the whole distinct-rewrite chain, the entire chain for that aggregate
must fall back to Spark, as already happens for the simple `Partial → Final`
case.
## Additional context
Surfaced by the `approx_percentile` / `percentile_approx` work (#4801):
Spark's `ObjectHashAggregateSuite` "randomized aggregation test - [typed, with
distinct]" crashed with a native panic
```
quantile_summaries.rs: range end index 100 out of range for slice of length
96
```
because Comet's Greenwald-Khanna digest is wildly different from Spark's
`PercentileDigest`, so the boundary panics loudly. Other affected aggregates
(`collect_set`, `collect_list`, exact `percentile`, and the declarative `avg` /
decimal `sum` / variance family) have structurally similar buffers and instead
risk **silent wrong results**, which is why this went unnoticed.
Affected: any aggregate with `supportsMixedPartialFinal == false` used
together with a distinct aggregate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]