This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 515022487a feat[arrow-ord]: suppport REE comparisons (#9621)
515022487a is described below

commit 515022487af353b92d00acd6f405331bf8d240ea
Author: Alfonso Subiotto Marqués <[email protected]>
AuthorDate: Sun Apr 19 15:11:46 2026 +0200

    feat[arrow-ord]: suppport REE comparisons (#9621)
    
    This commit implements native comparisons on REE-encoded arrays which
    are treated similarly to dictionary indirection.
    
    This commit implements REE to scalar comparisons by operating on the
    physical values only then bulk expanding the boolean result.
    
    REE-to-REE comparisons are also optimized by computing aligned physical
    value runs to minimize comparisons.
    
    Mixed cases (REE vs flat) materialize a logical index mapping similar to
    dictionaries.
    
    This commit also supports `REE<Dict>`.
    
    For comparison, here are the benchmark results with flat arrays as a
    reference on my local machine:
    ```
    eq Int32                time:   [14.955 µs 15.162 µs 15.396 µs]
    eq scalar Int32         time:   [11.379 µs 11.418 µs 11.459 µs]
    
    ree_comparison/eq_ree_scalar(phys=64,log=65536)     time:   [453.31 ns 
454.88 ns 456.43 ns]
    ree_comparison/eq_ree_scalar(phys=1024,log=65536)   time:   [4.1224 µs 
4.1298 µs 4.1368 µs]
    ree_comparison/eq_ree_scalar(phys=32768,log=65536)  time:   [93.506 µs 
94.085 µs 94.993 µs]
    ree_comparison/eq_ree_ree(phys=64,log=65536)        time:   [413.96 ns 
414.82 ns 415.87 ns]
    ree_comparison/eq_ree_ree(phys=1024,log=65536)      time:   [4.1597 µs 
4.1660 µs 4.1749 µs]
    ree_comparison/eq_ree_ree(phys=32768,log=65536)     time:   [128.74 µs 
144.40 µs 161.53 µs]
    ```
    
    As is expected, the more we take advantage of REE encoding, the faster
    the comparisons are.
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Closes #9620
    - Part of #3520
    
    # Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    Feature enhancement for comparisons
    
    # What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    Support for REE comparisons, including tests and benchmarks.
    
    # Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    Yes.
    
    # Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    
    If there are any breaking changes to public APIs, please call them out.
    -->
    REE comparisons are now supported.
    
    Signed-off-by: Alfonso Subiotto Marques <[email protected]>
---
 arrow-ord/src/cmp.rs                | 441 +++++++++++++++++++++++++++++++++---
 arrow/benches/comparison_kernels.rs |  36 +++
 2 files changed, 446 insertions(+), 31 deletions(-)

diff --git a/arrow-ord/src/cmp.rs b/arrow-ord/src/cmp.rs
index 54b3b505d0..53ca258870 100644
--- a/arrow-ord/src/cmp.rs
+++ b/arrow-ord/src/cmp.rs
@@ -27,10 +27,10 @@ use arrow_array::cast::AsArray;
 use arrow_array::types::{ByteArrayType, ByteViewType};
 use arrow_array::{
     AnyDictionaryArray, Array, ArrowNativeTypeOp, BooleanArray, Datum, 
FixedSizeBinaryArray,
-    GenericByteArray, GenericByteViewArray, downcast_primitive_array,
+    GenericByteArray, GenericByteViewArray, downcast_primitive_array, 
downcast_run_array,
 };
 use arrow_buffer::bit_util::ceil;
-use arrow_buffer::{BooleanBuffer, NullBuffer};
+use arrow_buffer::{ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, 
NullBuffer};
 use arrow_schema::{ArrowError, DataType};
 use arrow_select::take::take;
 use std::cmp::Ordering;
@@ -239,6 +239,12 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) -> 
Result<BooleanArray,
     let l_nulls = l.logical_nulls();
     let r_nulls = r.logical_nulls();
 
+    let l_ree_info = ree_info_opt(l);
+    let l = l_ree_info.as_ref().map(|(vals, _)| *vals).unwrap_or(l);
+
+    let r_ree_info = ree_info_opt(r);
+    let r = r_ree_info.as_ref().map(|(vals, _)| *vals).unwrap_or(r);
+
     let l_v = l.as_any_dictionary_opt();
     let l = l_v.map(|x| x.values().as_ref()).unwrap_or(l);
     let l_t = l.data_type();
@@ -257,18 +263,29 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) 
-> Result<BooleanArray,
         )));
     }
 
