This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d4beaeb32 Optimize length calculation in row encoding for 
fixed-length columns (#7564)
8d4beaeb32 is described below

commit 8d4beaeb323a12e228c33e3e25ba14005993fb94
Author: Christian <9384305+c...@users.noreply.github.com>
AuthorDate: Mon Jun 9 18:13:51 2025 +0200

    Optimize length calculation in row encoding for fixed-length columns (#7564)
    
    # Rationale for this change
    
    When converting data into row format, a significant portion of cycles is
    spent determining the lengths of the rows to be created. For columns
    with fixed-size elements (determined by datatype), this calculation can
    be optimized by avoiding writes to an intermediate vector for length
    tracking.
    
    # What changes are included in this PR?
    
    - Implements `LengthTracker` which only materializes lengths for
    variable-size columns
    - Updates length calculation in `row_lengths(..)` and offset computation
    in `RowConverter::append` to use the `LengthTracker`
    
    # Are there any user-facing changes?
    
    No.
---
 arrow-row/src/lib.rs | 222 +++++++++++++++++++++++++++++++++++----------------
 1 file changed, 151 insertions(+), 71 deletions(-)

diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs
index 3316a17788..089bf43ebe 100644
--- a/arrow-row/src/lib.rs
+++ b/arrow-row/src/lib.rs
@@ -625,33 +625,8 @@ impl RowConverter {
 
         let write_offset = rows.num_rows();
         let lengths = row_lengths(columns, &encoders);
-
-        // We initialize the offsets shifted down by one row index.
-        //
-        // As the rows are appended to the offsets will be incremented to match
-        //
-        // For example, consider the case of 3 rows of length 3, 4, and 6 
respectively.
-        // The offsets would be initialized to `0, 0, 3, 7`
-        //
-        // Writing the first row entirely would yield `0, 3, 3, 7`
-        // The second, `0, 3, 7, 7`
-        // The third, `0, 3, 7, 13`
-        //
-        // This would be the final offsets for reading
-        //
-        // In this way offsets tracks the position during writing whilst 
eventually serving
-        // as identifying the offsets of the written rows
-        rows.offsets.reserve(lengths.len());
-        let mut cur_offset = rows.offsets[write_offset];
-        for l in lengths {
-            rows.offsets.push(cur_offset);
-            cur_offset = cur_offset.checked_add(l).expect("overflow");
-        }
-
-        // Note this will not zero out any trailing data in `rows.buffer`,
-        // e.g. resulting from a call to `Rows::clear`, relying instead on the
-        // encoders not assuming a zero-initialized buffer
-        rows.buffer.resize(cur_offset, 0);
+        let total = lengths.extend_offsets(rows.offsets[write_offset], &mut 
rows.offsets);
+        rows.buffer.resize(total, 0);
 
         for ((column, field), encoder) in 
columns.iter().zip(self.fields.iter()).zip(encoders) {
             // We encode a column at a time to minimise dispatch overheads
@@ -1173,49 +1148,156 @@ fn null_sentinel(options: SortOptions) -> u8 {
     }
 }
 
+/// Stores the lengths of the rows. Lazily materializes lengths for columns 
with fixed-size types.
+enum LengthTracker {
+    /// Fixed state: All rows have length `length`
+    Fixed { length: usize, num_rows: usize },
+    /// Variable state: The length of row `i` is `lengths[i] + fixed_length`
+    Variable {
+        fixed_length: usize,
+        lengths: Vec<usize>,
+    },
+}
+
+impl LengthTracker {
+    fn new(num_rows: usize) -> Self {
+        Self::Fixed {
+            length: 0,
+            num_rows,
+        }
+    }
+
+    /// Adds a column of fixed-length elements, each of size `new_length` to 
the LengthTracker
+    fn push_fixed(&mut self, new_length: usize) {
+        match self {
+            LengthTracker::Fixed { length, .. } => *length += new_length,
+            LengthTracker::Variable { fixed_length, .. } => *fixed_length += 
new_length,
+        }
+    }
+
+    /// Adds a column of possibly variable-length elements, element `i` has 
length `new_lengths.nth(i)`
+    fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = 
usize>) {
+        match self {
+            LengthTracker::Fixed { length, .. } => {
+                *self = LengthTracker::Variable {
+                    fixed_length: *length,
+                    lengths: new_lengths.collect(),
+                }
+            }
+            LengthTracker::Variable { lengths, .. } => {
+                assert_eq!(lengths.len(), new_lengths.len());
+                lengths
+                    .iter_mut()
+                    .zip(new_lengths)
+                    .for_each(|(length, new_length)| *length += new_length);
+            }
+        }
+    }
+
+    /// Returns the tracked row lengths as a slice
+    fn materialized(&mut self) -> &mut [usize] {
+        if let LengthTracker::Fixed { length, num_rows } = *self {
+            *self = LengthTracker::Variable {
+                fixed_length: length,
+                lengths: vec![0; num_rows],
+            };
+        }
+
+        match self {
+            LengthTracker::Variable { lengths, .. } => lengths,
+            LengthTracker::Fixed { .. } => unreachable!(),
+        }
+    }
+
+    /// Initializes the offsets using the tracked lengths. Returns the sum of 
the
+    /// lengths of the rows added.
+    ///
+    /// We initialize the offsets shifted down by one row index.
+    ///
+    /// As the rows are appended to the offsets will be incremented to match
+    ///
+    /// For example, consider the case of 3 rows of length 3, 4, and 6 
respectively.
+    /// The offsets would be initialized to `0, 0, 3, 7`
+    ///
+    /// Writing the first row entirely would yield `0, 3, 3, 7`
+    /// The second, `0, 3, 7, 7`
+    /// The third, `0, 3, 7, 13`
+    //
+    /// This would be the final offsets for reading
+    //
+    /// In this way offsets tracks the position during writing whilst 
eventually serving
+    fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) 
-> usize {
+        match self {
+            LengthTracker::Fixed { length, num_rows } => {
+                offsets.extend((0..*num_rows).map(|i| initial_offset + i * 
length));
+
+                initial_offset + num_rows * length
+            }
+            LengthTracker::Variable {
+                fixed_length,
+                lengths,
+            } => {
+                let mut acc = initial_offset;
+
+                offsets.extend(lengths.iter().map(|length| {
+                    let current = acc;
+                    acc += length + fixed_length;
+                    current
+                }));
+
+                acc
+            }
+        }
+    }
+}
+
 /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`]
-fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
+fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
     use fixed::FixedLengthEncoding;
 
     let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
-    let mut lengths = vec![0; num_rows];
+    let mut tracker = LengthTracker::new(num_rows);
 
     for (array, encoder) in cols.iter().zip(encoders) {
         match encoder {
             Encoder::Stateless => {
                 downcast_primitive_array! {
-                    array => lengths.iter_mut().for_each(|x| *x += 
fixed::encoded_len(array)),
+                    array => tracker.push_fixed(fixed::encoded_len(array)),
                     DataType::Null => {},
-                    DataType::Boolean => lengths.iter_mut().for_each(|x| *x += 
bool::ENCODED_LEN),
-                    DataType::Binary => as_generic_binary_array::<i32>(array)
-                        .iter()
-                        .zip(lengths.iter_mut())
-                        .for_each(|(slice, length)| *length += 
variable::encoded_len(slice)),
-                    DataType::LargeBinary => 
as_generic_binary_array::<i64>(array)
-                        .iter()
-                        .zip(lengths.iter_mut())
-                        .for_each(|(slice, length)| *length += 
variable::encoded_len(slice)),
-                    DataType::BinaryView => 
array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, 
length)| {
-                            *length += variable::encoded_len(slice)
-                        }),
-                    DataType::Utf8 => array.as_string::<i32>()
-                        .iter()
-                        .zip(lengths.iter_mut())
-                        .for_each(|(slice, length)| {
-                            *length += variable::encoded_len(slice.map(|x| 
x.as_bytes()))
-                        }),
-                    DataType::LargeUtf8 => array.as_string::<i64>()
-                        .iter()
-                        .zip(lengths.iter_mut())
-                        .for_each(|(slice, length)| {
-                            *length += variable::encoded_len(slice.map(|x| 
x.as_bytes()))
-                        }),
-                    DataType::Utf8View => 
array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, 
length)| {
-                        *length += variable::encoded_len(slice.map(|x| 
x.as_bytes()))
-                    }),
+                    DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
+                    DataType::Binary => tracker.push_variable(
+                        as_generic_binary_array::<i32>(array)
+                            .iter()
+                            .map(|slice| variable::encoded_len(slice))
+                    ),
+                    DataType::LargeBinary => tracker.push_variable(
+                        as_generic_binary_array::<i64>(array)
+                            .iter()
+                            .map(|slice| variable::encoded_len(slice))
+                    ),
+                    DataType::BinaryView => tracker.push_variable(
+                        array.as_binary_view()
+                            .iter()
+                            .map(|slice| variable::encoded_len(slice))
+                    ),
+                    DataType::Utf8 => tracker.push_variable(
+                        array.as_string::<i32>()
+                            .iter()
+                            .map(|slice| variable::encoded_len(slice.map(|x| 
x.as_bytes())))
+                    ),
+                    DataType::LargeUtf8 => tracker.push_variable(
+                        array.as_string::<i64>()
+                            .iter()
+                            .map(|slice| variable::encoded_len(slice.map(|x| 
x.as_bytes())))
+                    ),
+                    DataType::Utf8View => tracker.push_variable(
+                        array.as_string_view()
+                            .iter()
+                            .map(|slice| variable::encoded_len(slice.map(|x| 
x.as_bytes())))
+                    ),
                     DataType::FixedSizeBinary(len) => {
                         let len = len.to_usize().unwrap();
-                        lengths.iter_mut().for_each(|x| *x += 1 + len)
+                        tracker.push_fixed(1 + len)
                     }
                     _ => unimplemented!("unsupported data type: {}", 
array.data_type()),
                 }
@@ -1223,38 +1305,36 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) 
-> Vec<usize> {
             Encoder::Dictionary(values, null) => {
                 downcast_dictionary_array! {
                     array => {
-                        for (v, length) in 
array.keys().iter().zip(lengths.iter_mut()) {
-                            *length += match v {
+                        tracker.push_variable(
+                            array.keys().iter().map(|v| match v {
                                 Some(k) => values.row(k.as_usize()).data.len(),
                                 None => null.data.len(),
-                            }
-                        }
+                            })
+                        )
                     }
                     _ => unreachable!(),
                 }
             }
             Encoder::Struct(rows, null) => {
                 let array = as_struct_array(array);
-                lengths.iter_mut().enumerate().for_each(|(idx, length)| {
-                    match array.is_valid(idx) {
-                        true => *length += 1 + rows.row(idx).as_ref().len(),
-                        false => *length += 1 + null.data.len(),
-                    }
-                });
+                tracker.push_variable((0..array.len()).map(|idx| match 
array.is_valid(idx) {
+                    true => 1 + rows.row(idx).as_ref().len(),
+                    false => 1 + null.data.len(),
+                }));
             }
             Encoder::List(rows) => match array.data_type() {
                 DataType::List(_) => {
-                    list::compute_lengths(&mut lengths, rows, 
as_list_array(array))
+                    list::compute_lengths(tracker.materialized(), rows, 
as_list_array(array))
                 }
                 DataType::LargeList(_) => {
-                    list::compute_lengths(&mut lengths, rows, 
as_large_list_array(array))
+                    list::compute_lengths(tracker.materialized(), rows, 
as_large_list_array(array))
                 }
                 _ => unreachable!(),
             },
         }
     }
 
-    lengths
+    tracker
 }
 
 /// Encodes a column to the provided [`Rows`] incrementing the offsets as it 
progresses

Reply via email to