2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191383325


##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+    spill_manager: SpillManager,
+    schema: SchemaRef,
+    sorted_spill_files: Vec<SortedSpillFile>,
+    sorted_streams: Vec<SendableRecordBatchStream>,
+    expr: LexOrdering,
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    reservation: MemoryReservation,
+    fetch: Option<usize>,
+    enable_round_robin_tie_breaker: bool,
+
+    // This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+    // side.
+    // and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+    unbounded_memory_pool: Arc<dyn MemoryPool>,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "MultiLevelMergeBuilder")
+    }
+}
+
+impl MultiLevelMergeBuilder {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        spill_manager: SpillManager,
+        schema: SchemaRef,
+        sorted_spill_files: Vec<SortedSpillFile>,
+        sorted_streams: Vec<SendableRecordBatchStream>,
+        expr: LexOrdering,
+        metrics: BaselineMetrics,
+        batch_size: usize,
+        reservation: MemoryReservation,
+        fetch: Option<usize>,
+        enable_round_robin_tie_breaker: bool,
+    ) -> Self {
+        Self {
+            spill_manager,
+            schema,
+            sorted_spill_files,
+            sorted_streams,
+            expr,
+            metrics,
+            batch_size,
+            reservation,
+            enable_round_robin_tie_breaker,
+            fetch,
+            unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+        }
+    }
+
+    pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+        Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.schema),
+            futures::stream::once(self.create_stream()).try_flatten(),
+        ))
+    }
+
+    async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
+        loop {
+            // Hold this for the lifetime of the stream
+            let mut current_memory_reservation = self.reservation.new_empty();
+            let mut stream =
+                self.create_sorted_stream(&mut current_memory_reservation)?;
+
+            // TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+            //        we can avoid the memory reservation
+
+            // If no spill files are left, we can return the stream as this is 
the last sorted run
+            // TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+            if self.sorted_spill_files.is_empty() {
+                // Attach the memory reservation to the stream as we are done 
with it
+                // but because we replaced the memory reservation of the merge 
stream, we must hold
+                // this to make sure we have enough memory
+                return Ok(Box::pin(StreamAttachedReservation::new(
+                    stream,
+                    current_memory_reservation,
+                )));
+            }
+
+            // Need to sort to a spill file
+            let Some((spill_file, max_record_batch_memory)) = self
+                .spill_manager
+                .spill_record_batch_stream_by_size(
+                    &mut stream,
+                    self.batch_size,
+                    "MultiLevelMergeBuilder intermediate spill",
+                )
+                .await?
+            else {
+                continue;
+            };
+
+            // Add the spill file
+            self.sorted_spill_files.push(SortedSpillFile {
+                file: spill_file,
+                max_record_batch_memory,
+            });
+        }
+    }
+
+    fn create_sorted_stream(
+        &mut self,
+        memory_reservation: &mut MemoryReservation,
+    ) -> Result<SendableRecordBatchStream> {
+        match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
+            // No data so empty batch
+            (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+                &self.schema,
+            )))),
+
+            // Only in-memory stream, return that
+            (0, 1) => Ok(self.sorted_streams.remove(0)),
+
+            // Only single sorted spill file so return it
+            (1, 0) => {
+                let spill_file = self.sorted_spill_files.remove(0);
+
+                self.spill_manager.read_spill_as_stream(spill_file.file)
+            }
+
+            // Only in memory streams, so merge them all in a single pass
+            (0, _) => {
+                let sorted_stream = mem::take(&mut self.sorted_streams);
+                self.create_new_merge_sort(
+                    sorted_stream,
+                    // If we have no sorted spill files left, this is the last 
run
+                    true,
+                )
+            }
+
+            // Need to merge multiple streams
+            (_, _) => {
+                // Don't account for existing streams memory
+                // as we are not holding the memory for them
+                let mut sorted_streams = mem::take(&mut self.sorted_streams);
+
+                let (sorted_spill_files, buffer_size) = self
+                    .get_sorted_spill_files_to_merge(
+                        2,
+                        // we must have at least 2 streams to merge
+                        2_usize.saturating_sub(sorted_streams.len()),
+                        memory_reservation,
+                    )?;
+
+                for spill in sorted_spill_files {
+                    let stream = self
+                        .spill_manager
+                        .clone()
+                        .with_batch_read_buffer_capacity(buffer_size)
+                        .read_spill_as_stream(spill.file)?;
+                    sorted_streams.push(stream);
+                }
+
+                self.create_new_merge_sort(
+                    sorted_streams,
+                    // If we have no sorted spill files left, this is the last 
run
+                    self.sorted_spill_files.is_empty(),
+                )
+            }
+        }
+    }
+
+    fn create_new_merge_sort(
+        &mut self,
+        streams: Vec<SendableRecordBatchStream>,
+        is_output: bool,
+    ) -> Result<SendableRecordBatchStream> {
+        StreamingMergeBuilder::new()
+            .with_schema(Arc::clone(&self.schema))
+            .with_expressions(&self.expr)
+            .with_batch_size(self.batch_size)
+            .with_fetch(self.fetch)
+            .with_metrics(if is_output {
+                // Only add the metrics to the last run
+                self.metrics.clone()
+            } else {
+                self.metrics.intermediate()
+            })
+            .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker)
+            .with_streams(streams)
+            // Don't track memory used by this stream as we reserve that 
memory by worst case sceneries
+            // (reserving memory for the biggest batch in each stream)
+            // This is a hack
+            .with_reservation(
+                MemoryConsumer::new("merge stream mock memory")
+                    .register(&self.unbounded_memory_pool),
+            )
+            .build()
+    }
+
+    /// Return the sorted spill files to use for the next phase, and the 
buffer size
+    /// This will try to get as many spill files as possible to merge, and if 
we don't have enough streams
+    /// it will try to reduce the buffer size until we have enough streams to 
merge
+    /// otherwise it will return an error
+    fn get_sorted_spill_files_to_merge(
+        &mut self,
+        buffer_size: usize,

Review Comment:
   nit: I think size is ambiguous because it sounds like 'memory size in 
bytes', perhaps `buffer_len`?



##########
datafusion/core/tests/fuzz_cases/stream_exec.rs:
##########
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_schema::SchemaRef;
+use datafusion_common::DataFusionError;
+use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion_physical_plan::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+};
+use std::any::Any;
+use std::fmt::{Debug, Formatter};
+use std::sync::{Arc, Mutex};
+
+/// Execution plan that return the stream on the call to `execute`. further 
calls to `execute` will
+/// return an error
+pub struct StreamExec {

Review Comment:
   nit: I think `OneShotExec` or `OnceExec` is more descriptive.



##########
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##########
@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> 
Vec<RecordBatch> {
 
     batches
 }
+

Review Comment:
   It would be great to add a top-level comment for this group of tests, like 
'Testing spilling sort queries, with another memory-consuming neighbor (the 
mock operator with `MemoryBehavior`)'.
   The same for aggregate fuzz tests (perhaps we can structure them into the 
same new file?)



