ahmed-mez opened a new issue, #18907:
URL: https://github.com/apache/datafusion/issues/18907

   ### Describe the bug
   
   When a `GroupedHashAggregateStream` finishes processing its input, it 
transitions to emitting the accumulated groups. Currently, this emission 
happens by calling `emit(EmitTo::All)`, which materializes all groups into a 
single `RecordBatch` before slicing it into smaller batches for downstream 
consumption.
   
   For aggregations with a large number of groups (e.g., >500k) or complex 
grouping keys (e.g., Strings, Lists), this single emit call becomes a 
CPU-intensive blocking operation that can stall the async runtime for hundreds 
of milliseconds or even seconds. This "long poll" prevents other tasks on the 
same thread from running, causing latency spikes and "hiccups" in the system.
   
   ### To Reproduce
   
   I have created a reproducer in 
https://github.com/apache/datafusion/pull/18906 that simulates this behavior by 
performing a GROUP BY on a dataset with 1,000,000 groups using complex keys 
(UInt32 + String + LargeList).
   
   The reproducer demonstrates two scenarios:
   Current Behavior (Blocking): Emitting all groups at once blocks the thread 
for ~2.8s (on my machine).
   Desired Behavior (Chunked): Emitting groups incrementally keeps poll times 
low and allows results to start streaming in ~2.2s.
   
   ### Expected behavior
   
   The aggregation operator should respect the `batch_size` configuration 
during the emission phase. Instead of materializing all groups at once, it 
should:
   1. Transition to a "draining" state when input is done.
   2. Iteratively emit groups in chunks of `batch_size`.
   3. Yield control back to the async runtime between chunks.
   
   This would prevent blocking the thread and allow for:
   - Lower Latency: Downstream operators receive the first batch of results 
much earlier.
   - Better Concurrency: Other tasks can run interleaved with the emission.
   - True Streaming: Results are streamed rather than buffered.
   
   ### Additional context
   
   This issue was identified during an internal investigation of "hiccups" 
(long polls) in our query engine. We observed `AggregateExec` stalling the 
runtime for >1s when processing queries with high-cardinality grouping keys 
(e.g., ~10M groups).
   
   The problematic path was traced to:
   `GroupedHashAggregateStream::emit` -> `self.group_values.emit` -> 
`GroupValues::emit(EmitTo::All)` -> `self.row_converter.convert_rows`.
   
   We observed that increasing `target_partitions` mitigates the issue (by 
reducing groups per partition). A proper fix at the operator level to emit in 
chunks is preferred.
   We have a working implementation (and the reproducer above) that introduces 
a `DrainingGroups` state to `GroupedHashAggregateStream` to handle this 
incremental emission. We would like to contribute this fix 
https://github.com/apache/datafusion/pull/18906 and we're open to considering 
alternative fixes / implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to