2010YOUY01 commented on code in PR #15700: URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191383325
########## datafusion/physical-plan/src/sorts/multi_level_merge.rs: ########## @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Create a stream that do a multi level merge stream + +use crate::metrics::BaselineMetrics; +use crate::{EmptyRecordBatchStream, SpillManager}; +use arrow::array::RecordBatch; +use std::fmt::{Debug, Formatter}; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool, +}; + +use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; +use crate::stream::RecordBatchStreamAdapter; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use futures::TryStreamExt; +use futures::{Stream, StreamExt}; + +/// Merges a stream of sorted cursors and record batches into a single sorted stream +pub(crate) struct MultiLevelMergeBuilder { + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec<SortedSpillFile>, + sorted_streams: Vec<SendableRecordBatchStream>, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + fetch: Option<usize>, + enable_round_robin_tie_breaker: bool, + + // This is for avoiding double reservation of memory from our side and the sort preserving merge stream + // side. + // and doing a lot of code changes to avoid accounting for the memory used by the streams + unbounded_memory_pool: Arc<dyn MemoryPool>, +} + +impl Debug for MultiLevelMergeBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MultiLevelMergeBuilder") + } +} + +impl MultiLevelMergeBuilder { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec<SortedSpillFile>, + sorted_streams: Vec<SendableRecordBatchStream>, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + fetch: Option<usize>, + enable_round_robin_tie_breaker: bool, + ) -> Self { + Self { + spill_manager, + schema, + sorted_spill_files, + sorted_streams, + expr, + metrics, + batch_size, + reservation, + enable_round_robin_tie_breaker, + fetch, + unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()), + } + } + + pub(crate) fn create_spillable_merge_stream(self) -> SendableRecordBatchStream { + Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + futures::stream::once(self.create_stream()).try_flatten(), + )) + } + + async fn create_stream(mut self) -> Result<SendableRecordBatchStream> { + loop { + // Hold this for the lifetime of the stream + let mut current_memory_reservation = self.reservation.new_empty(); + let mut stream = + self.create_sorted_stream(&mut current_memory_reservation)?; + + // TODO - add a threshold for number of files to disk even if empty and reading from disk so + // we can avoid the memory reservation + + // If no spill files are left, we can return the stream as this is the last sorted run + // TODO - We can write to disk before reading it back to avoid having multiple streams in memory + if self.sorted_spill_files.is_empty() { + // Attach the memory reservation to the stream as we are done with it + // but because we replaced the memory reservation of the merge stream, we must hold + // this to make sure we have enough memory + return Ok(Box::pin(StreamAttachedReservation::new( + stream, + current_memory_reservation, + ))); + } + + // Need to sort to a spill file + let Some((spill_file, max_record_batch_memory)) = self + .spill_manager + .spill_record_batch_stream_by_size( + &mut stream, + self.batch_size, + "MultiLevelMergeBuilder intermediate spill", + ) + .await? + else { + continue; + }; + + // Add the spill file + self.sorted_spill_files.push(SortedSpillFile { + file: spill_file, + max_record_batch_memory, + }); + } + } + + fn create_sorted_stream( + &mut self, + memory_reservation: &mut MemoryReservation, + ) -> Result<SendableRecordBatchStream> { + match (self.sorted_spill_files.len(), self.sorted_streams.len()) { + // No data so empty batch + (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + &self.schema, + )))), + + // Only in-memory stream, return that + (0, 1) => Ok(self.sorted_streams.remove(0)), + + // Only single sorted spill file so return it + (1, 0) => { + let spill_file = self.sorted_spill_files.remove(0); + + self.spill_manager.read_spill_as_stream(spill_file.file) + } + + // Only in memory streams, so merge them all in a single pass + (0, _) => { + let sorted_stream = mem::take(&mut self.sorted_streams); + self.create_new_merge_sort( + sorted_stream, + // If we have no sorted spill files left, this is the last run + true, + ) + } + + // Need to merge multiple streams + (_, _) => { + // Don't account for existing streams memory + // as we are not holding the memory for them + let mut sorted_streams = mem::take(&mut self.sorted_streams); + + let (sorted_spill_files, buffer_size) = self + .get_sorted_spill_files_to_merge( + 2, + // we must have at least 2 streams to merge + 2_usize.saturating_sub(sorted_streams.len()), + memory_reservation, + )?; + + for spill in sorted_spill_files { + let stream = self + .spill_manager + .clone() + .with_batch_read_buffer_capacity(buffer_size) + .read_spill_as_stream(spill.file)?; + sorted_streams.push(stream); + } + + self.create_new_merge_sort( + sorted_streams, + // If we have no sorted spill files left, this is the last run + self.sorted_spill_files.is_empty(), + ) + } + } + } + + fn create_new_merge_sort( + &mut self, + streams: Vec<SendableRecordBatchStream>, + is_output: bool, + ) -> Result<SendableRecordBatchStream> { + StreamingMergeBuilder::new() + .with_schema(Arc::clone(&self.schema)) + .with_expressions(&self.expr) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .with_metrics(if is_output { + // Only add the metrics to the last run + self.metrics.clone() + } else { + self.metrics.intermediate() + }) + .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker) + .with_streams(streams) + // Don't track memory used by this stream as we reserve that memory by worst case sceneries + // (reserving memory for the biggest batch in each stream) + // This is a hack + .with_reservation( + MemoryConsumer::new("merge stream mock memory") + .register(&self.unbounded_memory_pool), + ) + .build() + } + + /// Return the sorted spill files to use for the next phase, and the buffer size + /// This will try to get as many spill files as possible to merge, and if we don't have enough streams + /// it will try to reduce the buffer size until we have enough streams to merge + /// otherwise it will return an error + fn get_sorted_spill_files_to_merge( + &mut self, + buffer_size: usize, Review Comment: nit: I think size is ambiguous because it sounds like 'memory size in bytes', perhaps `buffer_len`? ########## datafusion/core/tests/fuzz_cases/stream_exec.rs: ########## @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::SchemaRef; +use datafusion_common::DataFusionError; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, +}; +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +/// Execution plan that return the stream on the call to `execute`. further calls to `execute` will +/// return an error +pub struct StreamExec { Review Comment: nit: I think `OneShotExec` or `OnceExec` is more descriptive. ########## datafusion/core/tests/fuzz_cases/sort_fuzz.rs: ########## @@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> Vec<RecordBatch> { batches } + Review Comment: It would be great to add a top-level comment for this group of tests, like 'Testing spilling sort queries, with another memory-consuming neighbor (the mock operator with `MemoryBehavior`)'. The same for aggregate fuzz tests (perhaps we can structure them into the same new file?) ########## datafusion/physical-plan/src/sorts/multi_level_merge.rs: ########## @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Create a stream that do a multi level merge stream + +use crate::metrics::BaselineMetrics; +use crate::{EmptyRecordBatchStream, SpillManager}; +use arrow::array::RecordBatch; +use std::fmt::{Debug, Formatter}; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool, +}; + +use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; +use crate::stream::RecordBatchStreamAdapter; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use futures::TryStreamExt; +use futures::{Stream, StreamExt}; + +/// Merges a stream of sorted cursors and record batches into a single sorted stream +pub(crate) struct MultiLevelMergeBuilder { + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec<SortedSpillFile>, + sorted_streams: Vec<SendableRecordBatchStream>, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + fetch: Option<usize>, + enable_round_robin_tie_breaker: bool, + + // This is for avoiding double reservation of memory from our side and the sort preserving merge stream + // side. + // and doing a lot of code changes to avoid accounting for the memory used by the streams + unbounded_memory_pool: Arc<dyn MemoryPool>, +} + +impl Debug for MultiLevelMergeBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MultiLevelMergeBuilder") + } +} + +impl MultiLevelMergeBuilder { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec<SortedSpillFile>, + sorted_streams: Vec<SendableRecordBatchStream>, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + fetch: Option<usize>, + enable_round_robin_tie_breaker: bool, + ) -> Self { + Self { + spill_manager, + schema, + sorted_spill_files, + sorted_streams, + expr, + metrics, + batch_size, + reservation, + enable_round_robin_tie_breaker, + fetch, + unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()), + } + } + + pub(crate) fn create_spillable_merge_stream(self) -> SendableRecordBatchStream { + Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + futures::stream::once(self.create_stream()).try_flatten(), + )) + } + + async fn create_stream(mut self) -> Result<SendableRecordBatchStream> { + loop { + // Hold this for the lifetime of the stream + let mut current_memory_reservation = self.reservation.new_empty(); + let mut stream = + self.create_sorted_stream(&mut current_memory_reservation)?; + + // TODO - add a threshold for number of files to disk even if empty and reading from disk so + // we can avoid the memory reservation + + // If no spill files are left, we can return the stream as this is the last sorted run + // TODO - We can write to disk before reading it back to avoid having multiple streams in memory + if self.sorted_spill_files.is_empty() { + // Attach the memory reservation to the stream as we are done with it + // but because we replaced the memory reservation of the merge stream, we must hold + // this to make sure we have enough memory + return Ok(Box::pin(StreamAttachedReservation::new( + stream, + current_memory_reservation, + ))); + } + + // Need to sort to a spill file + let Some((spill_file, max_record_batch_memory)) = self + .spill_manager + .spill_record_batch_stream_by_size( + &mut stream, + self.batch_size, + "MultiLevelMergeBuilder intermediate spill", + ) + .await? + else { + continue; + }; + + // Add the spill file + self.sorted_spill_files.push(SortedSpillFile { + file: spill_file, + max_record_batch_memory, + }); + } + } + + fn create_sorted_stream( + &mut self, + memory_reservation: &mut MemoryReservation, + ) -> Result<SendableRecordBatchStream> { + match (self.sorted_spill_files.len(), self.sorted_streams.len()) { + // No data so empty batch + (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + &self.schema, + )))), + + // Only in-memory stream, return that + (0, 1) => Ok(self.sorted_streams.remove(0)), + + // Only single sorted spill file so return it + (1, 0) => { + let spill_file = self.sorted_spill_files.remove(0); + + self.spill_manager.read_spill_as_stream(spill_file.file) + } + + // Only in memory streams, so merge them all in a single pass + (0, _) => { + let sorted_stream = mem::take(&mut self.sorted_streams); + self.create_new_merge_sort( + sorted_stream, + // If we have no sorted spill files left, this is the last run + true, + ) + } + + // Need to merge multiple streams + (_, _) => { + // Don't account for existing streams memory + // as we are not holding the memory for them + let mut sorted_streams = mem::take(&mut self.sorted_streams); + + let (sorted_spill_files, buffer_size) = self + .get_sorted_spill_files_to_merge( + 2, + // we must have at least 2 streams to merge + 2_usize.saturating_sub(sorted_streams.len()), + memory_reservation, + )?; + + for spill in sorted_spill_files { + let stream = self + .spill_manager + .clone() + .with_batch_read_buffer_capacity(buffer_size) + .read_spill_as_stream(spill.file)?; + sorted_streams.push(stream); + } + + self.create_new_merge_sort( + sorted_streams, + // If we have no sorted spill files left, this is the last run + self.sorted_spill_files.is_empty(), + ) + } + } + } + + fn create_new_merge_sort( + &mut self, + streams: Vec<SendableRecordBatchStream>, + is_output: bool, + ) -> Result<SendableRecordBatchStream> { + StreamingMergeBuilder::new() + .with_schema(Arc::clone(&self.schema)) + .with_expressions(&self.expr) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .with_metrics(if is_output { + // Only add the metrics to the last run + self.metrics.clone() + } else { + self.metrics.intermediate() + }) + .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker) + .with_streams(streams) + // Don't track memory used by this stream as we reserve that memory by worst case sceneries + // (reserving memory for the biggest batch in each stream) + // This is a hack + .with_reservation( + MemoryConsumer::new("merge stream mock memory") Review Comment: We should free the reservation and let the SPM operator use the global memory pool, to validate it can continue with the memory limit. ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -220,12 +220,12 @@ struct ExternalSorter { /// During external sorting, in-memory intermediate data will be appended to /// this file incrementally. Once finished, this file will be moved to [`Self::finished_spill_files`]. - in_progress_spill_file: Option<InProgressSpillFile>, + in_progress_spill_file: Option<(InProgressSpillFile, usize)>, Review Comment: Let's update the comment for this newly added `usize`. ########## datafusion/core/tests/fuzz_cases/sort_fuzz.rs: ########## @@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> Vec<RecordBatch> { batches } + +#[tokio::test] +async fn test_sort_with_limited_memory() -> Result<()> { + let record_batch_size = 8192; + let pool_size = 2 * MB as usize; + let task_ctx = { + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + TaskContext::default() + .with_session_config( + SessionConfig::new() + .with_batch_size(record_batch_size) + .with_sort_spill_reservation_bytes(1), + ) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )) + }; + + let record_batch_size = pool_size / 16; + + // Basic test with a lot of groups that cannot all fit in memory and 1 record batch + // from each spill file is too much memory + let spill_count = + run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs { + pool_size, + task_ctx, + number_of_record_batches: 100, + get_size_of_record_batch_to_generate: Box::pin(move |_| record_batch_size), + memory_behavior: Default::default(), + }) + .await?; + + let total_spill_files_size = spill_count * record_batch_size; + assert!( + total_spill_files_size > pool_size, + "Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}", + ); + + Ok(()) +} + +#[tokio::test] +async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch() -> Result<()> +{ + let record_batch_size = 8192; + let pool_size = 2 * MB as usize; + let task_ctx = { + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + TaskContext::default() + .with_session_config( + SessionConfig::new() + .with_batch_size(record_batch_size) + .with_sort_spill_reservation_bytes(1), + ) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )) + }; + + run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs { + pool_size, + task_ctx, + number_of_record_batches: 100, + get_size_of_record_batch_to_generate: Box::pin(move |i| { + if i % 25 == 1 { + pool_size / 4 + } else { + 16 * KB + } + }), + memory_behavior: Default::default(), + }) + .await?; + + Ok(()) +} + +#[tokio::test] +async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_changing_memory_reservation( +) -> Result<()> { + let record_batch_size = 8192; + let pool_size = 2 * MB as usize; + let task_ctx = { + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + TaskContext::default() + .with_session_config( + SessionConfig::new() + .with_batch_size(record_batch_size) + .with_sort_spill_reservation_bytes(1), + ) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )) + }; + + run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs { + pool_size, + task_ctx, + number_of_record_batches: 100, + get_size_of_record_batch_to_generate: Box::pin(move |i| { + if i % 25 == 1 { + pool_size / 4 + } else { + 16 * KB + } + }), + memory_behavior: MemoryBehavior::TakeAllMemoryAndReleaseEveryNthBatch(10), + }) + .await?; + + Ok(()) +} + +#[tokio::test] +async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_take_all_memory( +) -> Result<()> { + let record_batch_size = 8192; + let pool_size = 2 * MB as usize; + let task_ctx = { + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + TaskContext::default() + .with_session_config( + SessionConfig::new() + .with_batch_size(record_batch_size) + .with_sort_spill_reservation_bytes(1), + ) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )) + }; + + run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs { + pool_size, + task_ctx, + number_of_record_batches: 100, + get_size_of_record_batch_to_generate: Box::pin(move |i| { + if i % 25 == 1 { + pool_size / 4 + } else { + 16 * KB + } + }), + memory_behavior: MemoryBehavior::TakeAllMemoryAtTheBeginning, + }) + .await?; + + Ok(()) +} + +#[tokio::test] +async fn test_sort_with_limited_memory_and_large_record_batch() -> Result<()> { + let record_batch_size = 8192; + let pool_size = 2 * MB as usize; + let task_ctx = { + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + TaskContext::default() + .with_session_config( + SessionConfig::new() + .with_batch_size(record_batch_size) + .with_sort_spill_reservation_bytes(1), + ) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )) + }; + + // Test that the merge degree of multi level merge sort cannot be fixed size when there is not enough memory + run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs { + pool_size, + task_ctx, + number_of_record_batches: 100, + get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 4), + memory_behavior: Default::default(), + }) + .await?; + + Ok(()) +} + +struct RunSortTestWithLimitedMemoryArgs { Review Comment: This utilities seems to be copied and pasted from `aggregate_fuzz.rs`, how about refactor it into a common module? ########## datafusion/physical-plan/src/aggregates/row_hash.rs: ########## @@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream { sort_batch(&batch, &expr, None) })), ))); - for spill in self.spill_state.spills.drain(..) { - let stream = self.spill_state.spill_manager.read_spill_as_stream(spill)?; - streams.push(stream); - } + self.spill_state.is_stream_merging = true; self.input = StreamingMergeBuilder::new() .with_streams(streams) .with_schema(schema) + .with_spill_manager(self.spill_state.spill_manager.clone()) + .with_sorted_spill_files(std::mem::take(&mut self.spill_state.spills)) Review Comment: I suggest to spill all in-memory batches (in `streams`) to disk, before this final merging step. Also, let the multi pass merge operator also only handle spill files, and don't have to handle in-mem batches and spills at the same time. This is just a simplification for now, we can do a optimization to avoid this re-spill step in the future. The issue is, without special handling, it's possible that in-mem batches will take most of the available memory budget, and leave only a very small memory part for multi-pass spilling to continue. This can cause slow downs or even prevent some cases to fail. We're already doing this in sort executor, see: https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L336-L341 ########## datafusion/physical-plan/src/sorts/multi_level_merge.rs: ########## @@ -0,0 +1,342 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Create a stream that do a multi level merge stream + +use crate::metrics::BaselineMetrics; +use crate::{EmptyRecordBatchStream, SpillManager}; +use arrow::array::RecordBatch; +use std::fmt::{Debug, Formatter}; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::datatypes::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::memory_pool::{ + MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool, +}; + +use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; +use crate::stream::RecordBatchStreamAdapter; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; +use datafusion_physical_expr_common::sort_expr::LexOrdering; +use futures::TryStreamExt; +use futures::{Stream, StreamExt}; + +/// Merges a stream of sorted cursors and record batches into a single sorted stream +pub(crate) struct MultiLevelMergeBuilder { + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec<SortedSpillFile>, + sorted_streams: Vec<SendableRecordBatchStream>, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + fetch: Option<usize>, + enable_round_robin_tie_breaker: bool, + + // This is for avoiding double reservation of memory from our side and the sort preserving merge stream + // side. + // and doing a lot of code changes to avoid accounting for the memory used by the streams + unbounded_memory_pool: Arc<dyn MemoryPool>, +} + +impl Debug for MultiLevelMergeBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MultiLevelMergeBuilder") + } +} + +impl MultiLevelMergeBuilder { + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec<SortedSpillFile>, + sorted_streams: Vec<SendableRecordBatchStream>, + expr: LexOrdering, + metrics: BaselineMetrics, + batch_size: usize, + reservation: MemoryReservation, + fetch: Option<usize>, + enable_round_robin_tie_breaker: bool, + ) -> Self { + Self { + spill_manager, + schema, + sorted_spill_files, + sorted_streams, + expr, + metrics, + batch_size, + reservation, + enable_round_robin_tie_breaker, + fetch, + unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()), + } + } + + pub(crate) fn create_spillable_merge_stream(self) -> SendableRecordBatchStream { + Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + futures::stream::once(self.create_stream()).try_flatten(), + )) + } + + async fn create_stream(mut self) -> Result<SendableRecordBatchStream> { + loop { + // Hold this for the lifetime of the stream + let mut current_memory_reservation = self.reservation.new_empty(); + let mut stream = + self.create_sorted_stream(&mut current_memory_reservation)?; + + // TODO - add a threshold for number of files to disk even if empty and reading from disk so + // we can avoid the memory reservation + + // If no spill files are left, we can return the stream as this is the last sorted run + // TODO - We can write to disk before reading it back to avoid having multiple streams in memory + if self.sorted_spill_files.is_empty() { + // Attach the memory reservation to the stream as we are done with it + // but because we replaced the memory reservation of the merge stream, we must hold + // this to make sure we have enough memory + return Ok(Box::pin(StreamAttachedReservation::new( + stream, + current_memory_reservation, + ))); + } + + // Need to sort to a spill file + let Some((spill_file, max_record_batch_memory)) = self + .spill_manager + .spill_record_batch_stream_by_size( + &mut stream, + self.batch_size, + "MultiLevelMergeBuilder intermediate spill", + ) + .await? + else { + continue; + }; + + // Add the spill file + self.sorted_spill_files.push(SortedSpillFile { + file: spill_file, + max_record_batch_memory, + }); + } + } + + fn create_sorted_stream( + &mut self, + memory_reservation: &mut MemoryReservation, + ) -> Result<SendableRecordBatchStream> { + match (self.sorted_spill_files.len(), self.sorted_streams.len()) { + // No data so empty batch + (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( + &self.schema, + )))), + + // Only in-memory stream, return that + (0, 1) => Ok(self.sorted_streams.remove(0)), + + // Only single sorted spill file so return it + (1, 0) => { + let spill_file = self.sorted_spill_files.remove(0); + + self.spill_manager.read_spill_as_stream(spill_file.file) + } + + // Only in memory streams, so merge them all in a single pass + (0, _) => { + let sorted_stream = mem::take(&mut self.sorted_streams); + self.create_new_merge_sort( + sorted_stream, + // If we have no sorted spill files left, this is the last run + true, + ) + } + + // Need to merge multiple streams + (_, _) => { + // Don't account for existing streams memory + // as we are not holding the memory for them + let mut sorted_streams = mem::take(&mut self.sorted_streams); + + let (sorted_spill_files, buffer_size) = self + .get_sorted_spill_files_to_merge( + 2, + // we must have at least 2 streams to merge + 2_usize.saturating_sub(sorted_streams.len()), + memory_reservation, + )?; + + for spill in sorted_spill_files { + let stream = self + .spill_manager + .clone() + .with_batch_read_buffer_capacity(buffer_size) + .read_spill_as_stream(spill.file)?; + sorted_streams.push(stream); + } + + self.create_new_merge_sort( + sorted_streams, + // If we have no sorted spill files left, this is the last run + self.sorted_spill_files.is_empty(), + ) + } + } + } + + fn create_new_merge_sort( + &mut self, + streams: Vec<SendableRecordBatchStream>, + is_output: bool, + ) -> Result<SendableRecordBatchStream> { + StreamingMergeBuilder::new() + .with_schema(Arc::clone(&self.schema)) + .with_expressions(&self.expr) + .with_batch_size(self.batch_size) + .with_fetch(self.fetch) + .with_metrics(if is_output { + // Only add the metrics to the last run + self.metrics.clone() + } else { + self.metrics.intermediate() + }) + .with_round_robin_tie_breaker(self.enable_round_robin_tie_breaker) + .with_streams(streams) + // Don't track memory used by this stream as we reserve that memory by worst case sceneries + // (reserving memory for the biggest batch in each stream) + // This is a hack + .with_reservation( + MemoryConsumer::new("merge stream mock memory") + .register(&self.unbounded_memory_pool), + ) + .build() + } + + /// Return the sorted spill files to use for the next phase, and the buffer size + /// This will try to get as many spill files as possible to merge, and if we don't have enough streams + /// it will try to reduce the buffer size until we have enough streams to merge + /// otherwise it will return an error + fn get_sorted_spill_files_to_merge( + &mut self, + buffer_size: usize, + minimum_number_of_required_streams: usize, + reservation: &mut MemoryReservation, + ) -> Result<(Vec<SortedSpillFile>, usize)> { + assert_ne!(buffer_size, 0, "Buffer size must be greater than 0"); + let mut number_of_spills_to_read_for_current_phase = 0; + + for spill in &self.sorted_spill_files { + // For memory pools that are not shared this is good, for other this is not + // and there should be some upper limit to memory reservation so we won't starve the system + match reservation.try_grow(spill.max_record_batch_memory * buffer_size) { Review Comment: SPM can use another `*2` memory for its internal intermediate data, see https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L776-L788 Here we might need an additional `*2` for the estimation. ########## datafusion/physical-plan/src/spill/get_size.rs: ########## @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray, + FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait, + PrimitiveArray, RecordBatch, StructArray, +}; +use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer}; +use arrow::datatypes::{ArrowNativeType, ByteArrayType}; +use arrow::downcast_primitive_array; +use arrow_schema::DataType; + +/// TODO - NEED TO MOVE THIS TO ARROW Review Comment: Can we use in-memory size for now, both for simplicity and to stay conservative in our memory estimation? Once it's implemented on the Arrow side, we can consider switching to it. ########## datafusion/physical-plan/src/sorts/streaming_merge.rs: ########## @@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> { enable_round_robin_tie_breaker, } = self; - // Early return if streams or expressions are empty: - if streams.is_empty() { - return internal_err!("Streams cannot be empty for streaming merge"); - } + // Early return if expressions are empty: let Some(expressions) = expressions else { return internal_err!("Sort expressions cannot be empty for streaming merge"); }; + if !sorted_spill_files.is_empty() { Review Comment: The current code flow is ``` Sort/Join -> StreamingMerge -> MultiLevelMerge (with potential internal re-spills to ensure the final merge can proceed under memory limit) -> StreamingMerge ``` Then the first `StreamingMerge` -> `MultiLevelMerge` indirection implemented here seems redundant. How about let sort/aggregate directly use `MultiLevelMergeBuilder` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org