+    let l_side = SideInfo {
+        is_scalar: l_s,
+        dict: l_v,
+        ree: l_ree_info.as_ref().map(|(_, info)| info),
+    };
+    let r_side = SideInfo {
+        is_scalar: r_s,
+        dict: r_v,
+        ree: r_ree_info.as_ref().map(|(_, info)| info),
+    };
+
     // Defer computation as may not be necessary
     let values = || -> BooleanBuffer {
         let d = downcast_primitive_array! {
-            (l, r) => apply(op, l.values().as_ref(), l_s, l_v, 
r.values().as_ref(), r_s, r_v),
-            (Boolean, Boolean) => apply(op, l.as_boolean(), l_s, l_v, 
r.as_boolean(), r_s, r_v),
-            (Utf8, Utf8) => apply(op, l.as_string::<i32>(), l_s, l_v, 
r.as_string::<i32>(), r_s, r_v),
-            (Utf8View, Utf8View) => apply(op, l.as_string_view(), l_s, l_v, 
r.as_string_view(), r_s, r_v),
-            (LargeUtf8, LargeUtf8) => apply(op, l.as_string::<i64>(), l_s, 
l_v, r.as_string::<i64>(), r_s, r_v),
-            (Binary, Binary) => apply(op, l.as_binary::<i32>(), l_s, l_v, 
r.as_binary::<i32>(), r_s, r_v),
-            (BinaryView, BinaryView) => apply(op, l.as_binary_view(), l_s, 
l_v, r.as_binary_view(), r_s, r_v),
-            (LargeBinary, LargeBinary) => apply(op, l.as_binary::<i64>(), l_s, 
l_v, r.as_binary::<i64>(), r_s, r_v),
-            (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op, 
l.as_fixed_size_binary(), l_s, l_v, r.as_fixed_size_binary(), r_s, r_v),
+            (l, r) => apply(op, l.values().as_ref(), &l_side, 
r.values().as_ref(), &r_side),
+            (Boolean, Boolean) => apply(op, l.as_boolean(), &l_side, 
r.as_boolean(), &r_side),
+            (Utf8, Utf8) => apply(op, l.as_string::<i32>(), &l_side, 
r.as_string::<i32>(), &r_side),
+            (Utf8View, Utf8View) => apply(op, l.as_string_view(), &l_side, 
r.as_string_view(), &r_side),
+            (LargeUtf8, LargeUtf8) => apply(op, l.as_string::<i64>(), &l_side, 
r.as_string::<i64>(), &r_side),
+            (Binary, Binary) => apply(op, l.as_binary::<i32>(), &l_side, 
r.as_binary::<i32>(), &r_side),
+            (BinaryView, BinaryView) => apply(op, l.as_binary_view(), &l_side, 
r.as_binary_view(), &r_side),
+            (LargeBinary, LargeBinary) => apply(op, l.as_binary::<i64>(), 
&l_side, r.as_binary::<i64>(), &r_side),
+            (FixedSizeBinary(_), FixedSizeBinary(_)) => apply(op, 
l.as_fixed_size_binary(), &l_side, r.as_fixed_size_binary(), &r_side),
             (Null, Null) => None,
             _ => unreachable!(),
         };
@@ -340,29 +357,44 @@ fn compare_op(op: Op, lhs: &dyn Datum, rhs: &dyn Datum) 
-> Result<BooleanArray,
     })
 }
 
