Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-12 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2798789296

   I have made the following updates:
   - Address review comments
   - Introduced a new configuration option for max merge degree
   - Add tests
   
   It's ready for another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-12 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2798787828

   Benchmark results:
   (I think there is no significant regression for an extra round of re-spill, 
if it's running on a machine with fast SSDs)
   
   ### Environment
   MacBook Pro with m4-pro chip (disk bandwidth is around 8000MB/s)
   ### Sorting 'thin' table
   1. Run datafusion-cli with `cargo run --profile release-nonlto -- 
--mem-pool-type fair -m 100M`
   2. Execute `explain analyze select * from generate_series(1, 10) as 
t1(v1) order by v1;`
   
   Main: 37s (merge ~170 spill files at once)
   PR (with `sort_max_spill_merge_degree = 16`, and there is one round of 
re-spill): 43s
   PR (with `sort_max_spill_merge_degree = 10`, two rounds of re-spill): 49s
   ### Sorting 'fat' table
   Run `sort_tpch` benchmark q7
   ```
   // Q7: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 
12 all other columns
   r#"
   SELECT l_linenumber, l_suppkey, l_orderkey, 
  l_partkey, l_quantity, l_extendedprice, l_discount, l_tax,
  l_returnflag, l_linestatus, l_shipdate, l_commitdate,
  l_receiptdate, l_shipinstruct, l_shipmode
   FROM lineitem
   ORDER BY l_linenumber, l_suppkey, l_orderkey
   "#,
   ```
   Benchmark command
   ```sh
cargo run --profile release-nonlto --bin dfbench -- sort-tpch -p 
/Users/yongting/Code/datafusion/benchmarks/data/tpch_sf10 -q 7 --memory-limit 
1.2G
   ```
   Notes:
   - `target_partitions` config set to 14, and later configurations and results 
depend on this setting.
   - For PR's benchmark runs, `sort_max_spill_merge_degree` is manually changed 
to 6, as a result: 
   - under 1.2G memory limit, 1 round of re-spill will be triggered
   - under 500M memory limit, 2 rounds of re-spill happens
   
    Result
   Main (1.2G): 
   ```
   Q7 iteration 0 took 9374.7 ms and returned 59986052 rows
   Q7 iteration 1 took 8117.6 ms and returned 59986052 rows
   Q7 iteration 2 took 8549.1 ms and returned 59986052 rows
   Q7 avg time: 8680.47 ms
   ```
   
   Main (500M):
   ```
   Fail with OOM
   ```
   
   PR (1.2G):
   ```
   ata/tpch_sf10 -q 7 --memory-limit 1G`
   Q7 iteration 0 took 10723.6 ms and returned 59986052 rows
   Q7 iteration 1 took 12962.8 ms and returned 59986052 rows
   Q7 iteration 2 took 11739.7 ms and returned 59986052 rows
   Q7 avg time: 11808.71 ms
   ```
   
   PR (500M):
   ```
   Q7 iteration 0 took 16233.1 ms and returned 59986052 rows
   Q7 iteration 1 took 18568.4 ms and returned 59986052 rows
   Q7 iteration 2 took 19173.4 ms and returned 59986052 rows
   Q7 avg time: 17991.67 ms
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-12 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2798782989

   I didn't implement the parallel merge optimization for now, my major concern 
is: this optimization requires one extra configuration, and users have to learn 
and correctly set 2 configs for each individual query, to enable the most 
efficient cascaded spill merge execution (see the below `intended optimization` 
section for what's those 2 configs), which is not ideal.
   So I'd like to defer the implementation for a bit, to think about if there 
are any simpler approaches (or maybe by collecting stats internally and 
auto-tune those related configs)
   Also, I think the current implementation is good enough to cover common 
cases (I did a rough estimation, sorting TPCH-SF1000 lineitem table with 16GB 
of memory only requires one round of re-spill)
   
   Here is the optimization that I originally thought about, I'll put them into 
a separate issue if it makes sense.
   ### Example scenario
   For one partition's `SortExec`, 100 runs are spilled, and we set 
`spill_max_spill_merge_degree` to 4
   ### Current Implementation
   
![image](https://github.com/user-attachments/assets/2d4bba36-2b9c-4fc1-8eac-780900dc1fb8)
   Each time it merges 4 existing spills into one combined spill file, until 
there are <= 4 spills total, the final result can be produced.
   For each entry, the number of re-spill will be $floor(\log_4 100)$ = 3
   
   ### Intended optimization
   If the memory pool is enough to hold more buffers at a time (while 
`spill_max_spill_merge_degree` is still limited to 4, in case the merge-degree 
is too large and hurt performance in some cases)
   One additional config will be introduced `sort_buffer_batch_capacity`, and 
set to `16` in the above example, the execution will look like:
   
![image](https://github.com/user-attachments/assets/5ed8e884-4f45-4abd-b27c-bea6fce8f46d)
   Then, inside each merge step, 16 spill files will be combined and re-spill. 
Each entry only need to be re-spilled for $floor(\log_{16} 100)$ = 1 time.
   With this approach, we can achieve an optimal re-spill count, and also 
enable parallel merge.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-10 Thread via GitHub


alamb commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2794812727

   Yes I totally agree when possible SQL (or dataframe) is a better level to 
test at (and because it is the API that most users care about, not the internal 
details)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-09 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2791514957

   > And some other thoughts:
   > 
   > 1. This is a pretty complicated program, maybe we should write some unit 
tests to make sure it doesn't break for future modifications?
   
   I'll try to do most of the testing and cover edge cases in integration tests 
at 
https://github.com/apache/datafusion/blob/main/datafusion/core/tests/fuzz_cases/sort_fuzz.rs
 and 
https://github.com/apache/datafusion/blob/main/datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs,
 instead of doing extensive UTs.
   
   I think we should promote tests to a higher level (SQL) when possible, 
because that API is much more stable and easier to manage. If a feature is 
tested mostly through unit tests, and someone later refactors the component 
away, those tests are likely to get lost—they might assume the feature is 
already covered by integration tests.
   
   I first heard this idea in a talk by the DuckDB developers 
https://youtu.be/BgC79Zt2fPs?si=WiziGqJ8Dlz6-MMW 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-09 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2791500223

   Thank you all for the review!
   
   @qstommyshu I agree with the implementation-level feedbacks. I will address 
them in the refactor.
   
   @alamb Regarding parallel merging: I was thinking if 
`max_spill_perge_degree` configured to 10, than the memory is limited so that 
in each partition, we can only hold 10 batches at the same time, so parallel 
merging is not possible in this case.
   However, @rluvaton 's PR has inspired me that, it's possible each operator 
is able to hold 100 batches under the memory limit at the same time, but we 
might still want to merge them 10 at a time for performance.
   
   I think the next steps are
   1. Contribute benchmarks for external sort.
   2. Refactor this PR to avoid always re-spill, also do parallel merging when 
possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-09 Thread via GitHub


qstommyshu commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2036048151


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");

Review Comment:
   Clean up debug logs if they are not needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-09 Thread via GitHub


qstommyshu commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2036031118


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-09 Thread via GitHub


qstommyshu commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2036027203


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-09 Thread via GitHub


qstommyshu commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2036013722


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-09 Thread via GitHub


alamb commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2035293849


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new spill 

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-08 Thread via GitHub


2010YOUY01 commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2034370539


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-08 Thread via GitHub


2010YOUY01 commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2034369554


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-08 Thread via GitHub


qstommyshu commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2034287537


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-08 Thread via GitHub


qstommyshu commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2034251397


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-07 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2782275431

   This PR and https://github.com/apache/datafusion/pull/15608 both implemented 
multi-level merge for `SortExec` for different purposes:
   ### This PR
   - This PR wants to let memory-limit sort queries be able to run even if the 
memory budget is very tight (i.e. num-spill-files * batch-size > memory limit)
   - Always re-spill for each merge step
   ### #15608
   - Reduces merge degree for performance (reading spills will stall for a 
shorter amount of time)
   - Never re-spill
   
   I think we should refine the existing PR to be:
   1. Prioritize stable execution of memory-limited queries over performance.
   - I think the optimizations mentioned below are somewhat complex. We 
should first resolve the remaining known correctness issues in external sort, 
strengthen the tests, and then proceed with later optimizations more 
confidently.
   2. Extensible for future performance optimization
   - When the memory budget allows, don't always re-spill
   - Consider pre-fetching future spill reads to avoid blocking read
   - For other steps that require merging in `SortExec`, the multi-pass 
merging utility should be reusable for performance: For example if we have 
enough memory to buffer all input buffers, it should be able to do multi-level 
merging.
 (The first two points is related to 
https://github.com/apache/datafusion/issues/15323, the third point is tracked 
in https://github.com/apache/datafusion/issues/7181)
   
   
   To summarize, I think this PR needs to be restructured to make future 
optimizations easier to implement. I don’t have a solid idea yet, so I’ll keep 
thinking and also wait to hear more opinions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-07 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2782279871

   > BTW, row_hash uses the sort preserving merge stream as well and has 
similar problem, I think this should be a solution outside the sort exec
   
   I think the spilling-related problem in external aggregation is still 
larger-than-memory sort, the current aggregation implementation tries to 
re-implement the sort spilling logic which is already done in `ExternalSorter`. 
So the implementation is reusable by `row_hash` (with some modifications)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-07 Thread via GitHub


2010YOUY01 commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2782282126

   > Also, to have a fully working larger than memory sort, you need to spill in
   > 
   > 
https://github.com/apache/datafusion/blob/362fcdfc7b9e00cb6126a0cbc41c9abb2637c563/datafusion/physical-plan/src/sorts/builder.rs#L74
   > 
   > In case the memory reservation is failing
   
   Could you elaborate? I don't get it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-07 Thread via GitHub


2010YOUY01 commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2030611077


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new s

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-06 Thread via GitHub


rluvaton commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2782032011

   Also, to have a fully working sort, you need to spill in

https://github.com/apache/datafusion/blob/362fcdfc7b9e00cb6126a0cbc41c9abb2637c563/datafusion/physical-plan/src/sorts/builder.rs#L74
   
   In case the memory reservation is failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-06 Thread via GitHub


rluvaton commented on code in PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#discussion_r2030454805


##
datafusion/physical-plan/src/sorts/sort.rs:
##
@@ -535,56 +457,262 @@ impl ExternalSorter {
 // reserved again for the next spill.
 self.merge_reservation.free();
 
-let mut sorted_stream =
+let sorted_stream =
 self.in_mem_sort_stream(self.metrics.baseline.intermediate())?;
+debug!("SPM stream is constructed");
+
 // After `in_mem_sort_stream()` is constructed, all `in_mem_batches` 
is taken
 // to construct a globally sorted stream.
 if !self.in_mem_batches.is_empty() {
 return internal_err!(
 "in_mem_batches should be empty after constructing sorted 
stream"
 );
 }
-// 'global' here refers to all buffered batches when the memory limit 
is
-// reached. This variable will buffer the sorted batches after
-// sort-preserving merge and incrementally append to spill files.
-let mut globally_sorted_batches: Vec = vec![];
 
+let spill_file = self.write_stream_to_spill_file(sorted_stream).await?;
+self.finished_spill_files.push(spill_file);
+
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
+
+Ok(())
+}
+
+/// Create a new spill file, and write all batches from the stream to the 
file.
+///
+/// Note: After the spill is done, the memory reservation will be freed to 
0,
+/// because `sorted_stream` holds all buffered batches.
+async fn write_stream_to_spill_file(
+&mut self,
+mut sorted_stream: SendableRecordBatchStream,
+) -> Result {
+// Release the memory reserved for merge back to the pool so there is 
some
+// left when the executed stream requests an allocation (now the 
stream to
+// write are SortPreservingMergeStream, which requires memory).
+// At the end of this function, memory will be reserved again for the 
next spill.
+self.merge_reservation.free();
+
+let mut in_progress_spill_file =
+self.spill_manager.create_in_progress_file("Sorting")?;
+
+// Incrementally append globally sorted batches to the spill file, 
because
+// there might not be enough memory to materialize all batches at once.
 while let Some(batch) = sorted_stream.next().await {
-let batch = batch?;
-let sorted_size = get_reserved_byte_for_record_batch(&batch);
-if self.reservation.try_grow(sorted_size).is_err() {
-// Although the reservation is not enough, the batch is
-// already in memory, so it's okay to combine it with 
previously
-// sorted batches, and spill together.
-globally_sorted_batches.push(batch);
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?; // reservation is freed in spill()
-} else {
-globally_sorted_batches.push(batch);
-}
+let mut batch = vec![batch?];
+Self::organize_stringview_arrays(&mut batch)?;
+in_progress_spill_file.append_batch(&batch[0])?;
 }
 
 // Drop early to free up memory reserved by the sorted stream, 
otherwise the
 // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
 drop(sorted_stream);
 
-self.consume_and_spill_append(&mut globally_sorted_batches)
-.await?;
-self.spill_finish().await?;
+// Reserve headroom for next sort/merge
+self.reserve_memory_for_merge()?;
 
-// Sanity check after spilling
-let buffers_cleared_property =
-self.in_mem_batches.is_empty() && 
globally_sorted_batches.is_empty();
-if !buffers_cleared_property {
-return internal_err!(
-"in_mem_batches and globally_sorted_batches should be cleared 
before"
-);
+let spill_file = in_progress_spill_file.finish()?.ok_or_else(|| {
+internal_datafusion_err!("Writing stream with 0 batch is not 
allowed")
+})?;
+
+Ok(spill_file)
+}
+
+/// Sort-preserving merges the spilled files into a single file.
+///
+/// All of input spill files are sorted by sort keys within each file, and 
the
+/// returned file is also sorted by sort keys.
+///
+/// This method consumes the input spill files, and returns a new compacted
+/// spill file. After returnning, the input files will be cleaned up 
(deleted).
+///
+/// # Example:
+/// Input spill files:
+/// SpillFile1 (sorted by SortKeys):
+/// [batch1(100 rows)], [batch2(100 rows)]
+/// SpillFile2 (sorted by SortKeys):
+/// [batch1(100 rows)]
+///
+/// After merging, it returns a new spi

Re: [PR] POC: Cascaded spill merge and re-spill [datafusion]

2025-04-06 Thread via GitHub


rluvaton commented on PR #15610:
URL: https://github.com/apache/datafusion/pull/15610#issuecomment-2782018619

   BTW, row_hash uses the sort preserving merge stream as well and has similar 
problem, I think this should be a solution outside the sort exec


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]