gabotechs commented on code in PR #18906:
URL: https://github.com/apache/datafusion/pull/18906#discussion_r2558925604


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1164,11 +1189,11 @@ impl GroupedHashAggregateStream {
     fn set_input_done_and_produce_output(&mut self) -> Result<()> {
         self.input_done = true;
         self.group_ordering.input_done();
+        self.group_values.input_done();
         let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
         let timer = elapsed_compute.timer();
         self.exec_state = if self.spill_state.spills.is_empty() {
-            let batch = self.emit(EmitTo::All, false)?;
-            batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput)
+            ExecutionState::DrainingGroups

Review Comment:
   If that's the case, I imagine there's no reason to keep both modes of 
emitting outputs right? would there still be any situation where we want the 
previous behavior?



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1164,11 +1189,11 @@ impl GroupedHashAggregateStream {
     fn set_input_done_and_produce_output(&mut self) -> Result<()> {
         self.input_done = true;
         self.group_ordering.input_done();
+        self.group_values.input_done();
         let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
         let timer = elapsed_compute.timer();
         self.exec_state = if self.spill_state.spills.is_empty() {
-            let batch = self.emit(EmitTo::All, false)?;
-            batch.map_or(ExecutionState::Done, ExecutionState::ProducingOutput)
+            ExecutionState::DrainingGroups

Review Comment:
   Ok, I think I understand better what's happening now, correct me if I'm 
wrong:
   
   - The previous implementation, upon finishing accumulating all groups, it 
bundled everything into a big `RecordBatch` and then proceeded to yield slices 
of it respecting the configured `batch_size`
   - The current implementation, upon finishing accumulating all groups, it 
bundles nothing, and instead each `RecordBatch` of size `batch_size` is bundled 
on-demand as the stream gets polled
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to