This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a5af6438b8 Improve `concat` performance, and add `append_array` for
some array builder implementations (#7309)
a5af6438b8 is described below
commit a5af6438b88d3724720c2755c8c8261694559963
Author: Raz Luvaton <[email protected]>
AuthorDate: Sun Apr 6 14:38:25 2025 +0300
Improve `concat` performance, and add `append_array` for some array builder
implementations (#7309)
* feat: add `append_buffer` for `NullBufferBuilder`
* feat: add `append_array` for `PrimitiveBuilder`
* feat: add `append_array` for `BooleanBuilder`
* test: add test that the underlying null values are added as is
* wip
* format and lint
* add special implementation for concat primitives and booleans improving
perf by 50%
* add more tests for generic bytes builder
* add special implementation for bytes in concat
* manually concat primitives
* add large array impl
* wip
* remove unsafe API and use primitive builder in concat
* lint and format
* fix concat primitives to use the input array data type
* format
* add back the capacity for binary because dictionary call concat_fallback
* add tests and update comment
* extract benchmark changes to different PR
https://github.com/apache/arrow-rs/pull/7376
---
arrow-array/src/builder/boolean_builder.rs | 61 ++++++-
arrow-array/src/builder/generic_bytes_builder.rs | 219 ++++++++++++++++++++++-
arrow-array/src/builder/primitive_builder.rs | 84 ++++++++-
arrow-select/src/concat.rs | 51 +++++-
4 files changed, 411 insertions(+), 4 deletions(-)
diff --git a/arrow-array/src/builder/boolean_builder.rs
b/arrow-array/src/builder/boolean_builder.rs
index 60ed86ce80..a0bd5745d2 100644
--- a/arrow-array/src/builder/boolean_builder.rs
+++ b/arrow-array/src/builder/boolean_builder.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::builder::{ArrayBuilder, BooleanBufferBuilder};
-use crate::{ArrayRef, BooleanArray};
+use crate::{Array, ArrayRef, BooleanArray};
use arrow_buffer::Buffer;
use arrow_buffer::NullBufferBuilder;
use arrow_data::ArrayData;
@@ -146,6 +146,18 @@ impl BooleanBuilder {
}
}
+ /// Appends array values and null to this builder as is
+ /// (this means that underlying null values are copied as is).
+ #[inline]
+ pub fn append_array(&mut self, array: &BooleanArray) {
+ self.values_builder.append_buffer(array.values());
+ if let Some(null_buffer) = array.nulls() {
+ self.null_buffer_builder.append_buffer(null_buffer);
+ } else {
+ self.null_buffer_builder.append_n_non_nulls(array.len());
+ }
+ }
+
/// Builds the [BooleanArray] and reset this builder.
pub fn finish(&mut self) -> BooleanArray {
let len = self.len();
@@ -232,6 +244,7 @@ impl Extend<Option<bool>> for BooleanBuilder {
mod tests {
use super::*;
use crate::Array;
+ use arrow_buffer::{BooleanBuffer, NullBuffer};
#[test]
fn test_boolean_array_builder() {
@@ -346,4 +359,50 @@ mod tests {
let values = array.iter().map(|x| x.unwrap()).collect::<Vec<_>>();
assert_eq!(&values, &[true, true, true, false, false])
}
+
+ #[test]
+ fn test_append_array() {
+ let input = vec![
+ Some(true),
+ None,
+ Some(true),
+ None,
+ Some(false),
+ None,
+ None,
+ None,
+ Some(false),
+ Some(false),
+ Some(false),
+ Some(true),
+ Some(false),
+ ];
+ let arr1 = BooleanArray::from(input[..5].to_vec());
+ let arr2 = BooleanArray::from(input[5..8].to_vec());
+ let arr3 = BooleanArray::from(input[8..].to_vec());
+
+ let mut builder = BooleanBuilder::new();
+ builder.append_array(&arr1);
+ builder.append_array(&arr2);
+ builder.append_array(&arr3);
+ let actual = builder.finish();
+ let expected = BooleanArray::from(input);
+
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_append_array_add_underlying_null_values() {
+ let array = BooleanArray::new(
+ BooleanBuffer::from(vec![true, false, true, false]),
+ Some(NullBuffer::from(&[true, true, false, false])),
+ );
+
+ let mut builder = BooleanBuilder::new();
+ builder.append_array(&array);
+ let actual = builder.finish();
+
+ assert_eq!(actual, array);
+ assert_eq!(actual.values(), array.values())
+ }
}
diff --git a/arrow-array/src/builder/generic_bytes_builder.rs
b/arrow-array/src/builder/generic_bytes_builder.rs
index e2be96615b..ae82921b0b 100644
--- a/arrow-array/src/builder/generic_bytes_builder.rs
+++ b/arrow-array/src/builder/generic_bytes_builder.rs
@@ -17,7 +17,7 @@
use crate::builder::{ArrayBuilder, BufferBuilder, UInt8BufferBuilder};
use crate::types::{ByteArrayType, GenericBinaryType, GenericStringType};
-use crate::{ArrayRef, GenericByteArray, OffsetSizeTrait};
+use crate::{Array, ArrayRef, GenericByteArray, OffsetSizeTrait};
use arrow_buffer::NullBufferBuilder;
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::ArrayDataBuilder;
@@ -129,6 +129,48 @@ impl<T: ByteArrayType> GenericByteBuilder<T> {
self.offsets_builder.append(self.next_offset());
}
+ /// Appends array values and null to this builder as is
+ /// (this means that underlying null values are copied as is).
+ #[inline]
+ pub fn append_array(&mut self, array: &GenericByteArray<T>) {
+ if array.len() == 0 {
+ return;
+ }
+
+ let offsets = array.offsets();
+
+ // If the offsets are contiguous, we can append them directly avoiding
the need to align
+ // for example, when the first appended array is not sliced (starts at
offset 0)
+ if self.next_offset() == offsets[0] {
+ self.offsets_builder.append_slice(&offsets[1..]);
+ } else {
+ // Shifting all the offsets
+ let shift: T::Offset = self.next_offset() - offsets[0];
+
+ // Creating intermediate offsets instead of pushing each offset is
faster
+ // (even if we make MutableBuffer to avoid updating length on each
push
+ // and reserve the necessary capacity, it's still slower)
+ let mut intermediate = Vec::with_capacity(offsets.len() - 1);
+
+ for &offset in &offsets[1..] {
+ intermediate.push(offset + shift)
+ }
+
+ self.offsets_builder.append_slice(&intermediate);
+ }
+
+ // Append underlying values, starting from the first offset and ending
at the last offset
+ self.value_builder.append_slice(
+
&array.values().as_slice()[offsets[0].as_usize()..offsets[array.len()].as_usize()],
+ );
+
+ if let Some(null_buffer) = array.nulls() {
+ self.null_buffer_builder.append_buffer(null_buffer);
+ } else {
+ self.null_buffer_builder.append_n_non_nulls(array.len());
+ }
+ }
+
/// Builds the [`GenericByteArray`] and reset this builder.
pub fn finish(&mut self) -> GenericByteArray<T> {
let array_type = T::DATA_TYPE;
@@ -358,6 +400,7 @@ mod tests {
use super::*;
use crate::array::Array;
use crate::GenericStringArray;
+ use arrow_buffer::NullBuffer;
use std::fmt::Write as _;
use std::io::Write as _;
@@ -593,4 +636,178 @@ mod tests {
&["foo".as_bytes(), "bar\n".as_bytes(), "fizbuz".as_bytes()]
)
}
+
+ #[test]
+ fn test_append_array_without_nulls() {
+ let input = vec![
+ "hello", "world", "how", "are", "you", "doing", "today", "I",
"am", "doing", "well",
+ "thank", "you", "for", "asking",
+ ];
+ let arr1 = GenericStringArray::<i32>::from(input[..3].to_vec());
+ let arr2 = GenericStringArray::<i32>::from(input[3..7].to_vec());
+ let arr3 = GenericStringArray::<i32>::from(input[7..].to_vec());
+
+ let mut builder = GenericStringBuilder::<i32>::new();
+ builder.append_array(&arr1);
+ builder.append_array(&arr2);
+ builder.append_array(&arr3);
+
+ let actual = builder.finish();
+ let expected = GenericStringArray::<i32>::from(input);
+
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_append_array_with_nulls() {
+ let input = vec![
+ Some("hello"),
+ None,
+ Some("how"),
+ None,
+ None,
+ None,
+ None,
+ Some("I"),
+ Some("am"),
+ Some("doing"),
+ Some("well"),
+ ];
+ let arr1 = GenericStringArray::<i32>::from(input[..3].to_vec());
+ let arr2 = GenericStringArray::<i32>::from(input[3..7].to_vec());
+ let arr3 = GenericStringArray::<i32>::from(input[7..].to_vec());
+
+ let mut builder = GenericStringBuilder::<i32>::new();
+ builder.append_array(&arr1);
+ builder.append_array(&arr2);
+ builder.append_array(&arr3);
+
+ let actual = builder.finish();
+ let expected = GenericStringArray::<i32>::from(input);
+
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_append_empty_array() {
+ let arr = GenericStringArray::<i32>::from(Vec::<&str>::new());
+ let mut builder = GenericStringBuilder::<i32>::new();
+ builder.append_array(&arr);
+ let result = builder.finish();
+ assert_eq!(result.len(), 0);
+ }
+
+ #[test]
+ fn test_append_array_with_offset_not_starting_at_0() {
+ let input = vec![
+ Some("hello"),
+ None,
+ Some("how"),
+ None,
+ None,
+ None,
+ None,
+ Some("I"),
+ Some("am"),
+ Some("doing"),
+ Some("well"),
+ ];
+ let full_array = GenericStringArray::<i32>::from(input);
+ let sliced = full_array.slice(1, 4);
+
+ assert_ne!(sliced.offsets()[0].as_usize(), 0);
+ assert_ne!(sliced.offsets().last(), full_array.offsets().last());
+
+ let mut builder = GenericStringBuilder::<i32>::new();
+ builder.append_array(&sliced);
+ let actual = builder.finish();
+
+ let expected = GenericStringArray::<i32>::from(vec![None, Some("how"),
None, None]);
+
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_append_underlying_null_values_added_as_is() {
+ let input_1_array_with_nulls = {
+ let input = vec![
+ "hello", "world", "how", "are", "you", "doing", "today", "I",
"am",
+ ];
+ let (offsets, buffer, _) =
GenericStringArray::<i32>::from(input).into_parts();
+
+ GenericStringArray::<i32>::new(
+ offsets,
+ buffer,
+ Some(NullBuffer::from(&[
+ true, false, true, false, false, true, true, true, false,
+ ])),
+ )
+ };
+ let input_2_array_with_nulls = {
+ let input = vec!["doing", "well", "thank", "you", "for", "asking"];
+ let (offsets, buffer, _) =
GenericStringArray::<i32>::from(input).into_parts();
+
+ GenericStringArray::<i32>::new(
+ offsets,
+ buffer,
+ Some(NullBuffer::from(&[false, false, true, false, true,
true])),
+ )
+ };
+
+ let mut builder = GenericStringBuilder::<i32>::new();
+ builder.append_array(&input_1_array_with_nulls);
+ builder.append_array(&input_2_array_with_nulls);
+
+ let actual = builder.finish();
+ let expected = GenericStringArray::<i32>::from(vec![
+ Some("hello"),
+ None, // world
+ Some("how"),
+ None, // are
+ None, // you
+ Some("doing"),
+ Some("today"),
+ Some("I"),
+ None, // am
+ None, // doing
+ None, // well
+ Some("thank"),
+ None, // "you",
+ Some("for"),
+ Some("asking"),
+ ]);
+
+ assert_eq!(actual, expected);
+
+ let expected_underlying_buffer = Buffer::from(
+ [
+ "hello", "world", "how", "are", "you", "doing", "today", "I",
"am", "doing",
+ "well", "thank", "you", "for", "asking",
+ ]
+ .join("")
+ .as_bytes(),
+ );
+ assert_eq!(actual.values(), &expected_underlying_buffer);
+ }
+
+ #[test]
+ fn append_array_with_continues_indices() {
+ let input = vec![
+ "hello", "world", "how", "are", "you", "doing", "today", "I",
"am", "doing", "well",
+ "thank", "you", "for", "asking",
+ ];
+ let full_array = GenericStringArray::<i32>::from(input);
+ let slice1 = full_array.slice(0, 3);
+ let slice2 = full_array.slice(3, 4);
+ let slice3 = full_array.slice(7, full_array.len() - 7);
+
+ let mut builder = GenericStringBuilder::<i32>::new();
+ builder.append_array(&slice1);
+ builder.append_array(&slice2);
+ builder.append_array(&slice3);
+
+ let actual = builder.finish();
+
+ assert_eq!(actual, full_array);
+ }
}
diff --git a/arrow-array/src/builder/primitive_builder.rs
b/arrow-array/src/builder/primitive_builder.rs
index 3191fea6e4..41c65fe34e 100644
--- a/arrow-array/src/builder/primitive_builder.rs
+++ b/arrow-array/src/builder/primitive_builder.rs
@@ -17,7 +17,7 @@
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
-use crate::{ArrayRef, PrimitiveArray};
+use crate::{Array, ArrayRef, PrimitiveArray};
use arrow_buffer::NullBufferBuilder;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayData;
@@ -255,6 +255,28 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
self.values_builder.append_slice(values);
}
+ /// Appends array values and null to this builder as is
+ /// (this means that underlying null values are copied as is).
+ ///
+ /// # Panics
+ ///
+ /// Panics if `array` and `self` data types are different
+ #[inline]
+ pub fn append_array(&mut self, array: &PrimitiveArray<T>) {
+ assert_eq!(
+ &self.data_type,
+ array.data_type(),
+ "array data type mismatch"
+ );
+
+ self.values_builder.append_slice(array.values());
+ if let Some(null_buffer) = array.nulls() {
+ self.null_buffer_builder.append_buffer(null_buffer);
+ } else {
+ self.null_buffer_builder.append_n_non_nulls(array.len());
+ }
+ }
+
/// Appends values from a trusted length iterator.
///
/// # Safety
@@ -366,6 +388,7 @@ impl<P: ArrowPrimitiveType> Extend<Option<P::Native>> for
PrimitiveBuilder<P> {
#[cfg(test)]
mod tests {
use super::*;
+ use arrow_buffer::{NullBuffer, ScalarBuffer};
use arrow_schema::TimeUnit;
use crate::array::Array;
@@ -615,4 +638,63 @@ mod tests {
let array = builder.finish();
assert_eq!(array.values(), &[1, 2, 3, 5, 2, 4, 4, 2, 4, 6, 2]);
}
+
+ #[test]
+ fn test_primitive_array_append_array() {
+ let input = vec![
+ Some(1),
+ None,
+ Some(3),
+ None,
+ Some(5),
+ None,
+ None,
+ None,
+ Some(7),
+ Some(9),
+ Some(8),
+ Some(6),
+ Some(4),
+ ];
+ let arr1 = Int32Array::from(input[..5].to_vec());
+ let arr2 = Int32Array::from(input[5..8].to_vec());
+ let arr3 = Int32Array::from(input[8..].to_vec());
+
+ let mut builder = Int32Array::builder(5);
+ builder.append_array(&arr1);
+ builder.append_array(&arr2);
+ builder.append_array(&arr3);
+ let actual = builder.finish();
+ let expected = Int32Array::from(input);
+
+ assert_eq!(actual, expected);
+ }
+
+ #[test]
+ fn test_append_array_add_underlying_null_values() {
+ let array = Int32Array::new(
+ ScalarBuffer::from(vec![2, 3, 4, 5]),
+ Some(NullBuffer::from(&[true, true, false, false])),
+ );
+
+ let mut builder = Int32Array::builder(5);
+ builder.append_array(&array);
+ let actual = builder.finish();
+
+ assert_eq!(actual, array);
+ assert_eq!(actual.values(), array.values())
+ }
+
+ #[test]
+ #[should_panic(expected = "array data type mismatch")]
+ fn test_invalid_with_data_type_in_append_array() {
+ let array = {
+ let mut builder =
Decimal128Builder::new().with_data_type(DataType::Decimal128(1, 2));
+ builder.append_value(1);
+ builder.finish()
+ };
+
+ let mut builder =
Decimal128Builder::new().with_data_type(DataType::Decimal128(2, 3));
+ builder.append_array(&array)
+ }
}
diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs
index 09a151dd16..b489984784 100644
--- a/arrow-select/src/concat.rs
+++ b/arrow-select/src/concat.rs
@@ -31,6 +31,7 @@
//! ```
use crate::dictionary::{merge_dictionary_values,
should_merge_dictionary_values};
+use arrow_array::builder::{BooleanBuilder, GenericByteBuilder,
PrimitiveBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
@@ -193,12 +194,54 @@ fn concat_lists<OffsetSize: OffsetSizeTrait>(
Ok(Arc::new(array))
}
+fn concat_primitives<T: ArrowPrimitiveType>(arrays: &[&dyn Array]) ->
Result<ArrayRef, ArrowError> {
+ let mut builder =
PrimitiveBuilder::<T>::with_capacity(arrays.iter().map(|a| a.len()).sum())
+ .with_data_type(arrays[0].data_type().clone());
+
+ for array in arrays {
+ builder.append_array(array.as_primitive());
+ }
+
+ Ok(Arc::new(builder.finish()))
+}
+
+fn concat_boolean(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
+ let mut builder = BooleanBuilder::with_capacity(arrays.iter().map(|a|
a.len()).sum());
+
+ for array in arrays {
+ builder.append_array(array.as_boolean());
+ }
+
+ Ok(Arc::new(builder.finish()))
+}
+
+fn concat_bytes<T: ByteArrayType>(arrays: &[&dyn Array]) -> Result<ArrayRef,
ArrowError> {
+ let (item_capacity, bytes_capacity) = match binary_capacity::<T>(arrays) {
+ Capacities::Binary(item_capacity, Some(bytes_capacity)) =>
(item_capacity, bytes_capacity),
+ _ => unreachable!(),
+ };
+
+ let mut builder = GenericByteBuilder::<T>::with_capacity(item_capacity,
bytes_capacity);
+
+ for array in arrays {
+ builder.append_array(array.as_bytes::<T>());
+ }
+
+ Ok(Arc::new(builder.finish()))
+}
+
macro_rules! dict_helper {
($t:ty, $arrays:expr) => {
return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
};
}
+macro_rules! primitive_concat {
+ ($t:ty, $arrays:expr) => {
+ return Ok(Arc::new(concat_primitives::<$t>($arrays)?) as _)
+ };
+}
+
fn get_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capacities {
match data_type {
DataType::Utf8 => binary_capacity::<Utf8Type>(arrays),
@@ -254,7 +297,9 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef,
ArrowError> {
return Err(ArrowError::InvalidArgumentError(error_message));
}
- match d {
+ downcast_primitive! {
+ d => (primitive_concat, arrays),
+ DataType::Boolean => concat_boolean(arrays),
DataType::Dictionary(k, _) => {
downcast_integer! {
k.as_ref() => (dict_helper, arrays),
@@ -263,6 +308,10 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef,
ArrowError> {
}
DataType::List(field) => concat_lists::<i32>(arrays, field),
DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
+ DataType::Utf8 => concat_bytes::<Utf8Type>(arrays),
+ DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
+ DataType::Binary => concat_bytes::<BinaryType>(arrays),
+ DataType::LargeBinary => concat_bytes::<LargeBinaryType>(arrays),
_ => {
let capacity = get_capacity(arrays, d);
concat_fallback(arrays, capacity)