This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fc514c2506 perf: Optimize set operations to avoid RowConverter 
deserialization overhead (#20623)
fc514c2506 is described below

commit fc514c2506b4614771a72a3c3be34d98d4eb2935
Author: Neil Conway <[email protected]>
AuthorDate: Tue Mar 10 09:12:52 2026 -0700

    perf: Optimize set operations to avoid RowConverter deserialization 
overhead (#20623)
    
    ## Which issue does this PR close?
    
    - Closes #20622.
    
    ## Rationale for this change
    
    Several array set operations (e.g., `array_distinct`, `array_union`,
    `array_intersect`, `array_except`) share a similar structure:
    
    * Convert the input(s) using `RowConverter`, ideally in bulk
    * Apply the set operation as appropriate, which involves adding or
    removing elements from the candidate set of result `Rows`
    * Convert the final set of `Rows` back into `ArrayRef`
    
    We can do better for the final step: instead of converting from `Rows`
    back into `ArrayRef`, we can just track which indices in the input(s)
    correspond to the values we want to return. We can then grab those
    values with a single `take`, which avoids the `Row` -> `ArrayRef`
    deserialization overhead. This is a 5-20% performance win, depending on
    the set operation and the characteristics of the input.
    
    The only wrinkle is that for `intersect` and `union`, because there are
    multiple inputs we need to concatenate the inputs together so that we
    have a single index space. It turns out that this optimization is a win,
    even incurring the `concat` overhead.
    
    ## What changes are included in this PR?
    
    * Add a benchmark for `array_except`
    * Implement this optimization for `array_distinct`, `array_union`,
    `array_intersect`, `array_except`
    
    ## Are these changes tested?
    
    Yes, and benchmarked.
    
    ## Are there any user-facing changes?
    
    No.
---
 .../functions-nested/benches/array_set_ops.rs      | 21 ++++++
 datafusion/functions-nested/src/except.rs          | 42 ++++++++----
 datafusion/functions-nested/src/set_ops.rs         | 80 +++++++++++++++-------
 3 files changed, 106 insertions(+), 37 deletions(-)

diff --git a/datafusion/functions-nested/benches/array_set_ops.rs 
b/datafusion/functions-nested/benches/array_set_ops.rs
index e3146921d7..087e48d076 100644
--- a/datafusion/functions-nested/benches/array_set_ops.rs
+++ b/datafusion/functions-nested/benches/array_set_ops.rs
@@ -23,6 +23,7 @@ use criterion::{
 };
 use datafusion_common::config::ConfigOptions;
 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
+use datafusion_functions_nested::except::ArrayExcept;
 use datafusion_functions_nested::set_ops::{ArrayDistinct, ArrayIntersect, 
ArrayUnion};
 use rand::SeedableRng;
 use rand::prelude::SliceRandom;
@@ -38,6 +39,7 @@ const SEED: u64 = 42;
 fn criterion_benchmark(c: &mut Criterion) {
     bench_array_union(c);
     bench_array_intersect(c);
+    bench_array_except(c);
     bench_array_distinct(c);
 }
 
@@ -98,6 +100,25 @@ fn bench_array_intersect(c: &mut Criterion) {
     group.finish();
 }
 
+fn bench_array_except(c: &mut Criterion) {
+    let mut group = c.benchmark_group("array_except");
+    let udf = ArrayExcept::new();
+
+    for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), 
("low_overlap", 0.2)] {
+        for &array_size in ARRAY_SIZES {
+            let (array1, array2) =
+                create_arrays_with_overlap(NUM_ROWS, array_size, 
*overlap_ratio);
+            group.bench_with_input(
+                BenchmarkId::new(*overlap_label, array_size),
+                &array_size,
+                |b, _| b.iter(|| invoke_udf(&udf, &array1, &array2)),
+            );
+        }
+    }
+
+    group.finish();
+}
+
 fn bench_array_distinct(c: &mut Criterion) {
     let mut group = c.benchmark_group("array_distinct");
     let udf = ArrayDistinct::new();
diff --git a/datafusion/functions-nested/src/except.rs 
b/datafusion/functions-nested/src/except.rs
index 19a4e9573e..932eecf4b8 100644
--- a/datafusion/functions-nested/src/except.rs
+++ b/datafusion/functions-nested/src/except.rs
@@ -19,8 +19,12 @@
 
 use crate::utils::{check_datatypes, make_scalar_function};
 use arrow::array::new_null_array;
-use arrow::array::{Array, ArrayRef, GenericListArray, OffsetSizeTrait, 
cast::AsArray};
+use arrow::array::{
+    Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, 
UInt64Array,
+    cast::AsArray,
+};
 use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::compute::take;
 use arrow::datatypes::{DataType, FieldRef};
 use arrow::row::{RowConverter, SortField};
 use datafusion_common::utils::{ListCoercion, take_function_args};
@@ -179,7 +183,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
     let mut offsets = Vec::<OffsetSize>::with_capacity(l.len() + 1);
     offsets.push(OffsetSize::usize_as(0));
 
-    let mut rows = Vec::with_capacity(l_values.num_rows());
+    let mut indices: Vec<usize> = Vec::with_capacity(l_values.num_rows());
     let mut dedup = HashSet::new();
 
     let nulls = NullBuffer::union(l.nulls(), r.nulls());
@@ -193,7 +197,7 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
             .as_ref()
             .is_some_and(|nulls| nulls.is_null(list_index))
         {
-            offsets.push(OffsetSize::usize_as(rows.len()));
+            offsets.push(OffsetSize::usize_as(indices.len()));
             continue;
         }
 
@@ -204,22 +208,32 @@ fn general_except<OffsetSize: OffsetSizeTrait>(
         for element_index in l_start.as_usize()..l_end.as_usize() {
             let left_row = l_values.row(element_index);
             if dedup.insert(left_row) {
-                rows.push(left_row);
+                indices.push(element_index);
             }
         }
 
-        offsets.push(OffsetSize::usize_as(rows.len()));
+        offsets.push(OffsetSize::usize_as(indices.len()));
         dedup.clear();
     }
 
-    if let Some(values) = converter.convert_rows(rows)?.first() {
-        Ok(GenericListArray::<OffsetSize>::new(
-            field.to_owned(),
-            OffsetBuffer::new(offsets.into()),
-            values.to_owned(),
-            nulls,
-        ))
+    // Gather distinct left-side values by index.
+    // Use UInt64Array for LargeList to support values arrays exceeding 
u32::MAX.
+    let values = if indices.is_empty() {
+        arrow::array::new_empty_array(&l.value_type())
+    } else if OffsetSize::IS_LARGE {
+        let indices =
+            UInt64Array::from(indices.into_iter().map(|i| i as 
u64).collect::<Vec<_>>());
+        take(l.values().as_ref(), &indices, None)?
     } else {
-        internal_err!("array_except failed to convert rows")
-    }
+        let indices =
+            UInt32Array::from(indices.into_iter().map(|i| i as 
u32).collect::<Vec<_>>());
+        take(l.values().as_ref(), &indices, None)?
+    };
+
+    Ok(GenericListArray::<OffsetSize>::new(
+        field.to_owned(),
+        OffsetBuffer::new(offsets.into()),
+        values,
+        nulls,
+    ))
 }
