2010YOUY01 commented on code in PR #18207: URL: https://github.com/apache/datafusion/pull/18207#discussion_r2458620331
########## datafusion/physical-plan/src/spill/spill_pool.rs: ########## @@ -0,0 +1,1099 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SpillPool: A reusable abstraction for managing spill files with FIFO semantics. +//! +//! # Overview +//! +//! The `SpillPool` provides a centralized mechanism for spilling record batches to disk +//! when memory is constrained. It manages a collection of spill files, each containing +//! multiple batches, with configurable maximum file sizes. +//! +//! # Design +//! +//! - **FIFO (Queue) semantics**: Batches are read in the order they were spilled +//! - **File handle reuse**: Multiple batches are written to the same file to minimize syscalls +//! - **Automatic file rotation**: When a file exceeds `max_file_size_bytes`, rotate to a new file +//! - **Sequential reading**: Uses IPC Stream format's natural sequential access pattern +//! - **Automatic cleanup**: Files are deleted once fully consumed +//! +//! # Usage Example +//! +//! ```ignore +//! use std::sync::Arc; +//! use parking_lot::Mutex; +//! +//! let pool = SpillPool::new( +//! 100 * 1024 * 1024, // 100MB max per file +//! spill_manager, +//! schema, +//! ); +//! let pool = Arc::new(Mutex::new(pool)); +//! +//! // Spill batches - automatically rotates files when size limit reached +//! { +//! let mut pool = pool.lock(); +//! pool.push_batch(batch1)?; +//! pool.push_batch(batch2)?; +//! pool.flush()?; // Finalize current file +//! pool.finalize(); // Signal no more writes +//! } +//! +//! // Read back in FIFO order using a stream +//! let mut stream = SpillPool::reader(pool, spill_manager); +//! let batch1 = stream.next().await.unwrap()?; // Returns batch1 +//! let batch2 = stream.next().await.unwrap()?; // Returns batch2 +//! // stream.next() returns None after finalize +//! ``` + +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::Waker; + +use parking_lot::Mutex; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + +use super::in_progress_spill_file::InProgressSpillFile; +use super::spill_manager::SpillManager; + +/// A pool of spill files that manages batch-level spilling with FIFO semantics. +/// +/// Batches are written sequentially to files, with automatic rotation when the +/// configured size limit is reached. Reading is done via an infinite stream +/// that can read concurrently while writes continue. +/// +/// # Thread Safety +/// +/// `SpillPool` is not thread-safe and should be used from a single thread or +/// protected with appropriate synchronization (e.g., `Arc<Mutex<SpillPool>>`). +pub struct SpillPool { + /// Maximum size in bytes before rotating to a new file + max_file_size_bytes: usize, + /// Queue of spill files (front = oldest, back = newest) + files: VecDeque<RefCountedTempFile>, + /// Current file being written to (if any) + current_write_file: Option<InProgressSpillFile>, + /// Size of current write file in bytes (estimated) + current_write_size: usize, + /// Number of batches written to current file + current_batch_count: usize, Review Comment: I think we can track them inside `InProgressSpillFile`, and expose an API. This approach can simplify `SpillPool` a bit. ########## datafusion/physical-plan/src/spill/spill_pool.rs: ########## @@ -0,0 +1,1099 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SpillPool: A reusable abstraction for managing spill files with FIFO semantics. +//! +//! # Overview +//! +//! The `SpillPool` provides a centralized mechanism for spilling record batches to disk +//! when memory is constrained. It manages a collection of spill files, each containing +//! multiple batches, with configurable maximum file sizes. +//! +//! # Design +//! +//! - **FIFO (Queue) semantics**: Batches are read in the order they were spilled +//! - **File handle reuse**: Multiple batches are written to the same file to minimize syscalls +//! - **Automatic file rotation**: When a file exceeds `max_file_size_bytes`, rotate to a new file +//! - **Sequential reading**: Uses IPC Stream format's natural sequential access pattern +//! - **Automatic cleanup**: Files are deleted once fully consumed +//! +//! # Usage Example +//! +//! ```ignore +//! use std::sync::Arc; +//! use parking_lot::Mutex; +//! +//! let pool = SpillPool::new( +//! 100 * 1024 * 1024, // 100MB max per file +//! spill_manager, +//! schema, +//! ); +//! let pool = Arc::new(Mutex::new(pool)); +//! +//! // Spill batches - automatically rotates files when size limit reached +//! { +//! let mut pool = pool.lock(); +//! pool.push_batch(batch1)?; +//! pool.push_batch(batch2)?; +//! pool.flush()?; // Finalize current file +//! pool.finalize(); // Signal no more writes +//! } +//! +//! // Read back in FIFO order using a stream +//! let mut stream = SpillPool::reader(pool, spill_manager); +//! let batch1 = stream.next().await.unwrap()?; // Returns batch1 +//! let batch2 = stream.next().await.unwrap()?; // Returns batch2 +//! // stream.next() returns None after finalize +//! ``` + +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::Waker; + +use parking_lot::Mutex; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + +use super::in_progress_spill_file::InProgressSpillFile; +use super::spill_manager::SpillManager; + +/// A pool of spill files that manages batch-level spilling with FIFO semantics. +/// +/// Batches are written sequentially to files, with automatic rotation when the +/// configured size limit is reached. Reading is done via an infinite stream +/// that can read concurrently while writes continue. +/// +/// # Thread Safety +/// +/// `SpillPool` is not thread-safe and should be used from a single thread or +/// protected with appropriate synchronization (e.g., `Arc<Mutex<SpillPool>>`). +pub struct SpillPool { + /// Maximum size in bytes before rotating to a new file + max_file_size_bytes: usize, + /// Queue of spill files (front = oldest, back = newest) + files: VecDeque<RefCountedTempFile>, + /// Current file being written to (if any) + current_write_file: Option<InProgressSpillFile>, + /// Size of current write file in bytes (estimated) + current_write_size: usize, + /// Number of batches written to current file + current_batch_count: usize, + /// SpillManager for creating files and tracking metrics + spill_manager: Arc<SpillManager>, + /// Schema for batches (used by SpillPoolStream to implement RecordBatchStream) + schema: SchemaRef, Review Comment: To avoid duplication, the `schema` inside `spill_manager` can be used instead. ########## datafusion/physical-plan/src/spill/spill_pool.rs: ########## @@ -0,0 +1,1099 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SpillPool: A reusable abstraction for managing spill files with FIFO semantics. +//! +//! # Overview +//! +//! The `SpillPool` provides a centralized mechanism for spilling record batches to disk +//! when memory is constrained. It manages a collection of spill files, each containing +//! multiple batches, with configurable maximum file sizes. +//! +//! # Design +//! +//! - **FIFO (Queue) semantics**: Batches are read in the order they were spilled +//! - **File handle reuse**: Multiple batches are written to the same file to minimize syscalls +//! - **Automatic file rotation**: When a file exceeds `max_file_size_bytes`, rotate to a new file +//! - **Sequential reading**: Uses IPC Stream format's natural sequential access pattern +//! - **Automatic cleanup**: Files are deleted once fully consumed +//! +//! # Usage Example +//! +//! ```ignore +//! use std::sync::Arc; +//! use parking_lot::Mutex; +//! +//! let pool = SpillPool::new( +//! 100 * 1024 * 1024, // 100MB max per file +//! spill_manager, +//! schema, +//! ); +//! let pool = Arc::new(Mutex::new(pool)); +//! +//! // Spill batches - automatically rotates files when size limit reached +//! { +//! let mut pool = pool.lock(); +//! pool.push_batch(batch1)?; +//! pool.push_batch(batch2)?; +//! pool.flush()?; // Finalize current file +//! pool.finalize(); // Signal no more writes +//! } +//! +//! // Read back in FIFO order using a stream +//! let mut stream = SpillPool::reader(pool, spill_manager); +//! let batch1 = stream.next().await.unwrap()?; // Returns batch1 +//! let batch2 = stream.next().await.unwrap()?; // Returns batch2 +//! // stream.next() returns None after finalize +//! ``` + +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::Waker; + +use parking_lot::Mutex; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + +use super::in_progress_spill_file::InProgressSpillFile; +use super::spill_manager::SpillManager; + +/// A pool of spill files that manages batch-level spilling with FIFO semantics. +/// +/// Batches are written sequentially to files, with automatic rotation when the +/// configured size limit is reached. Reading is done via an infinite stream +/// that can read concurrently while writes continue. +/// +/// # Thread Safety +/// +/// `SpillPool` is not thread-safe and should be used from a single thread or +/// protected with appropriate synchronization (e.g., `Arc<Mutex<SpillPool>>`). +pub struct SpillPool { + /// Maximum size in bytes before rotating to a new file + max_file_size_bytes: usize, + /// Queue of spill files (front = oldest, back = newest) + files: VecDeque<RefCountedTempFile>, + /// Current file being written to (if any) + current_write_file: Option<InProgressSpillFile>, + /// Size of current write file in bytes (estimated) + current_write_size: usize, + /// Number of batches written to current file + current_batch_count: usize, + /// SpillManager for creating files and tracking metrics + spill_manager: Arc<SpillManager>, + /// Schema for batches (used by SpillPoolStream to implement RecordBatchStream) + schema: SchemaRef, + /// Wakers to notify when new data is available for readers + wakers: Vec<Waker>, + /// Flag indicating no more writes will occur + finalized: bool, +} + +impl SpillPool { + /// Creates a new SpillPool with FIFO semantics. + /// + /// # Arguments + /// + /// * `max_file_size_bytes` - Maximum size per file before rotation (e.g., 100MB) + /// * `spill_manager` - Manager for file creation and metrics + /// * `schema` - Schema for record batches + pub fn new( + max_file_size_bytes: usize, + spill_manager: Arc<SpillManager>, + schema: SchemaRef, + ) -> Self { + Self { + max_file_size_bytes, + files: VecDeque::new(), + current_write_file: None, + current_write_size: 0, + current_batch_count: 0, + spill_manager, + schema, + wakers: Vec::new(), + finalized: false, + } + } + + /// Marks the pool as finalized, indicating no more writes will occur. + /// This allows readers to know when to stop waiting for more data. + pub fn finalize(&mut self) { + self.finalized = true; + self.wake(); // Wake readers to check finalized status + } + + /// Returns true if the pool has been finalized + pub fn is_finalized(&self) -> bool { + self.finalized + } + + /// Creates a stream reader for this pool. + /// + /// The stream automatically handles file rotation and can read concurrently + /// while writes continue to the pool. When the stream catches up to the writer, + /// it will return `Poll::Pending` and wait for more data. + /// + /// # Arguments + /// + /// * `pool` - Shared reference to the SpillPool + /// * `spill_manager` - Manager for creating streams from spill files + /// + /// # Returns + /// + /// A `SpillPoolStream` that returns batches in FIFO order and ends when the pool + /// is finalized and all data has been read + pub fn reader( + pool: Arc<Mutex<Self>>, + spill_manager: Arc<SpillManager>, + ) -> SendableRecordBatchStream { + Box::pin(SpillPoolStream::new(pool, spill_manager)) + } + + /// Spills a batch to the pool, rotating files when necessary. + /// + /// If the current file would exceed `max_file_size_bytes` after adding + /// this batch, the file is finalized and a new one is started. + /// + /// # Errors + /// + /// Returns an error if disk I/O fails or disk quota is exceeded. + pub fn push_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + // Skip empty batches + return Ok(()); + } + + let batch_size = batch.get_array_memory_size(); + + // Check if we need to rotate to a new file + let needs_rotation = if self.current_write_file.is_some() { + // Rotate if adding this batch would exceed the max file size + self.current_write_size + batch_size > self.max_file_size_bytes + } else { + // No current file, need to create one + true + }; + + if needs_rotation && self.current_write_file.is_some() { + // Finish current file and add to queue + self.finish_current_file()?; + } + + // Create new file if needed + if self.current_write_file.is_none() { + self.current_write_file = + Some(self.spill_manager.create_in_progress_file("SpillPool")?); + self.current_write_size = 0; + self.current_batch_count = 0; + } + + // Append batch to current file + if let Some(ref mut file) = self.current_write_file { + file.append_batch(batch)?; Review Comment: A potential follow-up to do: https://github.com/apache/datafusion/issues/18261 ########## datafusion/physical-plan/src/spill/spill_pool.rs: ########## @@ -0,0 +1,1099 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SpillPool: A reusable abstraction for managing spill files with FIFO semantics. +//! +//! # Overview +//! +//! The `SpillPool` provides a centralized mechanism for spilling record batches to disk +//! when memory is constrained. It manages a collection of spill files, each containing +//! multiple batches, with configurable maximum file sizes. +//! +//! # Design +//! +//! - **FIFO (Queue) semantics**: Batches are read in the order they were spilled +//! - **File handle reuse**: Multiple batches are written to the same file to minimize syscalls +//! - **Automatic file rotation**: When a file exceeds `max_file_size_bytes`, rotate to a new file +//! - **Sequential reading**: Uses IPC Stream format's natural sequential access pattern +//! - **Automatic cleanup**: Files are deleted once fully consumed +//! +//! # Usage Example +//! +//! ```ignore +//! use std::sync::Arc; +//! use parking_lot::Mutex; +//! +//! let pool = SpillPool::new( +//! 100 * 1024 * 1024, // 100MB max per file +//! spill_manager, +//! schema, +//! ); +//! let pool = Arc::new(Mutex::new(pool)); +//! +//! // Spill batches - automatically rotates files when size limit reached +//! { +//! let mut pool = pool.lock(); +//! pool.push_batch(batch1)?; +//! pool.push_batch(batch2)?; +//! pool.flush()?; // Finalize current file +//! pool.finalize(); // Signal no more writes +//! } +//! +//! // Read back in FIFO order using a stream +//! let mut stream = SpillPool::reader(pool, spill_manager); +//! let batch1 = stream.next().await.unwrap()?; // Returns batch1 +//! let batch2 = stream.next().await.unwrap()?; // Returns batch2 +//! // stream.next() returns None after finalize +//! ``` + +use std::collections::VecDeque; +use std::sync::Arc; +use std::task::Waker; + +use parking_lot::Mutex; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::Result; +use datafusion_execution::disk_manager::RefCountedTempFile; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; + +use super::in_progress_spill_file::InProgressSpillFile; +use super::spill_manager::SpillManager; + +/// A pool of spill files that manages batch-level spilling with FIFO semantics. +/// +/// Batches are written sequentially to files, with automatic rotation when the +/// configured size limit is reached. Reading is done via an infinite stream +/// that can read concurrently while writes continue. +/// +/// # Thread Safety +/// +/// `SpillPool` is not thread-safe and should be used from a single thread or +/// protected with appropriate synchronization (e.g., `Arc<Mutex<SpillPool>>`). +pub struct SpillPool { + /// Maximum size in bytes before rotating to a new file + max_file_size_bytes: usize, + /// Queue of spill files (front = oldest, back = newest) + files: VecDeque<RefCountedTempFile>, + /// Current file being written to (if any) + current_write_file: Option<InProgressSpillFile>, + /// Size of current write file in bytes (estimated) + current_write_size: usize, + /// Number of batches written to current file + current_batch_count: usize, + /// SpillManager for creating files and tracking metrics + spill_manager: Arc<SpillManager>, + /// Schema for batches (used by SpillPoolStream to implement RecordBatchStream) + schema: SchemaRef, + /// Wakers to notify when new data is available for readers + wakers: Vec<Waker>, + /// Flag indicating no more writes will occur + finalized: bool, +} + +impl SpillPool { + /// Creates a new SpillPool with FIFO semantics. + /// + /// # Arguments + /// + /// * `max_file_size_bytes` - Maximum size per file before rotation (e.g., 100MB) + /// * `spill_manager` - Manager for file creation and metrics + /// * `schema` - Schema for record batches + pub fn new( + max_file_size_bytes: usize, + spill_manager: Arc<SpillManager>, + schema: SchemaRef, + ) -> Self { + Self { + max_file_size_bytes, + files: VecDeque::new(), + current_write_file: None, + current_write_size: 0, + current_batch_count: 0, + spill_manager, + schema, + wakers: Vec::new(), + finalized: false, + } + } + + /// Marks the pool as finalized, indicating no more writes will occur. + /// This allows readers to know when to stop waiting for more data. + pub fn finalize(&mut self) { + self.finalized = true; + self.wake(); // Wake readers to check finalized status + } + + /// Returns true if the pool has been finalized + pub fn is_finalized(&self) -> bool { + self.finalized + } + + /// Creates a stream reader for this pool. + /// + /// The stream automatically handles file rotation and can read concurrently + /// while writes continue to the pool. When the stream catches up to the writer, + /// it will return `Poll::Pending` and wait for more data. + /// + /// # Arguments + /// + /// * `pool` - Shared reference to the SpillPool + /// * `spill_manager` - Manager for creating streams from spill files + /// + /// # Returns + /// + /// A `SpillPoolStream` that returns batches in FIFO order and ends when the pool + /// is finalized and all data has been read + pub fn reader( + pool: Arc<Mutex<Self>>, + spill_manager: Arc<SpillManager>, + ) -> SendableRecordBatchStream { + Box::pin(SpillPoolStream::new(pool, spill_manager)) + } + + /// Spills a batch to the pool, rotating files when necessary. + /// + /// If the current file would exceed `max_file_size_bytes` after adding + /// this batch, the file is finalized and a new one is started. + /// + /// # Errors + /// + /// Returns an error if disk I/O fails or disk quota is exceeded. + pub fn push_batch(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + // Skip empty batches + return Ok(()); + } + + let batch_size = batch.get_array_memory_size(); + + // Check if we need to rotate to a new file + let needs_rotation = if self.current_write_file.is_some() { + // Rotate if adding this batch would exceed the max file size + self.current_write_size + batch_size > self.max_file_size_bytes + } else { + // No current file, need to create one + true + }; + + if needs_rotation && self.current_write_file.is_some() { + // Finish current file and add to queue + self.finish_current_file()?; + } + + // Create new file if needed + if self.current_write_file.is_none() { + self.current_write_file = + Some(self.spill_manager.create_in_progress_file("SpillPool")?); + self.current_write_size = 0; + self.current_batch_count = 0; + } + + // Append batch to current file + if let Some(ref mut file) = self.current_write_file { + file.append_batch(batch)?; + } + + self.current_write_size += batch_size; + self.current_batch_count += 1; + + // Wake any waiting readers + self.wake(); + + Ok(()) + } + + /// Registers a waker to be notified when new data is available + fn register_waker(&mut self, waker: Waker) { + // Only register if not already present (avoid duplicates) + if !self.wakers.iter().any(|w| w.will_wake(&waker)) { + self.wakers.push(waker); + } + } + + /// Wakes all registered readers + fn wake(&mut self) { + for waker in self.wakers.drain(..) { + waker.wake(); + } + } + + /// Finalizes the current write file and adds it to the files queue. + /// + /// This is called automatically when files reach the size limit, but can + /// also be called explicitly to ensure all pending data is available for reading. + pub fn flush(&mut self) -> Result<()> { + if self.current_write_file.is_some() { + self.finish_current_file()?; + } + Ok(()) + } + + // Private helper methods + + /// Finishes the current write file and moves it to the files queue. + fn finish_current_file(&mut self) -> Result<()> { + if let Some(mut file) = self.current_write_file.take() { + // Finish writing to get the final file + let finished_file = file.finish()?; + + if let Some(temp_file) = finished_file { + // Add to queue + self.files.push_back(temp_file); + } + + // Reset write state + self.current_write_size = 0; + self.current_batch_count = 0; + + // Wake any waiting readers since a new complete file is available + self.wake(); + } + + Ok(()) + } +} + +impl Drop for SpillPool { + fn drop(&mut self) { + // Flush any pending writes to ensure metrics are accurate + // We ignore errors here since Drop doesn't allow returning errors + let _ = self.flush(); + } +} + +/// A stream that reads from a SpillPool in FIFO order. +/// +/// The stream automatically handles file rotation and reads from completed files. +/// When no completed files are available, it returns `Poll::Pending` and waits +/// for the writer to complete more files. +/// +/// The stream ends (`Poll::Ready(None)`) when the pool is finalized and all data has been read. +struct SpillPoolStream { + /// Shared reference to the spill pool + spill_pool: Arc<Mutex<SpillPool>>, + /// SpillManager for creating streams from spill files + spill_manager: Arc<SpillManager>, Review Comment: Is it possible to use the `spill_manager` inside `spill_pool`, and eliminate this field? ########## datafusion/common/src/config.rs: ########## @@ -517,6 +517,20 @@ config_namespace! { /// batches and merged. pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 + /// Maximum size in bytes for individual spill files before rotating to a new file. + /// + /// When operators spill data to disk (e.g., RepartitionExec, SortExec), they write + /// multiple batches to the same file until this size limit is reached, then rotate + /// to a new file. This reduces syscall overhead compared to one-file-per-batch + /// while preventing files from growing too large. + /// + /// A larger value reduces file creation overhead but may hold more disk space. + /// A smaller value creates more files but allows finer-grained space reclamation + /// (especially in LIFO mode where files are truncated after reading). + /// + /// Default: 100 MB Review Comment: Maybe 128MB default can satisfy folks with alignment OCD (half joking) ########## datafusion/common/src/config.rs: ########## @@ -517,6 +517,20 @@ config_namespace! { /// batches and merged. pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024 + /// Maximum size in bytes for individual spill files before rotating to a new file. + /// + /// When operators spill data to disk (e.g., RepartitionExec, SortExec), they write + /// multiple batches to the same file until this size limit is reached, then rotate + /// to a new file. This reduces syscall overhead compared to one-file-per-batch + /// while preventing files from growing too large. + /// + /// A larger value reduces file creation overhead but may hold more disk space. + /// A smaller value creates more files but allows finer-grained space reclamation + /// (especially in LIFO mode where files are truncated after reading). Review Comment: Now we're reclaiming disk space in the 'chunked file' granularity, perhaps this truncating way don't have to be mentioned, since it don't have a real usage yet. ########## datafusion/physical-plan/src/repartition/mod.rs: ########## @@ -1331,58 +1393,79 @@ impl Stream for PerPartitionStream { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { + use futures::StreamExt; + loop { - match &mut self.state { - RepartitionStreamState::ReceivingFromChannel => { - let value = futures::ready!(self.receiver.recv().poll_unpin(cx)); - match value { - Some(Some(v)) => match v { - Ok(RepartitionBatch::Memory(batch)) => { - // Release memory and return - self.reservation - .lock() - .shrink(batch.get_array_memory_size()); - return Poll::Ready(Some(Ok(batch))); - } - Ok(RepartitionBatch::Spilled { spill_file, size }) => { - // Read from disk - SpillReaderStream uses tokio::fs internally - // Pass the original size for validation - let stream = self - .spill_manager - .read_spill_as_stream(spill_file, Some(size))?; - self.state = - RepartitionStreamState::ReadingSpilledBatch(stream); - // Continue loop to poll the stream immediately - } - Err(e) => { - return Poll::Ready(Some(Err(e))); - } - }, - Some(None) => { - // Input partition has finished sending batches - return Poll::Ready(None); - } - None => return Poll::Ready(None), + // First, check if there's a spilled batch available + match self.spill_stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + // Got a spilled batch + return Poll::Ready(Some(Ok(batch))); + } + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(e))); + } + Poll::Ready(None) => { + // Spill stream ended - all spilled data has been read + return Poll::Ready(None); + } + Poll::Pending => { + // No spilled data available + if self.input_finished { + // Input finished and no more spilled data - we're done + return Poll::Ready(None); } + // Otherwise check the channel Review Comment: `PerPartitionStream` is for the order-preserving case of `RepartitionExec`, it seems a bit tricky to get the order correct, I recommend to find the existing tests for order-preserving repartition, and include spilling to it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
