This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c85ac608f perf: Fix quadratic behavior of `to_array_of_size` (#20459)
9c85ac608f is described below

commit 9c85ac608fced2eef0ede0c3d5defe860b3b9b2d
Author: Neil Conway <[email protected]>
AuthorDate: Tue Feb 24 08:53:10 2026 -0500

    perf: Fix quadratic behavior of `to_array_of_size` (#20459)
    
    ## Which issue does this PR close?
    
    - Closes #20458.
    - Closes #18159.
    
    ## Rationale for this change
    
    When `array_to_size(n)` was called on a `List`-like object containing a
    `StringViewArray` with `b` data buffers, the previous implementation
    returned a list containing a `StringViewArray` with `n*b` buffers, which
    results in catastrophically bad performance if `b` grows even somewhat
    large.
    
    This issue was previously noticed causing poor nested loop join
    performance. #18161 adjusted the NLJ code to avoid calling
    `to_array_of_size` for this reason, but didn't attempt to fix the
    underlying issue in `to_array_of_size`. This PR doesn't attempt to
    revert the change to the NLJ code: the special-case code added in #18161
    is still slightly faster than `to_array_of_size` after this
    optimization. It might be possible to address that in a future PR.
    
    ## What changes are included in this PR?
    * Instead of using `repeat_n` + `concat` to merge together `n` copies of
    the `StringViewArray`, we instead use `take`, which preserves the same
    number of buffers as the input `StringViewArray`.
    * Add a new benchmark for this situation
    * Add more unit tests for `to_array_of_size`
    
    ## Are these changes tested?
    
    Yes and benchmarked.
    
    ## Are there any user-facing changes?
    
    No.
    
    ## AI usage
    
    Iterated on the problem with Claude Code; I understand the problem and
    the solution.
---
 datafusion/common/Cargo.toml                       |   4 +
 datafusion/common/benches/scalar_to_array.rs       | 107 +++++++++++++++++++++
 datafusion/common/src/scalar/mod.rs                |  96 ++++++++++++++++--
 .../physical-plan/src/joins/nested_loop_join.rs    |   7 +-
 4 files changed, 204 insertions(+), 10 deletions(-)

diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml
index 82e7aafcee..e4ba71e45c 100644
--- a/datafusion/common/Cargo.toml
+++ b/datafusion/common/Cargo.toml
@@ -57,6 +57,10 @@ sql = ["sqlparser"]
 harness = false
 name = "with_hashes"
 
+[[bench]]
+harness = false
+name = "scalar_to_array"
+
 [dependencies]
 ahash = { workspace = true }
 apache-avro = { workspace = true, features = [
diff --git a/datafusion/common/benches/scalar_to_array.rs 
b/datafusion/common/benches/scalar_to_array.rs
new file mode 100644
index 0000000000..90a152e515
--- /dev/null
+++ b/datafusion/common/benches/scalar_to_array.rs
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Benchmarks for `ScalarValue::to_array_of_size`, focusing on List
+//! scalars.
+
+use arrow::array::{Array, ArrayRef, AsArray, StringViewBuilder};
+use arrow::datatypes::{DataType, Field};
+use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
+use datafusion_common::ScalarValue;
+use datafusion_common::utils::SingleRowListArrayBuilder;
+use std::sync::Arc;
+
+/// Build a `ScalarValue::List` of `num_elements` Utf8View strings whose
+/// inner StringViewArray has `num_buffers` data buffers.
+fn make_list_scalar(num_elements: usize, num_buffers: usize) -> ScalarValue {
+    let elements_per_buffer = num_elements.div_ceil(num_buffers);
+
+    let mut small_arrays: Vec<ArrayRef> = Vec::new();
+    let mut remaining = num_elements;
+    for buf_idx in 0..num_buffers {
+        let count = remaining.min(elements_per_buffer);
+        if count == 0 {
+            break;
+        }
+        let start = buf_idx * elements_per_buffer;
+        let mut builder = StringViewBuilder::with_capacity(count);
+        for i in start..start + count {
+            builder.append_value(format!("{i:024x}"));
+        }
+        small_arrays.push(Arc::new(builder.finish()) as ArrayRef);
+        remaining -= count;
+    }
+
+    let refs: Vec<&dyn Array> = small_arrays.iter().map(|a| 
a.as_ref()).collect();
+    let concated = arrow::compute::concat(&refs).unwrap();
+
+    let list_array = SingleRowListArrayBuilder::new(concated)
+        .with_field(&Field::new_list_field(DataType::Utf8View, true))
+        .build_list_array();
+    ScalarValue::List(Arc::new(list_array))
+}
+
+/// We want to measure the cost of doing the conversion and then also accessing
+/// the results, to model what would happen during query evaluation.
+fn consume_list_array(arr: &ArrayRef) {
+    let list_arr = arr.as_list::<i32>();
+    let mut total_len: usize = 0;
+    for i in 0..list_arr.len() {
+        let inner = list_arr.value(i);
+        let sv = inner.as_string_view();
+        for j in 0..sv.len() {
+            total_len += sv.value(j).len();
+        }
+    }
+    std::hint::black_box(total_len);
+}
+
+fn bench_list_to_array_of_size(c: &mut Criterion) {
+    let mut group = c.benchmark_group("list_to_array_of_size");
+
+    let num_elements = 1245;
+    let scalar_1buf = make_list_scalar(num_elements, 1);
+    let scalar_50buf = make_list_scalar(num_elements, 50);
+
+    for batch_size in [256, 1024] {
+        group.bench_with_input(
+            BenchmarkId::new("1_buffer", batch_size),
+            &batch_size,
+            |b, &sz| {
+                b.iter(|| {
+                    let arr = scalar_1buf.to_array_of_size(sz).unwrap();
+                    consume_list_array(&arr);
+                });
+            },
+        );
+        group.bench_with_input(
+            BenchmarkId::new("50_buffers", batch_size),
+            &batch_size,
+            |b, &sz| {
+                b.iter(|| {
+                    let arr = scalar_50buf.to_array_of_size(sz).unwrap();
+                    consume_list_array(&arr);
+                });
+            },
+        );
+    }
+
+    group.finish();
+}
+
+criterion_group!(benches, bench_list_to_array_of_size);
+criterion_main!(benches);
diff --git a/datafusion/common/src/scalar/mod.rs 
b/datafusion/common/src/scalar/mod.rs
index f24df860c4..c21d3e21f0 100644
--- a/datafusion/common/src/scalar/mod.rs
+++ b/datafusion/common/src/scalar/mod.rs
@@ -3008,7 +3008,7 @@ impl ScalarValue {
     ///
     /// Errors if `self` is
     /// - a decimal that fails be converted to a decimal array of size
-    /// - a `FixedsizeList` that fails to be concatenated into an array of size
+    /// - a `FixedSizeList` that fails to be concatenated into an array of size
     /// - a `List` that fails to be concatenated into an array of size
     /// - a `Dictionary` that fails be converted to a dictionary array of size
     pub fn to_array_of_size(&self, size: usize) -> Result<ArrayRef> {
@@ -3434,13 +3434,22 @@ impl ScalarValue {
         }
     }
 
+    /// Repeats the rows of `arr` `size` times, producing an array with
+    /// `arr.len() * size` total rows.
     fn list_to_array_of_size(arr: &dyn Array, size: usize) -> Result<ArrayRef> 
{
-        let arrays = repeat_n(arr, size).collect::<Vec<_>>();
-        let ret = match !arrays.is_empty() {
-            true => arrow::compute::concat(arrays.as_slice())?,
-            false => arr.slice(0, 0),
-        };
-        Ok(ret)
+        if size == 0 {
+            return Ok(arr.slice(0, 0));
+        }
+
+        // Examples: given `arr = [[A, B, C]]` and `size = 3`, `indices = [0, 
0, 0]` and
+        // the result is `[[A, B, C], [A, B, C], [A, B, C]]`.
+        //
+        // Given `arr = [[A, B], [C]]` and `size = 2`, `indices = [0, 1, 0, 
1]` and the
+        // result is `[[A, B], [C], [A, B], [C]]`. (But in practice, we are 
always called
+        // with `arr.len() == 1`.)
+        let n = arr.len() as u32;
+        let indices = UInt32Array::from_iter_values((0..size).flat_map(|_| 
0..n));
+        Ok(arrow::compute::take(arr, &indices, None)?)
     }
 
     /// Retrieve ScalarValue for each row in `array`
@@ -5532,6 +5541,79 @@ mod tests {
         assert_eq!(empty_array.len(), 0);
     }
 
+    #[test]
+    fn test_to_array_of_size_list_size_one() {
+        // size=1 takes the fast path (Arc::clone)
+        let arr = ListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![Some(vec![
+            Some(10),
+            Some(20),
+        ])]);
+        let sv = ScalarValue::List(Arc::new(arr.clone()));
+        let result = sv.to_array_of_size(1).unwrap();
+        assert_eq!(result.as_list::<i32>(), &arr);
+    }
+
+    #[test]
+    fn test_to_array_of_size_list_empty_inner() {
+        // A list scalar containing an empty list: [[]]
+        let arr = ListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![Some(vec![])]);
+        let sv = ScalarValue::List(Arc::new(arr));
+        let result = sv.to_array_of_size(3).unwrap();
+        let result_list = result.as_list::<i32>();
+        assert_eq!(result_list.len(), 3);
+        for i in 0..3 {
+            assert_eq!(result_list.value(i).len(), 0);
+        }
+    }
+
+    #[test]
+    fn test_to_array_of_size_large_list() {
+        let arr =
+            LargeListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![Some(vec![
+                Some(100),
+                Some(200),
+            ])]);
+        let sv = ScalarValue::LargeList(Arc::new(arr));
+        let result = sv.to_array_of_size(3).unwrap();
+        let expected = LargeListArray::from_iter_primitive::<Int32Type, _, 
_>(vec![
+            Some(vec![Some(100), Some(200)]),
+            Some(vec![Some(100), Some(200)]),
+            Some(vec![Some(100), Some(200)]),
+        ]);
+        assert_eq!(result.as_list::<i64>(), &expected);
+    }
+
+    #[test]
+    fn test_list_to_array_of_size_multi_row() {
+        // Call list_to_array_of_size directly with arr.len() > 1
+        let arr = Int32Array::from(vec![Some(10), None, Some(30)]);
+        let result = ScalarValue::list_to_array_of_size(&arr, 3).unwrap();
+        let result = result.as_primitive::<Int32Type>();
+        assert_eq!(
+            result.iter().collect::<Vec<_>>(),
+            vec![
+                Some(10),
+                None,
+                Some(30),
+                Some(10),
+                None,
+                Some(30),
+                Some(10),
+                None,
+                Some(30),
+            ]
+        );
+    }
+
+    #[test]
+    fn test_to_array_of_size_null_list() {
+        let dt = 
DataType::List(Arc::new(Field::new_list_field(DataType::Int32, true)));
+        let sv = ScalarValue::try_from(&dt).unwrap();
+        let result = sv.to_array_of_size(3).unwrap();
+        assert_eq!(result.len(), 3);
+        assert_eq!(result.null_count(), 3);
+    }
+
     /// See https://github.com/apache/datafusion/issues/18870
     #[test]
     fn test_to_array_of_size_for_none_fsb() {
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 5b2cebb360..33fec9e181 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -2011,9 +2011,10 @@ fn build_row_join_batch(
             // Broadcast the single build-side row to match the filtered
             // probe-side batch length
             let original_left_array = 
build_side_batch.column(column_index.index);
-            // Avoid using `ScalarValue::to_array_of_size()` for 
`List(Utf8View)` to avoid
-            // deep copies for buffers inside `Utf8View` array. See below for 
details.
-            // https://github.com/apache/datafusion/issues/18159
+
+            // Use `arrow::compute::take` directly for `List(Utf8View)` rather
+            // than going through `ScalarValue::to_array_of_size()`, which
+            // avoids some intermediate allocations.
             //
             // In other cases, `to_array_of_size()` is faster.
             match original_left_array.data_type() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to