Dandandan commented on code in PR #7597: URL: https://github.com/apache/arrow-rs/pull/7597#discussion_r2125919969
########## arrow-select/src/coalesce.rs: ########## @@ -0,0 +1,596 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`BatchCoalescer`] concatenates multiple [`RecordBatch`]es after +//! operations such as [`filter`] and [`take`]. +//! +//! [`filter`]: crate::filter::filter +//! [`take`]: crate::take::take +use crate::concat::concat_batches; +use arrow_array::{ + builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions, +}; +use arrow_schema::{ArrowError, SchemaRef}; +use std::collections::VecDeque; +use std::sync::Arc; + +// Originally From DataFusion's coalesce module: +// https://github.com/apache/datafusion/blob/9d2f04996604e709ee440b65f41e7b882f50b788/datafusion/physical-plan/src/coalesce/mod.rs#L26-L25 + +/// Concatenate multiple [`RecordBatch`]es +/// +/// Implements the common pattern of incrementally creating output +/// [`RecordBatch`]es of a specific size from an input stream of +/// [`RecordBatch`]es. +/// +/// This is useful after operations such as [`filter`] and [`take`] that produce +/// smaller batches, and we want to coalesce them into larger +/// +/// [`filter`]: crate::filter::filter +/// [`take`]: crate::take::take +/// +/// See: <https://github.com/apache/arrow-rs/issues/6692> +/// +/// # Example +/// ``` +/// use arrow_array::record_batch; +/// use arrow_select::coalesce::{BatchCoalescer}; +/// let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap(); +/// let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap(); +/// +/// // Create a `BatchCoalescer` that will produce batches with at least 4 rows +/// let target_batch_size = 4; +/// let mut coalescer = BatchCoalescer::new(batch1.schema(), 4); +/// +/// // push the batches +/// coalescer.push_batch(batch1).unwrap(); +/// // only pushed 3 rows (not yet 4, enough to produce a batch) +/// assert!(coalescer.next_batch().is_none()); +/// coalescer.push_batch(batch2).unwrap(); +/// // now we have 5 rows, so we can produce a batch +/// let finished = coalescer.next_batch().unwrap(); +/// // 4 rows came out (target batch size is 4) +/// let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap(); +/// assert_eq!(finished, expected); +/// +/// // Have no more input, but still have an in-progress batch +/// assert!(coalescer.next_batch().is_none()); +/// // We can finish the batch, which will produce the remaining rows +/// coalescer.finish_batch().unwrap(); +/// let expected = record_batch!(("a", Int32, [5])).unwrap(); +/// assert_eq!(coalescer.next_batch().unwrap(), expected); +/// +/// // The coalescer is now empty +/// assert!(coalescer.next_batch().is_none()); +/// ``` +/// +/// # Background +/// +/// Generally speaking, larger [`RecordBatch`]es are more efficient to process +/// than smaller [`RecordBatch`]es (until the CPU cache is exceeded) because +/// there is fixed processing overhead per batch. This coalescer builds up these +/// larger batches incrementally. +/// +/// ```text +/// ┌────────────────────┐ +/// │ RecordBatch │ +/// │ num_rows = 100 │ +/// └────────────────────┘ ┌────────────────────┐ +/// │ │ +/// ┌────────────────────┐ Coalesce │ │ +/// │ │ Batches │ │ +/// │ RecordBatch │ │ │ +/// │ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │ +/// │ │ │ RecordBatch │ +/// │ │ │ num_rows = 400 │ +/// └────────────────────┘ │ │ +/// │ │ +/// ┌────────────────────┐ │ │ +/// │ │ │ │ +/// │ RecordBatch │ │ │ +/// │ num_rows = 100 │ └────────────────────┘ +/// │ │ +/// └────────────────────┘ +/// ``` +/// +/// # Notes: +/// +/// 1. Output rows are produced in the same order as the input rows +/// +/// 2. The output is a sequence of batches, with all but the last being at exactly +/// `target_batch_size` rows. +/// +/// 3. Eventually this may also be able to handle other optimizations such as a +/// combined filter/coalesce operation. See <https://github.com/apache/arrow-rs/issues/6692> +/// +#[derive(Debug)] +pub struct BatchCoalescer { + /// The input schema + schema: SchemaRef, + /// output batch size + batch_size: usize, + /// In-progress buffered batches + buffer: Vec<RecordBatch>, + /// Buffered row count. Always less than `batch_size` + buffered_rows: usize, + /// Completed batches + completed: VecDeque<RecordBatch>, +} + +impl BatchCoalescer { + /// Create a new `BatchCoalescer` + /// + /// # Arguments + /// - `schema` - the schema of the output batches + /// - `batch_size` - the number of rows in each output batch. + /// Typical values are `4096` or `8192` rows. + /// + pub fn new(schema: SchemaRef, batch_size: usize) -> Self { + Self { + schema, + batch_size, + buffer: vec![], + // We will for sure store at least one completed batch + completed: VecDeque::with_capacity(1), + buffered_rows: 0, + } + } + + /// Return the schema of the output batches + pub fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + /// Push next batch into the Coalescer + /// + /// See [`Self::next_batch()`] to retrieve any completed batches. + pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError> { + if batch.num_rows() == 0 { + // If the batch is empty, we don't need to do anything + return Ok(()); + } + + let mut batch = gc_string_view_batch(&batch); + + // If pushing this batch would exceed the target batch size, + // finish the current batch and start a new one + while batch.num_rows() > (self.batch_size - self.buffered_rows) { + let remaining_rows = self.batch_size - self.buffered_rows; + debug_assert!(remaining_rows > 0); + let head_batch = batch.slice(0, remaining_rows); + batch = batch.slice(remaining_rows, batch.num_rows() - remaining_rows); + self.buffered_rows += head_batch.num_rows(); + self.buffer.push(head_batch); + self.finish_batch()?; + } + // Add the remaining rows to the buffer + self.buffered_rows += batch.num_rows(); + self.buffer.push(batch); + Ok(()) + } + + /// Concatenates any buffered batches into a single `RecordBatch` and + /// clears any output buffers + /// + /// Normally this is called when the input stream is exhausted, and + /// we want to finalize the last batch of rows. + /// + /// See [`Self::next_batch()`] for the completed batches. + pub fn finish_batch(&mut self) -> Result<(), ArrowError> { + let batch = concat_batches(&self.schema, &self.buffer)?; + self.buffer.clear(); + self.buffered_rows = 0; + self.completed.push_back(batch); + Ok(()) + } + + /// Returns the next completed batch, if any + pub fn next_batch(&mut self) -> Option<RecordBatch> { + self.completed.pop_front() + } +} + +/// Heuristically compact `StringViewArray`s to reduce memory usage, if needed +/// +/// Decides when to consolidate the StringView into a new buffer to reduce +/// memory usage and improve string locality for better performance. +/// +/// This differs from `StringViewArray::gc` because: +/// 1. It may not compact the array depending on a heuristic. +/// 2. It uses a precise block size to reduce the number of buffers to track. +/// +/// # Heuristic +/// +/// If the average size of each view is larger than 32 bytes, we compact the array. +/// +/// `StringViewArray` include pointers to buffer that hold the underlying data. +/// One of the great benefits of `StringViewArray` is that many operations +/// (e.g., `filter`) can be done without copying the underlying data. +/// +/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the +/// `StringViewArray` may only refer to a small portion of the buffer, +/// significantly increasing memory usage. +fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { + let new_columns: Vec<ArrayRef> = batch + .columns() + .iter() + .map(|c| { + // Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long. + let Some(s) = c.as_string_view_opt() else { + return Arc::clone(c); + }; + let ideal_buffer_size: usize = s Review Comment: I think it makes sense to have another fast path here before looking into views: if the data buffer is small compared to view, gc doesn't have much impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org