This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e7a2fa553 Improve `take_bytes` perf in the null cases between 10-25% 
(#9625)
4e7a2fa553 is described below

commit 4e7a2fa553e2e5e7385f6c4e77984a354d40c813
Author: Adam Gutglick <[email protected]>
AuthorDate: Wed Jun 3 14:15:20 2026 +0100

    Improve `take_bytes` perf in the null cases between 10-25% (#9625)
    
    # Which issue does this PR close?
    
    
    - Closes #NNN.
    
    # Rationale for this change
    
    Just improves performance, I was profiling some things downstream and
    got curious about how it works.
    
    # What changes are included in this PR?
    
    The main idea is to use a two-pass approach:
    1. Compute byte offsets and collects (start, end) byte ranges
    2. Copy byte data via raw pointer writes (`copy_byte_ranges`)
    
    This PR also reduces the branching from 4 (one for each nullability
    combination) to only two.
    
    
    # Are these changes tested?
    
    Existing tests
    
    # Are there any user-facing changes?
    
    None
    
    ---------
    
    Signed-off-by: Adam Gutglick <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 arrow-select/src/take.rs | 241 ++++++++++++++++++++++++++++++++---------------
 1 file changed, 164 insertions(+), 77 deletions(-)

diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs
index cbb65ac915..c245247417 100644
--- a/arrow-select/src/take.rs
+++ b/arrow-select/src/take.rs
@@ -495,6 +495,7 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     array: &GenericByteArray<T>,
     indices: &PrimitiveArray<IndexType>,
 ) -> Result<GenericByteArray<T>, ArrowError> {
+    let mut values: Vec<u8> = Vec::new();
     let mut offsets = Vec::with_capacity(indices.len() + 1);
     offsets.push(T::Offset::default());
 
@@ -502,92 +503,116 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     let mut capacity = 0;
     let nulls = take_nulls(array.nulls(), indices);
 
-    let (offsets, values) = if array.null_count() == 0 && indices.null_count() 
== 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for index in indices.values() {
-            values.extend_from_slice(array.value(index.as_usize()).as_ref());
-        }
-        (offsets, values)
-    } else if indices.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+    // Branch on output nulls — `None` means every output slot is valid.
+    match nulls.as_ref().filter(|n| n.null_count() > 0) {
+        // Fast path: no nulls in output, every index is valid.
+        None => {
+            for index in indices.values() {
+                let index = index.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets.push(
+                    T::Offset::from_usize(capacity)
+                        .ok_or_else(|| 
ArrowError::OffsetOverflowError(capacity))?,
+                );
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                values.extend_from_slice(array.value(index).as_ref());
-            }
-        }
-        (offsets, values)
-    } else if array.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if indices.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+            values.reserve(capacity);
+
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+            let mut offset = 0;
+
+            for index in indices.values() {
+                // SAFETY: in-bounds proven by the first loop's bounds-checked 
offset access.
+                // dst asserted above to include the required capacity.
+                unsafe {
+                    let data: &[u8] = 
array.value_unchecked(index.as_usize()).as_ref();
+                    std::ptr::copy_nonoverlapping(
+                        data.as_ptr(),
+                        
dst.get_unchecked_mut(offset..).as_mut_ptr().cast::<u8>(),
+                        data.len(),
+                    );
+                    offset += data.len();
+                }
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for (i, index) in indices.values().iter().enumerate() {
-            if indices.is_valid(i) {
-                
values.extend_from_slice(array.value(index.as_usize()).as_ref());
+            // SAFETY: wrote exactly `capacity` bytes above; reserved on line 
above.
+            unsafe {
+                values.set_len(capacity);
             }
         }
-        (offsets, values)
-    } else {
-        let nulls = nulls.as_ref().unwrap();
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+        // Nullable path: only process valid (non-null) output positions.
+        Some(output_nulls) => {
+            let mut source_ranges = Vec::with_capacity(indices.len() - 
output_nulls.null_count());
+            let mut last_filled = 0;
+
+            // Pre-fill offsets; we overwrite valid positions below.
+            offsets.resize(indices.len() + 1, T::Offset::default());
+
+            // Pass 1: find all valid ranges that need to be copied.
+            for i in output_nulls.valid_indices() {
+                let current_offset = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+                // Fill offsets for skipped null slots so they get zero-length 
ranges.
+                if last_filled < i {
+                    offsets[last_filled + 1..=i].fill(current_offset);
+                }
+
+                // SAFETY: `i` comes from a validity bitmap over `indices`, so 
it is in-bounds.
+                let index = unsafe { indices.value_unchecked(i) }.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets[i + 1] = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+
+                source_ranges.push((start, end));
+                last_filled = i + 1;
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
+
+            // Fill trailing null offsets after the last valid position.
+            let final_offset = T::Offset::from_usize(capacity)
+                .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+            offsets[last_filled + 1..].fill(final_offset);
+            // Pass 2: copy byte data for all collected ranges.
+            values.reserve(capacity);
+            debug_assert_eq!(
+                source_ranges.iter().map(|(s, e)| e - s).sum::<usize>(),
+                capacity,
+                "capacity must equal total bytes across all ranges"
             );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for (i, index) in indices.values().iter().enumerate() {
-            // check index is valid before using index. The value in
-            // NULL index slots may not be within bounds of array
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                values.extend_from_slice(array.value(index).as_ref());
+
+            let src = array.value_data();
+            let src = src.as_ptr();
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+
+            let mut offset = 0;
+
+            for (start, end) in source_ranges.into_iter() {
+                let value_len = end - start;
+                // SAFETY: caller guarantees each (start, end) is in-bounds of 
`src`.
+                // `dst` asserted above to include the required capacity.
+                // The regions don't overlap (src is input, dst is a fresh 
allocation).
+                unsafe {
+                    std::ptr::copy_nonoverlapping(
+                        src.add(start),
+                        
dst.get_unchecked_mut(offset..).as_mut_ptr().cast::<u8>(),
+                        value_len,
+                    );
+                    offset += value_len;
+                }
             }
+            // SAFETY: caller guarantees `capacity` == total bytes across all 
ranges,
+            // so the loop above wrote exactly `capacity` bytes.
+            unsafe { values.set_len(capacity) };
         }
-        (offsets, values)
     };
 
-    T::Offset::from_usize(values.len())
-        .ok_or_else(|| ArrowError::OffsetOverflowError(values.len()))?;
-
+    // SAFETY: offsets are monotonically increasing and in-bounds of `values`,
+    // and `nulls` (if present) has length == `indices.len()`.
     let array = unsafe {
         let offsets = OffsetBuffer::new_unchecked(offsets.into());
         GenericByteArray::<T>::new_unchecked(offsets, values.into(), nulls)
@@ -1728,6 +1753,41 @@ mod tests {
         assert_eq!(result.as_ref(), &expected);
     }
 
+    /// Take from a *sliced* byte array, i.e. one whose value offsets do not
+    /// start at zero. This exercises copying byte data out of an array with a
+    /// non-zero base offset for both the no-null fast path and the nullable
+    /// path (null indices and selected null values).
+    #[test]
+    fn test_take_bytes_sliced_values() {
+        let values = StringArray::from(vec![
+            Some("aaa"),
+            Some("bbb"),
+            None,
+            Some("ccccc"),
+            Some("dd"),
+            None,
+            Some("eeee"),
+        ]);
+        // Slice so the underlying value offsets no longer start at 0:
+        // sliced == [None, "ccccc", "dd", None, "eeee"]
+        let sliced = values.slice(2, 5);
+
+        // Fast path: every output slot is valid (no null indices, no null
+        // values selected).
+        let indices = Int32Array::from(vec![1, 2, 4, 1]);
+        let result = take(&sliced, &indices, None).unwrap();
+        let expected =
+            StringArray::from(vec![Some("ccccc"), Some("dd"), Some("eeee"), 
Some("ccccc")]);
+        assert_eq!(result.as_string::<i32>(), &expected);
+
+        // Nullable path: a null index (position 1) and selected null values
+        // (sliced indices 0 and 3 are null).
+        let indices = Int32Array::from(vec![Some(1), None, Some(0), Some(4), 
Some(3)]);
+        let result = take(&sliced, &indices, None).unwrap();
+        let expected = StringArray::from(vec![Some("ccccc"), None, None, 
Some("eeee"), None]);
+        assert_eq!(result.as_string::<i32>(), &expected);
+    }
+
     fn _test_byte_view<T>()
     where
         T: ByteViewType,
@@ -2808,11 +2868,38 @@ mod tests {
         assert_eq!(array.len(), 3);
     }
 
+    /// Fixture for the offset-overflow tests: a single large value plus the
+    /// number of times it must be selected so the cumulative offset exceeds
+    /// `i32::MAX`. Using a large value keeps the index count (and the test
+    /// runtime) small.
+    fn offset_overflow_fixture() -> (StringArray, usize) {
+        let value_len = 1_000_000;
+        let values = StringArray::from(vec![Some("a".repeat(value_len))]);
+        let n = i32::MAX as usize / value_len + 1;
+        (values, n)
+    }
+
     #[test]
     fn test_take_bytes_offset_overflow() {
-        let indices = Int32Array::from(vec![0; (i32::MAX >> 4) as usize]);
-        let text = ('a'..='z').collect::<String>();
-        let values = StringArray::from(vec![Some(text.clone())]);
+        let (values, n) = offset_overflow_fixture();
+        let indices = Int32Array::from(vec![0; n]);
+        assert!(matches!(
+            take(&values, &indices, None),
+            Err(ArrowError::OffsetOverflowError(_))
+        ));
+    }
+
+    /// The offset-overflow error must also be produced on the nullable code
+    /// path (when the output contains nulls), not only on the no-null fast 
path.
+    #[test]
+    fn test_take_bytes_offset_overflow_nullable() {
+        let (values, n) = offset_overflow_fixture();
+        // A null index forces the output to contain nulls, exercising the
+        // nullable code path.
+        let validity =
+            
NullBuffer::from_iter(std::iter::once(false).chain(std::iter::repeat_n(true, 
n)));
+        let indices = Int32Array::new(vec![0i32; n + 1].into(), 
Some(validity));
+
         assert!(matches!(
             take(&values, &indices, None),
             Err(ArrowError::OffsetOverflowError(_))

Reply via email to