alamb commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2628615678
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -550,26 +571,39 @@ impl GroupedHashAggregateStream {
.collect::<Vec<_>>()
.join(", ");
let name = format!("GroupedHashAggregateStream[{partition}]
({agg_fn_names})");
- let reservation = MemoryConsumer::new(name)
- .with_can_spill(true)
- .register(context.memory_pool());
let group_ordering = GroupOrdering::try_new(&agg.input_order_mode)?;
+ let oom_mode = match agg.mode {
+ // In partial aggregation mode, always prefer to emit incomplete
results early.
+ AggregateMode::Partial => OutOfMemoryMode::EmitEarly,
+ _ => match group_ordering {
+ // For non-partial aggregation modes, don't use spilling if
the input
+ // of fully sorted by the grouping expressions. Regular
emission of completed
+ // group values will handle memory pressure.
+ GroupOrdering::Full(_) => OutOfMemoryMode::ReportError,
+ // For unsorted or partially sorted inputs, use disk spilling
+ GroupOrdering::None | GroupOrdering::Partial(_) =>
OutOfMemoryMode::Spill,
+ },
+ };
Review Comment:
If you are in to creating less indent you could do the following which is
slightly less verbose. However it loses the explicit indent that shows
group_ordering only applies when the mode is not partial
```rust
let oom_mode = match (agg.mode, group_ordering) {
// In partial aggregation mode, always prefer to emit incomplete
results early.
(AggregateMode::Partial, _) => OutOfMemoryMode::EmitEarly,
// For non-partial aggregation modes, don't use spilling if the
input
// of fully sorted by the grouping expressions. Regular emission
of completed
// group values will handle memory pressure.
(_, GroupOrdering::Full(_)) => OutOfMemoryMode::ReportError,
// For unsorted or partially sorted inputs, use disk spilling
(_, GroupOrdering::None | GroupOrdering::Partial(_)) =>
OutOfMemoryMode::Spill,
};
```
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -643,6 +678,21 @@ impl GroupedHashAggregateStream {
}
}
+fn find_sort_options(
Review Comment:
This could potentially be a useful function on LexOrdering, which would
potentially make it more discoverable.
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -987,25 +1019,50 @@ impl GroupedHashAggregateStream {
}
}
- match self.update_memory_reservation() {
- // Here we can ignore `insufficient_capacity_err` because we will
spill later,
- // but at least one batch should fit in the memory
- Err(DataFusionError::ResourcesExhausted(_))
- if self.group_values.len() >= self.batch_size =>
+ Ok(())
+ }
+
+ fn update_memory_reservation_with_oom_handling(
Review Comment:
maybe some comments explaining what this function does would be useful
Something like
```rust
/// tries to update the memory reservation, based on newly
/// ingested groups.
///
/// If the the increasing the reservation fails, then returns
/// a new execution state after potentially spilling
fn update_memory_reservation_with_oom_handling(
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1151,26 +1198,18 @@ impl GroupedHashAggregateStream {
/// Conduct a streaming merge sort between the batch and spilled data.
Since the stream is fully
/// sorted, set `self.group_ordering` to Full, then later we can read with
[`EmitTo::First`].
fn update_merged_stream(&mut self) -> Result<()> {
- let Some(batch) = self.emit(EmitTo::All, true)? else {
- return Ok(());
- };
+ // Spill the last remaining rows (if any) to free up as much memory as
possible.
+ // Since we're already spilling, we can be sure we're memory
constrained.
+ // Creating an extra spill file won't make much of a difference.
+ self.spill()?;
Review Comment:
Agreed -- this approach makes sense for now. We can try and optimize it in
the future if needed
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1060,24 +1117,6 @@ impl GroupedHashAggregateStream {
Ok(Some(batch))
}
- /// Optimistically, [`Self::group_aggregate_batch`] allows to exceed the
memory target slightly
- /// (~ 1 [`RecordBatch`]) for simplicity. In such cases, spill the data to
disk and clear the
- /// memory. Currently only [`GroupOrdering::None`] is supported for
spilling.
- fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) ->
Result<()> {
- // TODO: support group_ordering for spilling
Review Comment:
🎉
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -712,18 +771,28 @@ impl Stream for GroupedHashAggregateStream {
break 'reading_input;
}
- // Check if we should switch to skip aggregation
mode
- // It's important that we do this before we early
emit since we've
- // already updated the probe.
- self.update_skip_aggregation_probe(input_rows);
- if let Some(new_state) =
self.switch_to_skip_aggregation()? {
- timer.done();
- self.exec_state = new_state;
- break 'reading_input;
+ if self.mode == AggregateMode::Partial {
Review Comment:
I agree -- I find this flow very easy to follow now
##########
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs:
##########
@@ -1181,14 +1181,13 @@ impl<const STREAMING: bool> GroupValues for
GroupValuesColumn<STREAMING> {
Ok(output)
}
- fn clear_shrink(&mut self, batch: &RecordBatch) {
- let count = batch.num_rows();
+ fn clear_shrink(&mut self, num_rows: usize) {
Review Comment:
nice cleanup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]