kazuyukitanimura commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2632832100
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1115,62 +1156,30 @@ impl GroupedHashAggregateStream {
}
/// Clear memory and shirk capacities to the size of the batch.
- fn clear_shrink(&mut self, batch: &RecordBatch) {
- self.group_values.clear_shrink(batch);
+ fn clear_shrink(&mut self, num_rows: usize) {
+ self.group_values.clear_shrink(num_rows);
self.current_group_indices.clear();
- self.current_group_indices.shrink_to(batch.num_rows());
+ self.current_group_indices.shrink_to(num_rows);
}
/// Clear memory and shirk capacities to zero.
fn clear_all(&mut self) {
- let s = self.schema();
- self.clear_shrink(&RecordBatch::new_empty(s));
- }
-
- /// Emit if the used memory exceeds the target for partial aggregation.
- /// Currently only [`GroupOrdering::None`] is supported for early emitting.
- /// TODO: support group_ordering for early emitting
- ///
- /// Returns `Some(ExecutionState)` if the state should be changed, None
otherwise.
- fn emit_early_if_necessary(&mut self) -> Result<Option<ExecutionState>> {
- if self.group_values.len() >= self.batch_size
- && matches!(self.group_ordering, GroupOrdering::None)
- && self.update_memory_reservation().is_err()
- {
- assert_eq!(self.mode, AggregateMode::Partial);
- let n = self.group_values.len() / self.batch_size *
self.batch_size;
- if let Some(batch) = self.emit(EmitTo::First(n), false)? {
- return Ok(Some(ExecutionState::ProducingOutput(batch)));
- };
- }
- Ok(None)
+ self.clear_shrink(0);
}
/// At this point, all the inputs are read and there are some spills.
/// Emit the remaining rows and create a batch.
/// Conduct a streaming merge sort between the batch and spilled data.
Since the stream is fully
/// sorted, set `self.group_ordering` to Full, then later we can read with
[`EmitTo::First`].
Review Comment:
Should we update this comment because we now always spill the remaining rows?
Also, is there any way to measure the impact of this change in a micro
benchmark?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]