pepijnve commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2633940518


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1115,62 +1156,30 @@ impl GroupedHashAggregateStream {
     }
 
     /// Clear memory and shirk capacities to the size of the batch.
-    fn clear_shrink(&mut self, batch: &RecordBatch) {
-        self.group_values.clear_shrink(batch);
+    fn clear_shrink(&mut self, num_rows: usize) {
+        self.group_values.clear_shrink(num_rows);
         self.current_group_indices.clear();
-        self.current_group_indices.shrink_to(batch.num_rows());
+        self.current_group_indices.shrink_to(num_rows);
     }
 
     /// Clear memory and shirk capacities to zero.
     fn clear_all(&mut self) {
-        let s = self.schema();
-        self.clear_shrink(&RecordBatch::new_empty(s));
-    }
-
-    /// Emit if the used memory exceeds the target for partial aggregation.
-    /// Currently only [`GroupOrdering::None`] is supported for early emitting.
-    /// TODO: support group_ordering for early emitting
-    ///
-    /// Returns `Some(ExecutionState)` if the state should be changed, None 
otherwise.
-    fn emit_early_if_necessary(&mut self) -> Result<Option<ExecutionState>> {
-        if self.group_values.len() >= self.batch_size
-            && matches!(self.group_ordering, GroupOrdering::None)
-            && self.update_memory_reservation().is_err()
-        {
-            assert_eq!(self.mode, AggregateMode::Partial);
-            let n = self.group_values.len() / self.batch_size * 
self.batch_size;
-            if let Some(batch) = self.emit(EmitTo::First(n), false)? {
-                return Ok(Some(ExecutionState::ProducingOutput(batch)));
-            };
-        }
-        Ok(None)
+        self.clear_shrink(0);
     }
 
     /// At this point, all the inputs are read and there are some spills.
     /// Emit the remaining rows and create a batch.
     /// Conduct a streaming merge sort between the batch and spilled data. 
Since the stream is fully
     /// sorted, set `self.group_ordering` to Full, then later we can read with 
[`EmitTo::First`].

Review Comment:
   Good catch. Yes, I'll revise that comment a bit.
   
   I've made some adjustments that allow `benchmarks/src/bin/external_aggr.rs` 
to run to completion again (it was broken on `main`). I haven't inspected the 
query plans it uses yet though to see which code path it's actually covering.
   
   Because it wasn't working on `main` it's not possible to use it for 
comparative testing though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to