+/// Per-side metadata for a comparison operand.
+struct SideInfo<'a> {
+    is_scalar: bool,
+    dict: Option<&'a dyn AnyDictionaryArray>,
+    ree: Option<&'a ReeInfo<'a>>,
+}
+
+impl SideInfo<'_> {
+    fn has_indirection(&self) -> bool {
+        self.dict.is_some() || self.ree.is_some()
+    }
+}
+
 /// Perform a potentially vectored `op` on the provided `ArrayOrd`
 fn apply<T: ArrayOrd>(
     op: Op,
     l: T,
-    l_s: bool,
-    l_v: Option<&dyn AnyDictionaryArray>,
+    l_info: &SideInfo,
     r: T,
-    r_s: bool,
-    r_v: Option<&dyn AnyDictionaryArray>,
+    r_info: &SideInfo,
 ) -> Option<BooleanBuffer> {
     if l.len() == 0 || r.len() == 0 {
         return None; // Handle empty dictionaries
     }
 
-    if !l_s && !r_s && (l_v.is_some() || r_v.is_some()) {
-        // Not scalar and at least one side has a dictionary, need to perform 
vectored comparison
-        let l_v = l_v
-            .map(|x| x.normalized_keys())
-            .unwrap_or_else(|| (0..l.len()).collect());
+    if !l_info.is_scalar
+        && !r_info.is_scalar
+        && (l_info.has_indirection() || r_info.has_indirection())
+    {
+        // Both non-scalar with indirection. Pure REE-vs-REE uses segment-based
+        // bulk comparison; other combinations fall back to index vectors.
+        if let (Some(li), None, Some(ri), None) = (l_info.ree, l_info.dict, 
r_info.ree, r_info.dict)
+        {
+            return Some(apply_ree(op, l, li, r, ri));
+        }
 
-        let r_v = r_v
-            .map(|x| x.normalized_keys())
-            .unwrap_or_else(|| (0..r.len()).collect());
+        let l_v = logical_indices(l.len(), l_info.dict, l_info.ree);
+        let r_v = logical_indices(r.len(), r_info.dict, r_info.ree);
 
         assert_eq!(l_v.len(), r_v.len()); // Sanity check
 
@@ -375,8 +407,12 @@ fn apply<T: ArrayOrd>(
             Op::GreaterEqual => apply_op_vectored(l, &l_v, r, &r_v, true, 
T::is_lt),
         })
     } else {
-        let l_s = l_s.then(|| l_v.map(|x| 
x.normalized_keys()[0]).unwrap_or_default());
-        let r_s = r_s.then(|| r_v.map(|x| 
x.normalized_keys()[0]).unwrap_or_default());
+        let l_s = l_info
+            .is_scalar
+            .then(|| scalar_index(l_info.dict, l_info.ree));
+        let r_s = r_info
+            .is_scalar
+            .then(|| scalar_index(r_info.dict, r_info.ree));
 
         let buffer = match op {
             Op::Equal | Op::NotDistinct => apply_op(l, l_s, r, r_s, false, 
T::is_eq),
@@ -387,16 +423,87 @@ fn apply<T: ArrayOrd>(
             Op::GreaterEqual => apply_op(l, l_s, r, r_s, true, T::is_lt),
         };
 
-        // If a side had a dictionary, and was not scalar, we need to 
materialize this
-        Some(match (l_v, r_v) {
-            (Some(l_v), _) if l_s.is_none() => take_bits(l_v, buffer),
-            (_, Some(r_v)) if r_s.is_none() => take_bits(r_v, buffer),
-            _ => buffer,
+        // Expand the physical-length result back to logical length.
+        // Find the non-scalar side that needs expansion (at most one).
+        let side = if l_s.is_none() { l_info } else { r_info };
+        let buffer = match side.dict {
+            Some(d) => take_bits(d, buffer),
+            None => buffer,
+        };
+        Some(match side.ree {
+            Some(info) => expand_from_runs(info, buffer),
+            None => buffer,
         })
     }
 }
 
-/// Perform a take operation on `buffer` with the given dictionary
+/// Build a logical→physical index vector for one side of a non-scalar 
comparison.
+fn logical_indices(
+    len: usize,
+    dict: Option<&dyn AnyDictionaryArray>,
+    ree: Option<&ReeInfo>,
+) -> Vec<usize> {
+    match (dict, ree) {
+        (Some(d), Some(info)) => {
+            let keys = d.normalized_keys();
+            ree_physical_indices(info)
+                .iter()
+                .map(|&i| keys[i])
+                .collect()
+        }
+        (Some(d), None) => d.normalized_keys(),
+        (None, Some(info)) => ree_physical_indices(info),
+        (None, None) => (0..len).collect(),
+    }
+}
+
+fn ree_physical_indices(info: &ReeInfo) -> Vec<usize> {
+    let run_ends = info.run_ends_as_usize();
+    let end = info.offset + info.len;
+    let mut indices = Vec::with_capacity(info.len);
+    let mut pos = info.offset;
+    for (physical, &run_end) in 
run_ends.iter().enumerate().skip(info.start_physical) {
+        let run_end = run_end.min(end);
+        indices.extend(std::iter::repeat_n(physical, run_end - pos));
+        pos = run_end;
+        if pos >= end {
+            break;
+        }
+    }
+    indices
+}
+
+fn scalar_index(dict: Option<&dyn AnyDictionaryArray>, ree: Option<&ReeInfo>) 
-> usize {
+    let idx = ree.map(|r| r.start_physical).unwrap_or_default();
+    dict.map(|d| d.normalized_keys()[idx]).unwrap_or(idx)
+}
+
+/// Expand a physical-length BooleanBuffer to logical length by bulk-appending
+/// each run's result. Zero allocation — downcasts internally to access typed
+/// run_ends directly.
+fn expand_from_runs(info: &ReeInfo, buffer: BooleanBuffer) -> BooleanBuffer {
+    let array = info.array;
+    downcast_run_array!(
+        array => {
+            let run_ends = array.run_ends().values();
+            let end = info.offset + info.len;
+            let mut builder = BooleanBufferBuilder::new(info.len);
+            let mut pos = info.offset;
+            for (physical, re) in 
run_ends.iter().enumerate().skip(info.start_physical) {
+                let run_end = re.as_usize().min(end);
+                // SAFETY: physical < buffer.len() (one entry per run in the 
values array)
+                builder.append_n(run_end - pos, unsafe { 
buffer.value_unchecked(physical) });
+                pos = run_end;
+                if pos >= end {
+                    break;
+                }
+            }
+            builder.finish()
+        },
+        _ => unreachable!()
+    )
+}
+
 fn take_bits(v: &dyn AnyDictionaryArray, buffer: BooleanBuffer) -> 
BooleanBuffer {
     let array = take(&BooleanArray::new(buffer, None), v.keys(), 
None).unwrap();
     array.as_boolean().values().clone()
@@ -493,6 +600,62 @@ fn apply_op_vectored<T: ArrayOrd>(
     })
 }
 
+/// Dispatch `op` for a REE-vs-REE comparison (no dictionary on either side)
+/// using segment-based bulk comparison.
+fn apply_ree<T: ArrayOrd>(op: Op, l: T, l_info: &ReeInfo, r: T, r_info: 
&ReeInfo) -> BooleanBuffer {
+    match op {
+        Op::Equal | Op::NotDistinct => apply_op_ree_segments(l, l_info, r, 
r_info, false, T::is_eq),
+        Op::NotEqual | Op::Distinct => apply_op_ree_segments(l, l_info, r, 
r_info, true, T::is_eq),
+        Op::Less => apply_op_ree_segments(l, l_info, r, r_info, false, 
T::is_lt),
+        Op::LessEqual => apply_op_ree_segments(r, r_info, l, l_info, true, 
T::is_lt),
+        Op::Greater => apply_op_ree_segments(r, r_info, l, l_info, false, 
T::is_lt),
+        Op::GreaterEqual => apply_op_ree_segments(l, l_info, r, r_info, true, 
T::is_lt),
+    }
+}
+
+/// Compare two REE arrays by walking both run_ends simultaneously, comparing
+/// once per aligned segment and bulk-filling the result.
+fn apply_op_ree_segments<T: ArrayOrd>(
+    l: T,
+    l_info: &ReeInfo,
+    r: T,
+    r_info: &ReeInfo,
+    neg: bool,
+    op: fn(T::Item, T::Item) -> bool,
+) -> BooleanBuffer {
+    let l_re = l_info.run_ends_as_usize();
+    let r_re = r_info.run_ends_as_usize();
+    let end = l_info.len;
+    let mut builder = BooleanBufferBuilder::new(l_info.len);
+    let mut l_phys = l_info.start_physical;
+    let mut r_phys = r_info.start_physical;
+    let mut pos = 0usize;
+
+    while pos < end {
+        // SAFETY: l_phys/r_phys are bounded by their respective run counts —
+        // they advance only when pos reaches a run boundary, and pos < end
+        // guarantees we haven't exhausted all runs.
+        // Subtract each side's offset to convert absolute run-ends to logical
+        // coordinates so that arrays with different offsets align correctly.
+        let l_end = (unsafe { *l_re.get_unchecked(l_phys) } - 
l_info.offset).min(end);
+        let r_end = (unsafe { *r_re.get_unchecked(r_phys) } - 
r_info.offset).min(end);
+        let seg_end = l_end.min(r_end);
+
+        let result = unsafe { op(l.value_unchecked(l_phys), 
r.value_unchecked(r_phys)) } ^ neg;
+        builder.append_n(seg_end - pos, result);
+
+        pos = seg_end;
+        if pos >= l_end {
+            l_phys += 1;
+        }
+        if pos >= r_end {
+            r_phys += 1;
+        }
+    }
+
+    builder.finish()
+}
+
 trait ArrayOrd {
     type Item: Copy;
 
@@ -715,10 +878,48 @@ pub fn compare_byte_view<T: ByteViewType>(
     unsafe { GenericByteViewArray::compare_unchecked(left, left_idx, right, 
right_idx) }
 }
 
+/// Run-end encoding metadata for one side of a comparison. Holds a reference
+/// to the original REE array for deferred typed access to run_ends.
+struct ReeInfo<'a> {
+    array: &'a dyn Array,
+    offset: usize,
+    start_physical: usize,
+    len: usize,
+}
+
+impl ReeInfo<'_> {
+    /// Materialize run_ends as `Vec<usize>`.
+    fn run_ends_as_usize(&self) -> Vec<usize> {
+        let array = self.array;
+        downcast_run_array!(
+            array => array.run_ends().values().iter().map(|v| 
v.as_usize()).collect(),
+            _ => unreachable!()
+        )
+    }
+}
+
+/// If `array` is RunEndEncoded, return its physical values array and run 
metadata.
+fn ree_info_opt(array: &dyn Array) -> Option<(&dyn Array, ReeInfo<'_>)> {
+    downcast_run_array!(
+        array => {
+            let run_ends = array.run_ends();
+            let info = ReeInfo {
+                array,
+                offset: run_ends.offset(),
+                start_physical: run_ends.get_start_physical_index(),
+                len: run_ends.len(),
+            };
+            Some((array.values().as_ref(), info))
+        },
+        _ => None
+    )
+}
+
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;
 
