Dandandan commented on code in PR #7513: URL: https://github.com/apache/arrow-rs/pull/7513#discussion_r2122298413
########## arrow-select/src/incremental_batch_builder.rs: ########## @@ -0,0 +1,355 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`IncrementalRecordBatchBuilder`] for incrementally building RecordBatches from other arrays + +use crate::filter::{FilterBuilder, FilterPredicate, SlicesIterator}; +use crate::incremental_array_builder::{GenericIncrementalArrayBuilder, IncrementalArrayBuilder}; +use arrow_array::builder::{BinaryViewBuilder, StringViewBuilder}; +use arrow_array::{BooleanArray, RecordBatch}; +use arrow_schema::{ArrowError, DataType, SchemaRef}; +use std::borrow::Cow; +use std::collections::VecDeque; + +type ArrayBuilderImpl = Box<dyn IncrementalArrayBuilder>; + +/// Incrementally creates `RecordBatch`es of limited size +/// +/// This structure implements the common pattern of incrementally creating +/// output batches of a specific size from an input stream of arrays. +/// +/// See: <https://github.com/apache/arrow-rs/issues/6692> +/// +/// This is a convenience over [`IncrementalArrayBuilder`] which is used to +/// build arrays for each column in the `RecordBatch`. +/// +/// Which rows are selected from the input arrays are be chosen using one of the +/// following mechanisms: +/// +/// 1. `concat`-enated: all rows from the input array are appended +/// 2. `filter`-ed: the input array is filtered using a `BooleanArray` +/// 3. `take`-n: a subset of the input array is selected based on the indices provided in a `UInt32Array` or similar. +/// +/// This structure handles multiple arrays +pub struct IncrementalRecordBatchBuilder { + /// The schema of the RecordBatches being built + schema: SchemaRef, + /// The maximum size, in rows, of the arrays being built + batch_size: usize, + /// Should we 'optimize' the predicate before applying it? + optimize_predicate: bool, + /// batches that are "finished" (have batch_size rows) + finished: VecDeque<RecordBatch>, + /// The current arrays being built + current: Vec<ArrayBuilderImpl>, +} + +impl std::fmt::Debug for IncrementalRecordBatchBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IncrementalRecordBatchBuilder") + .field("schema", &self.schema) + .field("batch_size", &self.batch_size) + .field("optimize_predicate", &self.optimize_predicate) + .field("finished", &self.finished.len()) + .field("current", &self.current.len()) + .finish() + } +} + +impl IncrementalRecordBatchBuilder { + /// Creates a new builder with the specified batch size and schema + /// + /// There must be at least one column in the schema, and the batch size must be greater than 0. + pub fn try_new(schema: SchemaRef, batch_size: usize) -> Result<Self, ArrowError> { + if schema.fields().is_empty() { + return Err(ArrowError::InvalidArgumentError( + "IncrementalRecordBatchBuilder Schema must have at least one field".to_string(), + )); + } + + if batch_size == 0 { + return Err(ArrowError::InvalidArgumentError( + "IncrementalRecordBatchBuilder batch size must be greater than 0".to_string(), + )); + } + + let current = schema + .fields() + .iter() + .map(|field| instantiate_builder(field.data_type(), batch_size)) + .collect::<Vec<_>>(); + + // Optimize the predicate if we will use it more than once (have more than 1 array) + let optimize_predicate = schema.fields().len() > 1 + || schema + .fields() + .iter() + .any(|f| FilterBuilder::multiple_arrays(f.data_type())); + + Ok(Self { + schema, + batch_size, + optimize_predicate, + finished: VecDeque::new(), + current, + }) + } + + /// Return the current schema of the builder + #[allow(dead_code)] + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Combines all arrays in `current` into a new array in `finished` and returns the + /// number of rows in the array added to `self.finished` + fn finish_current(&mut self) -> Result<usize, ArrowError> { + debug_assert!( + self.current + .iter() + .all(|b| b.is_empty() == self.current[0].is_empty()), + "All builders in current must match is_empty" + ); + + if self.current[0].is_empty() { + // no rows in progress, so nothing to do + return Ok(0); + } + let new_arrays: Vec<_> = self + .current + .iter_mut() + .map(|builder| builder.finish()) + .collect(); + let batch = RecordBatch::try_new(self.schema.clone(), new_arrays)?; + + let num_rows = batch.num_rows(); + self.finished.push_back(batch); + Ok(num_rows) + } + + /// returns the number of rows currently in progress + pub fn num_current_rows(&self) -> usize { + debug_assert!( + self.current + .iter() + .all(|b| b.len() == self.current[0].len()), + "All builders in current must have the same length" + ); + self.current[0].len() + } + + /// Return the next `RecordBatch` if it is ready, or `None` if not + /// + /// This allows the builder to be used in a streaming fashion where rows are + /// added incrementally and produced in batches. + /// + /// Each batch + #[allow(dead_code)] + pub fn next_batch(&mut self) -> Option<RecordBatch> { + // return the last finished batch + self.finished.pop_front() + } + + /// Finalize this builder, returning any remaining batches + pub fn build(mut self) -> Result<VecDeque<RecordBatch>, ArrowError> { + self.finish_current()?; + let Self { finished, .. } = self; + Ok(finished) + } + + /// Appends all rows from the input batch to the current arrays where + /// `filter_array` is `true`. + /// + /// This method optimizes for the case where the filter selects all or no rows + /// and ensures all output arrays in `current` is at most `batch_size` rows long. + pub fn append_filtered( Review Comment: I think for any binary data it makes sense to buffer the `BooleanArray` up until batch_size row limit (or number of array limit if it gets too large) so the final capacity of data buffer can be determined in one go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org