alamb commented on code in PR #7671:
URL: https://github.com/apache/arrow-rs/pull/7671#discussion_r2153033096


##########
arrow-data/src/transform/run.rs:
##########
@@ -0,0 +1,560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{ArrayData, Extend, _MutableArrayData};
+use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
+use arrow_schema::DataType;
+
+/// Generic helper to get the last run end value from a run ends array
+fn get_last_run_end<T: ArrowNativeType>(run_ends_data: 
&super::MutableArrayData) -> T {
+    if run_ends_data.data.len == 0 {
+        T::default()
+    } else {
+        // Convert buffer to typed slice and get the last element
+        let buffer = Buffer::from(run_ends_data.data.buffer1.as_slice());
+        let typed_slice: &[T] = buffer.typed_data();
+        if typed_slice.len() >= run_ends_data.data.len {
+            typed_slice[run_ends_data.data.len - 1]
+        } else {
+            T::default()
+        }
+    }
+}
+
+/// Extends the `MutableArrayData` with null values.
+///
+/// For RunEndEncoded, this adds nulls by extending the run_ends array
+/// and values array appropriately.
+pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
+    if len == 0 {
+        return;
+    }
+
+    // For REE, we always need to add a value entry when adding a new run
+    // The values array should have one entry per run, not per logical element
+    mutable.child_data[1].extend_nulls(1);
+
+    // Determine the run end type from the data type
+    let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = 
&mutable.data_type {
+        run_ends_field.data_type()
+    } else {
+        panic!("extend_nulls called on non-RunEndEncoded array");
+    };
+
+    // Use a macro to handle all run end types generically
+    macro_rules! extend_nulls_impl {
+        ($run_end_type:ty) => {{
+            let last_run_end = 
get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
+            let new_value = last_run_end + <$run_end_type as 
ArrowNativeType>::usize_as(len);

Review Comment:
   I wonder if we have to worry about overflow here -- for a type like `int16` 
what will happen if we try to extend past 32K values 🤔 



##########
arrow-data/src/transform/run.rs:
##########
@@ -0,0 +1,560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{ArrayData, Extend, _MutableArrayData};
+use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
+use arrow_schema::DataType;
+
+/// Generic helper to get the last run end value from a run ends array
+fn get_last_run_end<T: ArrowNativeType>(run_ends_data: 
&super::MutableArrayData) -> T {
+    if run_ends_data.data.len == 0 {
+        T::default()
+    } else {
+        // Convert buffer to typed slice and get the last element
+        let buffer = Buffer::from(run_ends_data.data.buffer1.as_slice());
+        let typed_slice: &[T] = buffer.typed_data();
+        if typed_slice.len() >= run_ends_data.data.len {
+            typed_slice[run_ends_data.data.len - 1]
+        } else {
+            T::default()
+        }
+    }
+}
+
+/// Extends the `MutableArrayData` with null values.
+///
+/// For RunEndEncoded, this adds nulls by extending the run_ends array
+/// and values array appropriately.
+pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
+    if len == 0 {
+        return;
+    }
+
+    // For REE, we always need to add a value entry when adding a new run
+    // The values array should have one entry per run, not per logical element
+    mutable.child_data[1].extend_nulls(1);
+
+    // Determine the run end type from the data type
+    let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = 
&mutable.data_type {
+        run_ends_field.data_type()
+    } else {
+        panic!("extend_nulls called on non-RunEndEncoded array");
+    };
+
+    // Use a macro to handle all run end types generically
+    macro_rules! extend_nulls_impl {
+        ($run_end_type:ty) => {{
+            let last_run_end = 
get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
+            let new_value = last_run_end + <$run_end_type as 
ArrowNativeType>::usize_as(len);
+            mutable.child_data[0]
+                .data
+                .buffer1
+                .extend_from_slice(new_value.to_byte_slice());
+        }};
+    }
+
+    // Apply the appropriate implementation based on run end type
+    match run_end_type {
+        DataType::Int16 => extend_nulls_impl!(i16),
+        DataType::Int32 => extend_nulls_impl!(i32),
+        DataType::Int64 => extend_nulls_impl!(i64),
+        _ => panic!(
+            "Invalid run end type for RunEndEncoded array: {:?}",
+            run_end_type
+        ),
+    };
+
+    mutable.child_data[0].data.len += 1;
+}
+
+/// Build run ends bytes and values range directly for batch processing
+fn build_extend_arrays<T: ArrowNativeType + std::ops::Add<Output = T>>(
+    buffer: &Buffer,
+    length: usize,
+    start: usize,
+    len: usize,
+    dest_last_run_end: T,
+) -> (Vec<u8>, Option<(usize, usize)>) {
+    let mut run_ends_bytes = Vec::new();
+    let mut values_range: Option<(usize, usize)> = None;
+    let end = start + len;
+    let mut prev_end = 0;
+    let mut current_run_end = dest_last_run_end;
+
+    // Convert buffer to typed slice once
+    let typed_slice: &[T] = buffer.typed_data();
+
+    for i in 0..length {
+        if i < typed_slice.len() {
+            let run_end = typed_slice[i].to_usize().unwrap();
+
+            if prev_end <= start && run_end > start {
+                let start_offset = start - prev_end;
+                let end_offset = if run_end >= end {
+                    end - prev_end
+                } else {
+                    run_end - prev_end
+                };
+                current_run_end = current_run_end + T::usize_as(end_offset - 
start_offset);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Start the range
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+            } else if prev_end >= start && run_end <= end {
+                current_run_end = current_run_end + T::usize_as(run_end - 
prev_end);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Extend the range
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+            } else if prev_end < end && run_end >= end {
+                current_run_end = current_run_end + T::usize_as(end - 
prev_end);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Extend the range and break
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+                break;
+            }
+
+            prev_end = run_end;
+            if prev_end >= end {
+                break;
+            }
+        } else {
+            break;
+        }
+    }
+    (run_ends_bytes, values_range)
+}
+
+/// Process extends using batch operations
+fn process_extends_batch<T: ArrowNativeType>(
+    mutable: &mut _MutableArrayData,
+    source_array_idx: usize,
+    run_ends_bytes: Vec<u8>,
+    values_range: Option<(usize, usize)>,
+) {
+    if run_ends_bytes.is_empty() {
+        return;
+    }
+
+    // Batch extend the run_ends array with all bytes at once
+    mutable.child_data[0]
+        .data
+        .buffer1
+        .extend_from_slice(&run_ends_bytes);
+    mutable.child_data[0].data.len += run_ends_bytes.len() / 
std::mem::size_of::<T>();
+
+    // Batch extend the values array using the range
+    let (start_idx, end_idx) =
+        values_range.expect("values_range should be Some if run_ends_bytes is 
not empty");
+    mutable.child_data[1].extend(source_array_idx, start_idx, end_idx);
+}
+
+/// Returns a function that extends the run encoded array.
+///
+/// It finds the physical indices in the source array that correspond to the 
logical range to copy, and adjusts the runs to the logical indices of the array 
to extend. The values are copied from the source array to the destination array 
verbatim.
+pub fn build_extend(array: &ArrayData) -> Extend {
+    Box::new(
+        move |mutable: &mut _MutableArrayData, array_idx: usize, start: usize, 
len: usize| {
+            if len == 0 {
+                return;
+            }
+
+            // We need to analyze the source array's run structure
+            let source_run_ends = &array.child_data()[0];
+            let source_buffer = &source_run_ends.buffers()[0];
+
+            // Get the run end type from the mutable array
+            let dest_run_end_type =
+                if let DataType::RunEndEncoded(run_ends_field, _) = 
&mutable.data_type {
+                    run_ends_field.data_type()
+                } else {
+                    panic!("extend called on non-RunEndEncoded mutable array");
+                };
+
+            // Build run ends and values indices directly for batch processing
+            macro_rules! build_and_process_impl {
+                ($run_end_type:ty) => {{
+                    let dest_last_run_end =
+                        
get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
+                    let (run_ends_bytes, values_range) = 
build_extend_arrays::<$run_end_type>(
+                        source_buffer,
+                        source_run_ends.len(),
+                        start,
+                        len,
+                        dest_last_run_end,
+                    );
+                    process_extends_batch::<$run_end_type>(
+                        mutable,
+                        array_idx,
+                        run_ends_bytes,
+                        values_range,
+                    );
+                }};
+            }
+
+            match dest_run_end_type {
+                DataType::Int16 => build_and_process_impl!(i16),
+                DataType::Int32 => build_and_process_impl!(i32),
+                DataType::Int64 => build_and_process_impl!(i64),
+                _ => panic!(
+                    "Invalid run end type for RunEndEncoded array: {:?}",
+                    dest_run_end_type
+                ),
+            }
+        },
+    )
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::transform::MutableArrayData;
+    use crate::{ArrayData, ArrayDataBuilder};
+    use arrow_buffer::Buffer;
+    use arrow_schema::{DataType, Field};
+    use std::sync::Arc;
+
+    fn create_run_array_data(run_ends: Vec<i32>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int32)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i32>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_run_array_data_int16(run_ends: Vec<i16>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int16)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i16>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_run_array_data_int64(run_ends: Vec<i64>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int64, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int64)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i64>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_int32_array_data(values: Vec<i32>) -> ArrayData {
+        let buffer = Buffer::from_vec(values);
+        ArrayDataBuilder::new(DataType::Int32)
+            .len(buffer.len() / std::mem::size_of::<i32>())
+            .add_buffer(buffer)
+            .build()
+            .unwrap()
+    }
+
+    fn create_string_dict_array_data(values: Vec<&str>, dict_values: 
Vec<&str>) -> ArrayData {
+        // Create dictionary values (strings)
+        let dict_offsets: Vec<i32> = dict_values
+            .iter()
+            .scan(0i32, |acc, s| {
+                let offset = *acc;
+                *acc += s.len() as i32;
+                Some(offset)
+            })
+            .chain(std::iter::once(
+                dict_values.iter().map(|s| s.len()).sum::<usize>() as i32,
+            ))
+            .collect();
+
+        let dict_data: Vec<u8> = dict_values.iter().flat_map(|s| 
s.bytes()).collect();
+
+        let dict_array = ArrayDataBuilder::new(DataType::Utf8)
+            .len(dict_values.len())
+            .add_buffer(Buffer::from_vec(dict_offsets))
+            .add_buffer(Buffer::from_vec(dict_data))
+            .build()
+            .unwrap();
+
+        // Create keys array
+        let keys: Vec<i32> = values
+            .iter()
+            .map(|v| dict_values.iter().position(|d| d == v).unwrap() as i32)
+            .collect();
+
+        // Create dictionary array
+        let dict_type = DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+
+        ArrayDataBuilder::new(dict_type)
+            .len(values.len())
+            .add_buffer(Buffer::from_vec(keys))
+            .add_child_data(dict_array)
+            .build()
+            .unwrap()
+    }
+
+    #[test]
+    fn test_extend_nulls_int32() {
+        // Create values array with one value
+        let values = create_int32_array_data(vec![42]);
+
+        // Create REE array with Int32 run ends
+        let ree_array = create_run_array_data(vec![5], values);
+
+        let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
+
+        // First, we need to copy the existing data
+        mutable.extend(0, 0, 5);
+
+        // Then add nulls
+        mutable.extend_nulls(3);
+
+        // Verify the run ends were extended correctly
+        let result = mutable.freeze();

Review Comment:
   Shoudl this test also verify the nulls?



##########
arrow-data/src/transform/run.rs:
##########
@@ -0,0 +1,560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{ArrayData, Extend, _MutableArrayData};
+use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
+use arrow_schema::DataType;
+
+/// Generic helper to get the last run end value from a run ends array
+fn get_last_run_end<T: ArrowNativeType>(run_ends_data: 
&super::MutableArrayData) -> T {
+    if run_ends_data.data.len == 0 {
+        T::default()
+    } else {
+        // Convert buffer to typed slice and get the last element
+        let buffer = Buffer::from(run_ends_data.data.buffer1.as_slice());
+        let typed_slice: &[T] = buffer.typed_data();
+        if typed_slice.len() >= run_ends_data.data.len {
+            typed_slice[run_ends_data.data.len - 1]
+        } else {
+            T::default()
+        }
+    }
+}
+
+/// Extends the `MutableArrayData` with null values.
+///
+/// For RunEndEncoded, this adds nulls by extending the run_ends array
+/// and values array appropriately.
+pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
+    if len == 0 {
+        return;
+    }
+
+    // For REE, we always need to add a value entry when adding a new run
+    // The values array should have one entry per run, not per logical element
+    mutable.child_data[1].extend_nulls(1);
+
+    // Determine the run end type from the data type
+    let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = 
&mutable.data_type {
+        run_ends_field.data_type()
+    } else {
+        panic!("extend_nulls called on non-RunEndEncoded array");
+    };
+
+    // Use a macro to handle all run end types generically
+    macro_rules! extend_nulls_impl {
+        ($run_end_type:ty) => {{
+            let last_run_end = 
get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
+            let new_value = last_run_end + <$run_end_type as 
ArrowNativeType>::usize_as(len);
+            mutable.child_data[0]
+                .data
+                .buffer1
+                .extend_from_slice(new_value.to_byte_slice());
+        }};
+    }
+
+    // Apply the appropriate implementation based on run end type
+    match run_end_type {
+        DataType::Int16 => extend_nulls_impl!(i16),
+        DataType::Int32 => extend_nulls_impl!(i32),
+        DataType::Int64 => extend_nulls_impl!(i64),
+        _ => panic!(
+            "Invalid run end type for RunEndEncoded array: {:?}",
+            run_end_type
+        ),
+    };
+
+    mutable.child_data[0].data.len += 1;
+}
+
+/// Build run ends bytes and values range directly for batch processing
+fn build_extend_arrays<T: ArrowNativeType + std::ops::Add<Output = T>>(
+    buffer: &Buffer,
+    length: usize,
+    start: usize,
+    len: usize,
+    dest_last_run_end: T,
+) -> (Vec<u8>, Option<(usize, usize)>) {
+    let mut run_ends_bytes = Vec::new();
+    let mut values_range: Option<(usize, usize)> = None;
+    let end = start + len;
+    let mut prev_end = 0;
+    let mut current_run_end = dest_last_run_end;
+
+    // Convert buffer to typed slice once
+    let typed_slice: &[T] = buffer.typed_data();
+
+    for i in 0..length {
+        if i < typed_slice.len() {
+            let run_end = typed_slice[i].to_usize().unwrap();
+
+            if prev_end <= start && run_end > start {
+                let start_offset = start - prev_end;
+                let end_offset = if run_end >= end {
+                    end - prev_end
+                } else {
+                    run_end - prev_end
+                };
+                current_run_end = current_run_end + T::usize_as(end_offset - 
start_offset);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Start the range
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+            } else if prev_end >= start && run_end <= end {
+                current_run_end = current_run_end + T::usize_as(run_end - 
prev_end);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Extend the range
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+            } else if prev_end < end && run_end >= end {
+                current_run_end = current_run_end + T::usize_as(end - 
prev_end);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Extend the range and break
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+                break;
+            }
+
+            prev_end = run_end;
+            if prev_end >= end {
+                break;
+            }
+        } else {
+            break;
+        }
+    }
+    (run_ends_bytes, values_range)
+}
+
+/// Process extends using batch operations
+fn process_extends_batch<T: ArrowNativeType>(
+    mutable: &mut _MutableArrayData,
+    source_array_idx: usize,
+    run_ends_bytes: Vec<u8>,
+    values_range: Option<(usize, usize)>,
+) {
+    if run_ends_bytes.is_empty() {
+        return;
+    }
+
+    // Batch extend the run_ends array with all bytes at once
+    mutable.child_data[0]
+        .data
+        .buffer1
+        .extend_from_slice(&run_ends_bytes);
+    mutable.child_data[0].data.len += run_ends_bytes.len() / 
std::mem::size_of::<T>();
+
+    // Batch extend the values array using the range
+    let (start_idx, end_idx) =
+        values_range.expect("values_range should be Some if run_ends_bytes is 
not empty");
+    mutable.child_data[1].extend(source_array_idx, start_idx, end_idx);
+}
+
+/// Returns a function that extends the run encoded array.
+///
+/// It finds the physical indices in the source array that correspond to the 
logical range to copy, and adjusts the runs to the logical indices of the array 
to extend. The values are copied from the source array to the destination array 
verbatim.
+pub fn build_extend(array: &ArrayData) -> Extend {
+    Box::new(
+        move |mutable: &mut _MutableArrayData, array_idx: usize, start: usize, 
len: usize| {
+            if len == 0 {
+                return;
+            }
+
+            // We need to analyze the source array's run structure
+            let source_run_ends = &array.child_data()[0];
+            let source_buffer = &source_run_ends.buffers()[0];
+
+            // Get the run end type from the mutable array
+            let dest_run_end_type =
+                if let DataType::RunEndEncoded(run_ends_field, _) = 
&mutable.data_type {
+                    run_ends_field.data_type()
+                } else {
+                    panic!("extend called on non-RunEndEncoded mutable array");
+                };
+
+            // Build run ends and values indices directly for batch processing
+            macro_rules! build_and_process_impl {
+                ($run_end_type:ty) => {{
+                    let dest_last_run_end =
+                        
get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
+                    let (run_ends_bytes, values_range) = 
build_extend_arrays::<$run_end_type>(
+                        source_buffer,
+                        source_run_ends.len(),
+                        start,
+                        len,
+                        dest_last_run_end,
+                    );
+                    process_extends_batch::<$run_end_type>(
+                        mutable,
+                        array_idx,
+                        run_ends_bytes,
+                        values_range,
+                    );
+                }};
+            }
+
+            match dest_run_end_type {
+                DataType::Int16 => build_and_process_impl!(i16),
+                DataType::Int32 => build_and_process_impl!(i32),
+                DataType::Int64 => build_and_process_impl!(i64),
+                _ => panic!(
+                    "Invalid run end type for RunEndEncoded array: {:?}",
+                    dest_run_end_type
+                ),
+            }
+        },
+    )
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::transform::MutableArrayData;
+    use crate::{ArrayData, ArrayDataBuilder};
+    use arrow_buffer::Buffer;
+    use arrow_schema::{DataType, Field};
+    use std::sync::Arc;
+
+    fn create_run_array_data(run_ends: Vec<i32>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int32)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i32>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_run_array_data_int16(run_ends: Vec<i16>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int16)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i16>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_run_array_data_int64(run_ends: Vec<i64>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int64, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int64)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i64>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_int32_array_data(values: Vec<i32>) -> ArrayData {
+        let buffer = Buffer::from_vec(values);
+        ArrayDataBuilder::new(DataType::Int32)
+            .len(buffer.len() / std::mem::size_of::<i32>())
+            .add_buffer(buffer)
+            .build()
+            .unwrap()
+    }
+
+    fn create_string_dict_array_data(values: Vec<&str>, dict_values: 
Vec<&str>) -> ArrayData {
+        // Create dictionary values (strings)
+        let dict_offsets: Vec<i32> = dict_values
+            .iter()
+            .scan(0i32, |acc, s| {
+                let offset = *acc;
+                *acc += s.len() as i32;
+                Some(offset)
+            })
+            .chain(std::iter::once(
+                dict_values.iter().map(|s| s.len()).sum::<usize>() as i32,
+            ))
+            .collect();
+
+        let dict_data: Vec<u8> = dict_values.iter().flat_map(|s| 
s.bytes()).collect();
+
+        let dict_array = ArrayDataBuilder::new(DataType::Utf8)
+            .len(dict_values.len())
+            .add_buffer(Buffer::from_vec(dict_offsets))
+            .add_buffer(Buffer::from_vec(dict_data))
+            .build()
+            .unwrap();
+
+        // Create keys array
+        let keys: Vec<i32> = values
+            .iter()
+            .map(|v| dict_values.iter().position(|d| d == v).unwrap() as i32)
+            .collect();
+
+        // Create dictionary array
+        let dict_type = DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+
+        ArrayDataBuilder::new(dict_type)
+            .len(values.len())
+            .add_buffer(Buffer::from_vec(keys))
+            .add_child_data(dict_array)
+            .build()
+            .unwrap()
+    }
+
+    #[test]
+    fn test_extend_nulls_int32() {
+        // Create values array with one value
+        let values = create_int32_array_data(vec![42]);
+
+        // Create REE array with Int32 run ends
+        let ree_array = create_run_array_data(vec![5], values);
+
+        let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
+
+        // First, we need to copy the existing data
+        mutable.extend(0, 0, 5);
+
+        // Then add nulls
+        mutable.extend_nulls(3);
+
+        // Verify the run ends were extended correctly
+        let result = mutable.freeze();
+        let run_ends_buffer = &result.child_data()[0].buffers()[0];
+        let run_ends_slice = run_ends_buffer.as_slice();
+
+        // Should have two run ends now: original 5 and new 8 (5 + 3)
+        assert_eq!(result.child_data()[0].len(), 2);
+        let first_run_end = 
i32::from_ne_bytes(run_ends_slice[0..4].try_into().unwrap());
+        let second_run_end = 
i32::from_ne_bytes(run_ends_slice[4..8].try_into().unwrap());
+        assert_eq!(first_run_end, 5);
+        assert_eq!(second_run_end, 8);
+    }
+
+    #[test]
+    fn test_extend_nulls_int16() {
+        // Create values array with one value
+        let values = create_int32_array_data(vec![42]);
+
+        // Create REE array with Int16 run ends
+        let ree_array = create_run_array_data_int16(vec![5i16], values);
+
+        let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
+
+        // First, we need to copy the existing data
+        mutable.extend(0, 0, 5);
+
+        // Then add nulls
+        mutable.extend_nulls(3);

Review Comment:
   it seems like `3` nulls doesn't match the `5` values that are copied above
   



##########
arrow-data/src/transform/run.rs:
##########
@@ -0,0 +1,560 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{ArrayData, Extend, _MutableArrayData};
+use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
+use arrow_schema::DataType;
+
+/// Generic helper to get the last run end value from a run ends array
+fn get_last_run_end<T: ArrowNativeType>(run_ends_data: 
&super::MutableArrayData) -> T {
+    if run_ends_data.data.len == 0 {
+        T::default()
+    } else {
+        // Convert buffer to typed slice and get the last element
+        let buffer = Buffer::from(run_ends_data.data.buffer1.as_slice());
+        let typed_slice: &[T] = buffer.typed_data();
+        if typed_slice.len() >= run_ends_data.data.len {
+            typed_slice[run_ends_data.data.len - 1]
+        } else {
+            T::default()
+        }
+    }
+}
+
+/// Extends the `MutableArrayData` with null values.
+///
+/// For RunEndEncoded, this adds nulls by extending the run_ends array
+/// and values array appropriately.
+pub fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) {
+    if len == 0 {
+        return;
+    }
+
+    // For REE, we always need to add a value entry when adding a new run
+    // The values array should have one entry per run, not per logical element
+    mutable.child_data[1].extend_nulls(1);
+
+    // Determine the run end type from the data type
+    let run_end_type = if let DataType::RunEndEncoded(run_ends_field, _) = 
&mutable.data_type {
+        run_ends_field.data_type()
+    } else {
+        panic!("extend_nulls called on non-RunEndEncoded array");
+    };
+
+    // Use a macro to handle all run end types generically
+    macro_rules! extend_nulls_impl {
+        ($run_end_type:ty) => {{
+            let last_run_end = 
get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
+            let new_value = last_run_end + <$run_end_type as 
ArrowNativeType>::usize_as(len);
+            mutable.child_data[0]
+                .data
+                .buffer1
+                .extend_from_slice(new_value.to_byte_slice());
+        }};
+    }
+
+    // Apply the appropriate implementation based on run end type
+    match run_end_type {
+        DataType::Int16 => extend_nulls_impl!(i16),
+        DataType::Int32 => extend_nulls_impl!(i32),
+        DataType::Int64 => extend_nulls_impl!(i64),
+        _ => panic!(
+            "Invalid run end type for RunEndEncoded array: {:?}",
+            run_end_type
+        ),
+    };
+
+    mutable.child_data[0].data.len += 1;
+}
+
+/// Build run ends bytes and values range directly for batch processing
+fn build_extend_arrays<T: ArrowNativeType + std::ops::Add<Output = T>>(
+    buffer: &Buffer,
+    length: usize,
+    start: usize,
+    len: usize,
+    dest_last_run_end: T,
+) -> (Vec<u8>, Option<(usize, usize)>) {
+    let mut run_ends_bytes = Vec::new();
+    let mut values_range: Option<(usize, usize)> = None;
+    let end = start + len;
+    let mut prev_end = 0;
+    let mut current_run_end = dest_last_run_end;
+
+    // Convert buffer to typed slice once
+    let typed_slice: &[T] = buffer.typed_data();
+
+    for i in 0..length {
+        if i < typed_slice.len() {
+            let run_end = typed_slice[i].to_usize().unwrap();
+
+            if prev_end <= start && run_end > start {
+                let start_offset = start - prev_end;
+                let end_offset = if run_end >= end {
+                    end - prev_end
+                } else {
+                    run_end - prev_end
+                };
+                current_run_end = current_run_end + T::usize_as(end_offset - 
start_offset);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Start the range
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+            } else if prev_end >= start && run_end <= end {
+                current_run_end = current_run_end + T::usize_as(run_end - 
prev_end);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Extend the range
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+            } else if prev_end < end && run_end >= end {
+                current_run_end = current_run_end + T::usize_as(end - 
prev_end);
+                
run_ends_bytes.extend_from_slice(current_run_end.to_byte_slice());
+
+                // Extend the range and break
+                if values_range.is_none() {
+                    values_range = Some((i, i + 1));
+                } else {
+                    values_range = Some((values_range.unwrap().0, i + 1));
+                }
+                break;
+            }
+
+            prev_end = run_end;
+            if prev_end >= end {
+                break;
+            }
+        } else {
+            break;
+        }
+    }
+    (run_ends_bytes, values_range)
+}
+
+/// Process extends using batch operations
+fn process_extends_batch<T: ArrowNativeType>(
+    mutable: &mut _MutableArrayData,
+    source_array_idx: usize,
+    run_ends_bytes: Vec<u8>,
+    values_range: Option<(usize, usize)>,
+) {
+    if run_ends_bytes.is_empty() {
+        return;
+    }
+
+    // Batch extend the run_ends array with all bytes at once
+    mutable.child_data[0]
+        .data
+        .buffer1
+        .extend_from_slice(&run_ends_bytes);
+    mutable.child_data[0].data.len += run_ends_bytes.len() / 
std::mem::size_of::<T>();
+
+    // Batch extend the values array using the range
+    let (start_idx, end_idx) =
+        values_range.expect("values_range should be Some if run_ends_bytes is 
not empty");
+    mutable.child_data[1].extend(source_array_idx, start_idx, end_idx);
+}
+
+/// Returns a function that extends the run encoded array.
+///
+/// It finds the physical indices in the source array that correspond to the 
logical range to copy, and adjusts the runs to the logical indices of the array 
to extend. The values are copied from the source array to the destination array 
verbatim.
+pub fn build_extend(array: &ArrayData) -> Extend {
+    Box::new(
+        move |mutable: &mut _MutableArrayData, array_idx: usize, start: usize, 
len: usize| {
+            if len == 0 {
+                return;
+            }
+
+            // We need to analyze the source array's run structure
+            let source_run_ends = &array.child_data()[0];
+            let source_buffer = &source_run_ends.buffers()[0];
+
+            // Get the run end type from the mutable array
+            let dest_run_end_type =
+                if let DataType::RunEndEncoded(run_ends_field, _) = 
&mutable.data_type {
+                    run_ends_field.data_type()
+                } else {
+                    panic!("extend called on non-RunEndEncoded mutable array");
+                };
+
+            // Build run ends and values indices directly for batch processing
+            macro_rules! build_and_process_impl {
+                ($run_end_type:ty) => {{
+                    let dest_last_run_end =
+                        
get_last_run_end::<$run_end_type>(&mutable.child_data[0]);
+                    let (run_ends_bytes, values_range) = 
build_extend_arrays::<$run_end_type>(
+                        source_buffer,
+                        source_run_ends.len(),
+                        start,
+                        len,
+                        dest_last_run_end,
+                    );
+                    process_extends_batch::<$run_end_type>(
+                        mutable,
+                        array_idx,
+                        run_ends_bytes,
+                        values_range,
+                    );
+                }};
+            }
+
+            match dest_run_end_type {
+                DataType::Int16 => build_and_process_impl!(i16),
+                DataType::Int32 => build_and_process_impl!(i32),
+                DataType::Int64 => build_and_process_impl!(i64),
+                _ => panic!(
+                    "Invalid run end type for RunEndEncoded array: {:?}",
+                    dest_run_end_type
+                ),
+            }
+        },
+    )
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::transform::MutableArrayData;
+    use crate::{ArrayData, ArrayDataBuilder};
+    use arrow_buffer::Buffer;
+    use arrow_schema::{DataType, Field};
+    use std::sync::Arc;
+
+    fn create_run_array_data(run_ends: Vec<i32>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int32)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i32>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_run_array_data_int16(run_ends: Vec<i16>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int16)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i16>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_run_array_data_int64(run_ends: Vec<i64>, values: ArrayData) -> 
ArrayData {
+        let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int64, 
false));
+        let values_field = Arc::new(Field::new("values", 
values.data_type().clone(), true));
+        let data_type = DataType::RunEndEncoded(run_ends_field, values_field);
+
+        let last_run_end = if run_ends.is_empty() {
+            0
+        } else {
+            run_ends[run_ends.len() - 1] as usize
+        };
+
+        let run_ends_buffer = Buffer::from_vec(run_ends);
+        let run_ends_data = ArrayDataBuilder::new(DataType::Int64)
+            .len(run_ends_buffer.len() / std::mem::size_of::<i64>())
+            .add_buffer(run_ends_buffer)
+            .build()
+            .unwrap();
+
+        ArrayDataBuilder::new(data_type)
+            .len(last_run_end)
+            .add_child_data(run_ends_data)
+            .add_child_data(values)
+            .build()
+            .unwrap()
+    }
+
+    fn create_int32_array_data(values: Vec<i32>) -> ArrayData {
+        let buffer = Buffer::from_vec(values);
+        ArrayDataBuilder::new(DataType::Int32)
+            .len(buffer.len() / std::mem::size_of::<i32>())
+            .add_buffer(buffer)
+            .build()
+            .unwrap()
+    }
+
+    fn create_string_dict_array_data(values: Vec<&str>, dict_values: 
Vec<&str>) -> ArrayData {
+        // Create dictionary values (strings)
+        let dict_offsets: Vec<i32> = dict_values
+            .iter()
+            .scan(0i32, |acc, s| {
+                let offset = *acc;
+                *acc += s.len() as i32;
+                Some(offset)
+            })
+            .chain(std::iter::once(
+                dict_values.iter().map(|s| s.len()).sum::<usize>() as i32,
+            ))
+            .collect();
+
+        let dict_data: Vec<u8> = dict_values.iter().flat_map(|s| 
s.bytes()).collect();
+
+        let dict_array = ArrayDataBuilder::new(DataType::Utf8)
+            .len(dict_values.len())
+            .add_buffer(Buffer::from_vec(dict_offsets))
+            .add_buffer(Buffer::from_vec(dict_data))
+            .build()
+            .unwrap();
+
+        // Create keys array
+        let keys: Vec<i32> = values
+            .iter()
+            .map(|v| dict_values.iter().position(|d| d == v).unwrap() as i32)
+            .collect();
+
+        // Create dictionary array
+        let dict_type = DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+
+        ArrayDataBuilder::new(dict_type)
+            .len(values.len())
+            .add_buffer(Buffer::from_vec(keys))
+            .add_child_data(dict_array)
+            .build()
+            .unwrap()
+    }
+
+    #[test]
+    fn test_extend_nulls_int32() {
+        // Create values array with one value
+        let values = create_int32_array_data(vec![42]);
+
+        // Create REE array with Int32 run ends
+        let ree_array = create_run_array_data(vec![5], values);
+
+        let mut mutable = MutableArrayData::new(vec![&ree_array], true, 10);
+
+        // First, we need to copy the existing data
+        mutable.extend(0, 0, 5);
+
+        // Then add nulls
+        mutable.extend_nulls(3);
+
+        // Verify the run ends were extended correctly
+        let result = mutable.freeze();
+        let run_ends_buffer = &result.child_data()[0].buffers()[0];
+        let run_ends_slice = run_ends_buffer.as_slice();
+
+        // Should have two run ends now: original 5 and new 8 (5 + 3)
+        assert_eq!(result.child_data()[0].len(), 2);
+        let first_run_end = 
i32::from_ne_bytes(run_ends_slice[0..4].try_into().unwrap());
+        let second_run_end = 
i32::from_ne_bytes(run_ends_slice[4..8].try_into().unwrap());
+        assert_eq!(first_run_end, 5);
+        assert_eq!(second_run_end, 8);
+    }
+
+    #[test]
+    fn test_extend_nulls_int16() {

Review Comment:
   I think it is also important here to try and extend the data past 32k (aka 
past what the run ends can encode) and ensure we get a reasonable error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to