This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4e7a2fa553 Improve `take_bytes` perf in the null cases between 10-25%
(#9625)
4e7a2fa553 is described below
commit 4e7a2fa553e2e5e7385f6c4e77984a354d40c813
Author: Adam Gutglick <[email protected]>
AuthorDate: Wed Jun 3 14:15:20 2026 +0100
Improve `take_bytes` perf in the null cases between 10-25% (#9625)
# Which issue does this PR close?
- Closes #NNN.
# Rationale for this change
Just improves performance, I was profiling some things downstream and
got curious about how it works.
# What changes are included in this PR?
The main idea is to use a two-pass approach:
1. Compute byte offsets and collects (start, end) byte ranges
2. Copy byte data via raw pointer writes (`copy_byte_ranges`)
This PR also reduces the branching from 4 (one for each nullability
combination) to only two.
# Are these changes tested?
Existing tests
# Are there any user-facing changes?
None
---------
Signed-off-by: Adam Gutglick <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
arrow-select/src/take.rs | 241 ++++++++++++++++++++++++++++++++---------------
1 file changed, 164 insertions(+), 77 deletions(-)
diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs
index cbb65ac915..c245247417 100644
--- a/arrow-select/src/take.rs
+++ b/arrow-select/src/take.rs
@@ -495,6 +495,7 @@ fn take_bytes<T: ByteArrayType, IndexType:
ArrowPrimitiveType>(
array: &GenericByteArray<T>,
indices: &PrimitiveArray<IndexType>,
) -> Result<GenericByteArray<T>, ArrowError> {
+ let mut values: Vec<u8> = Vec::new();
let mut offsets = Vec::with_capacity(indices.len() + 1);
offsets.push(T::Offset::default());
@@ -502,92 +503,116 @@ fn take_bytes<T: ByteArrayType, IndexType:
ArrowPrimitiveType>(
let mut capacity = 0;
let nulls = take_nulls(array.nulls(), indices);
- let (offsets, values) = if array.null_count() == 0 && indices.null_count()
== 0 {
- offsets.reserve(indices.len());
- for index in indices.values() {
- let index = index.as_usize();
- capacity += input_offsets[index + 1].as_usize() -
input_offsets[index].as_usize();
- offsets.push(
- T::Offset::from_usize(capacity)
- .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
- );
- }
- let mut values = Vec::with_capacity(capacity);
-
- for index in indices.values() {
- values.extend_from_slice(array.value(index.as_usize()).as_ref());
- }
- (offsets, values)
- } else if indices.null_count() == 0 {
- offsets.reserve(indices.len());
- for index in indices.values() {
- let index = index.as_usize();
- if array.is_valid(index) {
- capacity += input_offsets[index + 1].as_usize() -
input_offsets[index].as_usize();
+ // Branch on output nulls — `None` means every output slot is valid.
+ match nulls.as_ref().filter(|n| n.null_count() > 0) {
+ // Fast path: no nulls in output, every index is valid.
+ None => {
+ for index in indices.values() {
+ let index = index.as_usize();
+ let start = input_offsets[index].as_usize();
+ let end = input_offsets[index + 1].as_usize();
+ capacity += end - start;
+ offsets.push(
+ T::Offset::from_usize(capacity)
+ .ok_or_else(||
ArrowError::OffsetOverflowError(capacity))?,
+ );
}
- offsets.push(
- T::Offset::from_usize(capacity)
- .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
- );
- }
- let mut values = Vec::with_capacity(capacity);
- for index in indices.values() {
- let index = index.as_usize();
- if array.is_valid(index) {
- values.extend_from_slice(array.value(index).as_ref());
- }
- }
- (offsets, values)
- } else if array.null_count() == 0 {
- offsets.reserve(indices.len());
- for (i, index) in indices.values().iter().enumerate() {
- let index = index.as_usize();
- if indices.is_valid(i) {
- capacity += input_offsets[index + 1].as_usize() -
input_offsets[index].as_usize();
+ values.reserve(capacity);
+
+ let dst = values.spare_capacity_mut();
+ debug_assert!(dst.len() >= capacity);
+ let mut offset = 0;
+
+ for index in indices.values() {
+ // SAFETY: in-bounds proven by the first loop's bounds-checked
offset access.
+ // dst asserted above to include the required capacity.
+ unsafe {
+ let data: &[u8] =
array.value_unchecked(index.as_usize()).as_ref();
+ std::ptr::copy_nonoverlapping(
+ data.as_ptr(),
+
dst.get_unchecked_mut(offset..).as_mut_ptr().cast::<u8>(),
+ data.len(),
+ );
+ offset += data.len();
+ }
}
- offsets.push(
- T::Offset::from_usize(capacity)
- .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
- );
- }
- let mut values = Vec::with_capacity(capacity);
- for (i, index) in indices.values().iter().enumerate() {
- if indices.is_valid(i) {
-
values.extend_from_slice(array.value(index.as_usize()).as_ref());
+ // SAFETY: wrote exactly `capacity` bytes above; reserved on line
above.
+ unsafe {
+ values.set_len(capacity);
}
}
- (offsets, values)
- } else {
- let nulls = nulls.as_ref().unwrap();
- offsets.reserve(indices.len());
- for (i, index) in indices.values().iter().enumerate() {
- let index = index.as_usize();
- if nulls.is_valid(i) {
- capacity += input_offsets[index + 1].as_usize() -
input_offsets[index].as_usize();
+ // Nullable path: only process valid (non-null) output positions.
+ Some(output_nulls) => {
+ let mut source_ranges = Vec::with_capacity(indices.len() -
output_nulls.null_count());
+ let mut last_filled = 0;
+
+ // Pre-fill offsets; we overwrite valid positions below.
+ offsets.resize(indices.len() + 1, T::Offset::default());
+
+ // Pass 1: find all valid ranges that need to be copied.
+ for i in output_nulls.valid_indices() {
+ let current_offset = T::Offset::from_usize(capacity)
+ .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+ // Fill offsets for skipped null slots so they get zero-length
ranges.
+ if last_filled < i {
+ offsets[last_filled + 1..=i].fill(current_offset);
+ }
+
+ // SAFETY: `i` comes from a validity bitmap over `indices`, so
it is in-bounds.
+ let index = unsafe { indices.value_unchecked(i) }.as_usize();
+ let start = input_offsets[index].as_usize();
+ let end = input_offsets[index + 1].as_usize();
+ capacity += end - start;
+ offsets[i + 1] = T::Offset::from_usize(capacity)
+ .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+
+ source_ranges.push((start, end));
+ last_filled = i + 1;
}
- offsets.push(
- T::Offset::from_usize(capacity)
- .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
+
+ // Fill trailing null offsets after the last valid position.
+ let final_offset = T::Offset::from_usize(capacity)
+ .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+ offsets[last_filled + 1..].fill(final_offset);
+ // Pass 2: copy byte data for all collected ranges.
+ values.reserve(capacity);
+ debug_assert_eq!(
+ source_ranges.iter().map(|(s, e)| e - s).sum::<usize>(),
+ capacity,
+ "capacity must equal total bytes across all ranges"
);
- }
- let mut values = Vec::with_capacity(capacity);
-
- for (i, index) in indices.values().iter().enumerate() {
- // check index is valid before using index. The value in
- // NULL index slots may not be within bounds of array
- let index = index.as_usize();
- if nulls.is_valid(i) {
- values.extend_from_slice(array.value(index).as_ref());
+
+ let src = array.value_data();
+ let src = src.as_ptr();
+ let dst = values.spare_capacity_mut();
+ debug_assert!(dst.len() >= capacity);
+
+ let mut offset = 0;
+
+ for (start, end) in source_ranges.into_iter() {
+ let value_len = end - start;
+ // SAFETY: caller guarantees each (start, end) is in-bounds of
`src`.
+ // `dst` asserted above to include the required capacity.
+ // The regions don't overlap (src is input, dst is a fresh
allocation).
+ unsafe {
+ std::ptr::copy_nonoverlapping(
+ src.add(start),
+
dst.get_unchecked_mut(offset..).as_mut_ptr().cast::<u8>(),
+ value_len,
+ );
+ offset += value_len;
+ }
}
+ // SAFETY: caller guarantees `capacity` == total bytes across all
ranges,
+ // so the loop above wrote exactly `capacity` bytes.
+ unsafe { values.set_len(capacity) };
}
- (offsets, values)
};
- T::Offset::from_usize(values.len())
- .ok_or_else(|| ArrowError::OffsetOverflowError(values.len()))?;
-
+ // SAFETY: offsets are monotonically increasing and in-bounds of `values`,
+ // and `nulls` (if present) has length == `indices.len()`.
let array = unsafe {
let offsets = OffsetBuffer::new_unchecked(offsets.into());
GenericByteArray::<T>::new_unchecked(offsets, values.into(), nulls)
@@ -1728,6 +1753,41 @@ mod tests {
assert_eq!(result.as_ref(), &expected);
}
+ /// Take from a *sliced* byte array, i.e. one whose value offsets do not
+ /// start at zero. This exercises copying byte data out of an array with a
+ /// non-zero base offset for both the no-null fast path and the nullable
+ /// path (null indices and selected null values).
+ #[test]
+ fn test_take_bytes_sliced_values() {
+ let values = StringArray::from(vec![
+ Some("aaa"),
+ Some("bbb"),
+ None,
+ Some("ccccc"),
+ Some("dd"),
+ None,
+ Some("eeee"),
+ ]);
+ // Slice so the underlying value offsets no longer start at 0:
+ // sliced == [None, "ccccc", "dd", None, "eeee"]
+ let sliced = values.slice(2, 5);
+
+ // Fast path: every output slot is valid (no null indices, no null
+ // values selected).
+ let indices = Int32Array::from(vec![1, 2, 4, 1]);
+ let result = take(&sliced, &indices, None).unwrap();
+ let expected =
+ StringArray::from(vec![Some("ccccc"), Some("dd"), Some("eeee"),
Some("ccccc")]);
+ assert_eq!(result.as_string::<i32>(), &expected);
+
+ // Nullable path: a null index (position 1) and selected null values
+ // (sliced indices 0 and 3 are null).
+ let indices = Int32Array::from(vec![Some(1), None, Some(0), Some(4),
Some(3)]);
+ let result = take(&sliced, &indices, None).unwrap();
+ let expected = StringArray::from(vec![Some("ccccc"), None, None,
Some("eeee"), None]);
+ assert_eq!(result.as_string::<i32>(), &expected);
+ }
+
fn _test_byte_view<T>()
where
T: ByteViewType,
@@ -2808,11 +2868,38 @@ mod tests {
assert_eq!(array.len(), 3);
}
+ /// Fixture for the offset-overflow tests: a single large value plus the
+ /// number of times it must be selected so the cumulative offset exceeds
+ /// `i32::MAX`. Using a large value keeps the index count (and the test
+ /// runtime) small.
+ fn offset_overflow_fixture() -> (StringArray, usize) {
+ let value_len = 1_000_000;
+ let values = StringArray::from(vec![Some("a".repeat(value_len))]);
+ let n = i32::MAX as usize / value_len + 1;
+ (values, n)
+ }
+
#[test]
fn test_take_bytes_offset_overflow() {
- let indices = Int32Array::from(vec![0; (i32::MAX >> 4) as usize]);
- let text = ('a'..='z').collect::<String>();
- let values = StringArray::from(vec![Some(text.clone())]);
+ let (values, n) = offset_overflow_fixture();
+ let indices = Int32Array::from(vec![0; n]);
+ assert!(matches!(
+ take(&values, &indices, None),
+ Err(ArrowError::OffsetOverflowError(_))
+ ));
+ }
+
+ /// The offset-overflow error must also be produced on the nullable code
+ /// path (when the output contains nulls), not only on the no-null fast
path.
+ #[test]
+ fn test_take_bytes_offset_overflow_nullable() {
+ let (values, n) = offset_overflow_fixture();
+ // A null index forces the output to contain nulls, exercising the
+ // nullable code path.
+ let validity =
+
NullBuffer::from_iter(std::iter::once(false).chain(std::iter::repeat_n(true,
n)));
+ let indices = Int32Array::new(vec![0i32; n + 1].into(),
Some(validity));
+
assert!(matches!(
take(&values, &indices, None),
Err(ArrowError::OffsetOverflowError(_))