This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b6e57321b4 Use the upstream arrow-rs coalesce kernel (#17193)
b6e57321b4 is described below

commit b6e57321b4531c7fbf5a18e49c6c5e385de5cf8a
Author: Qi Zhu <[email protected]>
AuthorDate: Fri Oct 31 17:47:44 2025 +0800

    Use the upstream arrow-rs coalesce kernel (#17193)
    
    ## Which issue does this PR close?
    
    Use the upstream arrow-rs coalesce kernel, and support
    LimitedBatchCoalesce for datafusion
    
    ## Rationale for this change
    
    Use the upstream arrow-rs coalesce kernel, also it will support
    LimitedBatchCoalesce for datafusion. There are some future work based
    this, for example [Push limit into joins
    ](https://github.com/apache/datafusion/issues/18295) which will optimize
    join.
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
    Co-authored-by: Daniël Heres <[email protected]>
---
 datafusion/physical-plan/src/coalesce/mod.rs     | 444 ++++++-----------------
 datafusion/physical-plan/src/coalesce_batches.rs | 132 ++-----
 2 files changed, 140 insertions(+), 436 deletions(-)

diff --git a/datafusion/physical-plan/src/coalesce/mod.rs 
b/datafusion/physical-plan/src/coalesce/mod.rs
index 5962362d76..8405a660f0 100644
--- a/datafusion/physical-plan/src/coalesce/mod.rs
+++ b/datafusion/physical-plan/src/coalesce/mod.rs
@@ -15,76 +15,38 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{
-    builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch,
-    RecordBatchOptions,
-};
-use arrow::compute::concat_batches;
+use arrow::array::RecordBatch;
+use arrow::compute::BatchCoalescer;
 use arrow::datatypes::SchemaRef;
-use std::sync::Arc;
+use datafusion_common::{internal_err, Result};
 
-/// Concatenate multiple [`RecordBatch`]es
-///
-/// `BatchCoalescer` concatenates multiple small [`RecordBatch`]es, produced by
-/// operations such as `FilterExec` and `RepartitionExec`, into larger ones for
-/// more efficient processing by subsequent operations.
-///
-/// # Background
-///
-/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
-/// than smaller record batches (until the CPU cache is exceeded) because there
-/// is fixed processing overhead per batch. DataFusion tries to operate on
-/// batches of `target_batch_size` rows to amortize this overhead
-///
-/// ```text
-/// ┌────────────────────┐
-/// │    RecordBatch     │
-/// │   num_rows = 23    │
-/// └────────────────────┘                 ┌────────────────────┐
-///                                        │                    │
-/// ┌────────────────────┐     Coalesce    │                    │
-/// │                    │      Batches    │                    │
-/// │    RecordBatch     │                 │                    │
-/// │   num_rows = 50    │  ─ ─ ─ ─ ─ ─ ▶  │                    │
-/// │                    │                 │    RecordBatch     │
-/// │                    │                 │   num_rows = 106   │
-/// └────────────────────┘                 │                    │
-///                                        │                    │
-/// ┌────────────────────┐                 │                    │
-/// │                    │                 │                    │
-/// │    RecordBatch     │                 │                    │
-/// │   num_rows = 33    │                 └────────────────────┘
-/// │                    │
-/// └────────────────────┘
-/// ```
-///
-/// # Notes:
-///
-/// 1. Output rows are produced in the same order as the input rows
-///
-/// 2. The output is a sequence of batches, with all but the last being at 
least
-///    `target_batch_size` rows.
-///
-/// 3. Eventually this may also be able to handle other optimizations such as a
-///    combined filter/coalesce operation.
+/// Concatenate multiple [`RecordBatch`]es and apply a limit
 ///
+/// See [`BatchCoalescer`] for more details on how this works.
 #[derive(Debug)]
-pub struct BatchCoalescer {
-    /// The input schema
-    schema: SchemaRef,
-    /// Minimum number of rows for coalesces batches
-    target_batch_size: usize,
+pub struct LimitedBatchCoalescer {
+    /// The arrow structure that builds the output batches
+    inner: BatchCoalescer,
     /// Total number of rows returned so far
     total_rows: usize,
-    /// Buffered batches
-    buffer: Vec<RecordBatch>,
-    /// Buffered row count
-    buffered_rows: usize,
     /// Limit: maximum number of rows to fetch, `None` means fetch all rows
     fetch: Option<usize>,
+    /// Indicates if the coalescer is finished
+    finished: bool,
+}
+
+/// Status returned by [`LimitedBatchCoalescer::push_batch`]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PushBatchStatus {
+    /// The limit has **not** been reached, and more batches can be pushed
+    Continue,
+    /// The limit **has** been reached after processing this batch
+    /// The caller should call [`LimitedBatchCoalescer::finish`]
+    /// to flush any buffered rows and stop pushing more batches.
+    LimitReached,
 }
 
-impl BatchCoalescer {
+impl LimitedBatchCoalescer {
     /// Create a new `BatchCoalescer`
     ///
     /// # Arguments
@@ -98,197 +60,95 @@ impl BatchCoalescer {
         fetch: Option<usize>,
     ) -> Self {
         Self {
-            schema,
-            target_batch_size,
+            inner: BatchCoalescer::new(schema, target_batch_size)
+                .with_biggest_coalesce_batch_size(Some(target_batch_size / 2)),
             total_rows: 0,
-            buffer: vec![],
-            buffered_rows: 0,
             fetch,
+            finished: false,
         }
     }
 
     /// Return the schema of the output batches
     pub fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
+        self.inner.schema()
     }
 
-    /// Push next batch, and returns [`CoalescerState`] indicating the current
-    /// state of the buffer.
-    pub fn push_batch(&mut self, batch: RecordBatch) -> CoalescerState {
-        let batch = gc_string_view_batch(&batch);
-        if self.limit_reached(&batch) {
-            CoalescerState::LimitReached
-        } else if self.target_reached(batch) {
-            CoalescerState::TargetReached
-        } else {
-            CoalescerState::Continue
+    /// Pushes the next [`RecordBatch`] into the coalescer and returns its 
status.
+    ///
+    /// # Arguments
+    /// * `batch` - The [`RecordBatch`] to append.
+    ///
+    /// # Returns
+    /// * [`PushBatchStatus::Continue`] - More batches can still be pushed.
+    /// * [`PushBatchStatus::LimitReached`] - The row limit was reached after 
processing
+    ///   this batch. The caller should call [`Self::finish`] before 
retrieving the
+    ///   remaining buffered batches.
+    ///
+    /// # Errors
+    /// Returns an error if called after [`Self::finish`] or if the internal 
push
+    /// operation fails.
+    pub fn push_batch(&mut self, batch: RecordBatch) -> 
Result<PushBatchStatus> {
+        if self.finished {
+            return internal_err!(
+                "LimitedBatchCoalescer: cannot push batch after finish"
+            );
         }
-    }
 
-    /// Return true if the there is no data buffered
-    pub fn is_empty(&self) -> bool {
-        self.buffer.is_empty()
-    }
+        // if we are at the limit, return LimitReached
+        if let Some(fetch) = self.fetch {
+            // limit previously reached
+            if self.total_rows >= fetch {
+                return Ok(PushBatchStatus::LimitReached);
+            }
 
-    /// Checks if the buffer will reach the specified limit after getting
-    /// `batch`.
-    ///
-    /// If fetch would be exceeded, slices the received batch, updates the
-    /// buffer with it, and returns `true`.
-    ///
-    /// Otherwise: does nothing and returns `false`.
-    fn limit_reached(&mut self, batch: &RecordBatch) -> bool {
-        match self.fetch {
-            Some(fetch) if self.total_rows + batch.num_rows() >= fetch => {
+            // limit now reached
+            if self.total_rows + batch.num_rows() >= fetch {
                 // Limit is reached
                 let remaining_rows = fetch - self.total_rows;
                 debug_assert!(remaining_rows > 0);
 
-                let batch = batch.slice(0, remaining_rows);
-                self.buffered_rows += batch.num_rows();
-                self.total_rows = fetch;
-                self.buffer.push(batch);
-                true
+                let batch_head = batch.slice(0, remaining_rows);
+                self.total_rows += batch_head.num_rows();
+                self.inner.push_batch(batch_head)?;
+                return Ok(PushBatchStatus::LimitReached);
             }
-            _ => false,
         }
-    }
 
-    /// Updates the buffer with the given batch.
-    ///
-    /// If the target batch size is reached, returns `true`. Otherwise, returns
-    /// `false`.
-    fn target_reached(&mut self, batch: RecordBatch) -> bool {
-        if batch.num_rows() == 0 {
-            false
-        } else {
-            self.total_rows += batch.num_rows();
-            self.buffered_rows += batch.num_rows();
-            self.buffer.push(batch);
-            self.buffered_rows >= self.target_batch_size
-        }
+        // Limit not reached, push the entire batch
+        self.total_rows += batch.num_rows();
+        self.inner.push_batch(batch)?;
+
+        Ok(PushBatchStatus::Continue)
     }
 
-    /// Concatenates and returns all buffered batches, and clears the buffer.
-    pub fn finish_batch(&mut self) -> datafusion_common::Result<RecordBatch> {
-        let batch = concat_batches(&self.schema, &self.buffer)?;
-        self.buffer.clear();
-        self.buffered_rows = 0;
-        Ok(batch)
+    /// Return true if there is no data buffered
+    pub fn is_empty(&self) -> bool {
+        self.inner.is_empty()
     }
-}
 
-/// Indicates the state of the [`BatchCoalescer`] buffer after the
-/// [`BatchCoalescer::push_batch()`] operation.
-///
-/// The caller should take different actions, depending on the variant 
returned.
-pub enum CoalescerState {
-    /// Neither the limit nor the target batch size is reached.
+    /// Complete the current buffered batch and finish the coalescer
     ///
-    /// Action: continue pushing batches.
-    Continue,
-    /// The limit has been reached.
-    ///
-    /// Action: call [`BatchCoalescer::finish_batch()`] to get the final
-    /// buffered results as a batch and finish the query.
-    LimitReached,
-    /// The specified minimum number of rows a batch should have is reached.
-    ///
-    /// Action: call [`BatchCoalescer::finish_batch()`] to get the current
-    /// buffered results as a batch and then continue pushing batches.
-    TargetReached,
-}
-
-/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed
-///
-/// Decides when to consolidate the StringView into a new buffer to reduce
-/// memory usage and improve string locality for better performance.
-///
-/// This differs from `StringViewArray::gc` because:
-/// 1. It may not compact the array depending on a heuristic.
-/// 2. It uses a precise block size to reduce the number of buffers to track.
-///
-/// # Heuristic
-///
-/// If the average size of each view is larger than 32 bytes, we compact the 
array.
-///
-/// `StringViewArray` include pointers to buffer that hold the underlying data.
-/// One of the great benefits of `StringViewArray` is that many operations
-/// (e.g., `filter`) can be done without copying the underlying data.
-///
-/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
-/// `StringViewArray` may only refer to a small portion of the buffer,
-/// significantly increasing memory usage.
-fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
-    let new_columns: Vec<ArrayRef> = batch
-        .columns()
-        .iter()
-        .map(|c| {
-            // Try to re-create the `StringViewArray` to prevent holding the 
underlying buffer too long.
-            let Some(s) = c.as_string_view_opt() else {
-                return Arc::clone(c);
-            };
-
-            // Fast path: if the data buffers are empty, we can return the 
original array
-            if s.data_buffers().is_empty() {
-                return Arc::clone(c);
-            }
-
-            let ideal_buffer_size: usize = s
-                .views()
-                .iter()
-                .map(|v| {
-                    let len = (*v as u32) as usize;
-                    if len > 12 {
-                        len
-                    } else {
-                        0
-                    }
-                })
-                .sum();
-
-            // We don't use get_buffer_memory_size here, because gc is for the 
contents of the
-            // data buffers, not views and nulls.
-            let actual_buffer_size =
-                s.data_buffers().iter().map(|b| b.capacity()).sum::<usize>();
-
-            // Re-creating the array copies data and can be time consuming.
-            // We only do it if the array is sparse
-            if actual_buffer_size > (ideal_buffer_size * 2) {
-                // We set the block size to `ideal_buffer_size` so that the 
new StringViewArray only has one buffer, which accelerate later concat_batches.
-                // See https://github.com/apache/arrow-rs/issues/6094 for more 
details.
-                let mut builder = StringViewBuilder::with_capacity(s.len());
-                if ideal_buffer_size > 0 {
-                    builder = builder.with_fixed_block_size(ideal_buffer_size 
as u32);
-                }
-
-                for v in s.iter() {
-                    builder.append_option(v);
-                }
-
-                let gc_string = builder.finish();
-
-                debug_assert!(gc_string.data_buffers().len() <= 1); // buffer 
count can be 0 if the `ideal_buffer_size` is 0
+    /// Any subsequent calls to `push_batch()` will return an Err
+    pub fn finish(&mut self) -> Result<()> {
+        self.inner.finish_buffered_batch()?;
+        self.finished = true;
+        Ok(())
+    }
 
-                Arc::new(gc_string)
-            } else {
-                Arc::clone(c)
-            }
-        })
-        .collect();
-    let mut options = RecordBatchOptions::new();
-    options = options.with_row_count(Some(batch.num_rows()));
-    RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
-        .expect("Failed to re-create the gc'ed record batch")
+    /// Return the next completed batch, if any
+    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
+        self.inner.next_completed_batch()
+    }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::ops::Range;
-
     use super::*;
+    use std::ops::Range;
+    use std::sync::Arc;
 
-    use arrow::array::{builder::ArrayBuilder, StringViewArray, UInt32Array};
+    use arrow::array::UInt32Array;
+    use arrow::compute::concat_batches;
     use arrow::datatypes::{DataType, Field, Schema};
 
     #[test]
@@ -296,9 +156,9 @@ mod tests {
         let batch = uint32_batch(0..8);
         Test::new()
             .with_batches(std::iter::repeat_n(batch, 10))
-            // expected output is batches of at least 20 rows (except for the 
final batch)
+            // expected output is batches of exactly 21 rows (except for the 
final batch)
             .with_target_batch_size(21)
-            .with_expected_output_sizes(vec![24, 24, 24, 8])
+            .with_expected_output_sizes(vec![21, 21, 21, 17])
             .run()
     }
 
@@ -311,7 +171,7 @@ mod tests {
             // expected to behave the same as `test_concat_batches`
             .with_target_batch_size(21)
             .with_fetch(Some(100))
-            .with_expected_output_sizes(vec![24, 24, 24, 8])
+            .with_expected_output_sizes(vec![21, 21, 21, 17])
             .run();
     }
 
@@ -323,7 +183,7 @@ mod tests {
             // input is 10 batches x 8 rows (80 rows) with fetch limit of 50
             .with_target_batch_size(21)
             .with_fetch(Some(50))
-            .with_expected_output_sizes(vec![24, 24, 2])
+            .with_expected_output_sizes(vec![21, 21, 8])
             .run();
     }
 
@@ -333,7 +193,7 @@ mod tests {
         Test::new()
             .with_batches(std::iter::repeat_n(batch, 10))
             // input is 10 batches x 8 rows (80 rows) with fetch limit of 48
-            .with_target_batch_size(21)
+            .with_target_batch_size(24)
             .with_fetch(Some(48))
             .with_expected_output_sizes(vec![24, 24])
             .run();
@@ -362,7 +222,7 @@ mod tests {
             .run()
     }
 
-    /// Test for [`BatchCoalescer`]
+    /// Test for [`LimitedBatchCoalescer`]
     ///
     /// Pushes the input batches to the coalescer and verifies that the 
resulting
     /// batches have the expected number of rows and contents.
@@ -435,26 +295,32 @@ mod tests {
             let single_input_batch = concat_batches(&schema, 
&input_batches).unwrap();
 
             let mut coalescer =
-                BatchCoalescer::new(Arc::clone(&schema), target_batch_size, 
fetch);
+                LimitedBatchCoalescer::new(Arc::clone(&schema), 
target_batch_size, fetch);
 
             let mut output_batches = vec![];
             for batch in input_batches {
-                match coalescer.push_batch(batch) {
-                    CoalescerState::Continue => {}
-                    CoalescerState::LimitReached => {
-                        output_batches.push(coalescer.finish_batch().unwrap());
-                        break;
+                match coalescer.push_batch(batch).unwrap() {
+                    PushBatchStatus::Continue => {
+                        // continue pushing batches
                     }
-                    CoalescerState::TargetReached => {
-                        coalescer.buffered_rows = 0;
-                        output_batches.push(coalescer.finish_batch().unwrap());
+                    PushBatchStatus::LimitReached => {
+                        break;
                     }
                 }
             }
-            if coalescer.buffered_rows != 0 {
-                output_batches.extend(coalescer.buffer);
+            coalescer.finish().unwrap();
+            while let Some(batch) = coalescer.next_completed_batch() {
+                output_batches.push(batch);
             }
 
+            let actual_output_sizes: Vec<usize> =
+                output_batches.iter().map(|b| b.num_rows()).collect();
+            assert_eq!(
+                expected_output_sizes, actual_output_sizes,
+                "Unexpected number of rows in output batches\n\
+                
Expected\n{expected_output_sizes:#?}\nActual:{actual_output_sizes:#?}"
+            );
+
             // make sure we got the expected number of output batches and 
content
             let mut starting_idx = 0;
             assert_eq!(expected_output_sizes.len(), output_batches.len());
@@ -498,110 +364,6 @@ mod tests {
         .unwrap()
     }
 
-    #[test]
-    fn test_gc_string_view_batch_small_no_compact() {
-        // view with only short strings (no buffers) --> no need to compact
-        let array = StringViewTest {
-            rows: 1000,
-            strings: vec![Some("a"), Some("b"), Some("c")],
-        }
-        .build();
-
-        let gc_array = do_gc(array.clone());
-        compare_string_array_values(&array, &gc_array);
-        assert_eq!(array.data_buffers().len(), 0);
-        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); 
// no compaction
-    }
-
-    #[test]
-    fn test_gc_string_view_test_batch_empty() {
-        let schema = Schema::empty();
-        let batch = RecordBatch::new_empty(schema.into());
-        let output_batch = gc_string_view_batch(&batch);
-        assert_eq!(batch.num_columns(), output_batch.num_columns());
-        assert_eq!(batch.num_rows(), output_batch.num_rows());
-    }
-
-    #[test]
-    fn test_gc_string_view_batch_large_no_compact() {
-        // view with large strings (has buffers) but full --> no need to 
compact
-        let array = StringViewTest {
-            rows: 1000,
-            strings: vec![Some("This string is longer than 12 bytes")],
-        }
-        .build();
-
-        let gc_array = do_gc(array.clone());
-        compare_string_array_values(&array, &gc_array);
-        assert_eq!(array.data_buffers().len(), 5);
-        assert_eq!(array.data_buffers().len(), gc_array.data_buffers().len()); 
// no compaction
-    }
-
-    #[test]
-    fn test_gc_string_view_batch_large_slice_compact() {
-        // view with large strings (has buffers) and only partially used  --> 
no need to compact
-        let array = StringViewTest {
-            rows: 1000,
-            strings: vec![Some("this string is longer than 12 bytes")],
-        }
-        .build();
-
-        // slice only 11 rows, so most of the buffer is not used
-        let array = array.slice(11, 22);
-
-        let gc_array = do_gc(array.clone());
-        compare_string_array_values(&array, &gc_array);
-        assert_eq!(array.data_buffers().len(), 5);
-        assert_eq!(gc_array.data_buffers().len(), 1); // compacted into a 
single buffer
-    }
-
-    /// Compares the values of two string view arrays
-    fn compare_string_array_values(arr1: &StringViewArray, arr2: 
&StringViewArray) {
-        assert_eq!(arr1.len(), arr2.len());
-        for (s1, s2) in arr1.iter().zip(arr2.iter()) {
-            assert_eq!(s1, s2);
-        }
-    }
-
-    /// runs garbage collection on string view array
-    /// and ensures the number of rows are the same
-    fn do_gc(array: StringViewArray) -> StringViewArray {
-        let batch =
-            RecordBatch::try_from_iter(vec![("a", Arc::new(array) as 
ArrayRef)]).unwrap();
-        let gc_batch = gc_string_view_batch(&batch);
-        assert_eq!(batch.num_rows(), gc_batch.num_rows());
-        assert_eq!(batch.schema(), gc_batch.schema());
-        gc_batch
-            .column(0)
-            .as_any()
-            .downcast_ref::<StringViewArray>()
-            .unwrap()
-            .clone()
-    }
-
-    /// Describes parameters for creating a `StringViewArray`
-    struct StringViewTest {
-        /// The number of rows in the array
-        rows: usize,
-        /// The strings to use in the array (repeated over and over
-        strings: Vec<Option<&'static str>>,
-    }
-
-    impl StringViewTest {
-        /// Create a `StringViewArray` with the parameters specified in this 
struct
-        fn build(self) -> StringViewArray {
-            let mut builder =
-                
StringViewBuilder::with_capacity(100).with_fixed_block_size(8192);
-            loop {
-                for &v in self.strings.iter() {
-                    builder.append_option(v);
-                    if builder.len() >= self.rows {
-                        return builder.finish();
-                    }
-                }
-            }
-        }
-    }
     fn batch_to_pretty_strings(batch: &RecordBatch) -> String {
         arrow::util::pretty::pretty_format_batches(std::slice::from_ref(batch))
             .unwrap()
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs 
b/datafusion/physical-plan/src/coalesce_batches.rs
index 397bd9a377..eb3c3b5bef 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -34,7 +34,7 @@ use datafusion_common::Result;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalExpr;
 
-use crate::coalesce::{BatchCoalescer, CoalescerState};
+use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
 use crate::execution_plan::CardinalityEffect;
 use crate::filter_pushdown::{
     ChildPushdownResult, FilterDescription, FilterPushdownPhase,
@@ -53,7 +53,7 @@ use futures::stream::{Stream, StreamExt};
 /// buffering and returns the final batch once the number of collected rows
 /// reaches the `fetch` value.
 ///
-/// See [`BatchCoalescer`] for more information
+/// See [`LimitedBatchCoalescer`] for more information
 #[derive(Debug, Clone)]
 pub struct CoalesceBatchesExec {
     /// The input plan
@@ -182,14 +182,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
     ) -> Result<SendableRecordBatchStream> {
         Ok(Box::pin(CoalesceBatchesStream {
             input: self.input.execute(partition, context)?,
-            coalescer: BatchCoalescer::new(
+            coalescer: LimitedBatchCoalescer::new(
                 self.input.schema(),
                 self.target_batch_size,
                 self.fetch,
             ),
             baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
-            // Start by pulling data
-            inner_state: CoalesceBatchesStreamState::Pull,
+            completed: false,
         }))
     }
 
@@ -249,12 +248,11 @@ struct CoalesceBatchesStream {
     /// The input plan
     input: SendableRecordBatchStream,
     /// Buffer for combining batches
-    coalescer: BatchCoalescer,
+    coalescer: LimitedBatchCoalescer,
     /// Execution metrics
     baseline_metrics: BaselineMetrics,
-    /// The current inner state of the stream. This state dictates the current
-    /// action or operation to be performed in the streaming process.
-    inner_state: CoalesceBatchesStreamState,
+    /// is the input stream exhausted or limit reached?
+    completed: bool,
 }
 
 impl Stream for CoalesceBatchesStream {
@@ -274,50 +272,6 @@ impl Stream for CoalesceBatchesStream {
     }
 }
 
-/// Enumeration of possible states for `CoalesceBatchesStream`.
-/// It represents different stages in the lifecycle of a stream of record 
batches.
-///
-/// An example of state transition:
-/// Notation:
-/// `[3000]`: A batch with size 3000
-/// `{[2000], [3000]}`: `CoalesceBatchStream`'s internal buffer with 2 batches 
buffered
-/// Input of `CoalesceBatchStream` will generate three batches `[2000], 
[3000], [4000]`
-/// The coalescing procedure will go through the following steps with 4096 
coalescing threshold:
-/// 1. Read the first batch and get it buffered.
-/// - initial state: `Pull`
-/// - initial buffer: `{}`
-/// - updated buffer: `{[2000]}`
-/// - next state: `Pull`
-/// 2. Read the second batch, the coalescing target is reached since 2000 + 
3000 > 4096
-/// - initial state: `Pull`
-/// - initial buffer: `{[2000]}`
-/// - updated buffer: `{[2000], [3000]}`
-/// - next state: `ReturnBuffer`
-/// 4. Two batches in the batch get merged and consumed by the upstream 
operator.
-/// - initial state: `ReturnBuffer`
-/// - initial buffer: `{[2000], [3000]}`
-/// - updated buffer: `{}`
-/// - next state: `Pull`
-/// 5. Read the third input batch.
-/// - initial state: `Pull`
-/// - initial buffer: `{}`
-/// - updated buffer: `{[4000]}`
-/// - next state: `Pull`
-/// 5. The input is ended now. Jump to exhaustion state preparing the 
finalized data.
-/// - initial state: `Pull`
-/// - initial buffer: `{[4000]}`
-/// - updated buffer: `{[4000]}`
-/// - next state: `Exhausted`
-#[derive(Debug, Clone, Eq, PartialEq)]
-enum CoalesceBatchesStreamState {
-    /// State to pull a new batch from the input stream.
-    Pull,
-    /// State to return a buffered batch.
-    ReturnBuffer,
-    /// State indicating that the stream is exhausted.
-    Exhausted,
-}
-
 impl CoalesceBatchesStream {
     fn poll_next_inner(
         self: &mut Pin<&mut Self>,
@@ -325,51 +279,39 @@ impl CoalesceBatchesStream {
     ) -> Poll<Option<Result<RecordBatch>>> {
         let cloned_time = self.baseline_metrics.elapsed_compute().clone();
         loop {
-            match &self.inner_state {
-                CoalesceBatchesStreamState::Pull => {
-                    // Attempt to pull the next batch from the input stream.
-                    let input_batch = ready!(self.input.poll_next_unpin(cx));
-                    // Start timing the operation. The timer records time upon 
being dropped.
-                    let _timer = cloned_time.timer();
-
-                    match input_batch {
-                        Some(Ok(batch)) => match 
self.coalescer.push_batch(batch) {
-                            CoalescerState::Continue => {}
-                            CoalescerState::LimitReached => {
-                                self.inner_state = 
CoalesceBatchesStreamState::Exhausted;
-                            }
-                            CoalescerState::TargetReached => {
-                                self.inner_state =
-                                    CoalesceBatchesStreamState::ReturnBuffer;
-                            }
-                        },
-                        None => {
-                            // End of input stream, but buffered batches might 
still be present.
-                            self.inner_state = 
CoalesceBatchesStreamState::Exhausted;
+            // If there is any completed batch ready, return it
+            if let Some(batch) = self.coalescer.next_completed_batch() {
+                return Poll::Ready(Some(Ok(batch)));
+            }
+            if self.completed {
+                // If input is done and no batches are ready, return None to 
signal end of stream.
+                return Poll::Ready(None);
+            }
+            // Attempt to pull the next batch from the input stream.
+            let input_batch = ready!(self.input.poll_next_unpin(cx));
+            // Start timing the operation. The timer records time upon being 
dropped.
+            let _timer = cloned_time.timer();
+
+            match input_batch {
+                None => {
+                    // Input stream is exhausted, finalize any remaining 
batches
+                    self.completed = true;
+                    self.coalescer.finish()?;
+                }
+                Some(Ok(batch)) => {
+                    match self.coalescer.push_batch(batch)? {
+                        PushBatchStatus::Continue => {
+                            // Keep pushing more batches
+                        }
+                        PushBatchStatus::LimitReached => {
+                            // limit was reached, so stop early
+                            self.completed = true;
+                            self.coalescer.finish()?;
                         }
-                        other => return Poll::Ready(other),
                     }
                 }
-                CoalesceBatchesStreamState::ReturnBuffer => {
-                    let _timer = cloned_time.timer();
-                    // Combine buffered batches into one batch and return it.
-                    let batch = self.coalescer.finish_batch()?;
-                    // Set to pull state for the next iteration.
-                    self.inner_state = CoalesceBatchesStreamState::Pull;
-                    return Poll::Ready(Some(Ok(batch)));
-                }
-                CoalesceBatchesStreamState::Exhausted => {
-                    // Handle the end of the input stream.
-                    return if self.coalescer.is_empty() {
-                        // If buffer is empty, return None indicating the 
stream is fully consumed.
-                        Poll::Ready(None)
-                    } else {
-                        let _timer = cloned_time.timer();
-                        // If the buffer still contains batches, prepare to 
return them.
-                        let batch = self.coalescer.finish_batch()?;
-                        Poll::Ready(Some(Ok(batch)))
-                    };
-                }
+                // Error case
+                other => return Poll::Ready(other),
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to