diff --git a/datafusion/functions-nested/src/set_ops.rs 
b/datafusion/functions-nested/src/set_ops.rs
index 150559111f..a3d2573747 100644
--- a/datafusion/functions-nested/src/set_ops.rs
+++ b/datafusion/functions-nested/src/set_ops.rs
@@ -19,9 +19,11 @@
 
 use crate::utils::make_scalar_function;
 use arrow::array::{
-    Array, ArrayRef, GenericListArray, OffsetSizeTrait, new_empty_array, 
new_null_array,
+    Array, ArrayRef, GenericListArray, OffsetSizeTrait, UInt32Array, 
UInt64Array,
+    new_empty_array, new_null_array,
 };
 use arrow::buffer::{NullBuffer, OffsetBuffer};
+use arrow::compute::{concat, take};
 use arrow::datatypes::DataType::{LargeList, List, Null};
 use arrow::datatypes::{DataType, Field, FieldRef};
 use arrow::row::{RowConverter, SortField};
@@ -373,12 +375,28 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>(
     let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?;
     let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?;
 
+    // Combine value arrays so indices from both sides share a single index 
space.
+    let combined_values = concat(&[l.values().as_ref(), r.values().as_ref()])?;
+    let r_offset = l.values().len();
+
     match set_op {
         SetOp::Union => generic_set_loop::<OffsetSize, true>(
-            l, r, &rows_l, &rows_r, field, &converter,
+            l,
+            r,
+            &rows_l,
+            &rows_r,
+            field,
+            &combined_values,
+            r_offset,
         ),
         SetOp::Intersect => generic_set_loop::<OffsetSize, false>(
-            l, r, &rows_l, &rows_r, field, &converter,
+            l,
+            r,
+            &rows_l,
+            &rows_r,
+            field,
+            &combined_values,
+            r_offset,
         ),
     }
 }
