alamb commented on code in PR #7649:
URL: https://github.com/apache/arrow-rs/pull/7649#discussion_r2153011523


##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,695 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{variable, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{PrimitiveArray, RunArray};
+use arrow_buffer::{ArrowNativeType, ScalarBuffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    let run_ends = array.run_ends().values();
+    let mut logical_start = 0;
+
+    // Iterate over each run and apply the same length to all logical 
positions in the run
+    for (physical_idx, &run_end) in run_ends.iter().enumerate() {
+        let logical_end = run_end.as_usize();
+        let row = rows.row(physical_idx);
+        let encoded_len = variable::encoded_len(Some(row.data));
+
+        // Add the same length for all logical positions in this run
+        for length in &mut lengths[logical_start..logical_end] {
+            *length += encoded_len;
+        }
+
+        logical_start = logical_end;
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    let run_ends = array.run_ends();
+
+    let mut logical_idx = 0;
+    let mut offset_idx = 1; // Skip first offset
+
+    // Iterate over each run
+    for physical_idx in 0..run_ends.values().len() {
+        let run_end = run_ends.values()[physical_idx].as_usize();
+
+        // Process all elements in this run
+        while logical_idx < run_end && offset_idx < offsets.len() {
+            let offset = &mut offsets[offset_idx];
+            let out = &mut data[*offset..];
+
+            // Use variable-length encoding to make the data self-describing
+            let row = rows.row(physical_idx);
+            let bytes_written = variable::encode_one(out, Some(row.data), 
opts);
+            *offset += bytes_written;
+
+            logical_idx += 1;
+            offset_idx += 1;
+        }
+
+        // Break if we've processed all offsets
+        if offset_idx >= offsets.len() {
+            break;
+        }
+    }
+}
+
+/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<R: RunEndIndexType>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<RunArray<R>, ArrowError> {
+    if rows.is_empty() {
+        let values = converter.convert_raw(&mut [], validate_utf8)?;
+        let run_ends_array = 
PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None);
+        return RunArray::<R>::try_new(&run_ends_array, &values[0]);
+    }
+
+    // Decode each row's REE data and collect the decoded values
+    let mut decoded_values = Vec::new();
+    let mut run_ends = Vec::new();
+    let mut unique_row_indices = Vec::new();
+
+    // Process each row to extract its REE data (following decode_binary 
pattern)
+    let mut decoded_data = Vec::new();
+    for (idx, row) in rows.iter_mut().enumerate() {
+        decoded_data.clear();
+        // Extract the decoded value data from this row
+        let consumed = variable::decode_blocks(row, field.options, |block| {
+            decoded_data.extend_from_slice(block);
+        });
+
+        // Handle bit inversion for descending sort (following decode_binary 
pattern)
+        if field.options.descending {
+            decoded_data.iter_mut().for_each(|b| *b = !*b);
+        }
+
+        // Update the row to point past the consumed REE data
+        *row = &row[consumed..];
+
+        // Check if this decoded value is the same as the previous one to 
identify runs
+        let is_new_run =
+            idx == 0 || decoded_data != 
decoded_values[*unique_row_indices.last().unwrap()];
+
+        if is_new_run {
+            // This is a new unique value - end the previous run if any
+            if idx > 0 {
+                run_ends.push(R::Native::usize_as(idx));
+            }
+            unique_row_indices.push(decoded_values.len());
+            decoded_values.push(decoded_data.clone());
+        }
+    }
+    // Add the final run end
+    run_ends.push(R::Native::usize_as(rows.len()));
+
+    // Convert the unique decoded values using the row converter
+    let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| 
v.as_slice()).collect();
+    let values = if unique_rows.is_empty() {
+        converter.convert_raw(&mut [], validate_utf8)?
+    } else {
+        converter.convert_raw(&mut unique_rows, validate_utf8)?
+    };
+
+    // Create run ends array
+    let run_ends_array = 
PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None);
+
+    // Create the RunEndEncodedArray
+    RunArray::<R>::try_new(&run_ends_array, &values[0])
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{RowConverter, SortField};
+    use arrow_array::types::Int32Type;
+    use arrow_array::{Array, Int64Array, RunArray, StringArray};
+    use arrow_schema::{DataType, SortOptions};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_run_end_encoded_supports_datatype() {
+        // Test that the RowConverter correctly supports run-end encoded arrays
+        assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
+        )));
+    }
+
+    #[test]
+    fn test_run_end_encoded_round_trip_int64s() {
+        // Test round-trip correctness for RunEndEncodedArray with Int64 
values making sure it
+        // doesn't just work with eg. strings (which are all the other tests).
+
+        let values = Int64Array::from(vec![100, 200, 100, 300]);
+        let run_ends = vec![2, 3, 5, 6];
+        let array: RunArray<Int32Type> =
+            RunArray::try_new(&arrow_array::PrimitiveArray::from(run_ends), 
&values).unwrap();
+
+        let converter = 
RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Int64, 
true)),
+        ))])
+        .unwrap();
+
+        let rows = converter
+            .convert_columns(&[Arc::new(array.clone())])
+            .unwrap();
+
+        let arrays = converter.convert_rows(&rows).unwrap();
+        let result = arrays[0]
+            .as_any()
+            .downcast_ref::<RunArray<Int32Type>>()
+            .unwrap();
+
+        assert_eq!(array.run_ends().values(), result.run_ends().values());

