kosiew commented on code in PR #16734:
URL: https://github.com/apache/datafusion/pull/16734#discussion_r2206751878


##########
datafusion/physical-plan/src/stream.rs:
##########
@@ -522,6 +524,155 @@ impl Stream for ObservedStream {
     }
 }
 
+pin_project! {
+    /// Stream wrapper that splits large [`RecordBatch`]es into smaller 
batches.
+    ///
+    /// This ensures upstream operators receive batches no larger than
+    /// `batch_size`, which can improve parallelism when data sources
+    /// generate very large batches.
+    ///
+    /// # Fields
+    ///
+    /// - `current_batch`: The batch currently being split, if any
+    /// - `offset`: Index of the next row to split from `current_batch`.
+    ///   This tracks our position within the current batch being split.
+    ///
+    /// # Invariants
+    ///
+    /// - `offset` is always ≤ `current_batch.num_rows()` when `current_batch` 
is `Some`
+    /// - When `current_batch` is `None`, `offset` is always 0
+    /// - `batch_size` is always > 0
+pub struct BatchSplitStream {
+        #[pin]
+        input: SendableRecordBatchStream,
+        schema: SchemaRef,
+        batch_size: usize,
+        metrics: SplitMetrics,
+        current_batch: Option<RecordBatch>,
+        offset: usize,
+    }
+}
+
+impl BatchSplitStream {
+    /// Create a new [`BatchSplitStream`]
+    pub fn new(
+        input: SendableRecordBatchStream,
+        batch_size: usize,
+        metrics: SplitMetrics,
+    ) -> Self {
+        let schema = input.schema();
+        Self {
+            input,
+            schema,
+            batch_size,
+            metrics,
+            current_batch: None,
+            offset: 0,
+        }
+    }
+
+    /// Attempt to produce the next sliced batch from the current batch.
+    ///
+    /// Returns `Some(batch)` if a slice was produced, `None` if the current 
batch
+    /// is exhausted and we need to poll upstream for more data.
+    fn next_sliced_batch(&mut self) -> Option<Result<RecordBatch>> {
+        let batch = self.current_batch.take()?;
+
+        // Wrap slicing logic in a panic-safe block

Review Comment:
   Amended



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to