This is an automated email from the ASF dual-hosted git repository.
jeffreyvo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 92a239a54e Implement min, max, sum for run-end-encoded arrays. (#9409)
92a239a54e is described below
commit 92a239a54e33043f05fef98d81d3c7bd2b926467
Author: Bruno <[email protected]>
AuthorDate: Thu Mar 12 07:31:45 2026 +0100
Implement min, max, sum for run-end-encoded arrays. (#9409)
Efficient implementations:
* min & max work directly on the values child array.
* sum folds over run lengths & values, without decompressing the array.
In particular, those implementations takes care of the logical offset &
len of the run-end-encoded arrays. This is non-trivial:
* We get the physical start & end indices in O(log(#runs)), but those
are incorrect for empty arrays.
* Slicing can happen in the middle of a run. For sum, we need to track
the logical start & end and reduce the run length accordingly.
Finally, one caveat: the aggregation functions only work when the child
values array is a primitive array. That's fine ~always, but some client
might store the values in an unexpected type. They'll either get None or
an Error, depending on the aggregation function used.
This feature is tracked in
https://github.com/apache/arrow-rs/issues/3520.
---
arrow-arith/src/aggregate.rs | 296 ++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 292 insertions(+), 4 deletions(-)
diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs
index a043259694..59792d0c5b 100644
--- a/arrow-arith/src/aggregate.rs
+++ b/arrow-arith/src/aggregate.rs
@@ -540,7 +540,7 @@ pub fn min_string_view(array: &StringViewArray) ->
Option<&str> {
/// Returns the sum of values in the array.
///
/// This doesn't detect overflow. Once overflowing, the result will wrap
around.
-/// For an overflow-checking variant, use `sum_array_checked` instead.
+/// For an overflow-checking variant, use [`sum_array_checked`] instead.
pub fn sum_array<T, A: ArrayAccessor<Item = T::Native>>(array: A) ->
Option<T::Native>
where
T: ArrowNumericType,
@@ -567,6 +567,12 @@ where
Some(sum)
}
+ DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
+ DataType::Int16 => ree::sum_wrapping::<types::Int16Type,
T>(&array),
+ DataType::Int32 => ree::sum_wrapping::<types::Int32Type,
T>(&array),
+ DataType::Int64 => ree::sum_wrapping::<types::Int64Type,
T>(&array),
+ _ => unreachable!(),
+ },
_ => sum::<T>(as_primitive_array(&array)),
}
}
@@ -574,7 +580,9 @@ where
/// Returns the sum of values in the array.
///
/// This detects overflow and returns an `Err` for that. For an
non-overflow-checking variant,
-/// use `sum_array` instead.
+/// use [`sum_array`] instead.
+/// Additionally returns an `Err` on run-end-encoded arrays with a provided
+/// values type parameter that is incorrect.
pub fn sum_array_checked<T, A: ArrayAccessor<Item = T::Native>>(
array: A,
) -> Result<Option<T::Native>, ArrowError>
@@ -603,10 +611,110 @@ where
Ok(Some(sum))
}
+ DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
+ DataType::Int16 => ree::sum_checked::<types::Int16Type, T>(&array),
+ DataType::Int32 => ree::sum_checked::<types::Int32Type, T>(&array),
+ DataType::Int64 => ree::sum_checked::<types::Int64Type, T>(&array),
+ _ => unreachable!(),
+ },
_ => sum_checked::<T>(as_primitive_array(&array)),
}
}
+// Logic for summing run-end-encoded arrays.
+mod ree {
+ use std::convert::Infallible;
+
+ use arrow_array::cast::AsArray;
+ use arrow_array::types::RunEndIndexType;
+ use arrow_array::{Array, ArrowNativeTypeOp, ArrowNumericType,
PrimitiveArray, TypedRunArray};
+ use arrow_buffer::ArrowNativeType;
+ use arrow_schema::ArrowError;
+
+ /// Downcasts an array to a TypedRunArray.
+ fn downcast<'a, I: RunEndIndexType, V: ArrowNumericType>(
+ array: &'a dyn Array,
+ ) -> Option<TypedRunArray<'a, I, PrimitiveArray<V>>> {
+ let array = array.as_run_opt::<I>()?;
+ // We only support RunArray wrapping primitive types.
+ array.downcast::<PrimitiveArray<V>>()
+ }
+
+ /// Computes the sum (wrapping) of the array values.
+ pub(super) fn sum_wrapping<I: RunEndIndexType, V: ArrowNumericType>(
+ array: &dyn Array,
+ ) -> Option<V::Native> {
+ let ree = downcast::<I, V>(array)?;
+ let Ok(sum) = fold(ree, |acc, val, len| -> Result<V::Native,
Infallible> {
+ Ok(acc.add_wrapping(val.mul_wrapping(V::Native::usize_as(len))))
+ });
+ sum
+ }
+
+ /// Computes the sum (erroring on overflow) of the array values.
+ pub(super) fn sum_checked<I: RunEndIndexType, V: ArrowNumericType>(
+ array: &dyn Array,
+ ) -> Result<Option<V::Native>, ArrowError> {
+ let Some(ree) = downcast::<I, V>(array) else {
+ return Err(ArrowError::InvalidArgumentError(
+ "Input run array values are not a PrimitiveArray".to_string(),
+ ));
+ };
+ fold(ree, |acc, val, len| -> Result<V::Native, ArrowError> {
+ let Some(len) = V::Native::from_usize(len) else {
+ return Err(ArrowError::ArithmeticOverflow(format!(
+ "Cannot convert a run-end index ({:?}) to the value type
({})",
+ len,
+ std::any::type_name::<V::Native>()
+ )));
+ };
+ acc.add_checked(val.mul_checked(len)?)
+ })
+ }
+
+ /// Folds over the values in a run-end-encoded array.
+ fn fold<'a, I: RunEndIndexType, V: ArrowNumericType, F, E>(
+ array: TypedRunArray<'a, I, PrimitiveArray<V>>,
+ mut f: F,
+ ) -> Result<Option<V::Native>, E>
+ where
+ F: FnMut(V::Native, V::Native, usize) -> Result<V::Native, E>,
+ {
+ let run_ends = array.run_ends();
+ let logical_start = run_ends.offset();
+ let logical_end = run_ends.offset() + run_ends.len();
+ let run_ends = run_ends.sliced_values();
+
+ let values_slice = array.run_array().values_slice();
+ let values = values_slice
+ .as_any()
+ .downcast_ref::<PrimitiveArray<V>>()
+ // Safety: we know the values array is PrimitiveArray<V>.
+ .unwrap();
+
+ let mut prev_end = 0;
+ let mut acc = V::Native::ZERO;
+ let mut has_non_null_value = false;
+
+ for (run_end, value) in run_ends.zip(values) {
+ let current_run_end = run_end.as_usize().clamp(logical_start,
logical_end);
+ let run_length = current_run_end - prev_end;
+
+ if let Some(value) = value {
+ has_non_null_value = true;
+ acc = f(acc, value, run_length)?;
+ }
+
+ prev_end = current_run_end;
+ if current_run_end == logical_end {
+ break;
+ }
+ }
+
+ Ok(if has_non_null_value { Some(acc) } else { None })
+ }
+}
+
/// Returns the min of values in the array of `ArrowNumericType` type, or
dictionary
/// array with value of `ArrowNumericType` type.
pub fn min_array<T, A: ArrayAccessor<Item = T::Native>>(array: A) ->
Option<T::Native>
@@ -639,6 +747,20 @@ where
{
match array.data_type() {
DataType::Dictionary(_, _) => min_max_helper::<T::Native, _, _>(array,
cmp),
+ DataType::RunEndEncoded(run_ends, _) => {
+ // We can directly perform min/max on the values child array, as
any
+ // run must have non-zero length.
+ let array: &dyn Array = &array;
+ let values = match run_ends.data_type() {
+ DataType::Int16 =>
array.as_run_opt::<types::Int16Type>()?.values_slice(),
+ DataType::Int32 =>
array.as_run_opt::<types::Int32Type>()?.values_slice(),
+ DataType::Int64 =>
array.as_run_opt::<types::Int64Type>()?.values_slice(),
+ _ => return None,
+ };
+ // We only support RunArray wrapping primitive types.
+ let values = values.as_any().downcast_ref::<PrimitiveArray<T>>()?;
+ m(values)
+ }
_ => m(as_primitive_array(&array)),
}
}
@@ -751,7 +873,7 @@ pub fn bool_or(array: &BooleanArray) -> Option<bool> {
/// Returns `Ok(None)` if the array is empty or only contains null values.
///
/// This detects overflow and returns an `Err` for that. For an
non-overflow-checking variant,
-/// use `sum` instead.
+/// use [`sum`] instead.
pub fn sum_checked<T>(array: &PrimitiveArray<T>) -> Result<Option<T::Native>,
ArrowError>
where
T: ArrowNumericType,
@@ -799,7 +921,7 @@ where
/// Returns `None` if the array is empty or only contains null values.
///
/// This doesn't detect overflow in release mode by default. Once overflowing,
the result will
-/// wrap around. For an overflow-checking variant, use `sum_checked` instead.
+/// wrap around. For an overflow-checking variant, use [`sum_checked`] instead.
pub fn sum<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
where
T::Native: ArrowNativeTypeOp,
@@ -1750,4 +1872,170 @@ mod tests {
sum_checked(&a).expect_err("overflow should be detected");
sum_array_checked::<Int32Type, _>(&a).expect_err("overflow should be
detected");
}
+
+ /// Helper for building a RunArray.
+ fn make_run_array<'a, I: RunEndIndexType, V: ArrowNumericType, ItemType>(
+ values: impl IntoIterator<Item = &'a ItemType>,
+ ) -> RunArray<I>
+ where
+ ItemType: Clone + Into<Option<V::Native>> + 'static,
+ {
+ let mut builder = arrow_array::builder::PrimitiveRunBuilder::<I,
V>::new();
+ for v in values.into_iter() {
+ builder.append_option((*v).clone().into());
+ }
+ builder.finish()
+ }
+
+ #[test]
+ fn test_ree_sum_array_basic() {
+ let run_array = make_run_array::<Int16Type, Int32Type, _>(&[10, 10,
20, 30, 30, 30]);
+ let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+ let result = sum_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(130));
+
+ let result = sum_array_checked::<Int32Type, _>(typed_array).unwrap();
+ assert_eq!(result, Some(130));
+ }
+
+ #[test]
+ fn test_ree_sum_array_empty() {
+ let run_array = make_run_array::<Int16Type, Int32Type, i32>(&[]);
+ let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+ let result = sum_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, None);
+
+ let result = sum_array_checked::<Int32Type, _>(typed_array).unwrap();
+ assert_eq!(result, None);
+ }
+
+ #[test]
+ fn test_ree_sum_array_with_nulls() {
+ let run_array =
+ make_run_array::<Int16Type, Int32Type, _>(&[Some(10), None,
Some(20), None, Some(30)]);
+ let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+ let result = sum_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(60));
+
+ let result = sum_array_checked::<Int32Type, _>(typed_array).unwrap();
+ assert_eq!(result, Some(60));
+ }
+
+ #[test]
+ fn test_ree_sum_array_with_only_nulls() {
+ let run_array = make_run_array::<Int16Type, Int16Type, _>(&[None,
None, None, None, None]);
+ let typed_array = run_array.downcast::<Int16Array>().unwrap();
+
+ let result = sum_array::<Int16Type, _>(typed_array);
+ assert_eq!(result, None);
+
+ let result = sum_array_checked::<Int16Type, _>(typed_array).unwrap();
+ assert_eq!(result, None);
+ }
+
+ #[test]
+ fn test_ree_sum_array_overflow() {
+ let run_array = make_run_array::<Int16Type, Int8Type, _>(&[126, 2]);
+ let typed_array = run_array.downcast::<Int8Array>().unwrap();
+
+ // i8 range is -128..=127. 126+2 overflows to -128.
+ let result = sum_array::<Int8Type, _>(typed_array);
+ assert_eq!(result, Some(-128));
+
+ let result = sum_array_checked::<Int8Type, _>(typed_array);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_ree_sum_array_sliced() {
+ let run_array = make_run_array::<Int16Type, UInt8Type, _>(&[0, 10, 10,
10, 20, 30, 30, 30]);
+ // Skip 2 values at the start and 1 at the end.
+ let sliced = run_array.slice(2, 5);
+ let typed_array = sliced.downcast::<UInt8Array>().unwrap();
+
+ let result = sum_array::<UInt8Type, _>(typed_array);
+ assert_eq!(result, Some(100));
+
+ let result = sum_array_checked::<UInt8Type, _>(typed_array).unwrap();
+ assert_eq!(result, Some(100));
+ }
+
+ #[test]
+ fn test_ree_min_max_array_basic() {
+ let run_array = make_run_array::<Int16Type, Int32Type, _>(&[30, 30,
10, 20, 20]);
+ let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+ let result = min_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(10));
+
+ let result = max_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(30));
+ }
+
+ #[test]
+ fn test_ree_min_max_array_empty() {
+ let run_array = make_run_array::<Int16Type, Int32Type, i32>(&[]);
+ let typed_array = run_array.downcast::<Int32Array>().unwrap();
+
+ let result = min_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, None);
+
+ let result = max_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, None);
+ }
+
+ #[test]
+ fn test_ree_min_max_array_float() {
+ let run_array = make_run_array::<Int16Type, Float64Type, _>(&[5.5,
5.5, 2.1, 8.9, 8.9]);
+ let typed_array = run_array.downcast::<Float64Array>().unwrap();
+
+ let result = min_array::<Float64Type, _>(typed_array);
+ assert_eq!(result, Some(2.1));
+
+ let result = max_array::<Float64Type, _>(typed_array);
+ assert_eq!(result, Some(8.9));
+ }
+
+ #[test]
+ fn test_ree_min_max_array_with_nulls() {
+ let run_array = make_run_array::<Int16Type, UInt8Type, _>(&[None,
Some(10)]);
+ let typed_array = run_array.downcast::<UInt8Array>().unwrap();
+
+ let result = min_array::<UInt8Type, _>(typed_array);
+ assert_eq!(result, Some(10));
+
+ let result = max_array::<UInt8Type, _>(typed_array);
+ assert_eq!(result, Some(10));
+ }
+
+ #[test]
+ fn test_ree_min_max_array_sliced() {
+ let run_array = make_run_array::<Int16Type, Int32Type, _>(&[0, 30, 30,
10, 20, 20, 100]);
+ // Skip 1 value at the start and 1 at the end.
+ let sliced = run_array.slice(1, 5);
+ let typed_array = sliced.downcast::<Int32Array>().unwrap();
+
+ let result = min_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(10));
+
+ let result = max_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(30));
+ }
+
+ #[test]
+ fn test_ree_min_max_array_sliced_mid_run() {
+ let run_array = make_run_array::<Int16Type, Int32Type, _>(&[0, 0, 30,
10, 20, 100, 100]);
+ // Skip 1 value at the start and 1 at the end.
+ let sliced = run_array.slice(1, 5);
+ let typed_array = sliced.downcast::<Int32Array>().unwrap();
+
+ let result = min_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(0));
+
+ let result = max_array::<Int32Type, _>(typed_array);
+ assert_eq!(result, Some(100));
+ }
}