This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f20e2e29b5 Remove unused fetch inside ExternalSorter (#15525)
f20e2e29b5 is described below
commit f20e2e29b5bddf6d7e5b712535bc56ed58c0ef7f
Author: Yongting You <[email protected]>
AuthorDate: Wed Apr 2 18:49:55 2025 +0800
Remove unused fetch inside ExternalSorter (#15525)
---
datafusion/physical-plan/src/sorts/sort.rs | 79 ++++++++----------------------
1 file changed, 20 insertions(+), 59 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 1072e9abf4..b04db06ea7 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -207,8 +207,6 @@ struct ExternalSorter {
expr: Arc<[PhysicalSortExpr]>,
/// RowConverter corresponding to the sort expressions
sort_keys_row_converter: Arc<RowConverter>,
- /// If Some, the maximum number of output rows that will be produced
- fetch: Option<usize>,
/// The target number of rows for output batches
batch_size: usize,
/// If the in size of buffered memory batches is below this size,
@@ -262,7 +260,6 @@ impl ExternalSorter {
schema: SchemaRef,
expr: LexOrdering,
batch_size: usize,
- fetch: Option<usize>,
sort_spill_reservation_bytes: usize,
sort_in_place_threshold_bytes: usize,
metrics: &ExecutionPlanMetricsSet,
@@ -307,7 +304,6 @@ impl ExternalSorter {
expr: expr.into(),
sort_keys_row_converter: Arc::new(converter),
metrics,
- fetch,
reservation,
spill_manager,
merge_reservation,
@@ -330,10 +326,8 @@ impl ExternalSorter {
let size = get_reserved_byte_for_record_batch(&input);
if self.reservation.try_grow(size).is_err() {
- self.sort_or_spill_in_mem_batches(false).await?;
- // We've already freed more than half of reserved memory,
- // so we can grow the reservation again. There's nothing we can do
- // if this try_grow fails.
+ self.sort_and_spill_in_mem_batches().await?;
+ // After spilling all in-memory batches, the retry should succeed
self.reservation.try_grow(size)?;
}
@@ -367,7 +361,7 @@ impl ExternalSorter {
// `in_mem_batches` and the memory limit is almost reached, merging
// them with the spilled files at the same time might cause OOM.
if !self.in_mem_batches.is_empty() {
- self.sort_or_spill_in_mem_batches(true).await?;
+ self.sort_and_spill_in_mem_batches().await?;
}
for spill in self.finished_spill_files.drain(..) {
@@ -386,7 +380,7 @@ impl ExternalSorter {
.with_expressions(expressions.as_ref())
.with_metrics(self.metrics.baseline.clone())
.with_batch_size(self.batch_size)
- .with_fetch(self.fetch)
+ .with_fetch(None)
.with_reservation(self.merge_reservation.new_empty())
.build()
} else {
@@ -532,28 +526,15 @@ impl ExternalSorter {
Ok(())
}
- /// Sorts the in_mem_batches and potentially spill the sorted batches.
- ///
- /// If the memory usage has dropped by a factor of 2, it might be a sort
with
- /// fetch (e.g. sorting 1M rows but only keep the top 100), so we keep the
- /// sorted entries inside `in_mem_batches` to be sorted in the next
iteration.
- /// Otherwise, we spill the sorted run to free up memory for inserting
more batches.
- ///
- /// # Arguments
- ///
- /// * `force_spill` - If true, the method will spill the in-memory batches
- /// even if the memory usage has not dropped by a factor of 2. Otherwise
it will
- /// only spill when the memory usage has dropped by the pre-defined
factor.
- ///
- async fn sort_or_spill_in_mem_batches(&mut self, force_spill: bool) ->
Result<()> {
+ /// Sorts the in-memory batches and merges them into a single sorted run,
then writes
+ /// the result to spill files.
+ async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
// Release the memory reserved for merge back to the pool so
// there is some left when `in_mem_sort_stream` requests an
// allocation. At the end of this function, memory will be
// reserved again for the next spill.
self.merge_reservation.free();
- let before = self.reservation.size();
-
let mut sorted_stream =
self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
// After `in_mem_sort_stream()` is constructed, all `in_mem_batches`
is taken
@@ -568,7 +549,6 @@ impl ExternalSorter {
// sort-preserving merge and incrementally append to spill files.
let mut globally_sorted_batches: Vec<RecordBatch> = vec![];
- let mut spilled = false;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
let sorted_size = get_reserved_byte_for_record_batch(&batch);
@@ -579,7 +559,6 @@ impl ExternalSorter {
globally_sorted_batches.push(batch);
self.consume_and_spill_append(&mut globally_sorted_batches)
.await?; // reservation is freed in spill()
- spilled = true;
} else {
globally_sorted_batches.push(batch);
}
@@ -589,33 +568,17 @@ impl ExternalSorter {
// upcoming `self.reserve_memory_for_merge()` may fail due to
insufficient memory.
drop(sorted_stream);
- // Sorting may free up some memory especially when fetch is `Some`. If
we have
- // not freed more than 50% of the memory, then we have to spill to
free up more
- // memory for inserting more batches.
- if (self.reservation.size() > before / 2) || force_spill {
- // We have not freed more than 50% of the memory, so we have to
spill to
- // free up more memory
- self.consume_and_spill_append(&mut globally_sorted_batches)
- .await?;
- spilled = true;
- }
-
- if spilled {
- // There might be some buffered batches that haven't trigger a
spill yet.
- self.consume_and_spill_append(&mut globally_sorted_batches)
- .await?;
- self.spill_finish().await?;
- } else {
- // If the memory limit has reached before calling this function,
and it
- // didn't spill anything, it means this is a sorting with fetch
top K
- // element: after sorting only the top K elements will be kept in
memory.
- // For simplicity, those sorted top K entries are put back to
unsorted
- // `in_mem_batches` to be consumed by the next sort/merge.
- if !self.in_mem_batches.is_empty() {
- return internal_err!("in_mem_batches should be cleared
before");
- }
+ self.consume_and_spill_append(&mut globally_sorted_batches)
+ .await?;
+ self.spill_finish().await?;
- self.in_mem_batches = std::mem::take(&mut globally_sorted_batches);
+ // Sanity check after spilling
+ let buffers_cleared_property =
+ self.in_mem_batches.is_empty() &&
globally_sorted_batches.is_empty();
+ if !buffers_cleared_property {
+ return internal_err!(
+ "in_mem_batches and globally_sorted_batches should be cleared
before"
+ );
}
// Reserve headroom for next sort/merge
@@ -740,7 +703,7 @@ impl ExternalSorter {
.with_expressions(expressions.as_ref())
.with_metrics(metrics)
.with_batch_size(self.batch_size)
- .with_fetch(self.fetch)
+ .with_fetch(None)
.with_reservation(self.merge_reservation.new_empty())
.build()
}
@@ -761,7 +724,6 @@ impl ExternalSorter {
);
let schema = batch.schema();
- let fetch = self.fetch;
let expressions: LexOrdering = self.expr.iter().cloned().collect();
let row_converter = Arc::clone(&self.sort_keys_row_converter);
let stream = futures::stream::once(async move {
@@ -775,9 +737,9 @@ impl ExternalSorter {
let sorted = if is_multi_column_with_lists(&sort_columns) {
// lex_sort_to_indices doesn't support List with more than one
column
// https://github.com/apache/arrow-rs/issues/5454
- sort_batch_row_based(&batch, &expressions, row_converter,
fetch)?
+ sort_batch_row_based(&batch, &expressions, row_converter,
None)?
} else {
- sort_batch(&batch, &expressions, fetch)?
+ sort_batch(&batch, &expressions, None)?
};
metrics.record_output(sorted.num_rows());
@@ -1244,7 +1206,6 @@ impl ExecutionPlan for SortExec {
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
- self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]