This is an automated email from the ASF dual-hosted git repository. dheres pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push: new 8d4beaeb32 Optimize length calculation in row encoding for fixed-length columns (#7564) 8d4beaeb32 is described below commit 8d4beaeb323a12e228c33e3e25ba14005993fb94 Author: Christian <9384305+c...@users.noreply.github.com> AuthorDate: Mon Jun 9 18:13:51 2025 +0200 Optimize length calculation in row encoding for fixed-length columns (#7564) # Rationale for this change When converting data into row format, a significant portion of cycles is spent determining the lengths of the rows to be created. For columns with fixed-size elements (determined by datatype), this calculation can be optimized by avoiding writes to an intermediate vector for length tracking. # What changes are included in this PR? - Implements `LengthTracker` which only materializes lengths for variable-size columns - Updates length calculation in `row_lengths(..)` and offset computation in `RowConverter::append` to use the `LengthTracker` # Are there any user-facing changes? No. --- arrow-row/src/lib.rs | 222 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 151 insertions(+), 71 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 3316a17788..089bf43ebe 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -625,33 +625,8 @@ impl RowConverter { let write_offset = rows.num_rows(); let lengths = row_lengths(columns, &encoders); - - // We initialize the offsets shifted down by one row index. - // - // As the rows are appended to the offsets will be incremented to match - // - // For example, consider the case of 3 rows of length 3, 4, and 6 respectively. - // The offsets would be initialized to `0, 0, 3, 7` - // - // Writing the first row entirely would yield `0, 3, 3, 7` - // The second, `0, 3, 7, 7` - // The third, `0, 3, 7, 13` - // - // This would be the final offsets for reading - // - // In this way offsets tracks the position during writing whilst eventually serving - // as identifying the offsets of the written rows - rows.offsets.reserve(lengths.len()); - let mut cur_offset = rows.offsets[write_offset]; - for l in lengths { - rows.offsets.push(cur_offset); - cur_offset = cur_offset.checked_add(l).expect("overflow"); - } - - // Note this will not zero out any trailing data in `rows.buffer`, - // e.g. resulting from a call to `Rows::clear`, relying instead on the - // encoders not assuming a zero-initialized buffer - rows.buffer.resize(cur_offset, 0); + let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets); + rows.buffer.resize(total, 0); for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) { // We encode a column at a time to minimise dispatch overheads @@ -1173,49 +1148,156 @@ fn null_sentinel(options: SortOptions) -> u8 { } } +/// Stores the lengths of the rows. Lazily materializes lengths for columns with fixed-size types. +enum LengthTracker { + /// Fixed state: All rows have length `length` + Fixed { length: usize, num_rows: usize }, + /// Variable state: The length of row `i` is `lengths[i] + fixed_length` + Variable { + fixed_length: usize, + lengths: Vec<usize>, + }, +} + +impl LengthTracker { + fn new(num_rows: usize) -> Self { + Self::Fixed { + length: 0, + num_rows, + } + } + + /// Adds a column of fixed-length elements, each of size `new_length` to the LengthTracker + fn push_fixed(&mut self, new_length: usize) { + match self { + LengthTracker::Fixed { length, .. } => *length += new_length, + LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length, + } + } + + /// Adds a column of possibly variable-length elements, element `i` has length `new_lengths.nth(i)` + fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) { + match self { + LengthTracker::Fixed { length, .. } => { + *self = LengthTracker::Variable { + fixed_length: *length, + lengths: new_lengths.collect(), + } + } + LengthTracker::Variable { lengths, .. } => { + assert_eq!(lengths.len(), new_lengths.len()); + lengths + .iter_mut() + .zip(new_lengths) + .for_each(|(length, new_length)| *length += new_length); + } + } + } + + /// Returns the tracked row lengths as a slice + fn materialized(&mut self) -> &mut [usize] { + if let LengthTracker::Fixed { length, num_rows } = *self { + *self = LengthTracker::Variable { + fixed_length: length, + lengths: vec![0; num_rows], + }; + } + + match self { + LengthTracker::Variable { lengths, .. } => lengths, + LengthTracker::Fixed { .. } => unreachable!(), + } + } + + /// Initializes the offsets using the tracked lengths. Returns the sum of the + /// lengths of the rows added. + /// + /// We initialize the offsets shifted down by one row index. + /// + /// As the rows are appended to the offsets will be incremented to match + /// + /// For example, consider the case of 3 rows of length 3, 4, and 6 respectively. + /// The offsets would be initialized to `0, 0, 3, 7` + /// + /// Writing the first row entirely would yield `0, 3, 3, 7` + /// The second, `0, 3, 7, 7` + /// The third, `0, 3, 7, 13` + // + /// This would be the final offsets for reading + // + /// In this way offsets tracks the position during writing whilst eventually serving + fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize { + match self { + LengthTracker::Fixed { length, num_rows } => { + offsets.extend((0..*num_rows).map(|i| initial_offset + i * length)); + + initial_offset + num_rows * length + } + LengthTracker::Variable { + fixed_length, + lengths, + } => { + let mut acc = initial_offset; + + offsets.extend(lengths.iter().map(|length| { + let current = acc; + acc += length + fixed_length; + current + })); + + acc + } + } + } +} + /// Computes the length of each encoded [`Rows`] and returns an empty [`Rows`] -fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> { +fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker { use fixed::FixedLengthEncoding; let num_rows = cols.first().map(|x| x.len()).unwrap_or(0); - let mut lengths = vec![0; num_rows]; + let mut tracker = LengthTracker::new(num_rows); for (array, encoder) in cols.iter().zip(encoders) { match encoder { Encoder::Stateless => { downcast_primitive_array! { - array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)), + array => tracker.push_fixed(fixed::encoded_len(array)), DataType::Null => {}, - DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN), - DataType::Binary => as_generic_binary_array::<i32>(array) - .iter() - .zip(lengths.iter_mut()) - .for_each(|(slice, length)| *length += variable::encoded_len(slice)), - DataType::LargeBinary => as_generic_binary_array::<i64>(array) - .iter() - .zip(lengths.iter_mut()) - .for_each(|(slice, length)| *length += variable::encoded_len(slice)), - DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { - *length += variable::encoded_len(slice) - }), - DataType::Utf8 => array.as_string::<i32>() - .iter() - .zip(lengths.iter_mut()) - .for_each(|(slice, length)| { - *length += variable::encoded_len(slice.map(|x| x.as_bytes())) - }), - DataType::LargeUtf8 => array.as_string::<i64>() - .iter() - .zip(lengths.iter_mut()) - .for_each(|(slice, length)| { - *length += variable::encoded_len(slice.map(|x| x.as_bytes())) - }), - DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| { - *length += variable::encoded_len(slice.map(|x| x.as_bytes())) - }), + DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN), + DataType::Binary => tracker.push_variable( + as_generic_binary_array::<i32>(array) + .iter() + .map(|slice| variable::encoded_len(slice)) + ), + DataType::LargeBinary => tracker.push_variable( + as_generic_binary_array::<i64>(array) + .iter() + .map(|slice| variable::encoded_len(slice)) + ), + DataType::BinaryView => tracker.push_variable( + array.as_binary_view() + .iter() + .map(|slice| variable::encoded_len(slice)) + ), + DataType::Utf8 => tracker.push_variable( + array.as_string::<i32>() + .iter() + .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes()))) + ), + DataType::LargeUtf8 => tracker.push_variable( + array.as_string::<i64>() + .iter() + .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes()))) + ), + DataType::Utf8View => tracker.push_variable( + array.as_string_view() + .iter() + .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes()))) + ), DataType::FixedSizeBinary(len) => { let len = len.to_usize().unwrap(); - lengths.iter_mut().for_each(|x| *x += 1 + len) + tracker.push_fixed(1 + len) } _ => unimplemented!("unsupported data type: {}", array.data_type()), } @@ -1223,38 +1305,36 @@ fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> { Encoder::Dictionary(values, null) => { downcast_dictionary_array! { array => { - for (v, length) in array.keys().iter().zip(lengths.iter_mut()) { - *length += match v { + tracker.push_variable( + array.keys().iter().map(|v| match v { Some(k) => values.row(k.as_usize()).data.len(), None => null.data.len(), - } - } + }) + ) } _ => unreachable!(), } } Encoder::Struct(rows, null) => { let array = as_struct_array(array); - lengths.iter_mut().enumerate().for_each(|(idx, length)| { - match array.is_valid(idx) { - true => *length += 1 + rows.row(idx).as_ref().len(), - false => *length += 1 + null.data.len(), - } - }); + tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) { + true => 1 + rows.row(idx).as_ref().len(), + false => 1 + null.data.len(), + })); } Encoder::List(rows) => match array.data_type() { DataType::List(_) => { - list::compute_lengths(&mut lengths, rows, as_list_array(array)) + list::compute_lengths(tracker.materialized(), rows, as_list_array(array)) } DataType::LargeList(_) => { - list::compute_lengths(&mut lengths, rows, as_large_list_array(array)) + list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array)) } _ => unreachable!(), }, } } - lengths + tracker } /// Encodes a column to the provided [`Rows`] incrementing the offsets as it progresses