@@ -391,7 +409,8 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const 
IS_UNION: bool>(
     rows_l: &arrow::row::Rows,
     rows_r: &arrow::row::Rows,
     field: Arc<Field>,
-    converter: &RowConverter,
+    combined_values: &ArrayRef,
+    r_offset: usize,
 ) -> Result<ArrayRef> {
     let l_offsets = l.value_offsets();
     let r_offsets = r.value_offsets();
@@ -406,7 +425,7 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const 
IS_UNION: bool>(
         rows_l.num_rows().min(rows_r.num_rows())
     };
 
-    let mut final_rows = Vec::with_capacity(initial_capacity);
+    let mut indices: Vec<usize> = Vec::with_capacity(initial_capacity);
 
     // Reuse hash sets across iterations
     let mut seen = HashSet::new();
@@ -430,25 +449,27 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const 
IS_UNION: bool>(
             for idx in l_start..l_end {
                 let row = rows_l.row(idx);
                 if seen.insert(row) {
-                    final_rows.push(row);
+                    indices.push(idx);
                 }
             }
             for idx in r_start..r_end {
                 let row = rows_r.row(idx);
                 if seen.insert(row) {
-                    final_rows.push(row);
+                    indices.push(idx + r_offset);
                 }
             }
         } else {
             let l_len = l_end - l_start;
             let r_len = r_end - r_start;
 
-            // Select shorter side for lookup, longer side for probing
-            let (lookup_rows, lookup_range, probe_rows, probe_range) = if 
l_len < r_len {
-                (rows_l, l_start..l_end, rows_r, r_start..r_end)
-            } else {
-                (rows_r, r_start..r_end, rows_l, l_start..l_end)
-            };
+            // Select shorter side for lookup, longer side for probing.
+            // Track the probe side's offset into the combined values array.
+            let (lookup_rows, lookup_range, probe_rows, probe_range, 
probe_offset) =
+                if l_len < r_len {
+                    (rows_l, l_start..l_end, rows_r, r_start..r_end, r_offset)
+                } else {
+                    (rows_r, r_start..r_end, rows_l, l_start..l_end, 0)
+                };
             lookup_set.clear();
             lookup_set.reserve(lookup_range.len());
 
@@ -461,18 +482,25 @@ fn generic_set_loop<OffsetSize: OffsetSizeTrait, const 
IS_UNION: bool>(
             for idx in probe_range {
                 let row = probe_rows.row(idx);
                 if lookup_set.contains(&row) && seen.insert(row) {
-                    final_rows.push(row);
+                    indices.push(idx + probe_offset);
                 }
             }
         }
         result_offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
     }
 
-    let final_values = if final_rows.is_empty() {
+    // Gather distinct values by index from the combined values array.
+    // Use UInt64Array for LargeList to support values arrays exceeding 
u32::MAX.
+    let final_values = if indices.is_empty() {
         new_empty_array(&l.value_type())
+    } else if OffsetSize::IS_LARGE {
+        let indices =
+            UInt64Array::from(indices.into_iter().map(|i| i as 
u64).collect::<Vec<_>>());
+        take(combined_values.as_ref(), &indices, None)?
     } else {
-        let arrays = converter.convert_rows(final_rows)?;
-        Arc::clone(&arrays[0])
+        let indices =
+            UInt32Array::from(indices.into_iter().map(|i| i as 
u32).collect::<Vec<_>>());
+        take(combined_values.as_ref(), &indices, None)?
     };
 
     let arr = GenericListArray::<OffsetSize>::try_new(
@@ -539,7 +567,7 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
     // Convert all values to row format in a single batch for performance
     let converter = RowConverter::new(vec![SortField::new(dt.clone())])?;
     let rows = converter.convert_columns(&[Arc::clone(array.values())])?;
-    let mut final_rows = Vec::with_capacity(rows.num_rows());
+    let mut indices: Vec<usize> = Vec::with_capacity(rows.num_rows());
     let mut seen = HashSet::new();
     for i in 0..array.len() {
         let last_offset = *offsets.last().unwrap();
@@ -559,18 +587,24 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
         for idx in start..end {
             let row = rows.row(idx);
             if seen.insert(row) {
-                final_rows.push(row);
+                indices.push(idx);
             }
         }
         offsets.push(last_offset + OffsetSize::usize_as(seen.len()));
     }
 
-    // Convert all collected distinct rows back
-    let final_values = if final_rows.is_empty() {
+    // Gather distinct values in a single pass, using the computed `indices`.
+    // Use UInt64Array for LargeList to support values arrays exceeding 
u32::MAX.
+    let final_values = if indices.is_empty() {
         new_empty_array(&dt)
+    } else if OffsetSize::IS_LARGE {
+        let indices =
+            UInt64Array::from(indices.into_iter().map(|i| i as 
u64).collect::<Vec<_>>());
+        take(array.values().as_ref(), &indices, None)?
     } else {
-        let arrays = converter.convert_rows(final_rows)?;
-        Arc::clone(&arrays[0])
+        let indices =
+            UInt32Array::from(indices.into_iter().map(|i| i as 
u32).collect::<Vec<_>>());
+        take(array.values().as_ref(), &indices, None)?
     };
 
     Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to