Review Comment:
   https://github.com/apache/arrow-rs/issues/7691



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,695 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{variable, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{PrimitiveArray, RunArray};
+use arrow_buffer::{ArrowNativeType, ScalarBuffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    let run_ends = array.run_ends().values();
+    let mut logical_start = 0;
+
+    // Iterate over each run and apply the same length to all logical 
positions in the run
+    for (physical_idx, &run_end) in run_ends.iter().enumerate() {
+        let logical_end = run_end.as_usize();
+        let row = rows.row(physical_idx);
+        let encoded_len = variable::encoded_len(Some(row.data));
+
+        // Add the same length for all logical positions in this run
+        for length in &mut lengths[logical_start..logical_end] {
+            *length += encoded_len;
+        }
+
+        logical_start = logical_end;
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    let run_ends = array.run_ends();
+
+    let mut logical_idx = 0;
+    let mut offset_idx = 1; // Skip first offset
+
+    // Iterate over each run
+    for physical_idx in 0..run_ends.values().len() {
+        let run_end = run_ends.values()[physical_idx].as_usize();
+
+        // Process all elements in this run
+        while logical_idx < run_end && offset_idx < offsets.len() {
+            let offset = &mut offsets[offset_idx];
+            let out = &mut data[*offset..];
+
+            // Use variable-length encoding to make the data self-describing
+            let row = rows.row(physical_idx);
+            let bytes_written = variable::encode_one(out, Some(row.data), 
opts);
+            *offset += bytes_written;
+
+            logical_idx += 1;
+            offset_idx += 1;
+        }
+
+        // Break if we've processed all offsets
+        if offset_idx >= offsets.len() {
+            break;
+        }
+    }
+}
+
+/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<R: RunEndIndexType>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<RunArray<R>, ArrowError> {
+    if rows.is_empty() {
+        let values = converter.convert_raw(&mut [], validate_utf8)?;
+        let run_ends_array = 
PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None);
+        return RunArray::<R>::try_new(&run_ends_array, &values[0]);
+    }
+
+    // Decode each row's REE data and collect the decoded values
+    let mut decoded_values = Vec::new();
+    let mut run_ends = Vec::new();
+    let mut unique_row_indices = Vec::new();
+
+    // Process each row to extract its REE data (following decode_binary 
pattern)
+    let mut decoded_data = Vec::new();
+    for (idx, row) in rows.iter_mut().enumerate() {
+        decoded_data.clear();
+        // Extract the decoded value data from this row
+        let consumed = variable::decode_blocks(row, field.options, |block| {
+            decoded_data.extend_from_slice(block);
+        });
+
+        // Handle bit inversion for descending sort (following decode_binary 
pattern)
+        if field.options.descending {
+            decoded_data.iter_mut().for_each(|b| *b = !*b);
+        }
+
+        // Update the row to point past the consumed REE data
+        *row = &row[consumed..];
+
+        // Check if this decoded value is the same as the previous one to 
identify runs
+        let is_new_run =
+            idx == 0 || decoded_data != 
decoded_values[*unique_row_indices.last().unwrap()];
+
+        if is_new_run {
+            // This is a new unique value - end the previous run if any
+            if idx > 0 {
+                run_ends.push(R::Native::usize_as(idx));
+            }
+            unique_row_indices.push(decoded_values.len());
+            decoded_values.push(decoded_data.clone());
+        }
+    }
+    // Add the final run end
+    run_ends.push(R::Native::usize_as(rows.len()));
+
+    // Convert the unique decoded values using the row converter
+    let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| 
v.as_slice()).collect();
+    let values = if unique_rows.is_empty() {
+        converter.convert_raw(&mut [], validate_utf8)?
+    } else {
+        converter.convert_raw(&mut unique_rows, validate_utf8)?
+    };
+
+    // Create run ends array
+    let run_ends_array = 
PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None);
+
+    // Create the RunEndEncodedArray
+    RunArray::<R>::try_new(&run_ends_array, &values[0])
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{RowConverter, SortField};
+    use arrow_array::types::Int32Type;
+    use arrow_array::{Array, Int64Array, RunArray, StringArray};
+    use arrow_schema::{DataType, SortOptions};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_run_end_encoded_supports_datatype() {
+        // Test that the RowConverter correctly supports run-end encoded arrays
+        assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
+        )));
+    }
+
+    #[test]
+    fn test_run_end_encoded_round_trip_int64s() {
+        // Test round-trip correctness for RunEndEncodedArray with Int64 
values making sure it
+        // doesn't just work with eg. strings (which are all the other tests).
+
+        let values = Int64Array::from(vec![100, 200, 100, 300]);
+        let run_ends = vec![2, 3, 5, 6];
+        let array: RunArray<Int32Type> =
+            RunArray::try_new(&arrow_array::PrimitiveArray::from(run_ends), 
&values).unwrap();
+
+        let converter = 
RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Int64, 
true)),
+        ))])
+        .unwrap();
+
+        let rows = converter
+            .convert_columns(&[Arc::new(array.clone())])
+            .unwrap();
+
+        let arrays = converter.convert_rows(&rows).unwrap();
+        let result = arrays[0]
+            .as_any()
+            .downcast_ref::<RunArray<Int32Type>>()
+            .unwrap();
+
+        assert_eq!(array.run_ends().values(), result.run_ends().values());

