Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-08-05 Thread via GitHub


ding-young commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3157119280

   @adriangb There are still some cases that external sorting gives up with 
memory allocation failure, especially when 
   (1) internal memory accounting is wrong 
https://github.com/apache/datafusion/issues/14748 (which is also the cause of 
https://github.com/apache/datafusion/issues/16979)
   
   and (2) when memory pressure is so high that multi level merge step can't 
grow the reservation 
   
https://github.com/apache/datafusion/blob/6043be448fe19333b0f1469026a62c580f7bbb31/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L381
   
   We need to estimate the memory required for row-formatted batch correctly 
for (1) and further limit the number of spills to merge if multi level merge 
fails (https://github.com/apache/datafusion/issues/16908) for (2). I'm working 
on these follow up issues recently but it will take time.   
   
   Anyway, until these fixes are done, I'd recommend trying to run above query 
with a smaller `partitions` setting. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-08-05 Thread via GitHub


adriangb commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3156469785

   I'm trying this out for our compaction system and am not able to get my sort 
to work without hitting memory limits. Note that I am using `datafusion-cli` 
but am not sure if it has a disk manager, etc. configured, but I figure if I 
can't reproduce it's maybe not obvious how to configure datafusion-cli so it's 
a fair question:
   
   In `q.sql`:
   
   ```sql
   -- About 6.32 GB of parquet compressed (~ 10 x compression ratio)
   -- Split into ~60 ~100 MB files
   CREATE EXTERNAL TABLE t1
   STORED AS PARQUET
   LOCATION '/Users/adriangb/Downloads/data/day=2025-08-05/';
   
   SET datafusion.execution.sort_spill_reservation_bytes = 0;
   
   COPY (
   SELECT *
   FROM t1
   ORDER BY deployment_environment, kind, service_name, trace_id
   )
   TO '/Users/adriangb/Downloads/out.parquet';
   ```
   
   ```shell
   ❯ ./target/release/datafusion-cli --mem-pool-type 'fair' --memory-limit '1g' 
-f q.sql
   DataFusion CLI v49.0.0
   0 row(s) fetched. 
   Elapsed 0.244 seconds.
   
   0 row(s) fetched. 
   Elapsed 0.000 seconds.
   
   +---+---+
   | plan_type | plan  |
   +---+---+
   | physical_plan | ┌───┐ |
   |   | │DataSinkExec   │ |
   |   | └─┬─┘ |
   |   | ┌─┴─┐ |
   |   | │  SortPreservingMergeExec  │ |
   |   | │   │ |
   |   | │ deployment_environment ASC│ |
   |   | │NULLS LAST, kind ASC   │ |
   |   | │ NULLS LAST,   │ |
   |   | │service_name   │ |
   |   | │   ASC NULLS LAST, │ |
   |   | │ trace_id ASC NULLS│ |
   |   | │LAST   │ |
   |   | └─┬─┘ |
   |   | ┌─┴─┐ |
   |   | │  SortExec │ |
   |   | │   │ |
   |   | │ deployment_environment@35 │ |
   |   | │   ASC NULLS LAST, kind@6  │ |
   |   | │   ASC NULLS LAST, │ |
   |   | │   service_name@27 │ |
   |   | │   ASC NULLS LAST, │ |
   |   | │   trace_id@4 ASC  │ |
   |   | │ NULLS LAST│ |
   |   | └─┬─┘ |
   |   | ┌─┴─┐ |
   |   | │   DataSourceExec  │ |
   |   | │   │ |
   |   | │ files: 68 │ |
   |   | │  format: parquet  │ |
   |   | └───┘ |
   |   |   |
   +---+---+
   1 row(s) fetched. 
   Elapsed 0.254 seconds.
   
   Not enough memory to continue external sort. Consider increasing the memory 
limit, or decreasing sort_spill_reservation_bytes
   caused by
   Resources exhausted: Additional allocation failed with top memory consumers 
(across reservations) as:
 ExternalSorter[10]#25(can spill: true) consumed 78.2 MB,
 ExternalSorter[11]#27(can spill: true) consumed 77.2 MB,
 ExternalSorter[7]#19(can spill: true) consumed 75.7 MB.
   Error: Failed to allocate additional 90.1 MB for ExternalSorter[6] with 0.0 
B already allocated for this reservation - 82.2 MB remain available for the 
total pool
   ```
   
   I can maybe share the data with some sort of NDA but honestly it's not that 
interesting, it's just a lot of random data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-29 Thread via GitHub


alamb commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2240660130


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+///
+/// This is a wrapper around 
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can 
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the 
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we 
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+///  - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+///  - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files

Review Comment:
   😍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-29 Thread via GitHub


alamb commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3133626834

   EPIC!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-28 Thread via GitHub


2010YOUY01 commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3130543883

   > Amazing work! Aside from updating to `main` what do users of DataFusion 
need to do to use this improvement?
   
   Nothing else needed, it will be triggered automatically and it doesn't 
require a configuration to control.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-28 Thread via GitHub


adriangb commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3130446693

   Amazing work! Aside from updating to `main` what do users of DataFusion need 
to do to use this improvement?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-28 Thread via GitHub


2010YOUY01 commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3130380274

   Thanks everyone @rluvaton @ding-young and others who have helped along the 
way 🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-28 Thread via GitHub


2010YOUY01 merged PR #15700:
URL: https://github.com/apache/datafusion/pull/15700


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-25 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230996830


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+///
+/// This is a wrapper around 
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can 
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the 
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we 
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+///  - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+///  - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+///  ┌───┐
+///  │  Phase 1  │
+///  └───┘
+/// ┌──Can hold in memory─┐
+/// │   ┌──┐  │
+/// │   │  In-memory   │
+/// │   │sorted stream │──┼┐
+/// │   │  1   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││
+/// │   │  In-memory   │   │
+/// │   │sorted stream │──┼┤
+/// │   │  2   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││
+/// │   │  In-memory   │   │
+/// │   │sorted stream │──┼┤
+/// │   │  3   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││┌───┐
+/// │   │ Sorted Spill │   ││  Phase 2  │
+/// │   │file 1│──┼┤└───┘
+/// │   └──┘  ││
+///      ─┘│   ┌──Can hold in memory─┐
+///│   │ │
+/// ┌──┐   │   │   ┌──┐
+/// │ Sorted Spill │   │   │   │ Sorted Spill │  │
+/// │file 2│──▶│file 2│──┼─┐
+/// └──┘   │   └──┘  │ │
+/// ┌──┐   │   │   ┌──┐  │ │
+/// │ Sorted Spill │   │   │   │ Sorted Spill ││
+/// │file 3│──▶│file 3│──┼─┤
+/// └──┘   │   │   └──┘  │ │
+/// ┌──┐   │   ┌──┐  │ │
+/// │ Sorted Spill │   │   │   │ Sorted Spill │  │ │
+/// │file 4│──▶│file 4│┤   
   ┌───┐
+/// └──┘   │   │   └──┘  │ │   
   │  Phase 3  │
+///│   │ │ │   
   └───┘
+///│    ─┘ │ 
┌──Can hold in memory─┐
+///│  

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-25 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230995605


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+///
+/// This is a wrapper around 
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can 
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the 
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we 
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+///  - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+///  - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+///  ┌───┐
+///  │  Phase 1  │
+///  └───┘
+/// ┌──Can hold in memory─┐
+/// │   ┌──┐  │
+/// │   │  In-memory   │
+/// │   │sorted stream │──┼┐
+/// │   │  1   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││
+/// │   │  In-memory   │   │
+/// │   │sorted stream │──┼┤
+/// │   │  2   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││
+/// │   │  In-memory   │   │
+/// │   │sorted stream │──┼┤
+/// │   │  3   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││┌───┐
+/// │   │ Sorted Spill │   ││  Phase 2  │
+/// │   │file 1│──┼┤└───┘
+/// │   └──┘  ││
+///      ─┘│   ┌──Can hold in memory─┐
+///│   │ │
+/// ┌──┐   │   │   ┌──┐
+/// │ Sorted Spill │   │   │   │ Sorted Spill │  │
+/// │file 2│──▶│file 2│──┼─┐
+/// └──┘   │   └──┘  │ │
+/// ┌──┐   │   │   ┌──┐  │ │
+/// │ Sorted Spill │   │   │   │ Sorted Spill ││
+/// │file 3│──▶│file 3│──┼─┤
+/// └──┘   │   │   └──┘  │ │
+/// ┌──┐   │   ┌──┐  │ │
+/// │ Sorted Spill │   │   │   │ Sorted Spill │  │ │
+/// │file 4│──▶│file 4│┤   
   ┌───┐
+/// └──┘   │   │   └──┘  │ │   
   │  Phase 3  │
+///│   │ │ │   
   └───┘
+///│    ─┘ │ 
┌──Can hold in memory─┐
+///│  

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-25 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230992741


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -17,11 +17,10 @@
 
 //! Define the `SpillManager` struct, which is responsible for reading and 
writing `RecordBatch`es to raw files based on the provided configurations.
 
-use std::sync::Arc;
-
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_execution::runtime_env::RuntimeEnv;
+use std::sync::Arc;

Review Comment:
   isn't the import order defined in clippy/fmt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-24 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230075680


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+///
+/// This is a wrapper around 
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can 
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the 
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we 
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+///  - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+///  - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+///  ┌───┐
+///  │  Phase 1  │
+///  └───┘
+/// ┌──Can hold in memory─┐
+/// │   ┌──┐  │
+/// │   │  In-memory   │
+/// │   │sorted stream │──┼┐
+/// │   │  1   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││
+/// │   │  In-memory   │   │
+/// │   │sorted stream │──┼┤
+/// │   │  2   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││
+/// │   │  In-memory   │   │
+/// │   │sorted stream │──┼┤
+/// │   │  3   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││┌───┐
+/// │   │ Sorted Spill │   ││  Phase 2  │
+/// │   │file 1│──┼┤└───┘
+/// │   └──┘  ││
+///      ─┘│   ┌──Can hold in memory─┐
+///│   │ │
+/// ┌──┐   │   │   ┌──┐
+/// │ Sorted Spill │   │   │   │ Sorted Spill │  │
+/// │file 2│──▶│file 2│──┼─┐
+/// └──┘   │   └──┘  │ │
+/// ┌──┐   │   │   ┌──┐  │ │
+/// │ Sorted Spill │   │   │   │ Sorted Spill ││
+/// │file 3│──▶│file 3│──┼─┤
+/// └──┘   │   │   └──┘  │ │
+/// ┌──┐   │   ┌──┐  │ │
+/// │ Sorted Spill │   │   │   │ Sorted Spill │  │ │
+/// │file 4│──▶│file 4│┤   
   ┌───┐
+/// └──┘   │   │   └──┘  │ │   
   │  Phase 3  │
+///│   │ │ │   
   └───┘
+///│    ─┘ │ 
┌──Can hold in memory─┐
+///│

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-24 Thread via GitHub


Copilot commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2230055548


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -17,11 +17,10 @@
 
 //! Define the `SpillManager` struct, which is responsible for reading and 
writing `RecordBatch`es to raw files based on the provided configurations.
 
-use std::sync::Arc;
-
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_execution::runtime_env::RuntimeEnv;
+use std::sync::Arc;

Review Comment:
   [nitpick] The import order has been changed unnecessarily. The original 
import order (std imports first, then external crates) followed Rust 
conventions better.
   ```suggestion
   use std::sync::Arc;
   
   use arrow::datatypes::SchemaRef;
   use arrow::record_batch::RecordBatch;
   use datafusion_execution::runtime_env::RuntimeEnv;
   ```



##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -119,9 +158,22 @@ impl<'a> StreamingMergeBuilder<'a> {
 self
 }
 
