This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ea7cdc55a Optimize `take_fixed_size_binary` For Predefined Value 
Lengths (#9535)
7ea7cdc55a is described below

commit 7ea7cdc55a20162346e2e006ac4589a30f7bfdbb
Author: Tobias Schwarzinger <[email protected]>
AuthorDate: Wed Mar 18 20:36:23 2026 +0100

    Optimize `take_fixed_size_binary` For Predefined Value Lengths (#9535)
    
    # Which issue does this PR close?
    
    - Related to https://github.com/apache/arrow-rs/issues/279
    
    # Rationale for this change
    
    The `take` kernel is very important for many operations (e.g.,
    `HashJoin` in DataFusion IIRC). Currently, there is a gap between the
    performance of the take kernel for primitive arrays (e.g.,
    `DataType::UInt32`) and fixed size binary arrays of the same length
    (e.g., `FixedSizeBinary<4>`).
    
    In our case this lead to a performance reduction when moving from an
    integer-based id column to a fixed-size-binary-based id column. This PR
    aims to address parts of this gap.
    
    The 16-bytes case would especially benefit operations on UUID columns.
    
    # What changes are included in this PR?
    
    - Add `take_fixed_size` that can be called for set of predefined
    fsb-lengths that we want to support. This is a "flat buffer" version of
    the `take_native` kernel.
    
    # Are these changes tested?
    
    I've added another test that still exercises the non-optimized code
    path.
    
    # Are there any user-facing changes?
    
    No
---
 arrow-select/src/take.rs      | 173 ++++++++++++++++++++++++++++++++++--------
 arrow/benches/take_kernels.rs |  16 +++-
 2 files changed, 156 insertions(+), 33 deletions(-)

diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs
index 43c13e66fb..ee813f5353 100644
--- a/arrow-select/src/take.rs
+++ b/arrow-select/src/take.rs
@@ -18,6 +18,7 @@
 //! Defines take kernel for [Array]
 
 use std::fmt::Display;
+use std::mem::ManuallyDrop;
 use std::sync::Arc;
 
 use arrow_array::builder::{BufferBuilder, UInt32Builder};
@@ -723,46 +724,127 @@ fn take_fixed_size_binary<IndexType: ArrowPrimitiveType>(
         ArrowError::InvalidArgumentError(format!("Cannot convert size '{}' to 
usize", size))
     })?;
 
-    let values_buffer = values.values().as_slice();
-    let mut values_buffer_builder = BufferBuilder::new(indices.len() * 
size_usize);
-
-    if indices.null_count() == 0 {
-        let array_iter = indices.values().iter().map(|idx| {
-            let offset = idx.as_usize() * size_usize;
-            &values_buffer[offset..offset + size_usize]
-        });
-        for slice in array_iter {
-            values_buffer_builder.append_slice(slice);
-        }
-    } else {
-        // The indices nullability cannot be ignored here because the values 
buffer may contain
-        // nulls which should not cause a panic.
-        let array_iter = indices.iter().map(|idx| {
-            idx.map(|idx| {
-                let offset = idx.as_usize() * size_usize;
-                &values_buffer[offset..offset + size_usize]
-            })
-        });
-        for slice in array_iter {
-            match slice {
-                None => values_buffer_builder.append_n(size_usize, 0),
-                Some(slice) => values_buffer_builder.append_slice(slice),
-            }
-        }
-    }
+    let result_buffer = match size_usize {
+        1 => take_fixed_size::<IndexType, 1>(values.values(), indices),
+        2 => take_fixed_size::<IndexType, 2>(values.values(), indices),
+        4 => take_fixed_size::<IndexType, 4>(values.values(), indices),
+        8 => take_fixed_size::<IndexType, 8>(values.values(), indices),
+        16 => take_fixed_size::<IndexType, 16>(values.values(), indices),
+        _ => take_fixed_size_binary_buffer_dynamic_length(values, indices, 
size_usize),
+    };
 
-    let values_buffer = values_buffer_builder.finish();
     let value_nulls = take_nulls(values.nulls(), indices);
     let final_nulls = NullBuffer::union(value_nulls.as_ref(), indices.nulls());
-
     let array_data = ArrayDataBuilder::new(DataType::FixedSizeBinary(size))
         .len(indices.len())
         .nulls(final_nulls)
         .offset(0)
-        .add_buffer(values_buffer)
+        .add_buffer(result_buffer)
         .build()?;
 
-    Ok(FixedSizeBinaryArray::from(array_data))
+    return Ok(FixedSizeBinaryArray::from(array_data));
+
+    /// Implementation of the take kernel for fixed size binary arrays.
+    #[inline(never)]
+    fn take_fixed_size_binary_buffer_dynamic_length<IndexType: 
ArrowPrimitiveType>(
+        values: &FixedSizeBinaryArray,
+        indices: &PrimitiveArray<IndexType>,
+        size_usize: usize,
+    ) -> Buffer {
+        let values_buffer = values.values().as_slice();
+        let mut values_buffer_builder = BufferBuilder::new(indices.len() * 
size_usize);
+
+        if indices.null_count() == 0 {
+            let array_iter = indices.values().iter().map(|idx| {
+                let offset = idx.as_usize() * size_usize;
+                &values_buffer[offset..offset + size_usize]
+            });
+            for slice in array_iter {
+                values_buffer_builder.append_slice(slice);
+            }
+        } else {
+            // The indices nullability cannot be ignored here because the 
values buffer may contain
+            // nulls which should not cause a panic.
+            let array_iter = indices.iter().map(|idx| {
+                idx.map(|idx| {
+                    let offset = idx.as_usize() * size_usize;
+                    &values_buffer[offset..offset + size_usize]
+                })
+            });
+            for slice in array_iter {
+                match slice {
+                    None => values_buffer_builder.append_n(size_usize, 0),
+                    Some(slice) => values_buffer_builder.append_slice(slice),
+                }
+            }
+        }
+
+        values_buffer_builder.finish()
+    }
+}
+
+/// Implements the take kernel semantics over a flat [`Buffer`], interpreting 
it as a slice of
+/// `&[[u8; N]]`, where `N` is a compile-time constant. The usage of a flat 
[`Buffer`] allows using
+/// this kernel without an available [`ArrowPrimitiveType`] (e.g., for `[u8; 
5]`).
+///
+/// # Using This Function in the Primitive Take Kernel
+///
+/// This function is basically the same as [`take_native`] but just on a flat 
[`Buffer`] instead of
+/// the primitive [`ScalarBuffer`]. Ideally, the [`take_primitive`] kernel 
should just use this
+/// more general function. However, the "idiomatic code" requires the
+/// 
[feature(generic_const_exprs)](https://github.com/rust-lang/rust/issues/76560) 
for calling
+/// `take_fixed_size<I, { size_of::<T::Native> () } >(...)`. Once this feature 
has been stabilized,
+/// we can use this function also in the primitive kernels.
+fn take_fixed_size<IndexType: ArrowPrimitiveType, const N: usize>(
+    buffer: &Buffer,
+    indices: &PrimitiveArray<IndexType>,
+) -> Buffer {
+    assert_eq!(
+        buffer.len() % N,
+        0,
+        "Invalid array length in take_fixed_size"
+    );
+
+    let ptr = buffer.as_ptr();
+    let chunk_ptr = ptr.cast::<[u8; N]>();
+    let chunk_len = buffer.len() / N;
+    let buffer: &[[u8; N]] = unsafe {
+        // SAFETY: interpret an already valid slice as a slice of N-byte 
chunks. N divides buffer
+        // length without remainder.
+        std::slice::from_raw_parts(chunk_ptr, chunk_len)
+    };
+
+    let result_buffer = match indices.nulls().filter(|n| n.null_count() > 0) {
+        Some(n) => indices
+            .values()
+            .iter()
+            .enumerate()
+            .map(|(idx, index)| match buffer.get(index.as_usize()) {
+                Some(v) => *v,
+                // SAFETY: idx<indices.len()
+                None => match unsafe { n.inner().value_unchecked(idx) } {
+                    false => [0u8; N],
+                    true => panic!("Out-of-bounds index {index:?}"),
+                },
+            })
+            .collect::<Vec<_>>(),
+        None => indices
+            .values()
+            .iter()
+            .map(|index| buffer[index.as_usize()])
+            .collect::<Vec<_>>(),
+    };
+
+    let mut vec = ManuallyDrop::new(result_buffer); // Prevent de-allocation
+    let ptr = vec.as_mut_ptr();
+    let len = vec.len();
+    let cap = vec.capacity();
+    let result_buffer = unsafe {
+        // SAFETY: flattening an already valid Vec.
+        Vec::from_raw_parts(ptr.cast::<u8>(), len * N, cap * N)
+    };
+
+    Buffer::from_vec(result_buffer)
 }
 
 /// `take` implementation for dictionary arrays
@@ -2150,6 +2232,35 @@ mod tests {
         );
     }
 
+    /// The [`take_fixed_size_binary`] kernel contains optimizations that 
provide a faster
+    /// implementation for commonly-used value lengths. This test uses a value 
length that is not
+    /// optimized to test both code paths.
+    #[test]
+    fn test_take_fixed_size_binary_with_nulls_indices_not_optimized_length() {
+        let fsb = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
+            [
+                Some(vec![0x01, 0x01, 0x01, 0x01, 0x01]),
+                Some(vec![0x02, 0x02, 0x02, 0x02, 0x01]),
+                Some(vec![0x03, 0x03, 0x03, 0x03, 0x01]),
+                Some(vec![0x04, 0x04, 0x04, 0x04, 0x01]),
+            ]
+            .into_iter(),
+            5,
+        )
+        .unwrap();
+
+        // The two middle indices are null -> Should be null in the output.
+        let indices = UInt32Array::from(vec![Some(0), None, None, Some(3)]);
+
+        let result = take_fixed_size_binary(&fsb, &indices, 5).unwrap();
+        assert_eq!(result.len(), 4);
+        assert_eq!(result.null_count(), 2);
+        assert_eq!(
+            result.nulls().unwrap().iter().collect::<Vec<_>>(),
+            vec![true, false, false, true]
+        );
+    }
+
     #[test]
     #[should_panic(expected = "index out of bounds: the len is 4 but the index 
is 1000")]
     fn test_take_list_out_of_bounds() {
diff --git a/arrow/benches/take_kernels.rs b/arrow/benches/take_kernels.rs
index 37b83a5e33..fb23177168 100644
--- a/arrow/benches/take_kernels.rs
+++ b/arrow/benches/take_kernels.rs
@@ -195,14 +195,26 @@ fn add_benchmark(c: &mut Criterion) {
 
     let values = create_fsb_array(1024, 0.0, 12);
     let indices = create_random_index(1024, 0.0);
-    c.bench_function("take primitive fsb value len: 12, indices: 1024", |b| {
+    c.bench_function("take fsb value len: 12, indices: 1024", |b| {
         b.iter(|| bench_take(&values, &indices))
     });
 
     let values = create_fsb_array(1024, 0.5, 12);
     let indices = create_random_index(1024, 0.0);
+    c.bench_function("take fsb value len: 12, null values, indices: 1024", |b| 
{
+        b.iter(|| bench_take(&values, &indices))
+    });
+
+    let values = create_fsb_array(1024, 0.0, 16);
+    let indices = create_random_index(1024, 0.0);
+    c.bench_function("take fsb value optimized len: 16, indices: 1024", |b| {
+        b.iter(|| bench_take(&values, &indices))
+    });
+
+    let values = create_fsb_array(1024, 0.5, 16);
+    let indices = create_random_index(1024, 0.0);
     c.bench_function(
-        "take primitive fsb value len: 12, null values, indices: 1024",
+        "take fsb value optimized len: 16, null values, indices: 1024",
         |b| b.iter(|| bench_take(&values, &indices)),
     );
 }

Reply via email to