adriangb commented on code in PR #18207:
URL: https://github.com/apache/datafusion/pull/18207#discussion_r2498719132


##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -0,0 +1,1264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Spill pool for managing spill files with FIFO semantics.
+
+use futures::{Stream, StreamExt};
+use std::collections::VecDeque;
+use std::sync::Arc;
+use std::task::Waker;
+
+use parking_lot::Mutex;
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+
+use super::in_progress_spill_file::InProgressSpillFile;
+use super::spill_manager::SpillManager;
+
+/// Shared state between the writer and readers of a spill pool.
+/// This contains the queue of files and coordination state.
+struct SpillPoolShared {
+    /// Queue of ALL files (including the current write file if it exists).
+    /// Readers always read from the front of this queue (FIFO).
+    files: VecDeque<Arc<Mutex<ActiveSpillFileShared>>>,
+    /// SpillManager for creating files and tracking metrics
+    spill_manager: Arc<SpillManager>,
+    /// Pool-level wakers to notify when new files are available
+    wakers: Vec<Waker>,
+}
+
+impl SpillPoolShared {
+    /// Creates a new shared pool state
+    fn new(spill_manager: Arc<SpillManager>) -> Self {
+        Self {
+            files: VecDeque::new(),
+            spill_manager,
+            wakers: Vec::new(),
+        }
+    }
+
+    /// Registers a waker to be notified when new data is available 
(pool-level)
+    fn register_waker(&mut self, waker: Waker) {
+        // Only register if not already present (avoid duplicates)
+        if !self.wakers.iter().any(|w| w.will_wake(&waker)) {
+            self.wakers.push(waker);
+        }
+    }
+
+    /// Wakes all pool-level readers
+    fn wake(&mut self) {
+        for waker in self.wakers.drain(..) {
+            waker.wake();
+        }
+    }
+}
+
+/// Writer for a spill pool. Provides exclusive write access with FIFO 
semantics.
+///
+/// Created by [`channel`]. See that function for architecture diagrams and 
usage examples.
+///
+/// The writer automatically manages file rotation based on the 
`max_file_size_bytes`
+/// configured in [`channel`]. When dropped, it finalizes the current file so 
readers
+/// can access all written data.
+pub struct SpillPoolWriter {
+    /// Maximum size in bytes before rotating to a new file
+    max_file_size_bytes: usize,
+    /// Writer's reference to the current file (also in the shared files queue)
+    current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
+    /// Shared state with readers
+    shared: Arc<Mutex<SpillPoolShared>>,
+}
+
+impl SpillPoolWriter {
+    /// Spills a batch to the pool, rotating files when necessary.
+    ///
+    /// If the current file would exceed `max_file_size_bytes` after adding
+    /// this batch, the file is finalized and a new one is started.
+    ///
+    /// See [`channel`] for overall architecture and examples.
+    ///
+    /// # File Rotation Logic
+    ///
+    /// ```text
+    /// push_batch()
+    ///      │
+    ///      ▼
+    /// Current file exists?
+    ///      │
+    ///      ├─ No ──▶ Create new file ──▶ Add to shared queue
+    ///      │                               Wake readers
+    ///      ▼
+    /// Write batch to current file
+    ///      │
+    ///      ▼
+    /// estimated_size > max_file_size_bytes?
+    ///      │
+    ///      ├─ No ──▶ Keep current file for next batch
+    ///      │
+    ///      ▼
+    /// Yes: finish() current file
+    ///      Mark writer_finished = true
+    ///      Wake readers
+    ///      │
+    ///      ▼
+    /// Next push_batch() creates new file
+    /// ```
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if disk I/O fails or disk quota is exceeded.
+    pub fn push_batch(&mut self, batch: &RecordBatch) -> Result<()> {
+        if batch.num_rows() == 0 {
+            // Skip empty batches
+            return Ok(());
+        }
+
+        let batch_size = batch.get_array_memory_size();
+
+        // Create new file if we don't have one yet
+        if self.current_write_file.is_none() {
+            let spill_manager = {
+                let shared = self.shared.lock();
+                Arc::clone(&shared.spill_manager)
+            };
+
+            let writer = spill_manager.create_in_progress_file("SpillPool")?;
+            // Clone the file so readers can access it immediately
+            let file = writer.file().expect("InProgressSpillFile should always 
have a file when it is first created").clone();
+
+            let file_shared = Arc::new(Mutex::new(ActiveSpillFileShared {
+                writer: Some(writer),
+                file: Some(file), // Set immediately so readers can access it
+                batches_written: 0,
+                estimated_size: 0,
+                writer_finished: false,
+                wakers: Vec::new(),
+            }));
+
+            // Push to shared queue and keep reference for writing
+            {
+                let mut shared = self.shared.lock();
+                shared.files.push_back(Arc::clone(&file_shared));
+                shared.wake(); // Wake readers waiting for new files
+            }
+            self.current_write_file = Some(file_shared);
+        }
+
+        let current_write_file = self.current_write_file.take();
+
+        // Write batch to current file
+        if let Some(current_file) = current_write_file {
+            let mut file_shared = current_file.lock();
+
+            // Append the batch
+            if let Some(ref mut writer) = file_shared.writer {
+                writer.append_batch(batch)?;
+                file_shared.batches_written += 1;
+                file_shared.estimated_size += batch_size;
+            }
+
+            // Wake readers waiting on this specific file
+            file_shared.wake_all();
+
+            // Check if we need to rotate
+            let needs_rotation = file_shared.estimated_size > 
self.max_file_size_bytes;
+
+            if needs_rotation {
+                // Finish the IPC writer
+                if let Some(mut writer) = file_shared.writer.take() {
+                    writer.finish()?;
+                }
+                // Mark as finished so readers know not to wait for more data
+                file_shared.writer_finished = true;
+                // Wake readers waiting on this file (it's now finished)
+                file_shared.wake_all();
+            } else {
+                // Release lock
+                drop(file_shared);
+                // Put back the current file for further writing
+                self.current_write_file = Some(current_file);
+            }
+        }
+
+        Ok(())
+    }
+}
+
+impl Drop for SpillPoolWriter {
+    fn drop(&mut self) {
+        // Finalize the current file when writer is dropped
+        if let Some(current_file) = self.current_write_file.take() {
+            let mut file_shared = current_file.lock();
+
+            // Finish the current writer if it exists
+            if let Some(mut writer) = file_shared.writer.take() {
+                // Ignore errors on drop - we're in destructor
+                let _ = writer.finish();
+            }
+
+            // Mark as finished so readers know not to wait for more data
+            file_shared.writer_finished = true;
+
+            // Wake readers waiting on this file (it's now finished)
+            file_shared.wake_all();
+        }
+    }
+}
+
+/// Creates a paired writer and reader for a spill pool with channel-like 
semantics.
+///
+/// This is the recommended way to create a spill pool. The writer has 
exclusive
+/// write access, and the reader can consume batches in FIFO order. The reader
+/// can start reading immediately while the writer continues to write more 
data.
+///
+/// # Architecture
+///
+/// ```text
+/// ┌─────────────────────────────────────────────────────────────────────────┐

Review Comment:
   A mix. Sonnet 4.5 always gets the formatting slightly wrong but I still find 
it easier t have it generate them and review / fix them up than to draw by hand 
in some cases. Sometimes it fails miserably and you have to do it yourself...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to