+/// Bypass the mempool and avoid using the memory reservation.
+///
+/// This is not marked as `pub` because it is not recommended to use this 
method

Review Comment:
   The comment should explain why this method is not recommended and what the 
risks are of bypassing the memory pool.
   ```suggestion
   ///
   /// This method bypasses the memory pool, which can lead to unregulated 
memory usage.
   /// Using an unbounded memory pool may result in excessive memory 
consumption and
   /// potential system instability if memory usage exceeds available 
resources.
   /// This is not marked as `pub` because it is not recommended to use 
this method
   /// except in testing or controlled scenarios where memory usage is 
closely monitored.
   ```



##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,449 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::MemoryReservation;
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+///
+/// This is a wrapper around 
[`SortPreservingMergeStream`](crate::sorts::merge::SortPreservingMergeStream)
+/// that provide it the sorted streams/files to merge while making sure we can 
merge them in memory.
+/// In case we can't merge all of them in a single pass we will spill the 
intermediate results to disk
+/// and repeat the process.
+///
+/// ## High level Algorithm
+/// 1. Get the maximum amount of sorted in-memory streams and spill files we 
can merge with the available memory
+/// 2. Sort them to a sorted stream
+/// 3. Do we have more spill files to merge?
+///  - Yes: write that sorted stream to a spill file,
+///add that spill file back to the spill files to merge and
+///repeat the process
+///
+///  - No: return that sorted stream as the final output stream
+///
+/// ```text
+/// Initial State: Multiple sorted streams + spill files
+///  ┌───┐
+///  │  Phase 1  │
+///  └───┘
+/// ┌──Can hold in memory─┐
+/// │   ┌──┐  │
+/// │   │  In-memory   │
+/// │   │sorted stream │──┼┐
+/// │   │  1   │  ││
+/// └──┘  ││
+/// │   ┌──┐  ││
+/// │   │  In-memory   │   │
+/// │   │sorted stream │──┼┤
+/// │   │  2   │  ││
+/// └─

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-24 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2227706228


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -140,3 +210,19 @@ impl SpillManager {
 Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
 }
 }