##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+    spill_manager: SpillManager,
+    schema: SchemaRef,
+    sorted_spill_files: Vec<SortedSpillFile>,
+    sorted_streams: Vec<SendableRecordBatchStream>,
+    expr: LexOrdering,
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    reservation: MemoryReservation,
+    fetch: Option<usize>,
+    enable_round_robin_tie_breaker: bool,
+
+    // This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+    // side.
+    // and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+    unbounded_memory_pool: Arc<dyn MemoryPool>,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "MultiLevelMergeBuilder")
+    }
+}
+
+impl MultiLevelMergeBuilder {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        spill_manager: SpillManager,
+        schema: SchemaRef,
+        sorted_spill_files: Vec<SortedSpillFile>,
+        sorted_streams: Vec<SendableRecordBatchStream>,
+        expr: LexOrdering,
+        metrics: BaselineMetrics,
+        batch_size: usize,
+        reservation: MemoryReservation,
+        fetch: Option<usize>,
+        enable_round_robin_tie_breaker: bool,
+    ) -> Self {
+        Self {
+            spill_manager,
+            schema,
+            sorted_spill_files,
+            sorted_streams,
+            expr,
+            metrics,
+            batch_size,
+            reservation,
+            enable_round_robin_tie_breaker,
+            fetch,
+            unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+        }
+    }
+
+    pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+        Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.schema),
+            futures::stream::once(self.create_stream()).try_flatten(),
+        ))
+    }
+
+    async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
+        loop {
+            // Hold this for the lifetime of the stream
+            let mut current_memory_reservation = self.reservation.new_empty();
+            let mut stream =
+                self.create_sorted_stream(&mut current_memory_reservation)?;
+
+            // TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+            //        we can avoid the memory reservation
+
+            // If no spill files are left, we can return the stream as this is 
the last sorted run
+            // TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+            if self.sorted_spill_files.is_empty() {
+                // Attach the memory reservation to the stream as we are done 
with it
+                // but because we replaced the memory reservation of the merge 
stream, we must hold
+                // this to make sure we have enough memory
+                return Ok(Box::pin(StreamAttachedReservation::new(
+                    stream,
+                    current_memory_reservation,
+                )));
+            }
+
+            // Need to sort to a spill file
+            let Some((spill_file, max_record_batch_memory)) = self
+                .spill_manager
+                .spill_record_batch_stream_by_size(
+                    &mut stream,
+                    self.batch_size,
+                    "MultiLevelMergeBuilder intermediate spill",
+                )
+                .await?
+            else {
+                continue;
+            };
+
+            // Add the spill file
+            self.sorted_spill_files.push(SortedSpillFile {
+                file: spill_file,
+                max_record_batch_memory,
+            });
+        }
+    }
+
+    fn create_sorted_stream(
+        &mut self,
+        memory_reservation: &mut MemoryReservation,
+    ) -> Result<SendableRecordBatchStream> {
+        match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
+            // No data so empty batch
+            (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+                &self.schema,
+            )))),
+
+            // Only in-memory stream, return that
+            (0, 1) => Ok(self.sorted_streams.remove(0)),
+
+            // Only single sorted spill file so return it
+            (1, 0) => {
+                let spill_file = self.sorted_spill_files.remove(0);
+
+                self.spill_manager.read_spill_as_stream(spill_file.file)
+            }
+
+            // Only in memory streams, so merge them all in a single pass
+            (0, _) => {
+                let sorted_stream = mem::take(&mut self.sorted_streams);
+                self.create_new_merge_sort(
+                    sorted_stream,
+                    // If we have no sorted spill files left, this is the last 
run
+                    true,
+                )
+            }
+
+            // Need to merge multiple streams
+            (_, _) => {
+                // Don't account for existing streams memory
+                // as we are not holding the memory for them
+                let mut sorted_streams = mem::take(&mut self.sorted_streams);
+
+                let (sorted_spill_files, buffer_size) = self
+                    .get_sorted_spill_files_to_merge(
+                        2,
+                        // we must have at least 2 streams to merge
+                        2_usize.saturating_sub(sorted_streams.len()),
+                        memory_reservation,
+                    )?;
+
+                for spill in sorted_spill_files {
+                    let stream = self
+                        .spill_manager
+                        .clone()
+                        .with_batch_read_buffer_capacity(buffer_size)
+                        .read_spill_as_stream(spill.file)?;
+                    sorted_streams.push(stream);
+                }
+
+                self.create_new_merge_sort(
+                    sorted_streams,
+                    // If we have no sorted spill files left, this is the last 
run
+                    self.sorted_spill_files.is_empty(),
+                )
+            }
+        }
+    }
+
+    fn create_new_merge_sort(
+        &mut self,
+        streams: Vec<SendableRecordBatchStream>,
+        is_output: bool,
+    ) -> Result<SendableRecordBatchStream> {
+        StreamingMergeBuilder::new()
+            .with_schema(Arc::clone(&self.schema))
+            .with_expressions(&self.expr)
+            .with_batch_size(self.batch_size)
+            .with_fetch(self.fetch)
+            .with_metrics(if is_output {
+                // Only add the metrics to the last run
+                self.metrics.clone()
+            } else {
+                self.metrics.intermediate()
+            })
+            .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker)
+            .with_streams(streams)
+            // Don't track memory used by this stream as we reserve that 
memory by worst case sceneries
+            // (reserving memory for the biggest batch in each stream)
+            // This is a hack
+            .with_reservation(
+                MemoryConsumer::new("merge stream mock memory")

Review Comment:
   We should free the reservation and let the SPM operator use the global 
memory pool, to validate it can continue with the memory limit.



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -220,12 +220,12 @@ struct ExternalSorter {
 
     /// During external sorting, in-memory intermediate data will be appended 
to
     /// this file incrementally. Once finished, this file will be moved to 
[`Self::finished_spill_files`].
-    in_progress_spill_file: Option<InProgressSpillFile>,
+    in_progress_spill_file: Option<(InProgressSpillFile, usize)>,

Review Comment:
   Let's update the comment for this newly added `usize`.



##########
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##########
@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> 
Vec<RecordBatch> {
 
     batches
 }
+
+#[tokio::test]
+async fn test_sort_with_limited_memory() -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    let record_batch_size = pool_size / 16;
+
+    // Basic test with a lot of groups that cannot all fit in memory and 1 
record batch
+    // from each spill file is too much memory
+    let spill_count =
+        run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+            pool_size,
+            task_ctx,
+            number_of_record_batches: 100,
+            get_size_of_record_batch_to_generate: Box::pin(move |_| 
record_batch_size),
+            memory_behavior: Default::default(),
+        })
+        .await?;
+
+    let total_spill_files_size = spill_count * record_batch_size;
+    assert!(
+        total_spill_files_size > pool_size,
+        "Total spill files size {total_spill_files_size} should be greater 
than pool size {pool_size}",
+    );
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch() 
-> Result<()>
+{
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |i| {
+            if i % 25 == 1 {
+                pool_size / 4
+            } else {
+                16 * KB
+            }
+        }),
+        memory_behavior: Default::default(),
+    })
+    .await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn 
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_changing_memory_reservation(
+) -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |i| {
+            if i % 25 == 1 {
+                pool_size / 4
+            } else {
+                16 * KB
+            }
+        }),
+        memory_behavior: 
MemoryBehavior::TakeAllMemoryAndReleaseEveryNthBatch(10),
+    })
+    .await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn 
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_take_all_memory(
+) -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |i| {
+            if i % 25 == 1 {
+                pool_size / 4
+            } else {
+                16 * KB
+            }
+        }),
+        memory_behavior: MemoryBehavior::TakeAllMemoryAtTheBeginning,
+    })
+    .await?;
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_sort_with_limited_memory_and_large_record_batch() -> Result<()> {
+    let record_batch_size = 8192;
+    let pool_size = 2 * MB as usize;
+    let task_ctx = {
+        let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+        TaskContext::default()
+            .with_session_config(
+                SessionConfig::new()
+                    .with_batch_size(record_batch_size)
+                    .with_sort_spill_reservation_bytes(1),
+            )
+            .with_runtime(Arc::new(
+                RuntimeEnvBuilder::new()
+                    .with_memory_pool(memory_pool)
+                    .build()?,
+            ))
+    };
+
+    // Test that the merge degree of multi level merge sort cannot be fixed 
size when there is not enough memory
+    run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+        pool_size,
+        task_ctx,
+        number_of_record_batches: 100,
+        get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 4),
+        memory_behavior: Default::default(),
+    })
+    .await?;
+
+    Ok(())
+}
+
+struct RunSortTestWithLimitedMemoryArgs {

Review Comment:
   This utilities seems to be copied and pasted from `aggregate_fuzz.rs`, how 
about refactor it into a common module?



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
                 sort_batch(&batch, &expr, None)
             })),
         )));
