ahmed-mez opened a new issue, #18907: URL: https://github.com/apache/datafusion/issues/18907
### Describe the bug When a `GroupedHashAggregateStream` finishes processing its input, it transitions to emitting the accumulated groups. Currently, this emission happens by calling `emit(EmitTo::All)`, which materializes all groups into a single `RecordBatch` before slicing it into smaller batches for downstream consumption. For aggregations with a large number of groups (e.g., >500k) or complex grouping keys (e.g., Strings, Lists), this single emit call becomes a CPU-intensive blocking operation that can stall the async runtime for hundreds of milliseconds or even seconds. This "long poll" prevents other tasks on the same thread from running, causing latency spikes and "hiccups" in the system. ### To Reproduce I have created a reproducer in https://github.com/apache/datafusion/pull/18906 that simulates this behavior by performing a GROUP BY on a dataset with 1,000,000 groups using complex keys (UInt32 + String + LargeList). The reproducer demonstrates two scenarios: Current Behavior (Blocking): Emitting all groups at once blocks the thread for ~2.8s (on my machine). Desired Behavior (Chunked): Emitting groups incrementally keeps poll times low and allows results to start streaming in ~2.2s. ### Expected behavior The aggregation operator should respect the `batch_size` configuration during the emission phase. Instead of materializing all groups at once, it should: 1. Transition to a "draining" state when input is done. 2. Iteratively emit groups in chunks of `batch_size`. 3. Yield control back to the async runtime between chunks. This would prevent blocking the thread and allow for: - Lower Latency: Downstream operators receive the first batch of results much earlier. - Better Concurrency: Other tasks can run interleaved with the emission. - True Streaming: Results are streamed rather than buffered. ### Additional context This issue was identified during an internal investigation of "hiccups" (long polls) in our query engine. We observed `AggregateExec` stalling the runtime for >1s when processing queries with high-cardinality grouping keys (e.g., ~10M groups). The problematic path was traced to: `GroupedHashAggregateStream::emit` -> `self.group_values.emit` -> `GroupValues::emit(EmitTo::All)` -> `self.row_converter.convert_rows`. We observed that increasing `target_partitions` mitigates the issue (by reducing groups per partition). A proper fix at the operator level to emit in chunks is preferred. We have a working implementation (and the reproducer above) that introduces a `DrainingGroups` state to `GroupedHashAggregateStream` to handle this incremental emission. We would like to contribute this fix https://github.com/apache/datafusion/pull/18906 and we're open to considering alternative fixes / implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
