This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ec679b1f4 Clean up ExternalSorter and use upstream converter (#16109)
9ec679b1f4 is described below

commit 9ec679b1f412cee8020ffb09674d20df35c98976
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 20 16:27:24 2025 -0400

    Clean up ExternalSorter and use upstream converter (#16109)
---
 datafusion/physical-plan/src/sorts/sort.rs | 138 ++---------------------------
 1 file changed, 6 insertions(+), 132 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 635b93b81b..683983d9e6 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -44,15 +44,10 @@ use crate::{
     Statistics,
 };
 
-use arrow::array::{
-    Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
-};
-use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, 
SortColumn};
-use arrow::datatypes::{DataType, SchemaRef};
-use arrow::row::{RowConverter, Rows, SortField};
-use datafusion_common::{
-    exec_datafusion_err, internal_datafusion_err, internal_err, 
DataFusionError, Result,
-};
+use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
+use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{internal_datafusion_err, internal_err, 
DataFusionError, Result};
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
@@ -206,8 +201,6 @@ struct ExternalSorter {
     schema: SchemaRef,
     /// Sort expressions
     expr: Arc<[PhysicalSortExpr]>,
-    /// RowConverter corresponding to the sort expressions
-    sort_keys_row_converter: Arc<RowConverter>,
     /// The target number of rows for output batches
     batch_size: usize,
     /// If the in size of buffered memory batches is below this size,
@@ -275,22 +268,6 @@ impl ExternalSorter {
             MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
                 .register(&runtime.memory_pool);
 
-        // Construct RowConverter for sort keys
-        let sort_fields = expr
-            .iter()
-            .map(|e| {
-                let data_type = e
-                    .expr
-                    .data_type(&schema)
-                    .map_err(|e| e.context("Resolving sort expression data 
type"))?;
-                Ok(SortField::new_with_options(data_type, e.options))
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        let converter = RowConverter::new(sort_fields).map_err(|e| {
-            exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
-        })?;
-
         let spill_manager = SpillManager::new(
             Arc::clone(&runtime),
             metrics.spill_metrics.clone(),
@@ -303,7 +280,6 @@ impl ExternalSorter {
             in_progress_spill_file: None,
             finished_spill_files: vec![],
             expr: expr.into(),
-            sort_keys_row_converter: Arc::new(converter),
             metrics,
             reservation,
             spill_manager,
@@ -728,22 +704,10 @@ impl ExternalSorter {
         let schema = batch.schema();
 
         let expressions: LexOrdering = self.expr.iter().cloned().collect();
-        let row_converter = Arc::clone(&self.sort_keys_row_converter);
         let stream = futures::stream::once(async move {
             let _timer = metrics.elapsed_compute().timer();
 
-            let sort_columns = expressions
-                .iter()
-                .map(|expr| expr.evaluate_to_sort_column(&batch))
-                .collect::<Result<Vec<_>>>()?;
-
-            let sorted = if is_multi_column_with_lists(&sort_columns) {
-                // lex_sort_to_indices doesn't support List with more than one 
column
-                // https://github.com/apache/arrow-rs/issues/5454
-                sort_batch_row_based(&batch, &expressions, row_converter, 
None)?
-            } else {
-                sort_batch(&batch, &expressions, None)?
-            };
+            let sorted = sort_batch(&batch, &expressions, None)?;
 
             metrics.record_output(sorted.num_rows());
             drop(batch);
@@ -834,45 +798,6 @@ impl Debug for ExternalSorter {
     }
 }
 
-/// Converts rows into a sorted array of indices based on their order.
-/// This function returns the indices that represent the sorted order of the 
rows.
-fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
-    let mut sort: Vec<_> = rows.iter().enumerate().collect();
-    sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
-
-    let mut len = rows.num_rows();
-    if let Some(limit) = limit {
-        len = limit.min(len);
-    }
-    let indices =
-        UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as 
u32));
-    Ok(indices)
-}
-
-/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format 
for faster comparison.
-fn sort_batch_row_based(
-    batch: &RecordBatch,
-    expressions: &LexOrdering,
-    row_converter: Arc<RowConverter>,
-    fetch: Option<usize>,
-) -> Result<RecordBatch> {
-    let sort_columns = expressions
-        .iter()
-        .map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
-        .collect::<Result<Vec<_>>>()?;
-    let rows = row_converter.convert_columns(&sort_columns)?;
-    let indices = rows_to_indices(rows, fetch)?;
-    let columns = take_arrays(batch.columns(), &indices, None)?;
-
-    let options = 
RecordBatchOptions::new().with_row_count(Some(indices.len()));
-
-    Ok(RecordBatch::try_new_with_options(
-        batch.schema(),
-        columns,
-        &options,
-    )?)
-}
-
 pub fn sort_batch(
     batch: &RecordBatch,
     expressions: &LexOrdering,
@@ -883,14 +808,7 @@ pub fn sort_batch(
         .map(|expr| expr.evaluate_to_sort_column(batch))
         .collect::<Result<Vec<_>>>()?;
 
-    let indices = if is_multi_column_with_lists(&sort_columns) {
-        // lex_sort_to_indices doesn't support List with more than one column
-        // https://github.com/apache/arrow-rs/issues/5454
-        lexsort_to_indices_multi_columns(sort_columns, fetch)?
-    } else {
-        lexsort_to_indices(&sort_columns, fetch)?
-    };
-
+    let indices = lexsort_to_indices(&sort_columns, fetch)?;
     let mut columns = take_arrays(batch.columns(), &indices, None)?;
 
     // The columns may be larger than the unsorted columns in `batch` 
especially for variable length
@@ -909,50 +827,6 @@ pub fn sort_batch(
     )?)
 }
 
-#[inline]
-fn is_multi_column_with_lists(sort_columns: &[SortColumn]) -> bool {
-    sort_columns.iter().any(|c| {
-        matches!(
-            c.values.data_type(),
-            DataType::List(_) | DataType::LargeList(_) | 
DataType::FixedSizeList(_, _)
-        )
-    })
-}
-
-pub(crate) fn lexsort_to_indices_multi_columns(
-    sort_columns: Vec<SortColumn>,
-    limit: Option<usize>,
-) -> Result<UInt32Array> {
-    let (fields, columns) = sort_columns.into_iter().fold(
-        (vec![], vec![]),
-        |(mut fields, mut columns), sort_column| {
-            fields.push(SortField::new_with_options(
-                sort_column.values.data_type().clone(),
-                sort_column.options.unwrap_or_default(),
-            ));
-            columns.push(sort_column.values);
-            (fields, columns)
-        },
-    );
-
-    // Note: row converter is reused through `sort_batch_row_based()`, this 
function
-    // is not used during normal sort execution, but it's kept temporarily 
because
-    // it's inside a public interface `sort_batch()`.
-    let converter = RowConverter::new(fields)?;
-    let rows = converter.convert_columns(&columns)?;
-    let mut sort: Vec<_> = rows.iter().enumerate().collect();
-    sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
-
-    let mut len = rows.num_rows();
-    if let Some(limit) = limit {
-        len = limit.min(len);
-    }
-    let indices =
-        UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as 
u32));
-
-    Ok(indices)
-}
-
 /// Sort execution plan.
 ///
 /// Support sorting datasets that are larger than the memory allotted


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to