alamb commented on code in PR #19287:
URL: https://github.com/apache/datafusion/pull/19287#discussion_r2628615678


##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -550,26 +571,39 @@ impl GroupedHashAggregateStream {
             .collect::<Vec<_>>()
             .join(", ");
         let name = format!("GroupedHashAggregateStream[{partition}] 
({agg_fn_names})");
-        let reservation = MemoryConsumer::new(name)
-            .with_can_spill(true)
-            .register(context.memory_pool());
         let group_ordering = GroupOrdering::try_new(&agg.input_order_mode)?;
+        let oom_mode = match agg.mode {
+            // In partial aggregation mode, always prefer to emit incomplete 
results early.
+            AggregateMode::Partial => OutOfMemoryMode::EmitEarly,
+            _ => match group_ordering {
+                // For non-partial aggregation modes, don't use spilling if 
the input
+                // of fully sorted by the grouping expressions. Regular 
emission of completed
+                // group values will handle memory pressure.
+                GroupOrdering::Full(_) => OutOfMemoryMode::ReportError,
+                // For unsorted or partially sorted inputs, use disk spilling
+                GroupOrdering::None | GroupOrdering::Partial(_) => 
OutOfMemoryMode::Spill,
+            },
+        };

Review Comment:
   If you are in to creating less indent you could do the following which is 
slightly less verbose. However it loses the explicit indent that shows 
group_ordering only applies when the mode is not partial
   
   ```rust
           let oom_mode = match (agg.mode, group_ordering) {
               // In partial aggregation mode, always prefer to emit incomplete 
results early.
               (AggregateMode::Partial, _) => OutOfMemoryMode::EmitEarly,
               // For non-partial aggregation modes, don't use spilling if the 
input
               // of fully sorted by the grouping expressions. Regular emission 
of completed
               // group values will handle memory pressure.
               (_, GroupOrdering::Full(_)) => OutOfMemoryMode::ReportError,
               // For unsorted or partially sorted inputs, use disk spilling
               (_, GroupOrdering::None | GroupOrdering::Partial(_)) => 
OutOfMemoryMode::Spill,
           };
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -643,6 +678,21 @@ impl GroupedHashAggregateStream {
     }
 }
 
+fn find_sort_options(

Review Comment:
   This could potentially be a useful function on LexOrdering, which would 
potentially make it more discoverable. 



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -987,25 +1019,50 @@ impl GroupedHashAggregateStream {
             }
         }
 
-        match self.update_memory_reservation() {
-            // Here we can ignore `insufficient_capacity_err` because we will 
spill later,
-            // but at least one batch should fit in the memory
-            Err(DataFusionError::ResourcesExhausted(_))
-                if self.group_values.len() >= self.batch_size =>
+        Ok(())
+    }
+
+    fn update_memory_reservation_with_oom_handling(

Review Comment:
   maybe some comments explaining what this function does would be useful
   
   Something like
   ```rust
   /// tries to update the memory reservation, based on newly
   /// ingested groups. 
   ///
   /// If the the increasing the reservation fails, then returns
   /// a new execution state after potentially spilling
   fn update_memory_reservation_with_oom_handling(
   



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1151,26 +1198,18 @@ impl GroupedHashAggregateStream {
     /// Conduct a streaming merge sort between the batch and spilled data. 
Since the stream is fully
     /// sorted, set `self.group_ordering` to Full, then later we can read with 
[`EmitTo::First`].
     fn update_merged_stream(&mut self) -> Result<()> {
-        let Some(batch) = self.emit(EmitTo::All, true)? else {
-            return Ok(());
-        };
+        // Spill the last remaining rows (if any) to free up as much memory as 
possible.
+        // Since we're already spilling, we can be sure we're memory 
constrained.
+        // Creating an extra spill file won't make much of a difference.
+        self.spill()?;

Review Comment:
   Agreed -- this approach makes sense for now. We can try and optimize it in 
the future if needed



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1060,24 +1117,6 @@ impl GroupedHashAggregateStream {
         Ok(Some(batch))
     }
 
-    /// Optimistically, [`Self::group_aggregate_batch`] allows to exceed the 
memory target slightly
-    /// (~ 1 [`RecordBatch`]) for simplicity. In such cases, spill the data to 
disk and clear the
-    /// memory. Currently only [`GroupOrdering::None`] is supported for 
spilling.
-    fn spill_previous_if_necessary(&mut self, batch: &RecordBatch) -> 
Result<()> {
-        // TODO: support group_ordering for spilling

Review Comment:
   🎉 



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -712,18 +771,28 @@ impl Stream for GroupedHashAggregateStream {
                                 break 'reading_input;
                             }
 
-                            // Check if we should switch to skip aggregation 
mode
-                            // It's important that we do this before we early 
emit since we've
-                            // already updated the probe.
-                            self.update_skip_aggregation_probe(input_rows);
-                            if let Some(new_state) = 
self.switch_to_skip_aggregation()? {
-                                timer.done();
-                                self.exec_state = new_state;
-                                break 'reading_input;
+                            if self.mode == AggregateMode::Partial {

Review Comment:
   I agree -- I find this flow very easy to follow now



##########
datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs:
##########
@@ -1181,14 +1181,13 @@ impl<const STREAMING: bool> GroupValues for 
GroupValuesColumn<STREAMING> {
         Ok(output)
     }
 
-    fn clear_shrink(&mut self, batch: &RecordBatch) {
-        let count = batch.num_rows();
+    fn clear_shrink(&mut self, num_rows: usize) {

Review Comment:
   nice cleanup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to