Dandandan commented on code in PR #21006:
URL: https://github.com/apache/datafusion/pull/21006#discussion_r2955780115


##########
datafusion/functions-nested/src/sort.rs:
##########
@@ -208,55 +211,150 @@ fn array_sort_generic<OffsetSize: OffsetSizeTrait>(
     list_array: &GenericListArray<OffsetSize>,
     field: FieldRef,
     sort_options: Option<SortOptions>,
+) -> Result<ArrayRef> {
+    let values = list_array.values();
+
+    if values.data_type().is_primitive() {
+        array_sort_direct(list_array, field, sort_options)
+    } else {
+        array_sort_batch_indices(list_array, field, sort_options)
+    }
+}
+
+/// Sort each row using `compute::sort()` and concatenate the results.
+///
+/// This is efficient for primitive element types because Arrow's sort kernel
+/// does the sorting in-place.
+fn array_sort_direct<OffsetSize: OffsetSizeTrait>(
+    list_array: &GenericListArray<OffsetSize>,
+    field: FieldRef,
+    sort_options: Option<SortOptions>,
 ) -> Result<ArrayRef> {
     let row_count = list_array.len();
+    let values = list_array.values();
 
-    let mut array_lengths = vec![];
-    let mut arrays = vec![];
+    let mut array_lengths = Vec::with_capacity(row_count);
+    let mut sorted_arrays = Vec::with_capacity(row_count);
     for i in 0..row_count {
         if list_array.is_null(i) {
             array_lengths.push(0);
         } else {
             let arr_ref = list_array.value(i);
+            let sorted = compute::sort(arr_ref.as_ref(), sort_options)?;
+            array_lengths.push(sorted.len());
+            sorted_arrays.push(sorted);
+        }
+    }
 
-            // arrow sort kernel does not support Structs, so use
+    let sorted_values: ArrayRef = if sorted_arrays.is_empty() {
+        values.slice(0, 0)
+    } else {
+        let elements: Vec<&dyn Array> =
+            sorted_arrays.iter().map(|a| a.as_ref()).collect();
+        Arc::new(compute::concat(&elements)?)
+    };
+
+    Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
+        field,
+        OffsetBuffer::from_lengths(array_lengths),
+        sorted_values,
+        list_array.nulls().cloned(),
+    )?))
+}
+
+/// Sort each row by collecting sort indices, then materialize with a single
+/// `take()` at the end.
+///
+/// This is efficient for non-primitive element types because Arrow's sort
+/// kernel would internally call `sort_to_indices()` + `take()` per row anyway.
+/// Batching into a single `take()` avoids N per-row allocations and the final
+/// `concat()`.
+fn array_sort_batch_indices<OffsetSize: OffsetSizeTrait>(
+    list_array: &GenericListArray<OffsetSize>,
+    field: FieldRef,
+    sort_options: Option<SortOptions>,
+) -> Result<ArrayRef> {
+    let row_count = list_array.len();
+    let values = list_array.values();
+    let offsets = list_array.offsets();
+
+    let total_values = offsets[row_count].as_usize() - offsets[0].as_usize();
+    let mut indices: Vec<OffsetSize> = Vec::with_capacity(total_values);
+    let mut new_offsets = Vec::with_capacity(row_count + 1);
+    new_offsets.push(OffsetSize::usize_as(0));
+
+    let is_struct = matches!(values.data_type(), DataType::Struct(_));
+
+    for (row_index, window) in offsets.windows(2).enumerate() {
+        let start = window[0];
+        let end = window[1];
+
+        if list_array.is_null(row_index) {
+            new_offsets.push(new_offsets[row_index]);
+            continue;
+        }
+
+        let len = (end - start).as_usize();
+        if len <= 1 {
+            // 0 or 1 elements: already sorted, push identity indices
+            
indices.extend((start.as_usize()..end.as_usize()).map(OffsetSize::usize_as));
+        } else {
+            let sliced = values.slice(start.as_usize(), len);
+            // Arrow's sort kernel does not support Struct arrays, so use
             // lexsort_to_indices instead:
             // 
https://github.com/apache/arrow-rs/issues/6911#issuecomment-2562928843
-            let sorted_array = match arr_ref.data_type() {
-                DataType::Struct(_) => {
-                    let sort_columns: Vec<SortColumn> = vec![SortColumn {
-                        values: Arc::clone(&arr_ref),
-                        options: sort_options,
-                    }];
-                    let indices = compute::lexsort_to_indices(&sort_columns, 
None)?;
-                    compute::take(arr_ref.as_ref(), &indices, None)?
-                }
-                _ => {
-                    let arr_ref = arr_ref.as_ref();
-                    compute::sort(arr_ref, sort_options)?
-                }
+            let sorted_indices = if is_struct {
+                let sort_columns = vec![SortColumn {
+                    values: sliced,
+                    options: sort_options,
+                }];
+                compute::lexsort_to_indices(&sort_columns, None)?
+            } else {
+                compute::sort_to_indices(&sliced, sort_options, None)?
             };
-            array_lengths.push(sorted_array.len());
-            arrays.push(sorted_array);
+            for &local_idx in sorted_indices.values() {
+                indices.push(start + OffsetSize::usize_as(local_idx as usize));
+            }
         }
+
+        new_offsets.push(new_offsets[row_index] + (end - start));
     }
 
-    let elements = arrays
-        .iter()
-        .map(|a| a.as_ref())
-        .collect::<Vec<&dyn Array>>();
+    let sorted_values = if indices.is_empty() {
+        values.slice(0, 0)
+    } else {
+        take_by_indices::<OffsetSize>(values, &indices)?
+    };
+
+    Ok(Arc::new(GenericListArray::<OffsetSize>::try_new(
+        field,
+        OffsetBuffer::<OffsetSize>::new(new_offsets.into()),
+        sorted_values,
+        list_array.nulls().cloned(),
+    )?))
+}
 
-    let list_arr = if elements.is_empty() {
-        GenericListArray::<OffsetSize>::new_null(field, row_count)
+/// Select elements from `values` at the given `indices` using `compute::take`.
+fn take_by_indices<OffsetSize: OffsetSizeTrait>(
+    values: &ArrayRef,
+    indices: &[OffsetSize],
+) -> Result<ArrayRef> {
+    let indices_array: ArrayRef = if OffsetSize::IS_LARGE {
+        Arc::new(UInt64Array::from(

Review Comment:
   Could save this UInt64Array as indices is already `Vec<OffsetSize>` and  
could be  converted to `UInt64Array` / `UInt32Array` without allocation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to