Review Comment:
    https://github.com/apache/arrow-rs/issues/7691



##########
arrow-row/src/run.rs:
##########
@@ -0,0 +1,695 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{variable, RowConverter, Rows, SortField};
+use arrow_array::types::RunEndIndexType;
+use arrow_array::{PrimitiveArray, RunArray};
+use arrow_buffer::{ArrowNativeType, ScalarBuffer};
+use arrow_schema::{ArrowError, SortOptions};
+
+/// Computes the lengths of each row for a RunEndEncodedArray
+pub fn compute_lengths<R: RunEndIndexType>(
+    lengths: &mut [usize],
+    rows: &Rows,
+    array: &RunArray<R>,
+) {
+    let run_ends = array.run_ends().values();
+    let mut logical_start = 0;
+
+    // Iterate over each run and apply the same length to all logical 
positions in the run
+    for (physical_idx, &run_end) in run_ends.iter().enumerate() {
+        let logical_end = run_end.as_usize();
+        let row = rows.row(physical_idx);
+        let encoded_len = variable::encoded_len(Some(row.data));
+
+        // Add the same length for all logical positions in this run
+        for length in &mut lengths[logical_start..logical_end] {
+            *length += encoded_len;
+        }
+
+        logical_start = logical_end;
+    }
+}
+
+/// Encodes the provided `RunEndEncodedArray` to `out` with the provided 
`SortOptions`
+///
+/// `rows` should contain the encoded values
+pub fn encode<R: RunEndIndexType>(
+    data: &mut [u8],
+    offsets: &mut [usize],
+    rows: &Rows,
+    opts: SortOptions,
+    array: &RunArray<R>,
+) {
+    let run_ends = array.run_ends();
+
+    let mut logical_idx = 0;
+    let mut offset_idx = 1; // Skip first offset
+
+    // Iterate over each run
+    for physical_idx in 0..run_ends.values().len() {
+        let run_end = run_ends.values()[physical_idx].as_usize();
+
+        // Process all elements in this run
+        while logical_idx < run_end && offset_idx < offsets.len() {
+            let offset = &mut offsets[offset_idx];
+            let out = &mut data[*offset..];
+
+            // Use variable-length encoding to make the data self-describing
+            let row = rows.row(physical_idx);
+            let bytes_written = variable::encode_one(out, Some(row.data), 
opts);
+            *offset += bytes_written;
+
+            logical_idx += 1;
+            offset_idx += 1;
+        }
+
+        // Break if we've processed all offsets
+        if offset_idx >= offsets.len() {
+            break;
+        }
+    }
+}
+
+/// Decodes a RunEndEncodedArray from `rows` with the provided `options`
+///
+/// # Safety
+///
+/// `rows` must contain valid data for the provided `converter`
+pub unsafe fn decode<R: RunEndIndexType>(
+    converter: &RowConverter,
+    rows: &mut [&[u8]],
+    field: &SortField,
+    validate_utf8: bool,
+) -> Result<RunArray<R>, ArrowError> {
+    if rows.is_empty() {
+        let values = converter.convert_raw(&mut [], validate_utf8)?;
+        let run_ends_array = 
PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None);
+        return RunArray::<R>::try_new(&run_ends_array, &values[0]);
+    }
+
+    // Decode each row's REE data and collect the decoded values
+    let mut decoded_values = Vec::new();
+    let mut run_ends = Vec::new();
+    let mut unique_row_indices = Vec::new();
+
+    // Process each row to extract its REE data (following decode_binary 
pattern)
+    let mut decoded_data = Vec::new();
+    for (idx, row) in rows.iter_mut().enumerate() {
+        decoded_data.clear();
+        // Extract the decoded value data from this row
+        let consumed = variable::decode_blocks(row, field.options, |block| {
+            decoded_data.extend_from_slice(block);
+        });
+
+        // Handle bit inversion for descending sort (following decode_binary 
pattern)
+        if field.options.descending {
+            decoded_data.iter_mut().for_each(|b| *b = !*b);
+        }
+
+        // Update the row to point past the consumed REE data
+        *row = &row[consumed..];
+
+        // Check if this decoded value is the same as the previous one to 
identify runs
+        let is_new_run =
+            idx == 0 || decoded_data != 
decoded_values[*unique_row_indices.last().unwrap()];
+
+        if is_new_run {
+            // This is a new unique value - end the previous run if any
+            if idx > 0 {
+                run_ends.push(R::Native::usize_as(idx));
+            }
+            unique_row_indices.push(decoded_values.len());
+            decoded_values.push(decoded_data.clone());
+        }
+    }
+    // Add the final run end
+    run_ends.push(R::Native::usize_as(rows.len()));
+
+    // Convert the unique decoded values using the row converter
+    let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| 
v.as_slice()).collect();
+    let values = if unique_rows.is_empty() {
+        converter.convert_raw(&mut [], validate_utf8)?
+    } else {
+        converter.convert_raw(&mut unique_rows, validate_utf8)?
+    };
+
+    // Create run ends array
+    let run_ends_array = 
PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None);
+
+    // Create the RunEndEncodedArray
+    RunArray::<R>::try_new(&run_ends_array, &values[0])
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{RowConverter, SortField};
+    use arrow_array::types::Int32Type;
+    use arrow_array::{Array, Int64Array, RunArray, StringArray};
+    use arrow_schema::{DataType, SortOptions};
+    use std::sync::Arc;
+
+    #[test]
+    fn test_run_end_encoded_supports_datatype() {
+        // Test that the RowConverter correctly supports run-end encoded arrays
+        assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)),
+        )));
+    }
+
+    #[test]
+    fn test_run_end_encoded_round_trip_int64s() {
+        // Test round-trip correctness for RunEndEncodedArray with Int64 
values making sure it
+        // doesn't just work with eg. strings (which are all the other tests).
+
+        let values = Int64Array::from(vec![100, 200, 100, 300]);
+        let run_ends = vec![2, 3, 5, 6];
+        let array: RunArray<Int32Type> =
+            RunArray::try_new(&arrow_array::PrimitiveArray::from(run_ends), 
&values).unwrap();
+
+        let converter = 
RowConverter::new(vec![SortField::new(DataType::RunEndEncoded(
+            Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, 
false)),
+            Arc::new(arrow_schema::Field::new("values", DataType::Int64, 
true)),
+        ))])
+        .unwrap();
+
+        let rows = converter
+            .convert_columns(&[Arc::new(array.clone())])
+            .unwrap();
+
+        let arrays = converter.convert_rows(&rows).unwrap();
+        let result = arrays[0]
+            .as_any()
+            .downcast_ref::<RunArray<Int32Type>>()
+            .unwrap();
+
+        assert_eq!(array.run_ends().values(), result.run_ends().values());

Review Comment:
   - https://github.com/apache/arrow-rs/issues/7691



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to