+    use arrow_array::types::Int32Type;
     use arrow_array::{DictionaryArray, Int32Array, Scalar, StringArray};
     use arrow_buffer::{Buffer, ScalarBuffer};
 
@@ -1092,6 +1293,184 @@ mod tests {
         !array.data_buffers().is_empty()
     }
 
+    fn ree_str(runs: &[(Option<&str>, i32)]) -> 
arrow_array::RunArray<Int32Type> {
+        let mut ends = Vec::new();
+        let mut vals = Vec::new();
+        let mut end = 0i32;
+        for &(v, n) in runs {
+            end += n;
+            ends.push(end);
+            vals.push(v);
+        }
+        arrow_array::RunArray::try_new(&Int32Array::from(ends), 
&StringArray::from(vals)).unwrap()
+    }
+
+    #[test]
+    fn test_ree_scalar() {
+        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]);
+
+        let s = Scalar::new(StringArray::from(vec!["b"]));
+        assert_eq!(
+            eq(&a, &s).unwrap(),
+            BooleanArray::from(vec![false, false, false, true, true])
+        );
+        assert_eq!(
+            neq(&a, &s).unwrap(),
+            BooleanArray::from(vec![true, true, true, false, false])
+        );
+        assert_eq!(
+            lt(&a, &s).unwrap(),
+            BooleanArray::from(vec![true, true, true, false, false])
+        );
+        assert_eq!(lt_eq(&a, &s).unwrap(), BooleanArray::from(vec![true; 5]));
+        assert_eq!(gt(&a, &s).unwrap(), BooleanArray::from(vec![false; 5]));
+        assert_eq!(
+            gt_eq(&a, &s).unwrap(),
+            BooleanArray::from(vec![false, false, false, true, true])
+        );
+
+        // Scalar on left side
+        let scalar = Scalar::new(ree_str(&[(Some("a"), 1)]));
+        assert_eq!(
+            eq(&scalar, &a).unwrap(),
+            BooleanArray::from(vec![true, true, true, false, false])
+        );
+        assert_eq!(
+            lt_eq(&scalar, &a).unwrap(),
+            BooleanArray::from(vec![true; 5])
+        );
+
+        // REE-wrapped scalar (DataFusion's ScalarValue::RunEndEncoded)
+        assert_eq!(
+            eq(&a, &Scalar::new(ree_str(&[(Some("a"), 1)]))).unwrap(),
+            BooleanArray::from(vec![true, true, true, false, false]),
+        );
+
+        // Single run
+        let a = ree_str(&[(Some("x"), 100)]);
+        let r = eq(&a, &Scalar::new(StringArray::from(vec!["x"]))).unwrap();
+        assert_eq!(r.true_count(), 100);
+    }
+
+    #[test]
+    fn test_ree_ree() {
+        // Different run boundaries, all ops.
+        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]);
+        let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]);
+        // a=[a,a,a,b,b] vs b=[a,a,b,b,b]
+        assert_eq!(
+            eq(&a, &b).unwrap(),
+            BooleanArray::from(vec![true, true, false, true, true])
+        );
+        assert_eq!(
+            neq(&a, &b).unwrap(),
+            BooleanArray::from(vec![false, false, true, false, false])
+        );
+        assert_eq!(
+            lt(&a, &b).unwrap(),
+            BooleanArray::from(vec![false, false, true, false, false])
+        );
+        assert_eq!(
+            gt_eq(&a, &b).unwrap(),
+            BooleanArray::from(vec![true, true, false, true, true])
+        );
+    }
+
+    #[test]
+    fn test_ree_sliced() {
+        // Scalar with sliced REE
+        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(2, 3);
+        let s = Scalar::new(StringArray::from(vec!["b"]));
+        assert_eq!(
+            eq(&a, &s).unwrap(),
+            BooleanArray::from(vec![false, true, true])
+        );
+
+        // Both sides sliced, REE vs REE
+        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(1, 4);
+        let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]).slice(1, 4);
+        assert_eq!(
+            eq(&a, &b).unwrap(),
+            BooleanArray::from(vec![true, false, true, true])
+        );
+    }
+
+    #[test]
+    fn test_ree_sliced_different_offsets() {
+        // left expands to ["a", "a", "b", "b"]
+        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]).slice(1, 4);
+        // right expands to ["a", "a", "b", "b"]
+        let b = ree_str(&[(Some("a"), 2), (Some("b"), 3)]).slice(0, 4);
+        assert_eq!(
+            eq(&a, &b).unwrap(),
+            BooleanArray::from(vec![true, true, true, true])
+        );
+    }
+
+    #[test]
+    fn test_ree_nullable() {
+        let a = ree_str(&[(Some("a"), 2), (None, 1), (Some("b"), 2)]);
+
+        // Scalar: null-aware ops
+        let s = Scalar::new(StringArray::from(vec!["a"]));
+        assert_eq!(
+            not_distinct(&a, &s).unwrap(),
+            BooleanArray::from(vec![true, true, false, false, false])
+        );
+        assert_eq!(
+            distinct(&a, &s).unwrap(),
+            BooleanArray::from(vec![false, false, true, true, true])
+        );
+
+        // REE vs REE with nulls
+        let b = ree_str(&[(Some("a"), 3), (None, 2)]);
+        assert_eq!(
+            eq(&a, &b).unwrap(),
+            BooleanArray::from(vec![Some(true), Some(true), None, None, None])
+        );
+    }
+
+    #[test]
+    fn test_ree_mixed() {
+        let a = ree_str(&[(Some("a"), 3), (Some("b"), 2)]);
+
+        // REE vs plain array
+        let b = StringArray::from(vec!["a", "a", "b", "b", "b"]);
+        assert_eq!(
+            eq(&a, &b).unwrap(),
+            BooleanArray::from(vec![true, true, false, true, true])
+        );
+
+        // REE wrapping a DictionaryArray
+        let dict = DictionaryArray::new(
+            Int32Array::from(vec![1, 0]),
+            Arc::new(StringArray::from(vec!["x", "y"])),
+        );
+        let ree_dict =
+            arrow_array::RunArray::try_new(&Int32Array::from(vec![3, 5]), 
&dict).unwrap();
+        let s = Scalar::new(StringArray::from(vec!["y"]));
+        assert_eq!(
+            eq(&ree_dict, &s).unwrap(),
+            BooleanArray::from(vec![true, true, true, false, false])
+        );
+
+        // Numeric REE (Int32 values)
+        let ree_int = arrow_array::RunArray::try_new(
+            &Int32Array::from(vec![3, 5]),
+            &Int32Array::from(vec![10, 20]),
+        )
+        .unwrap();
+        assert_eq!(
+            eq(&ree_int, &Scalar::new(Int32Array::from(vec![10]))).unwrap(),
+            BooleanArray::from(vec![true, true, true, false, false])
+        );
+
+        // Empty REE
+        let empty_a = ree_str(&[(Some("a"), 1)]).slice(0, 0);
+        let empty_b = ree_str(&[(Some("b"), 1)]).slice(0, 0);
+        assert_eq!(eq(&empty_a, &empty_b).unwrap().len(), 0);
+    }
+
     #[test]
     fn test_compare_byte_view() {
         let a = arrow_array::StringViewArray::from(vec![
diff --git a/arrow/benches/comparison_kernels.rs 
b/arrow/benches/comparison_kernels.rs
index 00c01374b6..3b3e72241c 100644
--- a/arrow/benches/comparison_kernels.rs
+++ b/arrow/benches/comparison_kernels.rs
@@ -530,6 +530,42 @@ fn add_benchmark(c: &mut Criterion) {
     c.bench_function("eq dictionary[10] string[4])", |b| {
         b.iter(|| eq(&dict_arr_a, &dict_arr_b).unwrap())
     });
+
+    // RunEndEncoded benchmarks
+
+    let mut group = c.benchmark_group("ree_comparison");
+
+    for (physical, logical) in [(64, SIZE), (1024, SIZE), (SIZE / 2, SIZE)] {
+        let ree_a = create_primitive_run_array::<Int32Type, 
Int32Type>(logical, physical);
+        let ree_b = create_primitive_run_array::<Int32Type, 
Int32Type>(logical, physical);
+        let scalar = Int32Array::from(vec![1]);
+
+        let tag = format!("phys={physical},log={logical}");
+
+        group.bench_function(format!("eq_ree_scalar({tag})"), |b| {
+            b.iter(|| eq(&ree_a, &Scalar::new(&scalar)).unwrap())
+        });
+
+        group.bench_function(format!("lt_ree_scalar({tag})"), |b| {
+            b.iter(|| lt(&ree_a, &Scalar::new(&scalar)).unwrap())
+        });
+
+        group.bench_function(format!("eq_ree_ree({tag})"), |b| {
+            b.iter(|| eq(&ree_a, &ree_b).unwrap())
+        });
+
+        group.bench_function(format!("lt_ree_ree({tag})"), |b| {
+            b.iter(|| lt(&ree_a, &ree_b).unwrap())
+        });
+
+        let flat = create_primitive_array_with_seed::<Int32Type>(logical, 0., 
42);
+
+        group.bench_function(format!("eq_ree_flat({tag})"), |b| {
+            b.iter(|| eq(&ree_a, &flat).unwrap())
+        });
+    }
+
+    group.finish();
 }
 
 criterion_group!(benches, add_benchmark);

Reply via email to