This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b80f0e158 feat(parquet): generalize value encoder inputs (#9955)
4b80f0e158 is described below

commit 4b80f0e1587b003aa01082fc3f8b15873800f219
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Wed May 20 17:08:20 2026 -0400

    feat(parquet): generalize value encoder inputs (#9955)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Spawn off from #9653
    - Contributes to #9731
    
    # Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    See #9731
    
    # What changes are included in this PR?
    
    Changes `byte_array` encoder methods (`FallbackEncoder::encode`,
    `DictEncoder::encode`, etc) and all `get_*_array_slice` functions from
    `&[usize]` to `impl ExactSizeIterator<Item = usize>`.
    
    # Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    
    If this PR claims a performance improvement, please include evidence
    such as benchmark results.
    -->
    
    All tests passing.
    
    # Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    
    If there are any breaking changes to public APIs, please call them out.
    -->
    
    None.
    
    Signed-off-by: Hippolyte Barraud <[email protected]>
---
 parquet/src/arrow/arrow_writer/byte_array.rs | 30 +++++++++------
 parquet/src/arrow/arrow_writer/mod.rs        | 57 +++++++++++++++-------------
 2 files changed, 48 insertions(+), 39 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs 
b/parquet/src/arrow/arrow_writer/byte_array.rs
index f56f9570ad..9cb0718b4d 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -165,7 +165,7 @@ impl FallbackEncoder {
     }
 
     /// Encode `values` to the in-progress page
-    fn encode<T>(&mut self, values: T, indices: &[usize])
+    fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = 
usize>)
     where
         T: ArrayAccessor + Copy,
         T::Item: AsRef<[u8]>,
@@ -174,7 +174,7 @@ impl FallbackEncoder {
         match &mut self.encoder {
             FallbackEncoderImpl::Plain { buffer } => {
                 for idx in indices {
-                    let value = values.value(*idx);
+                    let value = values.value(idx);
                     let value = value.as_ref();
                     buffer.extend_from_slice((value.len() as u32).as_bytes());
                     buffer.extend_from_slice(value);
@@ -183,7 +183,7 @@ impl FallbackEncoder {
             }
             FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
                 for idx in indices {
-                    let value = values.value(*idx);
+                    let value = values.value(idx);
                     let value = value.as_ref();
                     lengths.put(&[value.len() as i32]).unwrap();
                     buffer.extend_from_slice(value);
@@ -197,7 +197,7 @@ impl FallbackEncoder {
                 suffix_lengths,
             } => {
                 for idx in indices {
-                    let value = values.value(*idx);
+                    let value = values.value(idx);
                     let value = value.as_ref();
                     let mut prefix_length = 0;
 
@@ -343,7 +343,7 @@ struct DictEncoder {
 
 impl DictEncoder {
     /// Encode `values` to the in-progress page
-    fn encode<T>(&mut self, values: T, indices: &[usize])
+    fn encode<T>(&mut self, values: T, indices: impl ExactSizeIterator<Item = 
usize>)
     where
         T: ArrayAccessor + Copy,
         T::Item: AsRef<[u8]>,
@@ -351,7 +351,7 @@ impl DictEncoder {
         self.indices.reserve(indices.len());
 
         for idx in indices {
-            let value = values.value(*idx);
+            let value = values.value(idx);
             let interned = self.interner.intern(value.as_ref());
             self.indices.push(interned);
             self.variable_length_bytes += value.as_ref().len() as i64;
@@ -471,7 +471,13 @@ impl ColumnValueEncoder for ByteArrayEncoder {
     }
 
     fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> 
Result<()> {
-        downcast_op!(values.data_type(), values, encode, indices, self);
+        downcast_op!(
+            values.data_type(),
+            values,
+            encode,
+            indices.iter().copied(),
+            self
+        );
         Ok(())
     }
 
@@ -554,15 +560,16 @@ impl ColumnValueEncoder for ByteArrayEncoder {
 /// Encodes the provided `values` and `indices` to `encoder`
 ///
 /// This is a free function so it can be used with `downcast_op!`
-fn encode<T>(values: T, indices: &[usize], encoder: &mut ByteArrayEncoder)
+fn encode<T, I>(values: T, indices: I, encoder: &mut ByteArrayEncoder)
 where
     T: ArrayAccessor + Copy,
     T::Item: Copy + Ord + AsRef<[u8]>,
+    I: ExactSizeIterator<Item = usize> + Clone,
 {
     if encoder.statistics_enabled != EnabledStatistics::None {
         if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
-            update_geo_stats_accumulator(accumulator.as_mut(), values, 
indices.iter().cloned());
-        } else if let Some((min, max)) = compute_min_max(values, 
indices.iter().cloned()) {
+            update_geo_stats_accumulator(accumulator.as_mut(), values, 
indices.clone());
+        } else if let Some((min, max)) = compute_min_max(values, 
indices.clone()) {
             if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
                 encoder.min_value = Some(min);
             }
@@ -575,8 +582,7 @@ where
 
     // encode the values into bloom filter if enabled
     if let Some(bloom_filter) = &mut encoder.bloom_filter {
-        let valid = indices.iter().cloned();
-        for idx in valid {
+        for idx in indices.clone() {
             bloom_filter.insert(values.value(idx).as_ref());
         }
     }
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 04a7983914..79542caed9 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1387,7 +1387,7 @@ fn write_leaf(
         }
         ColumnWriter::BoolColumnWriter(typed) => {
             let array = column.as_boolean();
-            let values = get_bool_array_slice(array, indices);
+            let values = get_bool_array_slice(array, indices.iter().copied());
             typed.write_batch_internal(
                 values.as_slice(),
                 None,
@@ -1504,11 +1504,11 @@ fn write_leaf(
                 ArrowDataType::Interval(interval_unit) => match interval_unit {
                     IntervalUnit::YearMonth => {
                         let array = 
column.as_primitive::<IntervalYearMonthType>();
-                        get_interval_ym_array_slice(array, indices)
+                        get_interval_ym_array_slice(array, 
indices.iter().copied())
                     }
                     IntervalUnit::DayTime => {
                         let array = 
column.as_primitive::<IntervalDayTimeType>();
-                        get_interval_dt_array_slice(array, indices)
+                        get_interval_dt_array_slice(array, 
indices.iter().copied())
                     }
                     _ => {
                         return Err(ParquetError::NYI(format!(
@@ -1518,27 +1518,27 @@ fn write_leaf(
                 },
                 ArrowDataType::FixedSizeBinary(_) => {
                     let array = column.as_fixed_size_binary();
-                    get_fsb_array_slice(array, indices)
+                    get_fsb_array_slice(array, indices.iter().copied())
                 }
                 ArrowDataType::Decimal32(_, _) => {
                     let array = column.as_primitive::<Decimal32Type>();
-                    get_decimal_32_array_slice(array, indices)
+                    get_decimal_32_array_slice(array, indices.iter().copied())
                 }
                 ArrowDataType::Decimal64(_, _) => {
                     let array = column.as_primitive::<Decimal64Type>();
-                    get_decimal_64_array_slice(array, indices)
+                    get_decimal_64_array_slice(array, indices.iter().copied())
                 }
                 ArrowDataType::Decimal128(_, _) => {
                     let array = column.as_primitive::<Decimal128Type>();
-                    get_decimal_128_array_slice(array, indices)
+                    get_decimal_128_array_slice(array, indices.iter().copied())
                 }
                 ArrowDataType::Decimal256(_, _) => {
                     let array = column.as_primitive::<Decimal256Type>();
-                    get_decimal_256_array_slice(array, indices)
+                    get_decimal_256_array_slice(array, indices.iter().copied())
                 }
                 ArrowDataType::Float16 => {
                     let array = column.as_primitive::<Float16Type>();
-                    get_float_16_array_slice(array, indices)
+                    get_float_16_array_slice(array, indices.iter().copied())
                 }
                 _ => {
                     return Err(ParquetError::NYI(
@@ -1575,10 +1575,13 @@ fn write_primitive<E: ColumnValueEncoder>(
     )
 }
 
-fn get_bool_array_slice(array: &arrow_array::BooleanArray, indices: &[usize]) 
-> Vec<bool> {
+fn get_bool_array_slice(
+    array: &arrow_array::BooleanArray,
+    indices: impl ExactSizeIterator<Item = usize>,
+) -> Vec<bool> {
     let mut values = Vec::with_capacity(indices.len());
     for i in indices {
-        values.push(array.value(*i))
+        values.push(array.value(i))
     }
     values
 }
@@ -1587,11 +1590,11 @@ fn get_bool_array_slice(array: 
&arrow_array::BooleanArray, indices: &[usize]) ->
 /// An Arrow YearMonth interval only stores months, thus only the first 4 
bytes are populated.
 fn get_interval_ym_array_slice(
     array: &arrow_array::IntervalYearMonthArray,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     for i in indices {
-        let mut value = array.value(*i).to_le_bytes().to_vec();
+        let mut value = array.value(i).to_le_bytes().to_vec();
         let mut suffix = vec![0; 8];
         value.append(&mut suffix);
         values.push(FixedLenByteArray::from(ByteArray::from(value)))
@@ -1603,12 +1606,12 @@ fn get_interval_ym_array_slice(
 /// An Arrow DayTime interval only stores days and millis, thus the first 4 
bytes are not populated.
 fn get_interval_dt_array_slice(
     array: &arrow_array::IntervalDayTimeArray,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     for i in indices {
         let mut out = [0; 12];
-        let value = array.value(*i);
+        let value = array.value(i);
         out[4..8].copy_from_slice(&value.days.to_le_bytes());
         out[8..12].copy_from_slice(&value.milliseconds.to_le_bytes());
         values.push(FixedLenByteArray::from(ByteArray::from(out.to_vec())));
@@ -1618,12 +1621,12 @@ fn get_interval_dt_array_slice(
 
 fn get_decimal_32_array_slice(
     array: &arrow_array::Decimal32Array,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     let size = decimal_length_from_precision(array.precision());
     for i in indices {
-        let as_be_bytes = array.value(*i).to_be_bytes();
+        let as_be_bytes = array.value(i).to_be_bytes();
         let resized_value = as_be_bytes[(4 - size)..].to_vec();
         values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
     }
@@ -1632,12 +1635,12 @@ fn get_decimal_32_array_slice(
 
 fn get_decimal_64_array_slice(
     array: &arrow_array::Decimal64Array,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     let size = decimal_length_from_precision(array.precision());
     for i in indices {
-        let as_be_bytes = array.value(*i).to_be_bytes();
+        let as_be_bytes = array.value(i).to_be_bytes();
         let resized_value = as_be_bytes[(8 - size)..].to_vec();
         values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
     }
@@ -1646,12 +1649,12 @@ fn get_decimal_64_array_slice(
 
 fn get_decimal_128_array_slice(
     array: &arrow_array::Decimal128Array,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     let size = decimal_length_from_precision(array.precision());
     for i in indices {
-        let as_be_bytes = array.value(*i).to_be_bytes();
+        let as_be_bytes = array.value(i).to_be_bytes();
         let resized_value = as_be_bytes[(16 - size)..].to_vec();
         values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
     }
@@ -1660,12 +1663,12 @@ fn get_decimal_128_array_slice(
 
 fn get_decimal_256_array_slice(
     array: &arrow_array::Decimal256Array,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     let size = decimal_length_from_precision(array.precision());
     for i in indices {
-        let as_be_bytes = array.value(*i).to_be_bytes();
+        let as_be_bytes = array.value(i).to_be_bytes();
         let resized_value = as_be_bytes[(32 - size)..].to_vec();
         values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
     }
@@ -1674,11 +1677,11 @@ fn get_decimal_256_array_slice(
 
 fn get_float_16_array_slice(
     array: &arrow_array::Float16Array,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     for i in indices {
-        let value = array.value(*i).to_le_bytes().to_vec();
+        let value = array.value(i).to_le_bytes().to_vec();
         values.push(FixedLenByteArray::from(ByteArray::from(value)));
     }
     values
@@ -1686,11 +1689,11 @@ fn get_float_16_array_slice(
 
 fn get_fsb_array_slice(
     array: &arrow_array::FixedSizeBinaryArray,
-    indices: &[usize],
+    indices: impl ExactSizeIterator<Item = usize>,
 ) -> Vec<FixedLenByteArray> {
     let mut values = Vec::with_capacity(indices.len());
     for i in indices {
-        let value = array.value(*i).to_vec();
+        let value = array.value(i).to_vec();
         values.push(FixedLenByteArray::from(ByteArray::from(value)))
     }
     values

Reply via email to