alamb commented on code in PR #7649:
URL: https://github.com/apache/arrow-rs/pull/7649#discussion_r2143666108


##########
arrow-row/src/lib.rs:
##########
@@ -145,9 +145,11 @@ use variable::{decode_binary_view, decode_string_view};
 
 use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
 use crate::variable::{decode_binary, decode_string};
+use arrow_array::types::{Int16Type, Int32Type, Int64Type};
 
 mod fixed;
 mod list;
+mod run;

Review Comment:
   I double checked and `run` is consistent with the naming of REEArray 
elsewhere in the crate 👍 



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{null_sentinel, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{Array, RunArray};
+use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    // We don't need to use run_ends directly, just access through the array
+    for (idx, length) in lengths.iter_mut().enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+        if is_valid {
+            let row = rows.row(physical_idx);
+            *length += row.data.len() + 1; // 1 for the validity byte
+        } else {
+            *length += 1; // just the null sentinel
+        }
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    for (idx, offset) in offsets.iter_mut().skip(1).enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+
+        let out = &mut data[*offset..];
+        if is_valid {
+            let row = rows.row(physical_idx);
+            out[0] = 1; // valid
+            out[1..1 + row.data.len()].copy_from_slice(row.data);
+            *offset += row.data.len() + 1;
+        } else {
+            out[0] = null_sentinel(opts);
+            *offset += 1;
+        }
+    }
+}
+
+/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<R: RunEndIndexType>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<RunArray<R>, ArrowError> {
+    let opts = field.options;
+
+    // Track null values and collect row data to avoid borrow issues
+    let mut valid_flags = Vec::with_capacity(rows.len());

Review Comment:
    I think you could use `BooleanBufferBuilder` here which might be more 
efficient



##########
arrow-row/src/lib.rs:
##########
@@ -400,6 +404,17 @@ impl Codec {
                 };
                 Ok(Self::Dictionary(converter, owned))
             }
+            DataType::RunEndEncoded(_, values) => {
+                // Similar to List implementation

Review Comment:
   Maybe we can pull the transformation into a documented helper function (not 
needed, I just was confused for a bit until I read the comments in the 
List/LargeList implementation



##########
arrow-row/src/run_test.rs:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   I think it is more standard in this repo to put unit tests like this in the 
same module (aka I would expect this to be in `arrow-row/src/run.rs`
   
   Is there any reason it is in a different module?



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{null_sentinel, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{Array, RunArray};
+use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    // We don't need to use run_ends directly, just access through the array
+    for (idx, length) in lengths.iter_mut().enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+        if is_valid {
+            let row = rows.row(physical_idx);
+            *length += row.data.len() + 1; // 1 for the validity byte
+        } else {
+            *length += 1; // just the null sentinel
+        }
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values

Review Comment:
   it would be really helpful if the format that this was creating was 
documented somewhere (so I don't have to reverse engineer it from the code to 
try and double check it)



##########
arrow-row/src/run_test.rs:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(test)]
+mod tests {
+    use crate::{RowConverter, SortField};
+    use arrow_array::types::Int32Type;
+    use arrow_array::RunArray;
+    use arrow_schema::DataType;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_run_end_encoded_supports_datatype() {
+        // Test that the RowConverter correctly supports run-end encoded arrays
+        assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
+        )));
+    }
+
+    #[test]
+    fn test_run_end_encoded_sorting() {
+        // Create two run arrays with different values
+        let run_array1: RunArray<Int32Type> = vec!["b", "b", 
"a"].into_iter().collect();
+        let run_array2: RunArray<Int32Type> = vec!["a", "a", 
"b"].into_iter().collect();
+
+        let converter = 
RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
+        ))])
+        .unwrap();
+
+        let rows1 = 
converter.convert_columns(&[Arc::new(run_array1)]).unwrap();
+        let rows2 = 
converter.convert_columns(&[Arc::new(run_array2)]).unwrap();
+
+        // Compare rows - the second row should be less than the first
+        assert!(rows2.row(0) < rows1.row(0));
+        assert!(rows2.row(1) < rows1.row(0));
+        assert!(rows1.row(2) < rows2.row(2));
+    }

