Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3157119280 @adriangb There are still some cases that external sorting gives up with memory allocation failure, especially when (1) internal memory accounting is wrong https://github.com/apache/datafusion/issues/14748 (which is also the cause of https://github.com/apache/datafusion/issues/16979) and (2) when memory pressure is so high that multi level merge step can't grow the reservation https://github.com/apache/datafusion/blob/6043be448fe19333b0f1469026a62c580f7bbb31/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L381 We need to estimate the memory required for row-formatted batch correctly for (1) and further limit the number of spills to merge if multi level merge fails (https://github.com/apache/datafusion/issues/16908) for (2). I'm working on these follow up issues recently but it will take time. Anyway, until these fixes are done, I'd recommend trying to run above query with a smaller `partitions` setting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
adriangb commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3156469785 I'm trying this out for our compaction system and am not able to get my sort to work without hitting memory limits. Note that I am using `datafusion-cli` but am not sure if it has a disk manager, etc. configured, but I figure if I can't reproduce it's maybe not obvious how to configure datafusion-cli so it's a fair question: In `q.sql`: ```sql -- About 6.32 GB of parquet compressed (~ 10 x compression ratio) -- Split into ~60 ~100 MB files CREATE EXTERNAL TABLE t1 STORED AS PARQUET LOCATION '/Users/adriangb/Downloads/data/day=2025-08-05/'; SET datafusion.execution.sort_spill_reservation_bytes = 0; COPY ( SELECT * FROM t1 ORDER BY deployment_environment, kind, service_name, trace_id ) TO '/Users/adriangb/Downloads/out.parquet'; ``` ```shell ❯ ./target/release/datafusion-cli --mem-pool-type 'fair' --memory-limit '1g' -f q.sql DataFusion CLI v49.0.0 0 row(s) fetched. Elapsed 0.244 seconds. 0 row(s) fetched. Elapsed 0.000 seconds. +---+---+ | plan_type | plan | +---+---+ | physical_plan | ┌───┐ | | | │DataSinkExec │ | | | └─┬─┘ | | | ┌─┴─┐ | | | │ SortPreservingMergeExec │ | | | │ │ | | | │ deployment_environment ASC│ | | | │NULLS LAST, kind ASC │ | | | │ NULLS LAST, │ | | | │service_name │ | | | │ ASC NULLS LAST, │ | | | │ trace_id ASC NULLS│ | | | │LAST │ | | | └─┬─┘ | | | ┌─┴─┐ | | | │ SortExec │ | | | │ │ | | | │ deployment_environment@35 │ | | | │ ASC NULLS LAST, kind@6 │ | | | │ ASC NULLS LAST, │ | | | │ service_name@27 │ | | | │ ASC NULLS LAST, │ | | | │ trace_id@4 ASC │ | | | │ NULLS LAST│ | | | └─┬─┘ | | | ┌─┴─┐ | | | │ DataSourceExec │ | | | │ │ | | | │ files: 68 │ | | | │ format: parquet │ | | | └───┘ | | | | +---+---+ 1 row(s) fetched. Elapsed 0.254 seconds. Not enough memory to continue external sort. Consider increasing the memory limit, or decreasing sort_spill_reservation_bytes caused by Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorter[10]#25(can spill: true) consumed 78.2 MB, ExternalSorter[11]#27(can spill: true) consumed 77.2 MB, ExternalSorter[7]#19(can spill: true) consumed 75.7 MB. Error: Failed to allocate additional 90.1 MB for ExternalSorter[6] with 0.0 B already allocated for this reservation - 82.2 MB remain available for the total pool ``` I can maybe share the data with some sort of NDA but honestly it's not that interesting, it's just a lot of random data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
alamb commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2240660130
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+///
+/// This is a wrapper around
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+/// - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+/// - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
Review Comment:
😍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
alamb commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3133626834 EPIC! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3130543883 > Amazing work! Aside from updating to `main` what do users of DataFusion need to do to use this improvement? Nothing else needed, it will be triggered automatically and it doesn't require a configuration to control. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
adriangb commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3130446693 Amazing work! Aside from updating to `main` what do users of DataFusion need to do to use this improvement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3130380274 Thanks everyone @rluvaton @ding-young and others who have helped along the way 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 merged PR #15700: URL: https://github.com/apache/datafusion/pull/15700 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230996830
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+///
+/// This is a wrapper around
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+/// - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+/// - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+/// ┌───┐
+/// │ Phase 1 │
+/// └───┘
+/// ┌──Can hold in memory─┐
+/// │ ┌──┐ │
+/// │ │ In-memory │
+/// │ │sorted stream │──┼┐
+/// │ │ 1 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││
+/// │ │ In-memory │ │
+/// │ │sorted stream │──┼┤
+/// │ │ 2 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││
+/// │ │ In-memory │ │
+/// │ │sorted stream │──┼┤
+/// │ │ 3 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││┌───┐
+/// │ │ Sorted Spill │ ││ Phase 2 │
+/// │ │file 1│──┼┤└───┘
+/// │ └──┘ ││
+/// ─┘│ ┌──Can hold in memory─┐
+///│ │ │
+/// ┌──┐ │ │ ┌──┐
+/// │ Sorted Spill │ │ │ │ Sorted Spill │ │
+/// │file 2│──▶│file 2│──┼─┐
+/// └──┘ │ └──┘ │ │
+/// ┌──┐ │ │ ┌──┐ │ │
+/// │ Sorted Spill │ │ │ │ Sorted Spill ││
+/// │file 3│──▶│file 3│──┼─┤
+/// └──┘ │ │ └──┘ │ │
+/// ┌──┐ │ ┌──┐ │ │
+/// │ Sorted Spill │ │ │ │ Sorted Spill │ │ │
+/// │file 4│──▶│file 4│┤
┌───┐
+/// └──┘ │ │ └──┘ │ │
│ Phase 3 │
+///│ │ │ │
└───┘
+///│ ─┘ │
┌──Can hold in memory─┐
+///│
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230995605
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+///
+/// This is a wrapper around
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+/// - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+/// - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+/// ┌───┐
+/// │ Phase 1 │
+/// └───┘
+/// ┌──Can hold in memory─┐
+/// │ ┌──┐ │
+/// │ │ In-memory │
+/// │ │sorted stream │──┼┐
+/// │ │ 1 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││
+/// │ │ In-memory │ │
+/// │ │sorted stream │──┼┤
+/// │ │ 2 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││
+/// │ │ In-memory │ │
+/// │ │sorted stream │──┼┤
+/// │ │ 3 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││┌───┐
+/// │ │ Sorted Spill │ ││ Phase 2 │
+/// │ │file 1│──┼┤└───┘
+/// │ └──┘ ││
+/// ─┘│ ┌──Can hold in memory─┐
+///│ │ │
+/// ┌──┐ │ │ ┌──┐
+/// │ Sorted Spill │ │ │ │ Sorted Spill │ │
+/// │file 2│──▶│file 2│──┼─┐
+/// └──┘ │ └──┘ │ │
+/// ┌──┐ │ │ ┌──┐ │ │
+/// │ Sorted Spill │ │ │ │ Sorted Spill ││
+/// │file 3│──▶│file 3│──┼─┤
+/// └──┘ │ │ └──┘ │ │
+/// ┌──┐ │ ┌──┐ │ │
+/// │ Sorted Spill │ │ │ │ Sorted Spill │ │ │
+/// │file 4│──▶│file 4│┤
┌───┐
+/// └──┘ │ │ └──┘ │ │
│ Phase 3 │
+///│ │ │ │
└───┘
+///│ ─┘ │
┌──Can hold in memory─┐
+///│
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700: URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230992741 ## datafusion/physical-plan/src/spill/spill_manager.rs: ## @@ -17,11 +17,10 @@ //! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations. -use std::sync::Arc; - use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_execution::runtime_env::RuntimeEnv; +use std::sync::Arc; Review Comment: isn't the import order defined in clippy/fmt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230075680
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+///
+/// This is a wrapper around
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+/// - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+/// - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+/// ┌───┐
+/// │ Phase 1 │
+/// └───┘
+/// ┌──Can hold in memory─┐
+/// │ ┌──┐ │
+/// │ │ In-memory │
+/// │ │sorted stream │──┼┐
+/// │ │ 1 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││
+/// │ │ In-memory │ │
+/// │ │sorted stream │──┼┤
+/// │ │ 2 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││
+/// │ │ In-memory │ │
+/// │ │sorted stream │──┼┤
+/// │ │ 3 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││┌───┐
+/// │ │ Sorted Spill │ ││ Phase 2 │
+/// │ │file 1│──┼┤└───┘
+/// │ └──┘ ││
+/// ─┘│ ┌──Can hold in memory─┐
+///│ │ │
+/// ┌──┐ │ │ ┌──┐
+/// │ Sorted Spill │ │ │ │ Sorted Spill │ │
+/// │file 2│──▶│file 2│──┼─┐
+/// └──┘ │ └──┘ │ │
+/// ┌──┐ │ │ ┌──┐ │ │
+/// │ Sorted Spill │ │ │ │ Sorted Spill ││
+/// │file 3│──▶│file 3│──┼─┤
+/// └──┘ │ │ └──┘ │ │
+/// ┌──┐ │ ┌──┐ │ │
+/// │ Sorted Spill │ │ │ │ Sorted Spill │ │ │
+/// │file 4│──▶│file 4│┤
┌───┐
+/// └──┘ │ │ └──┘ │ │
│ Phase 3 │
+///│ │ │ │
└───┘
+///│ ─┘ │
┌──Can hold in memory─┐
+///│
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
Copilot commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230055548
##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -17,11 +17,10 @@
//! Define the `SpillManager` struct, which is responsible for reading and
writing `RecordBatch`es to raw files based on the provided configurations.
-use std::sync::Arc;
-
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_execution::runtime_env::RuntimeEnv;
+use std::sync::Arc;
Review Comment:
[nitpick] The import order has been changed unnecessarily. The original
import order (std imports first, then external crates) followed Rust
conventions better.
```suggestion
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_execution::runtime_env::RuntimeEnv;
```
##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -119,9 +158,22 @@ impl<'a> StreamingMergeBuilder<'a> {
self
}
+/// Bypass the mempool and avoid using the memory reservation.
+///
+/// This is not marked as `pub` because it is not recommended to use this
method
Review Comment:
The comment should explain why this method is not recommended and what the
risks are of bypassing the memory pool.
```suggestion
///
/// This method bypasses the memory pool, which can lead to unregulated
memory usage.
/// Using an unbounded memory pool may result in excessive memory
consumption and
/// potential system instability if memory usage exceeds available
resources.
/// This is not marked as `pub` because it is not recommended to use
this method
/// except in testing or controlled scenarios where memory usage is
closely monitored.
```
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+///
+/// This is a wrapper around
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+/// - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+/// - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+/// ┌───┐
+/// │ Phase 1 │
+/// └───┘
+/// ┌──Can hold in memory─┐
+/// │ ┌──┐ │
+/// │ │ In-memory │
+/// │ │sorted stream │──┼┐
+/// │ │ 1 │ ││
+/// └──┘ ││
+/// │ ┌──┐ ││
+/// │ │ In-memory │ │
+/// │ │sorted stream │──┼┤
+/// │ │ 2 │ ││
+/// └─
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2227706228
##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -140,3 +210,19 @@ impl SpillManager {
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
}
}
+
+pub(crate) trait GetSlicedSize {
+/// Returns the size of the `RecordBatch` when sliced.
+fn get_sliced_size(&self) -> usize;
+}
+
+impl GetSlicedSize for RecordBatch {
+fn get_sliced_size(&self) -> usize {
+let mut total = 0;
+for array in self.columns() {
+let data = array.to_data();
+total += data.get_slice_memory_size().unwrap();
+}
+total
Review Comment:
I blindly copy pasted the code 😅
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2227706228
##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -140,3 +210,19 @@ impl SpillManager {
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
}
}
+
+pub(crate) trait GetSlicedSize {
+/// Returns the size of the `RecordBatch` when sliced.
+fn get_sliced_size(&self) -> usize;
+}
+
+impl GetSlicedSize for RecordBatch {
+fn get_sliced_size(&self) -> usize {
+let mut total = 0;
+for array in self.columns() {
+let data = array.to_data();
+total += data.get_slice_memory_size().unwrap();
+}
+total
Review Comment:
I blindly copy pasted the code 😅 fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2227210285
##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -140,3 +210,19 @@ impl SpillManager {
Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
}
}
+
+pub(crate) trait GetSlicedSize {
+/// Returns the size of the `RecordBatch` when sliced.
+fn get_sliced_size(&self) -> usize;
+}
+
+impl GetSlicedSize for RecordBatch {
+fn get_sliced_size(&self) -> usize {
+let mut total = 0;
+for array in self.columns() {
+let data = array.to_data();
+total += data.get_slice_memory_size().unwrap();
+}
+total
Review Comment:
Thanks for applying the suggestion! Let's return `Result` instead of
calling `unwrap()`. Since we already support `From for
DataFusionError`, we can simply propagate the error using ?.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3109894011 can you please re-review I don't believe there are any actionable comments from my side to review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2226469366
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
changed!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2226461212
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
great! this memory is accurate!, I'm so glad about this API, will change now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2226402212
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
I'm not familiar with `Self::get_slice_memory_size` is it new? I will test
it against my current function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2221120019
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
I see the reason—currently, it's inevitable for aggregation to generate a
large intermediate batch when spilling.
~~Let's add some unit tests for this `get_actual_size()` API before
merging~~ It seems there's an existing Arrow utility for the slice size
estimation mentioned above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2221120019
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
I see the reason—currently, it's inevitable for aggregation to generate a
large intermediate batch when spilling.
Let's add some unit tests for this `get_actual_size()` API before merging
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2221118843
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
Thank you for the clarification. My next question is, if so, then what do
you think about using existing arrow api
[`get_slice_memory_size`](https://docs.rs/arrow/latest/arrow/array/struct.ArrayData.html#method.get_slice_memory_size)?
If its memory estimation is reasonably close to what you had in mind, (and if
the only cause of overestimation is slicing) we might be able to use that
instead. Thereby we may avoid the extra effort of porting your logic into
`arrow-rs`.
Like how you added `get_actually_used_size`, we may do the same.
```rust
impl GetSlicedSize for RecordBatch {
fn get_sliced_size(&self) -> usize {
let mut total = 0;
for array in self.columns() {
let data = array.to_data();
total += data.get_slice_memory_size().unwrap();
}
total
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219474642
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
slicing indeed overestimate, but the special case handling is the
`get_size`, but instead of only using in aggregation I've used it everywhere
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219148029
##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -125,6 +133,156 @@ impl SpillManager {
self.spill_record_batch_and_finish(&batches, request_description)
}
+/// Refer to the documentation for
[`Self::spill_record_batch_and_finish`]. This method
+/// additionally spills the `RecordBatch` into smaller batches, divided by
`row_limit`.
+///
+/// # Errors
+/// - Returns an error if spilling would exceed the disk usage limit
configured
+/// by `max_temp_directory_size` in `DiskManager`
+pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+&self,
+batch: &RecordBatch,
+request_description: &str,
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219057789
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
Review Comment:
changed and made sure to use the memory reservation when only merging
in-memory streams otherwise use the worst case scenario
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governi
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219143884
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
Review Comment:
added and created the temporary unbounded memory pool inside SPM.
made that function `pub(super)` to avoid exposing this to the users as I
feel it is a hack
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219141893
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
Review Comment:
added with diagram
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219098075
##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -125,6 +133,156 @@ impl SpillManager {
self.spill_record_batch_and_finish(&batches, request_description)
}
+/// Refer to the documentation for
[`Self::spill_record_batch_and_finish`]. This method
+/// additionally spills the `RecordBatch` into smaller batches, divided by
`row_limit`.
+///
+/// # Errors
+/// - Returns an error if spilling would exceed the disk usage limit
configured
+/// by `max_temp_directory_size` in `DiskManager`
+pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+&self,
+batch: &RecordBatch,
+request_description: &str,
+row_limit: usize,
+) -> Result> {
+let total_rows = batch.num_rows();
+let mut batches = Vec::new();
+let mut offset = 0;
+
+// It's ok to calculate all slices first, because slicing is zero-copy.
+while offset < total_rows {
+let length = std::cmp::min(total_rows - offset, row_limit);
+let sliced_batch = batch.slice(offset, length);
+batches.push(sliced_batch);
+offset += length;
+}
+
+let mut in_progress_file =
self.create_in_progress_file(request_description)?;
+
+let mut max_record_batch_size = 0;
+
+for batch in batches {
+in_progress_file.append_batch(&batch)?;
+
+max_record_batch_size =
+max_record_batch_size.max(batch.get_actually_used_size());
+}
+
+let file = in_progress_file.finish()?;
+
+Ok(file.map(|f| (f, max_record_batch_size)))
+}
+
+/// Spill the `RecordBatch` to disk as smaller batches
+/// split by `batch_size_rows`.
+///
+/// will return the spill file and the size of the largest batch in memory
+pub async fn spill_record_batch_stream_by_size(
+&self,
+stream: &mut SendableRecordBatchStream,
+batch_size_rows: usize,
+request_msg: &str,
+) -> Result> {
+use futures::StreamExt;
+let mut in_progress_file = self.create_in_progress_file(request_msg)?;
+
+let mut max_record_batch_size = 0;
+
+let mut maybe_last_batch: Option = None;
+
+while let Some(batch) = stream.next().await {
+let mut batch = batch?;
+
+if let Some(mut last_batch) = maybe_last_batch.take() {
+assert!(
+last_batch.num_rows() < batch_size_rows,
+"last batch size must be smaller than the requested batch
size"
+);
+
+// Get the number of rows to take from current batch so the
last_batch
+// will have `batch_size_rows` rows
+let current_batch_offset = std::cmp::min(
+// rows needed to fill
+batch_size_rows - last_batch.num_rows(),
+// Current length of the batch
+batch.num_rows(),
+);
+
+// if have last batch that has less rows than concat and spill
+last_batch = arrow::compute::concat_batches(
+&stream.schema(),
+&[last_batch, batch.slice(0, current_batch_offset)],
+)?;
+
+assert!(last_batch.num_rows() <= batch_size_rows, "must build
a batch that is smaller or equal to the requested batch size from the current
batch");
+
+// If not enough rows
+if last_batch.num_rows() < batch_size_rows {
+// keep the last batch for next iteration
+maybe_last_batch = Some(last_batch);
+continue;
+}
+
+max_record_batch_size =
+
max_record_batch_size.max(last_batch.get_actually_used_size());
+
+in_progress_file.append_batch(&last_batch)?;
+
+if current_batch_offset == batch.num_rows() {
+// No remainder
+continue;
+}
+
+// remainder
+batch = batch.slice(
+current_batch_offset,
+batch.num_rows() - current_batch_offset,
+);
+}
+
+let mut offset = 0;
+let total_rows = batch.num_rows();
+
+// Keep slicing the batch until we have left with a batch that is
smaller than
Review Comment:
you are right, simplify
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219057789
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
Review Comment:
changed and made sure to use the memory reservation when only merging
in-memory streams
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218214013
##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -125,6 +133,156 @@ impl SpillManager {
self.spill_record_batch_and_finish(&batches, request_description)
}
+/// Refer to the documentation for
[`Self::spill_record_batch_and_finish`]. This method
+/// additionally spills the `RecordBatch` into smaller batches, divided by
`row_limit`.
+///
+/// # Errors
+/// - Returns an error if spilling would exceed the disk usage limit
configured
+/// by `max_temp_directory_size` in `DiskManager`
+pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+&self,
+batch: &RecordBatch,
+request_description: &str,
Review Comment:
Nit: Maybe it might be better to unify the naming. Some functions use
`request_msg`, others use `request_description`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218105116
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
I've also experienced a case where using `batch.get_array_memory_size` leads
to overestimation in aggregation. But that's not because arrow overallocates
buffers, but because we **"slice"** the batch when we spill in
`spill_record_batch_by_size_(and_return_max_batch_memory)`. Slicing the batch
does not modify the buffer capacity, which `batch.get_array_memory_size` relies
on, so therefore each sliced batch reports the full buffer memory usage, even
though the buffers are shared between the slices.
When running
`test_single_mode_aggregate_single_mode_aggregate_with_spill()` in
`aggregation_fuzz` with [reproducer
branch](https://github.com/ding-young/datafusion/tree/batch-memory-reproducer),
we can verify that naively relying on `batch.get_array_memory_size` with sliced
batch may lead to overestimation.
```
fuzz_cases::aggregate_fuzz::test_single_mode_aggregate_single_mode_aggregate_with_spill
stdout
batch.get_actually_used_size:21492, batch.get_array_memory_size:178946
batch.get_actually_used_size:21624, batch.get_array_memory_size:178946
batch.get_actually_used_size:21635, batch.get_array_memory_size:178946
batch.get_actually_used_size:21620, batch.get_array_memory_size:178946
batch.get_actually_used_size:21636, batch.get_array_memory_size:178946
...
```
If slicing was the only cause for overestimation, I think we can just add
special case handling for aggregation and keep it simple with given
`batch.get_array_memory_size` api.
I wonder if the overestimation @rluvaton encountered was caused by this
issue, or if it was due to something else.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218105116
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
I've also experienced a case where using `batch.get_array_memory_size` leads
to overestimation in aggregation. But that's not because arrow overallocates
buffers, but because we **"slice"** the batch when we spill in
`spill_record_batch_by_size_(and_return_max_batch_memory)`. Slicing the batch
does not modify the buffer capacity, which `batch.get_array_memory_size` relies
on, so therefore each sliced batch reports the full buffer memory usage, even
though the buffers are shared between the slices.
When running
`test_single_mode_aggregate_single_mode_aggregate_with_spill()` in
`aggregation_fuzz` with [reproducer
branch](https://github.com/ding-young/datafusion/tree/batch-memory-reproducer),
we can verify that naively relying on `batch.get_array_memory_size` with sliced
batch may lead to overestimation.
```
fuzz_cases::aggregate_fuzz::test_single_mode_aggregate_single_mode_aggregate_with_spill
stdout
batch.get_actually_used_size:21492, batch.get_array_memory_size:178946
batch.get_actually_used_size:21624, batch.get_array_memory_size:178946
batch.get_actually_used_size:21635, batch.get_array_memory_size:178946
batch.get_actually_used_size:21620, batch.get_array_memory_size:178946
batch.get_actually_used_size:21636, batch.get_array_memory_size:178946
...
```
Since sort does not use `spill_record_batch_by_size` api, so no slicing is
involved, (I believe) only aggregation suffered from this issue. If slicing was
the only cause for overestimation, I think we can just add special case
handling for aggregation and keep it simple with given
`batch.get_array_memory_size` api.
I wonder if the overestimation @rluvaton encountered was caused by this
issue, or if it was due to something else.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218105116
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
I've also experienced a case where using `batch.get_array_memory_size` leads
to overestimation in aggregation. But that's not because arrow overallocates
buffers, but because we **"slice"** the batch when we spill in
`spill_record_batch_by_size_(and_return_max_batch_memory)`. Slicing the batch
does not modify the buffer capacity, which `batch.get_array_memory_size` relies
on, so therefore each sliced batch reports the full buffer memory usage, even
though the buffers are shared between the slices.
When running
`test_single_mode_aggregate_single_mode_aggregate_with_spill()` in
`aggregation_fuzz` with [reproducer
branch](https://github.com/ding-young/datafusion/tree/batch-memory-reproducer),
we can verify that naively relying on `batch.get_array_memory_size` with sliced
batch may lead to overestimation.
```
fuzz_cases::aggregate_fuzz::test_single_mode_aggregate_single_mode_aggregate_with_spill
stdout
batch.get_actually_used_size:21492, batch.get_array_memory_size:178946
batch.get_actually_used_size:21624, batch.get_array_memory_size:178946
batch.get_actually_used_size:21635, batch.get_array_memory_size:178946
batch.get_actually_used_size:21620, batch.get_array_memory_size:178946
batch.get_actually_used_size:21636, batch.get_array_memory_size:178946
...
```
Since sort does not use `spill_record_batch_by_size` api, so no slicing is
involved, (I believe) only aggregation suffered from this issue. If slicing was
the only cause for overestimation, I think we can just add special case
handling for aggregation and keep it simple with given
`batch.get_array_memory_size` api.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2217240477
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// N
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2217220186
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
Review Comment:
If we have this reservation with the same lifetime as the stream, would it
be better to create a `MultiLevelMergeStream` and make this reservation a
struct field?
##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
enable_round_robin_tie_breaker,
} = self;
-// Early return if streams or expressions are empty:
-if streams.is_empty() {
-return internal_err!("Streams cannot be empty for streaming
merge");
-}
+// Early return if expressions are empty:
let Some(expressions) = expressions else {
return internal_err!("Sort expressions cannot be empty for
streaming merge");
};
+if !sorted_spill_files.is_empty() {
Review Comment:
I agree it's good to reduce the number of APIs, then two approaches seem to
have similar complexity.
##
datafusion/physical-plan/src/sorts/multi
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3084017814 @2010YOUY01 I've updated based on your comments and commented back on some -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213305286
##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW
Review Comment:
The reason why I did that is that it can overestimate by a lot causing more
spills and really reducing performance, I had this because I experienced it
over-calculated buffers to be 100MB while it was not that large for aggregation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213294534
##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
enable_round_robin_tie_breaker,
} = self;
-// Early return if streams or expressions are empty:
-if streams.is_empty() {
-return internal_err!("Streams cannot be empty for streaming
merge");
-}
+// Early return if expressions are empty:
let Some(expressions) = expressions else {
return internal_err!("Sort expressions cannot be empty for
streaming merge");
};
+if !sorted_spill_files.is_empty() {
Review Comment:
I wanted a single entrypoint for users to merge multiple streams, it can be
files or not.
the reason I love DataFusion is that it uses it's own public API, meaning I
can copy paste the operator code and have it working without the need to copy
the entire codebase.
and I don't want to set `MultiLevelMergeBuilder` public for the same reason
`SortPreservingMergeStream` is not public
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213289055
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
sort_batch(&batch, &expr, None)
})),
)));
-for spill in self.spill_state.spills.drain(..) {
-let stream =
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
self.spill_state.is_stream_merging = true;
self.input = StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut
self.spill_state.spills))
Review Comment:
> The issue is, without special handling, it's possible that in-mem batches
will take most of the available memory budget, and leave only a very small
memory part for multi-pass spilling to continue. This can cause slow downs or
even prevent some cases to finish.
Actually this case is already handled, so if I have 4 in memory streams and
5 spill files.
and I don't have enough memory to merge with any spill file we will merge
the in memory streams and spill to disk and next iteration will be the same as
if we got 6 spill files.
this is the same behavior as spilling before but optimized
See:
https://github.com/rluvaton/datafusion/blob/af6b5c52afe316127cd007cd8464d07c3038a0f9/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L178-L203
we first `take` the `in_memory_streams` (and don't add them back anymore)
https://github.com/rluvaton/datafusion/blob/af6b5c52afe316127cd007cd8464d07c3038a0f9/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L180
and we try to get spill files to merge but the required number of spill
files needed is 2 - `in_memory_stream.len()`
https://github.com/rluvaton/datafusion/blob/af6b5c52afe316127cd007cd8464d07c3038a0f9/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L185-L186
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213223988
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// Nee
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213137522
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
Review Comment:
N
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213137522
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
Review Comment:
N
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213137522
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
Review Comment:
N
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213099661
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
Review Comment:
W
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213099661
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
Review Comment:
W
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213099661
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
Review Comment:
W
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212846002
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// Nee
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212828103
##
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##
@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) ->
Vec {
batches
}
+
Review Comment:
moved to the same file with comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212827518
##
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##
@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) ->
Vec {
batches
}
+
+#[tokio::test]
+async fn test_sort_with_limited_memory() -> Result<()> {
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+let record_batch_size = pool_size / 16;
+
+// Basic test with a lot of groups that cannot all fit in memory and 1
record batch
+// from each spill file is too much memory
+let spill_count =
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+get_size_of_record_batch_to_generate: Box::pin(move |_|
record_batch_size),
+memory_behavior: Default::default(),
+})
+.await?;
+
+let total_spill_files_size = spill_count * record_batch_size;
+assert!(
+total_spill_files_size > pool_size,
+"Total spill files size {total_spill_files_size} should be greater
than pool size {pool_size}",
+);
+
+Ok(())
+}
+
+#[tokio::test]
+async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch()
-> Result<()>
+{
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+get_size_of_record_batch_to_generate: Box::pin(move |i| {
+if i % 25 == 1 {
+pool_size / 4
+} else {
+16 * KB
+}
+}),
+memory_behavior: Default::default(),
+})
+.await?;
+
+Ok(())
+}
+
+#[tokio::test]
+async fn
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_changing_memory_reservation(
+) -> Result<()> {
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+get_size_of_record_batch_to_generate: Box::pin(move |i| {
+if i % 25 == 1 {
+pool_size / 4
+} else {
+16 * KB
+}
+}),
+memory_behavior:
MemoryBehavior::TakeAllMemoryAndReleaseEveryNthBatch(10),
+})
+.await?;
+
+Ok(())
+}
+
+#[tokio::test]
+async fn
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_take_all_memory(
+) -> Result<()> {
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212846002
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// Nee
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191875936
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
sort_batch(&batch, &expr, None)
})),
)));
-for spill in self.spill_state.spills.drain(..) {
-let stream =
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
self.spill_state.is_stream_merging = true;
self.input = StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut
self.spill_state.spills))
Review Comment:
Yes, but I think this is likely a small slowdown.
Comparing to 90% of the memory is taken by in-memory streams, and we have to
use the remaining 10% to re-spill lots of spills, this case can cause huge
slowdown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191760246
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
sort_batch(&batch, &expr, None)
})),
)));
-for spill in self.spill_state.spills.drain(..) {
-let stream =
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
self.spill_state.is_stream_merging = true;
self.input = StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut
self.spill_state.spills))
Review Comment:
But let's let for example this:
I have 1 spill file and 1 in memory stream, now I will spill even though I
don't need to
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191376010
##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
sort_batch(&batch, &expr, None)
})),
)));
-for spill in self.spill_state.spills.drain(..) {
-let stream =
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
self.spill_state.is_stream_merging = true;
self.input = StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut
self.spill_state.spills))
Review Comment:
I suggest to spill all in-memory batches (in `streams`) to disk, before this
final merging step. Also, let the multi pass merge operator also only handle
spill files, and don't have to handle in-mem batches and spills at the same
time.
This is just a simplification for now, we can do a optimization to avoid
this re-spill step in the future.
The issue is, without special handling, it's possible that in-mem batches
will take most of the available memory budget, and leave only a very small
memory part for multi-pass spilling to continue. This can cause slow downs or
even prevent some cases to finish.
We're already doing this in sort executor, see:
https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L336-L341
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191389061
##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
enable_round_robin_tie_breaker,
} = self;
-// Early return if streams or expressions are empty:
-if streams.is_empty() {
-return internal_err!("Streams cannot be empty for streaming
merge");
-}
+// Early return if expressions are empty:
let Some(expressions) = expressions else {
return internal_err!("Sort expressions cannot be empty for
streaming merge");
};
+if !sorted_spill_files.is_empty() {
Review Comment:
The current code flow is
```
Sort/Aggregate
-> StreamingMerge
-> MultiLevelMerge (with potential internal re-spills to ensure the final
merge can proceed under memory limit)
-> StreamingMerge
```
Then the first `StreamingMerge` -> `MultiLevelMerge` indirection implemented
here seems redundant.
How about let sort/aggregate directly use `MultiLevelMergeBuilder` instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191383325
##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) ->
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is
the last sorted run
+// TODO - We can write to disk before reading it back to avoid
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done
with it
+// but because we replaced the memory reservation of the merge
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// N
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3043682960 > > I would appreciate it, it would greatly help me > > @rluvaton I opened a pr on your fork. Would you take a look when you have some time? I **really** appriciate the PR but the changes are too large for me to review so I just did it myself: ``` +95,274 −36,420 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3043202973 > I would appreciate it, it would greatly help me @rluvaton I opened a pr on your fork. Would you take a look when you have some time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3036807094 I would appreciate it, it would greatly help me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3036770691 @rluvaton If you’d like, I can send a PR to your (fork's) branch that resolve merge conflicts since I already have one. Anyway there were only minor diffs to handle when I rebased your branch with main. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3036724762 So should I fix this PR conflicts? It seems like this pr has a change to be merged -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3034804482 https://github.com/apache/datafusion/pull/15700#discussion_r2041372025 I have a idea to fix this concern: adding a max merge degree configuration, if either a. SPM's estimated memory exceed budget b. configured max merge degree has reached do a re-spill. This approach I think has two advantages: 1. If batch size bloat happens after spill and read back roundtrip (see https://github.com/apache/datafusion/pull/15700#discussion_r2041372025), if there is a hard merge degree limit to override the estimation, query can still finish. 2. Also helpful to tune for speed: even we have enough memory to perform a very wide merge, limiting it to a smaller merge is still likely to run faster. I (or possibly @ding-young) can handle this patch in a follow-up PR. I think we can move forward with this one—I’ll review it in the next few days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3031592289 I've rebased this branch on the latest main and tested whether estimated size changes after we load `RecordBatch` which was compressed with `lz4_frame` into memory. The result of `get_actually_used_size()` was identical before and after (arrow-ipc `StreamReader` will return decoded array). Of course, since buffer allocations and copies happen internally during decoding, actual system memory usage (which DataFusion doesn't track) may temporarily be higher. Anyway, I've only tested for primitive type array + compression so I'll run a few more tests and try to see if I can reproduce any of the problematic cases discussed above. > Hi @adriangb, thanks for raising this point. I'm currently reviewing both this PR and the other cascading merge sort PR (#15610). I'm not taking sides between the two approaches, but I agree that accurately estimating memory consumption is tricky considering issues discussed above and the fact that now compression is supported in spill files. We may need to think more about whether we can special-case scenarios where the memory size changes after spilling and reloading, or perhaps add some kind of backup logic to handle such situations more gracefully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3021762356 Hi @adriangb, thanks for raising this point. I'm currently reviewing both this PR and the other cascading merge sort PR (https://github.com/apache/datafusion/pull/15610). I'm not taking sides between the two approaches, but I agree that accurately estimating memory consumption is tricky considering issues discussed above and the fact that now compression is supported in spill files. We may need to think more about whether we can special-case scenarios where the memory size changes after spilling and reloading, or perhaps add some kind of backup logic to handle such situations more gracefully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
adriangb commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3021699053 In the interest of this valuable work not being lost, is there any way that https://github.com/apache/datafusion/pull/15700#discussion_r2041372025 could be addressed by a method that's not more tests? Could we calculate the actual batch sizes every time we load into memory? Even if possible that opens up questions of what to do if we load a batch and now exceed our memory budget, but maybe it's a path forward? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906947109 > > @2010YOUY01 and @ding-young I wonder if you can review this PR again to help @rluvaton get it merged? > > > > > > Specifically if it needs more tests perhaps you can help identify which are needed > > > > I have some concerns about this PR's design direction (see more in https://github.com/apache/datafusion/pull/15700#discussion_r2041372025), and I don't think it can be addressed by more extensive tests. Why is that? You raised some concerns about miscalculating the size of the record batch, adding tests will make sure we are calculating correctly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906805485 > @2010YOUY01 and @ding-young I wonder if you can review this PR again to help @rluvaton get it merged? > > Specifically if it needs more tests perhaps you can help identify which are needed I have some concerns about this PR's design direction (see more in https://github.com/apache/datafusion/pull/15700#discussion_r2041372025), and I don't think it can be addressed by more extensive tests. By the way, this PR serves as an alternative to https://github.com/apache/datafusion/pull/15610. It's ready for another review, except that it needs to merge main. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
ding-young commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906803554 @alamb Sure! I may not be able to provide a detailed review right away, but I can definitely help by running the tests added in the PR locally and looking into memory accounting for the nested type that have been mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
alamb commented on PR #15700: URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906787792 @2010YOUY01 and @ding-young I wonder if you can review this PR again to help @rluvaton get it merged? Specifically if it needs more tests perhaps you can help identify which are needed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2104892196
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
If I added tests for every type to make sure the memory accounting is
correct would you approve?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2046041614
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
I think we should not estimate, even if it's correct 99% of the time, IMO
it's impossible to make sure it's always accurate for nested type's reader
implementation. If the estimate is way off for edge cases, the bug would be
hard to investigate.
If we want to follow this optimistic approach, the only required memory
accounting I think is during buffering batches inside `SortExec`, and all the
remaining memory-tracking code can be deleted to make the implementation much
more simpler, the potential problem is unexpected behavior for non-primitive
types (e.g. dictionary array's row format size can explode)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2045582560
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
So after looking at the code I came to the conclusion that this is still the
closest there is to accurately estimating memory
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041878079
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
I'm trying to reproduce that so I can better answer, how do you create that
string view array so it will cause what you said?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041629246
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
There might be some type of arrays with complex internal buffer management,
a simple example is:
Before spilling an `StringView` array has 10MB actual content, backed by 3 *
4MB buffer.
After spilling and reading back, the reader implementation decided to use 1
* 16MB buffer instead.
Different allocation policies caused different fragmentation status, and
physical memory consumed varies.
Here are some real bugs found recently due to similar reasons (this explains
why I'm worried about inconsistent memory size for logically equivalent
batches):
https://github.com/apache/datafusion/pull/14644
https://github.com/apache/datafusion/pull/14823
https://github.com/apache/datafusion/pull/13377
Note they're only caused by primitive and string arrays, for more complex
types like struct, array, or other nested types, I think it's even more likely
to see such inconsistency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041544882
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
Unless the actual array content before spill and after spill is different
this function will **always** return the correct result regardless of the spill
file format.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041544882
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
Unless the **actual array content** before spill and after spill is
different this function will **always** return the correct result regardless of
the spill file format.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041544882
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
Unless the **actual array content** before spill and after spill is
different this function will **always** return the correct result regardless of
the spill file format as we calculate the actual array content size.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041372025
##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
let batches_to_spill = std::mem::take(globally_sorted_batches);
self.reservation.free();
-let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be
initialized")
+})?;
for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());
Review Comment:
I think it's not realistic to correctly know a batch's size after a
roundtrip of spilling and reading back, with this `get_actually_used_size()`
implementation. The actual implementation might give us some surprise. The
implementation can get even more complex in the future, for example we might
implement extra encodings for
https://github.com/apache/datafusion/issues/14078, and the memory size of a
batch after reading back can be harder to estimate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]
rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041222937
##
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs:
##
@@ -753,3 +765,226 @@ async fn test_single_mode_aggregate_with_spill() ->
Result<()> {
Ok(())
}
+
+/// A Mock ExecutionPlan that can be used for writing tests of other
+/// ExecutionPlans
+pub struct StreamExec {
+/// the results to send back
+stream: Mutex>,
+/// if true (the default), sends data using a separate task to ensure the
+/// batches are not available without this stream yielding first
+use_task: bool,
+cache: PlanProperties,
+}
+
+impl Debug for StreamExec {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "StreamExec")
+}
+}
+
+impl StreamExec {
+/// Create a new `MockExec` with a single partition that returns
+/// the specified `Results`s.
+///
+/// By default, the batches are not produced immediately (the
+/// caller has to actually yield and another task must run) to
+/// ensure any poll loops are correct. This behavior can be
+/// changed with `with_use_task`
+pub fn new(stream: SendableRecordBatchStream) -> Self {
+let cache = Self::compute_properties(stream.schema());
+Self {
+stream: Mutex::new(Some(stream)),
+use_task: true,
+cache,
+}
+}
+
+/// If `use_task` is true (the default) then the batches are sent
+/// back using a separate task to ensure the underlying stream is
+/// not immediately ready
+pub fn with_use_task(mut self, use_task: bool) -> Self {
+self.use_task = use_task;
+self
+}
+
+/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
+fn compute_properties(schema: SchemaRef) -> PlanProperties {
+PlanProperties::new(
+EquivalenceProperties::new(schema),
+Partitioning::UnknownPartitioning(1),
+EmissionType::Incremental,
+Boundedness::Bounded,
+)
+}
+}
+
+impl DisplayAs for StreamExec {
+fn fmt_as(
+&self,
+t: DisplayFormatType,
+f: &mut Formatter,
+) -> std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+write!(f, "StreamExec:")
+}
+DisplayFormatType::TreeRender => {
+// TODO: collect info
+write!(f, "")
+}
+}
+}
+}
+
+impl ExecutionPlan for StreamExec {
+fn name(&self) -> &'static str {
+Self::static_name()
+}
+
+fn as_any(&self) -> &dyn Any {
+self
+}
+
+fn properties(&self) -> &PlanProperties {
+&self.cache
+}
+
+fn children(&self) -> Vec<&Arc> {
+vec![]
+}
+
+fn with_new_children(
+self: Arc,
+_: Vec>,
+) -> Result> {
+unimplemented!()
+}
+
+/// Returns a stream which yields data
+fn execute(
+&self,
+partition: usize,
+_context: Arc,
+) -> Result {
+assert_eq!(partition, 0);
+
+let stream = self.stream.lock().unwrap().take();
+
+stream.ok_or(DataFusionError::Internal(
+"Stream already consumed".to_string(),
+))
+}
+}
+
+
+#[tokio::test]
+async fn test_low_cardinality() -> Result<()> {
Review Comment:
This fails on main on OOM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
