andygrove opened a new issue, #4813:
URL: https://github.com/apache/datafusion-comet/issues/4813

   ## Describe the bug
   
   An aggregate whose intermediate buffer format is incompatible between Spark 
and Comet (`CometAggregateExpressionSerde.supportsMixedPartialFinal == false`) 
can be **split across Comet and Spark within a distinct-aggregate rewrite**, so 
a Comet-encoded partial buffer is handed to a Spark aggregate (or vice versa). 
Depending on the aggregate this produces a hard crash or silently wrong results.
   
   This is a pre-existing gap in the mixed-execution guards added for #1389. 
Those guards only handle a direct single-stage `Partial → Final` pair:
   
   - `CometExecRule.tagUnsafePartialAggregates` / `findPartialAggInPlan` only 
looked for a `Partial` aggregate feeding *directly* into a non-convertible 
`Final`, and **stopped at intermediate aggregate stages**.
   - `CometBaseAggregate.doConvert` only guarded the `Final` mode 
(`sparkFinalMode`), not intermediate `PartialMerge` stages.
   
   Spark's single-distinct rewrite (`AggUtils.planAggregateWithOneDistinct`) 
produces a **four-stage** plan where the non-distinct aggregate's `Partial` is 
separated from its `Final` by intermediate `PartialMerge` stages:
   
   ```
   Final(pa, count)            <- consumes buffers
     PartialMerge(pa) + Partial(count distinct)
       PartialMerge(pa)        <- consumes buffers, keyed by the distinct column
         Partial(pa)           <- produces the incompatible buffer
   ```
   
   If any stage of this chain falls back to Spark while another converts to 
Comet (e.g. because the distinct aggregate or its grouping key is unsupported 
by Comet, or via the `comet.exec.*HashAggregate.enabled` test configs), the 
incompatible buffer crosses the engine boundary.
   
   ## Steps to reproduce
   
   Any incompatible-buffer aggregate combined with a distinct aggregate, forced 
to split. For example with a `TypedImperativeAggregate`:
   
   ```sql
   SELECT collect_set(v), count(DISTINCT k) FROM t GROUP BY g
   ```
   
   Forcing the split with `spark.comet.exec.finalHashAggregate.enabled=false` 
(Comet does the partial/merge, Spark does the final) crashes, because Spark's 
aggregate tries to deserialize a Comet-encoded buffer.
   
   ## Expected behavior
   
   When an incompatible-buffer aggregate cannot run entirely in one engine 
across the whole distinct-rewrite chain, the entire chain for that aggregate 
must fall back to Spark, as already happens for the simple `Partial → Final` 
case.
   
   ## Additional context
   
   Surfaced by the `approx_percentile` / `percentile_approx` work (#4801): 
Spark's `ObjectHashAggregateSuite` "randomized aggregation test - [typed, with 
distinct]" crashed with a native panic
   
   ```
   quantile_summaries.rs: range end index 100 out of range for slice of length 
96
   ```
   
   because Comet's Greenwald-Khanna digest is wildly different from Spark's 
`PercentileDigest`, so the boundary panics loudly. Other affected aggregates 
(`collect_set`, `collect_list`, exact `percentile`, and the declarative `avg` / 
decimal `sum` / variance family) have structurally similar buffers and instead 
risk **silent wrong results**, which is why this went unnoticed.
   
   Affected: any aggregate with `supportsMixedPartialFinal == false` used 
together with a distinct aggregate.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to