+
+pub(crate) trait GetSlicedSize {
+/// Returns the size of the `RecordBatch` when sliced.
+fn get_sliced_size(&self) -> usize;
+}
+
+impl GetSlicedSize for RecordBatch {
+fn get_sliced_size(&self) -> usize {
+let mut total = 0;
+for array in self.columns() {
+let data = array.to_data();
+total += data.get_slice_memory_size().unwrap();
+}
+total

Review Comment:
   I blindly copy pasted the code 😅
   
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-24 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2227706228


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -140,3 +210,19 @@ impl SpillManager {
 Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
 }
 }
+
+pub(crate) trait GetSlicedSize {
+/// Returns the size of the `RecordBatch` when sliced.
+fn get_sliced_size(&self) -> usize;
+}
+
+impl GetSlicedSize for RecordBatch {
+fn get_sliced_size(&self) -> usize {
+let mut total = 0;
+for array in self.columns() {
+let data = array.to_data();
+total += data.get_slice_memory_size().unwrap();
+}
+total

Review Comment:
   I blindly copy pasted the code 😅 fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-23 Thread via GitHub


ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2227210285


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -140,3 +210,19 @@ impl SpillManager {
 Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
 }
 }
+
+pub(crate) trait GetSlicedSize {
+/// Returns the size of the `RecordBatch` when sliced.
+fn get_sliced_size(&self) -> usize;
+}
+
+impl GetSlicedSize for RecordBatch {
+fn get_sliced_size(&self) -> usize {
+let mut total = 0;
+for array in self.columns() {
+let data = array.to_data();
+total += data.get_slice_memory_size().unwrap();
+}
+total

Review Comment:
   Thanks for applying the suggestion! Let's return `Result` instead of 
calling `unwrap()`. Since we already support `From for 
DataFusionError`, we can simply propagate the error using ?.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-23 Thread via GitHub


rluvaton commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3109894011

   can you please re-review I don't believe there are any actionable comments 
from my side to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-23 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2226469366


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   changed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-23 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2226461212


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   great! this memory is accurate!, I'm so glad about this API, will change now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-23 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2226402212


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   I'm not familiar with `Self::get_slice_memory_size` is it new? I will test 
it against my current function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2221120019


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   I see the reason—currently, it's inevitable for aggregation to generate a 
large intermediate batch when spilling.
   ~~Let's add some unit tests for this `get_actual_size()` API before 
merging~~ It seems there's an existing Arrow utility for the slice size 
estimation mentioned above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2221120019


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   I see the reason—currently, it's inevitable for aggregation to generate a 
large intermediate batch when spilling.
   Let's add some unit tests for this `get_actual_size()` API before merging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2221118843


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   Thank you for the clarification. My next question is, if so, then what do 
you think about using existing arrow api 
[`get_slice_memory_size`](https://docs.rs/arrow/latest/arrow/array/struct.ArrayData.html#method.get_slice_memory_size)?
 If its memory estimation is reasonably close to what you had in mind, (and if 
the only cause of overestimation is slicing) we might be able to use that 
instead. Thereby we may avoid the extra effort of porting your logic into 
`arrow-rs`.
   
   Like how you added `get_actually_used_size`, we may do the same. 
   ```rust
   impl GetSlicedSize for RecordBatch {
   fn get_sliced_size(&self) -> usize {
   let mut total = 0;
   for array in self.columns() {
   let data = array.to_data();
   total += data.get_slice_memory_size().unwrap();
   }
   total
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219474642


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   slicing indeed overestimate, but the special case handling is the 
`get_size`, but instead of only using in aggregation  I've used it everywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219148029


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -125,6 +133,156 @@ impl SpillManager {
 self.spill_record_batch_and_finish(&batches, request_description)
 }
 
+/// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+/// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+///
+/// # Errors
+/// - Returns an error if spilling would exceed the disk usage limit 
configured
+///   by `max_temp_directory_size` in `DiskManager`
+pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+&self,
+batch: &RecordBatch,
+request_description: &str,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219057789


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream

Review Comment:
   changed and made sure to use the memory reservation when only merging 
in-memory streams otherwise use the worst case scenario 



##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governi

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219143884


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,

Review Comment:
   added and created the temporary unbounded memory pool inside SPM.
   
   made that function `pub(super)` to avoid exposing this to the users as I 
feel it is a hack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219141893


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream

Review Comment:
   added with diagram



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219098075


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -125,6 +133,156 @@ impl SpillManager {
 self.spill_record_batch_and_finish(&batches, request_description)
 }
 
+/// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+/// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+///
+/// # Errors
+/// - Returns an error if spilling would exceed the disk usage limit 
configured
+///   by `max_temp_directory_size` in `DiskManager`
+pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+&self,
+batch: &RecordBatch,
+request_description: &str,
+row_limit: usize,
+) -> Result> {
+let total_rows = batch.num_rows();
+let mut batches = Vec::new();
+let mut offset = 0;
+
+// It's ok to calculate all slices first, because slicing is zero-copy.
+while offset < total_rows {
+let length = std::cmp::min(total_rows - offset, row_limit);
+let sliced_batch = batch.slice(offset, length);
+batches.push(sliced_batch);
+offset += length;
+}
+
+let mut in_progress_file = 
self.create_in_progress_file(request_description)?;
+
+let mut max_record_batch_size = 0;
+
+for batch in batches {
+in_progress_file.append_batch(&batch)?;
+
+max_record_batch_size =
+max_record_batch_size.max(batch.get_actually_used_size());
+}
+
+let file = in_progress_file.finish()?;
+
+Ok(file.map(|f| (f, max_record_batch_size)))
+}
+
+/// Spill the `RecordBatch` to disk as smaller batches
+/// split by `batch_size_rows`.
+///
+/// will return the spill file and the size of the largest batch in memory
+pub async fn spill_record_batch_stream_by_size(
+&self,
+stream: &mut SendableRecordBatchStream,
+batch_size_rows: usize,
+request_msg: &str,
+) -> Result> {
+use futures::StreamExt;
+let mut in_progress_file = self.create_in_progress_file(request_msg)?;
+
+let mut max_record_batch_size = 0;
+
+let mut maybe_last_batch: Option = None;
+
+while let Some(batch) = stream.next().await {
+let mut batch = batch?;
+
+if let Some(mut last_batch) = maybe_last_batch.take() {
+assert!(
+last_batch.num_rows() < batch_size_rows,
+"last batch size must be smaller than the requested batch 
size"
+);
+
+// Get the number of rows to take from current batch so the 
last_batch
+// will have `batch_size_rows` rows
+let current_batch_offset = std::cmp::min(
+// rows needed to fill
+batch_size_rows - last_batch.num_rows(),
+// Current length of the batch
+batch.num_rows(),
+);
+
+// if have last batch that has less rows than concat and spill
+last_batch = arrow::compute::concat_batches(
+&stream.schema(),
+&[last_batch, batch.slice(0, current_batch_offset)],
+)?;
+
+assert!(last_batch.num_rows() <= batch_size_rows, "must build 
a batch that is smaller or equal to the requested batch size from the current 
batch");
+
+// If not enough rows
+if last_batch.num_rows() < batch_size_rows {
+// keep the last batch for next iteration
+maybe_last_batch = Some(last_batch);
+continue;
+}
+
+max_record_batch_size =
+
max_record_batch_size.max(last_batch.get_actually_used_size());
+
+in_progress_file.append_batch(&last_batch)?;
+
+if current_batch_offset == batch.num_rows() {
+// No remainder
+continue;
+}
+
+// remainder
+batch = batch.slice(
+current_batch_offset,
+batch.num_rows() - current_batch_offset,
+);
+}
+
+let mut offset = 0;
+let total_rows = batch.num_rows();
+
+// Keep slicing the batch until we have left with a batch that is 
smaller than

Review Comment:
   you are right, simplify



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-21 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2219057789


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream

Review Comment:
   changed and made sure to use the memory reservation when only merging 
in-memory streams



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-20 Thread via GitHub


ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218214013


##
datafusion/physical-plan/src/spill/spill_manager.rs:
##
@@ -125,6 +133,156 @@ impl SpillManager {
 self.spill_record_batch_and_finish(&batches, request_description)
 }
 
+/// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+/// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+///
+/// # Errors
+/// - Returns an error if spilling would exceed the disk usage limit 
configured
+///   by `max_temp_directory_size` in `DiskManager`
+pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory(
+&self,
+batch: &RecordBatch,
+request_description: &str,

Review Comment:
   Nit: Maybe it might be better to unify the naming. Some functions use 
`request_msg`, others use `request_description`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-20 Thread via GitHub


ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218105116


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   I've also experienced a case where using `batch.get_array_memory_size` leads 
to overestimation in aggregation. But that's not because arrow overallocates 
buffers, but because we **"slice"** the batch when we spill in 
`spill_record_batch_by_size_(and_return_max_batch_memory)`. Slicing the batch 
does not modify the buffer capacity, which `batch.get_array_memory_size` relies 
on, so therefore each sliced batch reports the full buffer memory usage, even 
though the buffers are shared between the slices.   
   
When running 
`test_single_mode_aggregate_single_mode_aggregate_with_spill()` in 
`aggregation_fuzz` with [reproducer 
branch](https://github.com/ding-young/datafusion/tree/batch-memory-reproducer), 
we can verify that naively relying on `batch.get_array_memory_size` with sliced 
batch may lead to overestimation. 
   ```
    
fuzz_cases::aggregate_fuzz::test_single_mode_aggregate_single_mode_aggregate_with_spill
 stdout 
   batch.get_actually_used_size:21492, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21624, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21635, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21620, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21636, batch.get_array_memory_size:178946
   ...
   ```
   
   If slicing was the only cause for overestimation, I think we can just add 
special case handling for aggregation and keep it simple with given 
`batch.get_array_memory_size` api.  
   
   I wonder if the overestimation @rluvaton encountered was caused by this 
issue, or if it was due to something else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-20 Thread via GitHub


ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218105116


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   I've also experienced a case where using `batch.get_array_memory_size` leads 
to overestimation in aggregation. But that's not because arrow overallocates 
buffers, but because we **"slice"** the batch when we spill in 
`spill_record_batch_by_size_(and_return_max_batch_memory)`. Slicing the batch 
does not modify the buffer capacity, which `batch.get_array_memory_size` relies 
on, so therefore each sliced batch reports the full buffer memory usage, even 
though the buffers are shared between the slices.   
   
When running 
`test_single_mode_aggregate_single_mode_aggregate_with_spill()` in 
`aggregation_fuzz` with [reproducer 
branch](https://github.com/ding-young/datafusion/tree/batch-memory-reproducer), 
we can verify that naively relying on `batch.get_array_memory_size` with sliced 
batch may lead to overestimation. 
   ```
    
fuzz_cases::aggregate_fuzz::test_single_mode_aggregate_single_mode_aggregate_with_spill
 stdout 
   batch.get_actually_used_size:21492, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21624, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21635, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21620, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21636, batch.get_array_memory_size:178946
   ...
   ```
   
   Since sort does not use `spill_record_batch_by_size` api, so no slicing is 
involved, (I believe) only aggregation suffered from this issue. If slicing was 
the only cause for overestimation, I think we can just add special case 
handling for aggregation and keep it simple with given 
`batch.get_array_memory_size` api.  
   
   I wonder if the overestimation @rluvaton encountered was caused by this 
issue, or if it was due to something else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-20 Thread via GitHub


ding-young commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2218105116


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   I've also experienced a case where using `batch.get_array_memory_size` leads 
to overestimation in aggregation. But that's not because arrow overallocates 
buffers, but because we **"slice"** the batch when we spill in 
`spill_record_batch_by_size_(and_return_max_batch_memory)`. Slicing the batch 
does not modify the buffer capacity, which `batch.get_array_memory_size` relies 
on, so therefore each sliced batch reports the full buffer memory usage, even 
though the buffers are shared between the slices.   
   
When running 
`test_single_mode_aggregate_single_mode_aggregate_with_spill()` in 
`aggregation_fuzz` with [reproducer 
branch](https://github.com/ding-young/datafusion/tree/batch-memory-reproducer), 
we can verify that naively relying on `batch.get_array_memory_size` with sliced 
batch may lead to overestimation. 
   ```
    
fuzz_cases::aggregate_fuzz::test_single_mode_aggregate_single_mode_aggregate_with_spill
 stdout 
   batch.get_actually_used_size:21492, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21624, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21635, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21620, batch.get_array_memory_size:178946
   batch.get_actually_used_size:21636, batch.get_array_memory_size:178946
   ...
   ```
   
   Since sort does not use `spill_record_batch_by_size` api, so no slicing is 
involved, (I believe) only aggregation suffered from this issue. If slicing was 
the only cause for overestimation, I think we can just add special case 
handling for aggregation and keep it simple with given 
`batch.get_array_memory_size` api.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-19 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2217240477


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// N

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-19 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2217220186


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::sort::get_reserved_byte_for_record_batch_size;
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream

Review Comment:
   If we have this reservation with the same lifetime as the stream, would it 
be better to create a `MultiLevelMergeStream` and make this reservation a 
struct field?



##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
 enable_round_robin_tie_breaker,
 } = self;
 
-// Early return if streams or expressions are empty:
-if streams.is_empty() {
-return internal_err!("Streams cannot be empty for streaming 
merge");
-}
+// Early return if expressions are empty:
 let Some(expressions) = expressions else {
 return internal_err!("Sort expressions cannot be empty for 
streaming merge");
 };
 
+if !sorted_spill_files.is_empty() {

Review Comment:
   I agree it's good to reduce the number of APIs, then two approaches seem to 
have similar complexity.



##
datafusion/physical-plan/src/sorts/multi

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3084017814

   @2010YOUY01 I've updated based on your comments and commented back on some


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213305286


##
datafusion/physical-plan/src/spill/get_size.rs:
##
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{
+Array, ArrayRef, ArrowPrimitiveType, BooleanArray, FixedSizeBinaryArray,
+FixedSizeListArray, GenericByteArray, GenericListArray, OffsetSizeTrait,
+PrimitiveArray, RecordBatch, StructArray,
+};
+use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
+use arrow::datatypes::{ArrowNativeType, ByteArrayType};
+use arrow::downcast_primitive_array;
+use arrow_schema::DataType;
+
+/// TODO - NEED TO MOVE THIS TO ARROW

Review Comment:
   The reason why I did that is that it can overestimate by a lot causing more 
spills and really reducing performance, I had this because I experienced it 
over-calculated buffers to be 100MB while it was not that large for aggregation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213294534


##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
 enable_round_robin_tie_breaker,
 } = self;
 
-// Early return if streams or expressions are empty:
-if streams.is_empty() {
-return internal_err!("Streams cannot be empty for streaming 
merge");
-}
+// Early return if expressions are empty:
 let Some(expressions) = expressions else {
 return internal_err!("Sort expressions cannot be empty for 
streaming merge");
 };
 
+if !sorted_spill_files.is_empty() {

Review Comment:
   I wanted a single entrypoint for users to merge multiple streams, it can be 
files or not.
   
   the reason I love DataFusion is that it uses it's own public API, meaning I 
can copy paste the operator code and have it working without the need to copy 
the entire codebase.
   
   and I don't want to set `MultiLevelMergeBuilder` public for the same reason 
`SortPreservingMergeStream` is not public



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213289055


##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
 sort_batch(&batch, &expr, None)
 })),
 )));
-for spill in self.spill_state.spills.drain(..) {
-let stream = 
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
 self.spill_state.is_stream_merging = true;
 self.input = StreamingMergeBuilder::new()
 .with_streams(streams)
 .with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut 
self.spill_state.spills))

Review Comment:
   > The issue is, without special handling, it's possible that in-mem batches 
will take most of the available memory budget, and leave only a very small 
memory part for multi-pass spilling to continue. This can cause slow downs or 
even prevent some cases to finish.
   
   Actually this case is already handled, so if I have 4 in memory streams and 
5 spill files.
   
   and I don't have enough memory to merge with any spill file we will merge 
the in memory streams and spill to disk and next iteration will be the same as 
if we got 6 spill files.
   
   this is the same behavior as spilling before but optimized
   
   See:
   
https://github.com/rluvaton/datafusion/blob/af6b5c52afe316127cd007cd8464d07c3038a0f9/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L178-L203
   
   
   we first `take` the `in_memory_streams` (and don't add them back anymore) 
   
https://github.com/rluvaton/datafusion/blob/af6b5c52afe316127cd007cd8464d07c3038a0f9/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L180
   
   and we try to get spill files to merge but the required number of spill 
files needed is 2 - `in_memory_stream.len()`
   
https://github.com/rluvaton/datafusion/blob/af6b5c52afe316127cd007cd8464d07c3038a0f9/datafusion/physical-plan/src/sorts/multi_level_merge.rs#L185-L186
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213223988


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// Nee

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213137522


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}

Review Comment:
   N

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213137522


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}

Review Comment:
   N

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213137522


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}

Review Comment:
   N

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213099661


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}

Review Comment:
   W

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213099661


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}

Review Comment:
   W

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2213099661


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}

Review Comment:
   W

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212846002


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// Nee

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212828103


##
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##
@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> 
Vec {
 
 batches
 }
+

Review Comment:
   moved to the same file with comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212827518


##
datafusion/core/tests/fuzz_cases/sort_fuzz.rs:
##
@@ -377,3 +388,335 @@ fn make_staggered_i32_utf8_batches(len: usize) -> 
Vec {
 
 batches
 }
+
+#[tokio::test]
+async fn test_sort_with_limited_memory() -> Result<()> {
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+let record_batch_size = pool_size / 16;
+
+// Basic test with a lot of groups that cannot all fit in memory and 1 
record batch
+// from each spill file is too much memory
+let spill_count =
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+get_size_of_record_batch_to_generate: Box::pin(move |_| 
record_batch_size),
+memory_behavior: Default::default(),
+})
+.await?;
+
+let total_spill_files_size = spill_count * record_batch_size;
+assert!(
+total_spill_files_size > pool_size,
+"Total spill files size {total_spill_files_size} should be greater 
than pool size {pool_size}",
+);
+
+Ok(())
+}
+
+#[tokio::test]
+async fn test_sort_with_limited_memory_and_different_sizes_of_record_batch() 
-> Result<()>
+{
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+get_size_of_record_batch_to_generate: Box::pin(move |i| {
+if i % 25 == 1 {
+pool_size / 4
+} else {
+16 * KB
+}
+}),
+memory_behavior: Default::default(),
+})
+.await?;
+
+Ok(())
+}
+
+#[tokio::test]
+async fn 
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_changing_memory_reservation(
+) -> Result<()> {
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+get_size_of_record_batch_to_generate: Box::pin(move |i| {
+if i % 25 == 1 {
+pool_size / 4
+} else {
+16 * KB
+}
+}),
+memory_behavior: 
MemoryBehavior::TakeAllMemoryAndReleaseEveryNthBatch(10),
+})
+.await?;
+
+Ok(())
+}
+
+#[tokio::test]
+async fn 
test_sort_with_limited_memory_and_different_sizes_of_record_batch_and_take_all_memory(
+) -> Result<()> {
+let record_batch_size = 8192;
+let pool_size = 2 * MB as usize;
+let task_ctx = {
+let memory_pool = Arc::new(FairSpillPool::new(pool_size));
+TaskContext::default()
+.with_session_config(
+SessionConfig::new()
+.with_batch_size(record_batch_size)
+.with_sort_spill_reservation_bytes(1),
+)
+.with_runtime(Arc::new(
+RuntimeEnvBuilder::new()
+.with_memory_pool(memory_pool)
+.build()?,
+))
+};
+
+run_sort_test_with_limited_memory(RunSortTestWithLimitedMemoryArgs {
+pool_size,
+task_ctx,
+number_of_record_batches: 100,
+   

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-17 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2212846002


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// Nee

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-08 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191875936


##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
 sort_batch(&batch, &expr, None)
 })),
 )));
-for spill in self.spill_state.spills.drain(..) {
-let stream = 
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
 self.spill_state.is_stream_merging = true;
 self.input = StreamingMergeBuilder::new()
 .with_streams(streams)
 .with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut 
self.spill_state.spills))

Review Comment:
   Yes, but I think this is likely a small slowdown.
   Comparing to 90% of the memory is taken by in-memory streams, and we have to 
use the remaining 10% to re-spill lots of spills, this case can cause huge 
slowdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-08 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191760246


##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
 sort_batch(&batch, &expr, None)
 })),
 )));
-for spill in self.spill_state.spills.drain(..) {
-let stream = 
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
 self.spill_state.is_stream_merging = true;
 self.input = StreamingMergeBuilder::new()
 .with_streams(streams)
 .with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut 
self.spill_state.spills))

Review Comment:
   But let's let for example this:
   I have 1 spill file and 1 in memory stream, now I will spill even though I 
don't need to



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-07 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191376010


##
datafusion/physical-plan/src/aggregates/row_hash.rs:
##
@@ -1067,14 +1074,13 @@ impl GroupedHashAggregateStream {
 sort_batch(&batch, &expr, None)
 })),
 )));
-for spill in self.spill_state.spills.drain(..) {
-let stream = 
self.spill_state.spill_manager.read_spill_as_stream(spill)?;
-streams.push(stream);
-}
+
 self.spill_state.is_stream_merging = true;
 self.input = StreamingMergeBuilder::new()
 .with_streams(streams)
 .with_schema(schema)
+.with_spill_manager(self.spill_state.spill_manager.clone())
+.with_sorted_spill_files(std::mem::take(&mut 
self.spill_state.spills))

Review Comment:
   I suggest to spill all in-memory batches (in `streams`) to disk, before this 
final merging step. Also, let the multi pass merge operator also only handle 
spill files, and don't have to handle in-mem batches and spills at the same 
time.
   
   This is just a simplification for now, we can do a optimization to avoid 
this re-spill step in the future.
   
   The issue is, without special handling, it's possible that in-mem batches 
will take most of the available memory budget, and leave only a very small 
memory part for multi-pass spilling to continue. This can cause slow downs or 
even prevent some cases to finish.
   
   We're already doing this in sort executor, see: 
https://github.com/apache/datafusion/blob/14487ddc275fc1f148f339293664fe7f83d91d09/datafusion/physical-plan/src/sorts/sort.rs#L336-L341



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-07 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191389061


##
datafusion/physical-plan/src/sorts/streaming_merge.rs:
##
@@ -131,14 +168,42 @@ impl<'a> StreamingMergeBuilder<'a> {
 enable_round_robin_tie_breaker,
 } = self;
 
-// Early return if streams or expressions are empty:
-if streams.is_empty() {
-return internal_err!("Streams cannot be empty for streaming 
merge");
-}
+// Early return if expressions are empty:
 let Some(expressions) = expressions else {
 return internal_err!("Sort expressions cannot be empty for 
streaming merge");
 };
 
+if !sorted_spill_files.is_empty() {

Review Comment:
   The current code flow is
   ```
   Sort/Aggregate
   -> StreamingMerge
   -> MultiLevelMerge (with potential internal re-spills to ensure the final 
merge can proceed under memory limit)
   -> StreamingMerge
   ```
   Then the first `StreamingMerge` -> `MultiLevelMerge` indirection implemented 
here seems redundant.
   
   How about let sort/aggregate directly use `MultiLevelMergeBuilder` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-07 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2191383325


##
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##
@@ -0,0 +1,342 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Create a stream that do a multi level merge stream
+
+use crate::metrics::BaselineMetrics;
+use crate::{EmptyRecordBatchStream, SpillManager};
+use arrow::array::RecordBatch;
+use std::fmt::{Debug, Formatter};
+use std::mem;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::Result;
+use datafusion_execution::memory_pool::{
+MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool,
+};
+
+use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
+use crate::stream::RecordBatchStreamAdapter;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::TryStreamExt;
+use futures::{Stream, StreamExt};
+
+/// Merges a stream of sorted cursors and record batches into a single sorted 
stream
+pub(crate) struct MultiLevelMergeBuilder {
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+
+// This is for avoiding double reservation of memory from our side and the 
sort preserving merge stream
+// side.
+// and doing a lot of code changes to avoid accounting for the memory used 
by the streams
+unbounded_memory_pool: Arc,
+}
+
+impl Debug for MultiLevelMergeBuilder {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "MultiLevelMergeBuilder")
+}
+}
+
+impl MultiLevelMergeBuilder {
+#[allow(clippy::too_many_arguments)]
+pub(crate) fn new(
+spill_manager: SpillManager,
+schema: SchemaRef,
+sorted_spill_files: Vec,
+sorted_streams: Vec,
+expr: LexOrdering,
+metrics: BaselineMetrics,
+batch_size: usize,
+reservation: MemoryReservation,
+fetch: Option,
+enable_round_robin_tie_breaker: bool,
+) -> Self {
+Self {
+spill_manager,
+schema,
+sorted_spill_files,
+sorted_streams,
+expr,
+metrics,
+batch_size,
+reservation,
+enable_round_robin_tie_breaker,
+fetch,
+unbounded_memory_pool: Arc::new(UnboundedMemoryPool::default()),
+}
+}
+
+pub(crate) fn create_spillable_merge_stream(self) -> 
SendableRecordBatchStream {
+Box::pin(RecordBatchStreamAdapter::new(
+Arc::clone(&self.schema),
+futures::stream::once(self.create_stream()).try_flatten(),
+))
+}
+
+async fn create_stream(mut self) -> Result {
+loop {
+// Hold this for the lifetime of the stream
+let mut current_memory_reservation = self.reservation.new_empty();
+let mut stream =
+self.create_sorted_stream(&mut current_memory_reservation)?;
+
+// TODO - add a threshold for number of files to disk even if 
empty and reading from disk so
+//we can avoid the memory reservation
+
+// If no spill files are left, we can return the stream as this is 
the last sorted run
+// TODO - We can write to disk before reading it back to avoid 
having multiple streams in memory
+if self.sorted_spill_files.is_empty() {
+// Attach the memory reservation to the stream as we are done 
with it
+// but because we replaced the memory reservation of the merge 
stream, we must hold
+// this to make sure we have enough memory
+return Ok(Box::pin(StreamAttachedReservation::new(
+stream,
+current_memory_reservation,
+)));
+}
+
+// N

Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-06 Thread via GitHub


rluvaton commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3043682960

   > > I would appreciate it, it would greatly help me
   > 
   > @rluvaton I opened a pr on your fork. Would you take a look when you have 
some time?
   
   I **really** appriciate the PR but the changes are too large for me to 
review so I just did it myself:
   ```
   +95,274 −36,420
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-06 Thread via GitHub


ding-young commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3043202973

   > I would appreciate it, it would greatly help me
   
   @rluvaton I opened a pr on your fork. Would you take a look when you have 
some time? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-04 Thread via GitHub


rluvaton commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3036807094

   I would appreciate it, it would greatly help me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-04 Thread via GitHub


ding-young commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3036770691

   @rluvaton If you’d like, I can send a PR to your (fork's) branch that 
resolve merge conflicts since I already have one. Anyway there were only minor 
diffs to handle when I rebased your branch with main. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-04 Thread via GitHub


rluvaton commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3036724762

   So should I fix this PR conflicts? It seems like this pr has a change to be 
merged 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-04 Thread via GitHub


2010YOUY01 commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3034804482

   https://github.com/apache/datafusion/pull/15700#discussion_r2041372025
   
   I have a idea to fix this concern: adding a max merge degree configuration, 
if either 
   a. SPM's estimated memory exceed budget 
   b. configured max merge degree has reached
   do a re-spill.
   
   This approach I think has two advantages:
   1. If batch size bloat happens after spill and read back roundtrip (see 
https://github.com/apache/datafusion/pull/15700#discussion_r2041372025), if 
there is a hard merge degree limit to override the estimation, query can still 
finish.
   2. Also helpful to tune for speed: even we have enough memory to perform a 
very wide merge, limiting it to a smaller merge is still likely to run faster.
   
   I (or possibly @ding-young) can handle this patch in a follow-up PR. I think 
we can move forward with this one—I’ll review it in the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-07-03 Thread via GitHub


ding-young commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3031592289

   I've rebased this branch on the latest main and tested whether estimated 
size changes after we load `RecordBatch` which was compressed with `lz4_frame` 
into memory. The result of `get_actually_used_size()` was identical before and 
after (arrow-ipc `StreamReader` will return decoded array). Of course, since 
buffer allocations and copies happen internally during decoding, actual system 
memory usage (which DataFusion doesn't track) may temporarily be higher. 
Anyway, I've only tested for primitive type array + compression so I'll run a 
few more tests and try to see if I can reproduce any of the problematic cases 
discussed above.
   
   
   
   > Hi @adriangb, thanks for raising this point. I'm currently reviewing both 
this PR and the other cascading merge sort PR (#15610). I'm not taking sides 
between the two approaches, but I agree that accurately estimating memory 
consumption is tricky considering issues discussed above and the fact that now 
compression is supported in spill files. We may need to think more about 
whether we can special-case scenarios where the memory size changes after 
spilling and reloading, or perhaps add some kind of backup logic to handle such 
situations more gracefully.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-06-30 Thread via GitHub


ding-young commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3021762356

   Hi @adriangb, thanks for raising this point. I'm currently reviewing both 
this PR and the other cascading merge sort PR 
(https://github.com/apache/datafusion/pull/15610). I'm not taking sides between 
the two approaches, but I agree that accurately estimating memory consumption 
is tricky considering issues discussed above and the fact that now compression 
is supported in spill files. We may need to think more about whether we can 
special-case scenarios where the memory size changes after spilling and 
reloading, or perhaps add some kind of backup logic to handle such situations 
more gracefully. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-06-30 Thread via GitHub


adriangb commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-3021699053

   In the interest of this valuable work not being lost, is there any way that 
https://github.com/apache/datafusion/pull/15700#discussion_r2041372025 could be 
addressed by a method that's not more tests? Could we calculate the actual 
batch sizes every time we load into memory? Even if possible that opens up 
questions of what to do if we load a batch and now exceed our memory budget, 
but maybe it's a path forward?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-05-24 Thread via GitHub


rluvaton commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906947109

   > > @2010YOUY01 and @ding-young I wonder if you can review this PR again to 
help @rluvaton get it merged?
   > 
   > > 
   > 
   > > Specifically if it needs more tests perhaps you can help identify which 
are needed
   > 
   > 
   > 
   > I have some concerns about this PR's design direction (see more in 
https://github.com/apache/datafusion/pull/15700#discussion_r2041372025), and I 
don't think it can be addressed by more extensive tests.
   
   Why is that? You raised some concerns about miscalculating the size of the 
record batch, adding tests will make sure we are calculating correctly
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-05-24 Thread via GitHub


2010YOUY01 commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906805485

   > @2010YOUY01 and @ding-young I wonder if you can review this PR again to 
help @rluvaton get it merged?
   > 
   > Specifically if it needs more tests perhaps you can help identify which 
are needed
   
   I have some concerns about this PR's design direction (see more in 
https://github.com/apache/datafusion/pull/15700#discussion_r2041372025), and I 
don't think it can be addressed by more extensive tests.
   By the way, this PR serves as an alternative to 
https://github.com/apache/datafusion/pull/15610. It's ready for another review, 
except that it needs to merge main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-05-24 Thread via GitHub


ding-young commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906803554

   @alamb Sure! I may not be able to provide a detailed review right away, but 
I can definitely help by running the tests added in the PR locally and looking 
into memory accounting for the nested type that have been mentioned. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-05-24 Thread via GitHub


alamb commented on PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#issuecomment-2906787792

   @2010YOUY01 and @ding-young I wonder if you can review this PR again to help 
@rluvaton get it merged? 
   
   Specifically if it needs more tests perhaps you can help identify which are 
needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-05-23 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2104892196


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   If I added tests for every type to make sure the memory accounting is 
correct would you approve?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-15 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2046041614


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   I think we should not estimate, even if it's correct 99% of the time, IMO 
it's impossible to make sure it's always accurate for nested type's reader 
implementation. If the estimate is way off for edge cases, the bug would be 
hard to investigate.
   If we want to follow this optimistic approach, the only required memory 
accounting I think is during buffering batches inside `SortExec`, and all the 
remaining memory-tracking code can be deleted to make the implementation much 
more simpler, the potential problem is unexpected behavior for non-primitive 
types (e.g. dictionary array's row format size can explode)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-15 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2045582560


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   So after looking at the code I came to the conclusion that this is still the 
closest there is to accurately estimating memory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-14 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041878079


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   I'm trying to reproduce that so I can better answer, how do you create that 
string view array so it will cause what you said?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-14 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041629246


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   There might be some type of arrays with complex internal buffer management, 
a simple example is:
   Before spilling an `StringView` array has 10MB actual content, backed by 3 * 
4MB buffer.
   After spilling and reading back, the reader implementation decided to use 1 
* 16MB buffer instead.
   Different allocation policies caused different fragmentation status, and 
physical memory consumed varies.
   
   Here are some real bugs found recently due to similar reasons (this explains 
why I'm worried about inconsistent memory size for logically equivalent 
batches):
   https://github.com/apache/datafusion/pull/14644
   https://github.com/apache/datafusion/pull/14823
   https://github.com/apache/datafusion/pull/13377
   Note they're only caused by primitive and string arrays, for more complex 
types like struct, array, or other nested types, I think it's even more likely 
to see such inconsistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-14 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041544882


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   Unless the actual array content before spill and after spill is different 
this function will **always** return the correct result regardless of the spill 
file format.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-14 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041544882


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   Unless the **actual array content** before spill and after spill is 
different this function will **always** return the correct result regardless of 
the spill file format.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-14 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041544882


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   Unless the **actual array content** before spill and after spill is 
different this function will **always** return the correct result regardless of 
the spill file format as we calculate the actual array content size.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-13 Thread via GitHub


2010YOUY01 commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041372025


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -431,12 +422,16 @@ impl ExternalSorter {
 let batches_to_spill = std::mem::take(globally_sorted_batches);
 self.reservation.free();
 
-let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
-internal_datafusion_err!("In-progress spill file should be 
initialized")
-})?;
+let (in_progress_file, max_record_batch_size) =
+self.in_progress_spill_file.as_mut().ok_or_else(|| {
+internal_datafusion_err!("In-progress spill file should be 
initialized")
+})?;
 
 for batch in batches_to_spill {
 in_progress_file.append_batch(&batch)?;
+
+*max_record_batch_size =
+(*max_record_batch_size).max(batch.get_actually_used_size());

Review Comment:
   I think it's not realistic to correctly know a batch's size after a 
roundtrip of spilling and reading back, with this `get_actually_used_size()` 
implementation. The actual implementation might give us some surprise. The 
implementation can get even more complex in the future, for example we might 
implement extra encodings for 
https://github.com/apache/datafusion/issues/14078, and the memory size of a 
batch after reading back can be harder to estimate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] feat: add multi level merge sort that will always fit in memory [datafusion]

2025-04-13 Thread via GitHub


rluvaton commented on code in PR #15700:
URL: https://github.com/apache/datafusion/pull/15700#discussion_r2041222937


##
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs:
##
@@ -753,3 +765,226 @@ async fn test_single_mode_aggregate_with_spill() -> 
Result<()> {
 
 Ok(())
 }
+
+/// A Mock ExecutionPlan that can be used for writing tests of other
+/// ExecutionPlans
+pub struct StreamExec {
+/// the results to send back
+stream: Mutex>,
+/// if true (the default), sends data using a separate task to ensure the
+/// batches are not available without this stream yielding first
+use_task: bool,
+cache: PlanProperties,
+}
+
+impl Debug for StreamExec {
+fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+write!(f, "StreamExec")
+}
+}
+
+impl StreamExec {
+/// Create a new `MockExec` with a single partition that returns
+/// the specified `Results`s.
+///
+/// By default, the batches are not produced immediately (the
+/// caller has to actually yield and another task must run) to
+/// ensure any poll loops are correct. This behavior can be
+/// changed with `with_use_task`
+pub fn new(stream: SendableRecordBatchStream) -> Self {
+let cache = Self::compute_properties(stream.schema());
+Self {
+stream: Mutex::new(Some(stream)),
+use_task: true,
+cache,
+}
+}
+
+/// If `use_task` is true (the default) then the batches are sent
+/// back using a separate task to ensure the underlying stream is
+/// not immediately ready
+pub fn with_use_task(mut self, use_task: bool) -> Self {
+self.use_task = use_task;
+self
+}
+
+/// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
+fn compute_properties(schema: SchemaRef) -> PlanProperties {
+PlanProperties::new(
+EquivalenceProperties::new(schema),
+Partitioning::UnknownPartitioning(1),
+EmissionType::Incremental,
+Boundedness::Bounded,
+)
+}
+}
+
+impl DisplayAs for StreamExec {
+fn fmt_as(
+&self,
+t: DisplayFormatType,
+f: &mut Formatter,
+) -> std::fmt::Result {
+match t {
+DisplayFormatType::Default | DisplayFormatType::Verbose => {
+write!(f, "StreamExec:")
+}
+DisplayFormatType::TreeRender => {
+// TODO: collect info
+write!(f, "")
+}
+}
+}
+}
+
+impl ExecutionPlan for StreamExec {
+fn name(&self) -> &'static str {
+Self::static_name()
+}
+
+fn as_any(&self) -> &dyn Any {
+self
+}
+
+fn properties(&self) -> &PlanProperties {
+&self.cache
+}
+
+fn children(&self) -> Vec<&Arc> {
+vec![]
+}
+
+fn with_new_children(
+self: Arc,
+_: Vec>,
+) -> Result> {
+unimplemented!()
+}
+
+/// Returns a stream which yields data
+fn execute(
+&self,
+partition: usize,
+_context: Arc,
+) -> Result {
+assert_eq!(partition, 0);
+
+let stream = self.stream.lock().unwrap().take();
+
+stream.ok_or(DataFusionError::Internal(
+"Stream already consumed".to_string(),
+))
+}
+}
+
+
+#[tokio::test]
+async fn test_low_cardinality() -> Result<()> {

Review Comment:
   This fails on main on OOM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]