This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f20e2e29b5 Remove unused fetch inside ExternalSorter (#15525)
f20e2e29b5 is described below

commit f20e2e29b5bddf6d7e5b712535bc56ed58c0ef7f
Author: Yongting You <[email protected]>
AuthorDate: Wed Apr 2 18:49:55 2025 +0800

    Remove unused fetch inside ExternalSorter (#15525)
---
 datafusion/physical-plan/src/sorts/sort.rs | 79 ++++++++----------------------
 1 file changed, 20 insertions(+), 59 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 1072e9abf4..b04db06ea7 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -207,8 +207,6 @@ struct ExternalSorter {
     expr: Arc<[PhysicalSortExpr]>,
     /// RowConverter corresponding to the sort expressions
     sort_keys_row_converter: Arc<RowConverter>,
-    /// If Some, the maximum number of output rows that will be produced
-    fetch: Option<usize>,
     /// The target number of rows for output batches
     batch_size: usize,
     /// If the in size of buffered memory batches is below this size,
@@ -262,7 +260,6 @@ impl ExternalSorter {
         schema: SchemaRef,
         expr: LexOrdering,
         batch_size: usize,
-        fetch: Option<usize>,
         sort_spill_reservation_bytes: usize,
         sort_in_place_threshold_bytes: usize,
         metrics: &ExecutionPlanMetricsSet,
@@ -307,7 +304,6 @@ impl ExternalSorter {
             expr: expr.into(),
             sort_keys_row_converter: Arc::new(converter),
             metrics,
-            fetch,
             reservation,
             spill_manager,
             merge_reservation,
@@ -330,10 +326,8 @@ impl ExternalSorter {
 
         let size = get_reserved_byte_for_record_batch(&input);
         if self.reservation.try_grow(size).is_err() {
-            self.sort_or_spill_in_mem_batches(false).await?;
-            // We've already freed more than half of reserved memory,
-            // so we can grow the reservation again. There's nothing we can do
-            // if this try_grow fails.
+            self.sort_and_spill_in_mem_batches().await?;
+            // After spilling all in-memory batches, the retry should succeed
             self.reservation.try_grow(size)?;
         }
 
@@ -367,7 +361,7 @@ impl ExternalSorter {
             // `in_mem_batches` and the memory limit is almost reached, merging
             // them with the spilled files at the same time might cause OOM.
             if !self.in_mem_batches.is_empty() {
-                self.sort_or_spill_in_mem_batches(true).await?;
+                self.sort_and_spill_in_mem_batches().await?;
             }
 
             for spill in self.finished_spill_files.drain(..) {
@@ -386,7 +380,7 @@ impl ExternalSorter {
                 .with_expressions(expressions.as_ref())
                 .with_metrics(self.metrics.baseline.clone())
                 .with_batch_size(self.batch_size)
-                .with_fetch(self.fetch)
+                .with_fetch(None)
                 .with_reservation(self.merge_reservation.new_empty())
                 .build()
         } else {
@@ -532,28 +526,15 @@ impl ExternalSorter {
         Ok(())
     }
 
-    /// Sorts the in_mem_batches and potentially spill the sorted batches.
-    ///
-    /// If the memory usage has dropped by a factor of 2, it might be a sort 
with
-    /// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the
-    /// sorted entries inside `in_mem_batches` to be sorted in the next 
iteration.
-    /// Otherwise, we spill the sorted run to free up memory for inserting 
more batches.
-    ///
-    /// # Arguments
-    ///
-    /// * `force_spill` - If true, the method will spill the in-memory batches
-    ///   even if the memory usage has not dropped by a factor of 2. Otherwise 
it will
-    ///   only spill when the memory usage has dropped by the pre-defined 
factor.
-    ///
-    async fn sort_or_spill_in_mem_batches(&mut self, force_spill: bool) -> 
Result<()> {
+    /// Sorts the in-memory batches and merges them into a single sorted run, 
then writes
+    /// the result to spill files.
+    async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
         // Release the memory reserved for merge back to the pool so
         // there is some left when `in_mem_sort_stream` requests an
         // allocation. At the end of this function, memory will be
         // reserved again for the next spill.
         self.merge_reservation.free();
 
-        let before = self.reservation.size();
-
         let mut sorted_stream =
             self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
         // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
@@ -568,7 +549,6 @@ impl ExternalSorter {
         // sort-preserving merge and incrementally append to spill files.
         let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
 
-        let mut spilled = false;
         while let Some(batch) = sorted_stream.next().await {
             let batch = batch?;
             let sorted_size = get_reserved_byte_for_record_batch(&batch);
@@ -579,7 +559,6 @@ impl ExternalSorter {
                 globally_sorted_batches.push(batch);
                 self.consume_and_spill_append(&mut globally_sorted_batches)
                     .await?; // reservation is freed in spill()
-                spilled = true;
             } else {
                 globally_sorted_batches.push(batch);
             }
@@ -589,33 +568,17 @@ impl ExternalSorter {
         // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
         drop(sorted_stream);
 
-        // Sorting may free up some memory especially when fetch is `Some`. If 
we have
-        // not freed more than 50% of the memory, then we have to spill to 
free up more
-        // memory for inserting more batches.
-        if (self.reservation.size() > before / 2) || force_spill {
-            // We have not freed more than 50% of the memory, so we have to 
spill to
-            // free up more memory
-            self.consume_and_spill_append(&mut globally_sorted_batches)
-                .await?;
-            spilled = true;
-        }
-
-        if spilled {
-            // There might be some buffered batches that haven't trigger a 
spill yet.
-            self.consume_and_spill_append(&mut globally_sorted_batches)
-                .await?;
-            self.spill_finish().await?;
-        } else {
-            // If the memory limit has reached before calling this function, 
and it
-            // didn't spill anything, it means this is a sorting with fetch 
top K
-            // element: after sorting only the top K elements will be kept in 
memory.
-            // For simplicity, those sorted top K entries are put back to 
unsorted
-            // `in_mem_batches` to be consumed by the next sort/merge.
-            if !self.in_mem_batches.is_empty() {
-                return internal_err!("in_mem_batches should be cleared 
before");
-            }
+        self.consume_and_spill_append(&mut globally_sorted_batches)
+            .await?;
+        self.spill_finish().await?;
 
-            self.in_mem_batches = std::mem::take(&mut globally_sorted_batches);
+        // Sanity check after spilling
+        let buffers_cleared_property =
+            self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
+        if !buffers_cleared_property {
+            return internal_err!(
+                "in_mem_batches and globally_sorted_batches should be cleared 
before"
+            );
         }
 
         // Reserve headroom for next sort/merge
@@ -740,7 +703,7 @@ impl ExternalSorter {
             .with_expressions(expressions.as_ref())
             .with_metrics(metrics)
             .with_batch_size(self.batch_size)
-            .with_fetch(self.fetch)
+            .with_fetch(None)
             .with_reservation(self.merge_reservation.new_empty())
             .build()
     }
@@ -761,7 +724,6 @@ impl ExternalSorter {
         );
         let schema = batch.schema();
 
-        let fetch = self.fetch;
         let expressions: LexOrdering = self.expr.iter().cloned().collect();
         let row_converter = Arc::clone(&self.sort_keys_row_converter);
         let stream = futures::stream::once(async move {
@@ -775,9 +737,9 @@ impl ExternalSorter {
             let sorted = if is_multi_column_with_lists(&sort_columns) {
                 // lex_sort_to_indices doesn't support List with more than one 
column
                 // https://github.com/apache/arrow-rs/issues/5454
-                sort_batch_row_based(&batch, &expressions, row_converter, 
fetch)?
+                sort_batch_row_based(&batch, &expressions, row_converter, 
None)?
             } else {
-                sort_batch(&batch, &expressions, fetch)?
+                sort_batch(&batch, &expressions, None)?
             };
 
             metrics.record_output(sorted.num_rows());
@@ -1244,7 +1206,6 @@ impl ExecutionPlan for SortExec {
                     input.schema(),
                     self.expr.clone(),
                     context.session_config().batch_size(),
-                    self.fetch,
                     execution_options.sort_spill_reservation_bytes,
                     execution_options.sort_in_place_threshold_bytes,
                     &self.metrics_set,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to