zhuqi-lucas commented on code in PR #7597:
URL: https://github.com/apache/arrow-rs/pull/7597#discussion_r2127998724


##########
arrow-select/src/coalesce.rs:
##########
@@ -0,0 +1,629 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`BatchCoalescer`]  concatenates multiple [`RecordBatch`]es after
+//! operations such as [`filter`] and [`take`].
+//!
+//! [`filter`]: crate::filter::filter
+//! [`take`]: crate::take::take
+use crate::concat::concat_batches;
+use arrow_array::{
+    builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, 
RecordBatchOptions,
+};
+use arrow_schema::{ArrowError, SchemaRef};
+use std::collections::VecDeque;
+use std::sync::Arc;
+
+// Originally From DataFusion's coalesce module:
+// 
https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25
+
+/// Concatenate multiple [`RecordBatch`]es
+///
+/// Implements the common pattern of incrementally creating output
+/// [`RecordBatch`]es of a specific size from an input stream of
+/// [`RecordBatch`]es.
+///
+/// This is useful after operations such as [`filter`] and [`take`] that 
produce
+/// smaller batches, and we want to coalesce them into larger
+///
+/// [`filter`]: crate::filter::filter
+/// [`take`]: crate::take::take
+///
+/// See: <https://github.com/apache/arrow-rs/issues/6692>
+///
+/// # Example
+/// ```
+/// use arrow_array::record_batch;
+/// use arrow_select::coalesce::{BatchCoalescer};
+/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
+///
+/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows
+/// let target_batch_size = 4;
+/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
+///
+/// // push the batches
+/// coalescer.push_batch(batch1).unwrap();
+/// // only pushed 3 rows (not yet 4, enough to produce a batch)
+/// assert!(coalescer.next_completed_batch().is_none());
+/// coalescer.push_batch(batch2).unwrap();
+/// // now we have 5 rows, so we can produce a batch
+/// let finished = coalescer.next_completed_batch().unwrap();
+/// // 4 rows came out (target batch size is 4)
+/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
+/// assert_eq!(finished, expected);
+///
+/// // Have no more input, but still have an in-progress batch
+/// assert!(coalescer.next_completed_batch().is_none());
+/// // We can finish the batch, which will produce the remaining rows
+/// coalescer.finish_buffered_batch().unwrap();
+/// let expected = record_batch!(("a", Int32, [5])).unwrap();
+/// assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
+///
+/// // The coalescer is now empty
+/// assert!(coalescer.next_completed_batch().is_none());
+/// ```
+///
+/// # Background
+///
+/// Generally speaking, larger [`RecordBatch`]es are more efficient to process
+/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because
+/// there is fixed processing overhead per batch. This coalescer builds up 
these
+/// larger batches incrementally.
+///
+/// ```text
+/// ┌────────────────────┐
+/// │    RecordBatch     │
+/// │   num_rows = 100   │
+/// └────────────────────┘                 ┌────────────────────┐
+///                                        │                    │
+/// ┌────────────────────┐     Coalesce    │                    │
+/// │                    │      Batches    │                    │
+/// │    RecordBatch     │                 │                    │
+/// │   num_rows = 200   │  ─ ─ ─ ─ ─ ─ ▶  │                    │
+/// │                    │                 │    RecordBatch     │
+/// │                    │                 │   num_rows = 400   │
+/// └────────────────────┘                 │                    │
+///                                        │                    │
+/// ┌────────────────────┐                 │                    │
+/// │                    │                 │                    │
+/// │    RecordBatch     │                 │                    │
+/// │   num_rows = 100   │                 └────────────────────┘
+/// │                    │
+/// └────────────────────┘
+/// ```
+///
+/// # Notes:
+///
+/// 1. Output rows are produced in the same order as the input rows
+///
+/// 2. The output is a sequence of batches, with all but the last being at 
exactly
+///    `target_batch_size` rows.
+///
+/// 3. Eventually this may also be able to handle other optimizations such as a
+///    combined filter/coalesce operation. See 
<https://github.com/apache/arrow-rs/issues/6692>
+///
+#[derive(Debug)]
+pub struct BatchCoalescer {
+    /// The input schema
+    schema: SchemaRef,
+    /// output batch size
+    batch_size: usize,
+    /// In-progress buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count. Always less than `batch_size`
+    buffered_rows: usize,
+    /// Completed batches
+    completed: VecDeque<RecordBatch>,
+}
+
+impl BatchCoalescer {
+    /// Create a new `BatchCoalescer`
+    ///
+    /// # Arguments
+    /// - `schema` - the schema of the output batches
+    /// - `batch_size` - the number of rows in each output batch.
+    ///   Typical values are `4096` or `8192` rows.
+    ///
+    pub fn new(schema: SchemaRef, batch_size: usize) -> Self {
+        Self {
+            schema,
+            batch_size,
+            buffer: vec![],
+            // We will for sure store at least one completed batch
+            completed: VecDeque::with_capacity(1),
+            buffered_rows: 0,
+        }
+    }
+
+    /// Return the schema of the output batches
+    pub fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.schema)
+    }
+
+    /// Push next batch into the Coalescer
+    ///
+    /// See [`Self::next_completed_batch()`] to retrieve any completed batches.
+    pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> 
{
+        if batch.num_rows() == 0 {
+            // If the batch is empty, we don't need to do anything
+            return Ok(());
+        }
+
+        let mut batch = gc_string_view_batch(&batch);
+
+        // If pushing this batch would exceed the target batch size,
+        // finish the current batch and start a new one
+        while batch.num_rows() > (self.batch_size - self.buffered_rows) {
+            let remaining_rows = self.batch_size - self.buffered_rows;
+            debug_assert!(remaining_rows > 0);
+            let head_batch = batch.slice(0, remaining_rows);
+            batch = batch.slice(remaining_rows, batch.num_rows() - 
remaining_rows);
+            self.buffered_rows += head_batch.num_rows();
+            self.buffer.push(head_batch);
+            self.finish_buffered_batch()?;
+        }
+        // Add the remaining rows to the buffer
+        self.buffered_rows += batch.num_rows();
+        self.buffer.push(batch);
+
+        // If we have reached the target batch size, finalize the buffered 
batch
+        if self.buffered_rows >= self.batch_size {
+            self.finish_buffered_batch()?;
+        }
+        Ok(())
+    }
+
+    /// Concatenates any buffered batches into a single `RecordBatch` and
+    /// clears any output buffers
+    ///
+    /// Normally this is called when the input stream is exhausted, and
+    /// we want to finalize the last batch of rows.
+    ///
+    /// See [`Self::next_completed_batch()`] for the completed batches.
+    pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError> {
+        if self.buffer.is_empty() {
+            return Ok(());
+        }
+        let batch = concat_batches(&self.schema, &self.buffer)?;
+        self.buffer.clear();
+        self.buffered_rows = 0;
+        self.completed.push_back(batch);
+        Ok(())
+    }
+
+    /// Returns true if there is any buffered data
+    pub fn is_empty(&self) -> bool {
+        self.buffer.is_empty() && self.completed.is_empty()
+    }
+
+    /// Returns true if there are any completed batches
+    pub fn has_completed_batch(&self) -> bool {
+        !self.completed.is_empty()
+    }
+
+    /// Returns the next completed batch, if any
+    pub fn next_completed_batch(&mut self) -> Option<RecordBatch> {
+        self.completed.pop_front()
+    }
+}
+
+/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed
+///
+/// Decides when to consolidate the StringView into a new buffer to reduce
+/// memory usage and improve string locality for better performance.
+///
+/// This differs from `StringViewArray::gc` because:
+/// 1. It may not compact the array depending on a heuristic.
+/// 2. It uses a precise block size to reduce the number of buffers to track.
+///
+/// # Heuristic
+///
+/// If the average size of each view is larger than 32 bytes, we compact the 
array.

Review Comment:
   Yeah @Dandandan , it seems no 32 bytes in code implement now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to