-        for spill in self.spill_state.spills.drain(..) {
-            let stream = 
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-            streams.push(stream);
-        }
+
         self.spill_state.is_stream_merging = true;
         self.input = StreamingMergeBuilder::new()
             .with_streams(streams)
             .with_schema(schema)
+            .with_spill_manager(self.spill_state.spill_manager.clone())
+            .with_sorted_spill_files(std::mem::take(&mut 
self.spill_state.spills))

Review Comment:
   I suggest to spill all in-memory batches (in `streams`) to disk, before this 
final merging step. Also, let the multi pass merge operator also only handle 
spill files, and don't have to handle in-mem batches and spills at the same 
time.
   
   This is just a simplification for now, we can do a optimization to avoid 
this re-spill step in the future.
   
   The issue is, without special handling, it's possible that in-mem batches 
will take most of the available memory budget, and leave only a very small 
memory part for multi-pass spilling to continue. This can cause slow downs or 
even prevent some cases to fail.
   
   We're already doing this in sort executor, see: 
https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L336-L341



##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+    MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+    spill_manager: SpillManager,
+    schema: SchemaRef,
+    sorted_spill_files: Vec<SortedSpillFile>,
+    sorted_streams: Vec<SendableRecordBatchStream>,
+    expr: LexOrdering,
+    metrics: BaselineMetrics,
+    batch_size: usize,
+    reservation: MemoryReservation,
+    fetch: Option<usize>,
+    enable_round_robin_tie_breaker: bool,
+
+    // This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+    // side.
+    // and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+    unbounded_memory_pool: Arc<dyn MemoryPool>,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "MultiLevelMergeBuilder")
+    }
+}
+
+impl MultiLevelMergeBuilder {
+    #[allow(clippy::too_many_arguments)]
+    pub(crate) fn new(
+        spill_manager: SpillManager,
+        schema: SchemaRef,
+        sorted_spill_files: Vec<SortedSpillFile>,
+        sorted_streams: Vec<SendableRecordBatchStream>,
+        expr: LexOrdering,
+        metrics: BaselineMetrics,
+        batch_size: usize,
+        reservation: MemoryReservation,
+        fetch: Option<usize>,
+        enable_round_robin_tie_breaker: bool,
+    ) -> Self {
+        Self {
+            spill_manager,
+            schema,
+            sorted_spill_files,
+            sorted_streams,
+            expr,
+            metrics,
+            batch_size,
+            reservation,
+            enable_round_robin_tie_breaker,
+            fetch,
+            unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+        }
+    }
+
+    pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+        Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.schema),
+            futures::stream::once(self.create_stream()).try_flatten(),
+        ))
+    }
+
+    async fn create_stream(mut self) -> Result<SendableRecordBatchStream> {
+        loop {
+            // Hold this for the lifetime of the stream
+            let mut current_memory_reservation = self.reservation.new_empty();
+            let mut stream =
+                self.create_sorted_stream(&mut current_memory_reservation)?;
+
+            // TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+            //        we can avoid the memory reservation
+
+            // If no spill files are left, we can return the stream as this is 
the last sorted run
+            // TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+            if self.sorted_spill_files.is_empty() {
+                // Attach the memory reservation to the stream as we are done 
with it
+                // but because we replaced the memory reservation of the merge 
stream, we must hold
+                // this to make sure we have enough memory
+                return Ok(Box::pin(StreamAttachedReservation::new(
+                    stream,
+                    current_memory_reservation,
+                )));
+            }
+
+            // Need to sort to a spill file
+            let Some((spill_file, max_record_batch_memory)) = self
+                .spill_manager
+                .spill_record_batch_stream_by_size(
+                    &mut stream,
+                    self.batch_size,
+                    "MultiLevelMergeBuilder intermediate spill",
+                )
+                .await?
+            else {
+                continue;
+            };
+
+            // Add the spill file
+            self.sorted_spill_files.push(SortedSpillFile {
+                file: spill_file,
+                max_record_batch_memory,
+            });
+        }
+    }
+
+    fn create_sorted_stream(
+        &mut self,
+        memory_reservation: &mut MemoryReservation,
+    ) -> Result<SendableRecordBatchStream> {
+        match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
+            // No data so empty batch
+            (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+                &self.schema,
+            )))),
+
+            // Only in-memory stream, return that
+            (0, 1) => Ok(self.sorted_streams.remove(0)),
+
+            // Only single sorted spill file so return it
+            (1, 0) => {
+                let spill_file = self.sorted_spill_files.remove(0);
+
+                self.spill_manager.read_spill_as_stream(spill_file.file)
+            }
+
+            // Only in memory streams, so merge them all in a single pass
+            (0, _) => {
+                let sorted_stream = mem::take(&mut self.sorted_streams);
+                self.create_new_merge_sort(
+                    sorted_stream,
+                    // If we have no sorted spill files left, this is the last 
run
+                    true,
+                )
+            }
+
+            // Need to merge multiple streams
+            (_, _) => {
+                // Don't account for existing streams memory
+                // as we are not holding the memory for them
+                let mut sorted_streams = mem::take(&mut self.sorted_streams);
+
+                let (sorted_spill_files, buffer_size) = self
+                    .get_sorted_spill_files_to_merge(
+                        2,
+                        // we must have at least 2 streams to merge
+                        2_usize.saturating_sub(sorted_streams.len()),
+                        memory_reservation,
+                    )?;
+
+                for spill in sorted_spill_files {
+                    let stream = self
+                        .spill_manager
+                        .clone()
+                        .with_batch_read_buffer_capacity(buffer_size)
+                        .read_spill_as_stream(spill.file)?;
+                    sorted_streams.push(stream);
+                }
+
+                self.create_new_merge_sort(
+                    sorted_streams,
+                    // If we have no sorted spill files left, this is the last 
run
+                    self.sorted_spill_files.is_empty(),
+                )
+            }
+        }
+    }
+
+    fn create_new_merge_sort(
+        &mut self,
+        streams: Vec<SendableRecordBatchStream>,
+        is_output: bool,
+    ) -> Result<SendableRecordBatchStream> {
+        StreamingMergeBuilder::new()
+            .with_schema(Arc::clone(&self.schema))
+            .with_expressions(&self.expr)
+            .with_batch_size(self.batch_size)
+            .with_fetch(self.fetch)
+            .with_metrics(if is_output {
+                // Only add the metrics to the last run
+                self.metrics.clone()
+            } else {
+                self.metrics.intermediate()
+            })
+            .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker)
+            .with_streams(streams)
+            // Don't track memory used by this stream as we reserve that 
memory by worst case sceneries
+            // (reserving memory for the biggest batch in each stream)
+            // This is a hack
+            .with_reservation(
+                MemoryConsumer::new("merge stream mock memory")
+                    .register(&self.unbounded_memory_pool),
+            )
+            .build()
+    }
+
+    /// Return the sorted spill files to use for the next phase, and the 
buffer size
+    /// This will try to get as many spill files as possible to merge, and if 
we don't have enough streams
+    /// it will try to reduce the buffer size until we have enough streams to 
merge
+    /// otherwise it will return an error
+    fn get_sorted_spill_files_to_merge(
+        &mut self,
+        buffer_size: usize,
+        minimum_number_of_required_streams: usize,
+        reservation: &mut MemoryReservation,
+    ) -> Result<(Vec<SortedSpillFile>, usize)> {
+        assert_ne!(buffer_size, 0, "Buffer size must be greater than 0");
+        let mut number_of_spills_to_read_for_current_phase = 0;
+
+        for spill in &self.sorted_spill_files {
+            // For memory pools that are not shared this is good, for other 
this is not
+            // and there should be some upper limit to memory reservation so 
we won't starve the system
+            match reservation.try_grow(spill.max_record_batch_memory * 
buffer_size) {

Review Comment:
   SPM can use another `*2` memory for its internal intermediate data, see 
https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L776-L788
   Here we might need an additional `*2` for the estimation.



##########
datafusion/physical-plan/src/spill/get_size.rs:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+    Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+    FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+    PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   Can we use in-memory size for now, both for simplicity and to stay 
conservative in our memory estimation?
   Once it's implemented on the Arrow side, we can consider switching to it.



##########
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##########
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
             enable_round_robin_tie_breaker,
         } = self;
 
-        // Early return if streams or expressions are empty:
-        if streams.is_empty() {
-            return internal_err!("Streams cannot be empty for streaming 
merge");
-        }
+        // Early return if expressions are empty:
         let Some(expressions) = expressions else {
             return internal_err!("Sort expressions cannot be empty for 
streaming merge");
         };
 
+        if !sorted_spill_files.is_empty() {

Review Comment:
   The current code flow is
   ```
   Sort/Join
   -> StreamingMerge
   -> MultiLevelMerge (with potential internal re-spills to ensure the final 
merge can proceed under memory limit)
   -> StreamingMerge
   ```
   Then the first `StreamingMerge` -> `MultiLevelMerge` indirection implemented 
here seems redundant.
   
   How about let sort/aggregate directly use `MultiLevelMergeBuilder` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org


Reply via email to