Review Comment:
   I think we should add a few more tests;
   
   1. Round tripping (e.g that converting rows1 to an `Array` results in an 
array that is equal to `run_array1` -- same for `rows2`)
   1. Encoding / Decoding REE Arrays that have nulls 
   2. Encoding/Decding REE arrays of some other value type (Int64Array for 
example)
   3. Descending / nulls first/last sort orders



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{null_sentinel, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{Array, RunArray};
+use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    // We don't need to use run_ends directly, just access through the array
+    for (idx, length) in lengths.iter_mut().enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+        if is_valid {
+            let row = rows.row(physical_idx);
+            *length += row.data.len() + 1; // 1 for the validity byte
+        } else {
+            *length += 1; // just the null sentinel
+        }
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    for (idx, offset) in offsets.iter_mut().skip(1).enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+
+        let out = &mut data[*offset..];
+        if is_valid {
+            let row = rows.row(physical_idx);
+            out[0] = 1; // valid
+            out[1..1 + row.data.len()].copy_from_slice(row.data);
+            *offset += row.data.len() + 1;
+        } else {
+            out[0] = null_sentinel(opts);
+            *offset += 1;
+        }
+    }
+}
+
+/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<R: RunEndIndexType>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<RunArray<R>, ArrowError> {
+    let opts = field.options;
+
+    // Track null values and collect row data to avoid borrow issues
+    let mut valid_flags = Vec::with_capacity(rows.len());
+    let mut row_data = Vec::with_capacity(rows.len());
+
+    // First pass: collect valid flags and data for each row
+    for row in rows.iter() {
+        let is_valid = row[0] != null_sentinel(opts);
+        valid_flags.push(is_valid);
+        if is_valid {
+            row_data.push(&row[1..]);
+        } else {
+            row_data.push(&[][..]);
+        }
+    }
+
+    // Now build run ends and values
+    let mut run_ends = Vec::new();
+    let mut values_data = Vec::new();
+    let mut current_value_idx = 0;
+    let mut current_run_end = 0;
+
+    for (idx, is_valid) in valid_flags.iter().enumerate() {
+        current_run_end += 1;
+
+        if idx == 0 {
+            // Add the first row data
+            values_data.push(row_data[idx]);
+            run_ends.push(R::Native::usize_as(current_run_end));
+            continue;
+        }
+
+        // Check if this row is different from the previous one
+        let value_changed = if !is_valid {
+            // Null value - check if previous was null
+            !valid_flags[idx - 1]
+        } else if !valid_flags[idx - 1] {
+            // Previous was null, this is not
+            true
+        } else {
+            // Both are valid, compare data
+            let prev_data = row_data[idx - 1];
+            let curr_data = row_data[idx];
+            prev_data != curr_data
+        };
+
+        if value_changed {
+            // Start a new run
+            current_value_idx += 1;
+            values_data.push(row_data[idx]);
+            run_ends.push(R::Native::usize_as(current_run_end));
+        } else {
+            // Update the current run end
+            run_ends[current_value_idx] = R::Native::usize_as(current_run_end);
+        }
+    }
+
+    // Convert collected values to arrays
+    let mut values_rows = values_data.clone();

Review Comment:
   is this clone necesssary? It seems like values_data isn't used afterwards



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{null_sentinel, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{Array, RunArray};
+use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    // We don't need to use run_ends directly, just access through the array
+    for (idx, length) in lengths.iter_mut().enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+        if is_valid {
+            let row = rows.row(physical_idx);
+            *length += row.data.len() + 1; // 1 for the validity byte
+        } else {
+            *length += 1; // just the null sentinel
+        }
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    for (idx, offset) in offsets.iter_mut().skip(1).enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+
+        let out = &mut data[*offset..];
+        if is_valid {
+            let row = rows.row(physical_idx);
+            out[0] = 1; // valid
+            out[1..1 + row.data.len()].copy_from_slice(row.data);
+            *offset += row.data.len() + 1;
+        } else {
+            out[0] = null_sentinel(opts);
+            *offset += 1;
+        }
+    }
+}
+
+/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<R: RunEndIndexType>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<RunArray<R>, ArrowError> {
+    let opts = field.options;
+
+    // Track null values and collect row data to avoid borrow issues
+    let mut valid_flags = Vec::with_capacity(rows.len());
+    let mut row_data = Vec::with_capacity(rows.len());
+
+    // First pass: collect valid flags and data for each row
+    for row in rows.iter() {
+        let is_valid = row[0] != null_sentinel(opts);
+        valid_flags.push(is_valid);
+        if is_valid {
+            row_data.push(&row[1..]);
+        } else {
+            row_data.push(&[][..]);
+        }
+    }
+
+    // Now build run ends and values
+    let mut run_ends = Vec::new();
+    let mut values_data = Vec::new();
+    let mut current_value_idx = 0;
+    let mut current_run_end = 0;
+
+    for (idx, is_valid) in valid_flags.iter().enumerate() {
+        current_run_end += 1;
+
+        if idx == 0 {
+            // Add the first row data
+            values_data.push(row_data[idx]);
+            run_ends.push(R::Native::usize_as(current_run_end));
+            continue;
+        }
+
+        // Check if this row is different from the previous one
+        let value_changed = if !is_valid {
+            // Null value - check if previous was null
+            !valid_flags[idx - 1]
+        } else if !valid_flags[idx - 1] {
+            // Previous was null, this is not
+            true
+        } else {
+            // Both are valid, compare data
+            let prev_data = row_data[idx - 1];
+            let curr_data = row_data[idx];
+            prev_data != curr_data
+        };
+
+        if value_changed {
+            // Start a new run
+            current_value_idx += 1;
+            values_data.push(row_data[idx]);
+            run_ends.push(R::Native::usize_as(current_run_end));
+        } else {
+            // Update the current run end
+            run_ends[current_value_idx] = R::Native::usize_as(current_run_end);
+        }
+    }
+
+    // Convert collected values to arrays
+    let mut values_rows = values_data.clone();
+    let values = converter.convert_raw(&mut values_rows, validate_utf8)?;
+
+    // Create run ends array
+    // Get the count of elements before we move the vector
+    let element_count = run_ends.len();
+    let buffer = Buffer::from_vec(run_ends);
+    let run_ends_array = arrow_array::PrimitiveArray::<R>::new(
+        arrow_buffer::ScalarBuffer::new(buffer, 0, element_count),
+        None,
+    );
+
+    // Update rows to consume the data we've processed
+    for i in 0..rows.len() {

Review Comment:
   I don't fully understand the consumption code, but it seems consistent with 
the ListArray implemenation



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{null_sentinel, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{Array, RunArray};
+use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    // We don't need to use run_ends directly, just access through the array
+    for (idx, length) in lengths.iter_mut().enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+        if is_valid {
+            let row = rows.row(physical_idx);
+            *length += row.data.len() + 1; // 1 for the validity byte
+        } else {
+            *length += 1; // just the null sentinel
+        }
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    for (idx, offset) in offsets.iter_mut().skip(1).enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+
+        let out = &mut data[*offset..];
+        if is_valid {
+            let row = rows.row(physical_idx);
+            out[0] = 1; // valid
+            out[1..1 + row.data.len()].copy_from_slice(row.data);
+            *offset += row.data.len() + 1;
+        } else {
+            out[0] = null_sentinel(opts);
+            *offset += 1;
+        }
+    }
+}
+
+/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<R: RunEndIndexType>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<RunArray<R>, ArrowError> {
+    let opts = field.options;
+
+    // Track null values and collect row data to avoid borrow issues
+    let mut valid_flags = Vec::with_capacity(rows.len());
+    let mut row_data = Vec::with_capacity(rows.len());
+
+    // First pass: collect valid flags and data for each row
+    for row in rows.iter() {
+        let is_valid = row[0] != null_sentinel(opts);
+        valid_flags.push(is_valid);
+        if is_valid {
+            row_data.push(&row[1..]);
+        } else {
+            row_data.push(&[][..]);
+        }
+    }
+
+    // Now build run ends and values
+    let mut run_ends = Vec::new();
+    let mut values_data = Vec::new();
+    let mut current_value_idx = 0;
+    let mut current_run_end = 0;
+
+    for (idx, is_valid) in valid_flags.iter().enumerate() {
+        current_run_end += 1;
+
+        if idx == 0 {
+            // Add the first row data
+            values_data.push(row_data[idx]);
+            run_ends.push(R::Native::usize_as(current_run_end));
+            continue;
+        }
+
+        // Check if this row is different from the previous one
+        let value_changed = if !is_valid {
+            // Null value - check if previous was null
+            !valid_flags[idx - 1]
+        } else if !valid_flags[idx - 1] {
+            // Previous was null, this is not
+            true
+        } else {
+            // Both are valid, compare data
+            let prev_data = row_data[idx - 1];
+            let curr_data = row_data[idx];
+            prev_data != curr_data
+        };
+
+        if value_changed {
+            // Start a new run
+            current_value_idx += 1;
+            values_data.push(row_data[idx]);
+            run_ends.push(R::Native::usize_as(current_run_end));
+        } else {
+            // Update the current run end
+            run_ends[current_value_idx] = R::Native::usize_as(current_run_end);
+        }
+    }
+
+    // Convert collected values to arrays
+    let mut values_rows = values_data.clone();
+    let values = converter.convert_raw(&mut values_rows, validate_utf8)?;
+
+    // Create run ends array
+    // Get the count of elements before we move the vector
+    let element_count = run_ends.len();
+    let buffer = Buffer::from_vec(run_ends);
+    let run_ends_array = arrow_array::PrimitiveArray::<R>::new(

Review Comment:
   I think you could just do `PrimitiveArray::<R>::from(run_ends` (which will 
internally do the buffer dance you are doing)



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,164 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{null_sentinel, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{Array, RunArray};
+use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    // We don't need to use run_ends directly, just access through the array
+    for (idx, length) in lengths.iter_mut().enumerate() {
+        let physical_idx = array.get_physical_index(idx);
+        let is_valid = array.values().is_valid(physical_idx);
+        if is_valid {
+            let row = rows.row(physical_idx);
+            *length += row.data.len() + 1; // 1 for the validity byte
+        } else {
+            *length += 1; // just the null sentinel
+        }
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    for (idx, offset) in offsets.iter_mut().skip(1).enumerate() {
+        let physical_idx = array.get_physical_index(idx);

Review Comment:
   This code is effectively going to deocde the REE array (though I don't 
really see any way around that)
   
   I do think you could make it more efficient by iterating over each run and 
then copying the value in a loop (rather than this which does a binary search 
on the run ends for each `idx`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to