This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9ec679b1f4 Clean up ExternalSorter and use upstream converter (#16109)
9ec679b1f4 is described below
commit 9ec679b1f412cee8020ffb09674d20df35c98976
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue May 20 16:27:24 2025 -0400
Clean up ExternalSorter and use upstream converter (#16109)
---
datafusion/physical-plan/src/sorts/sort.rs | 138 ++---------------------------
1 file changed, 6 insertions(+), 132 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 635b93b81b..683983d9e6 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -44,15 +44,10 @@ use crate::{
Statistics,
};
-use arrow::array::{
- Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
-};
-use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays,
SortColumn};
-use arrow::datatypes::{DataType, SchemaRef};
-use arrow::row::{RowConverter, Rows, SortField};
-use datafusion_common::{
- exec_datafusion_err, internal_datafusion_err, internal_err,
DataFusionError, Result,
-};
+use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
+use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{internal_datafusion_err, internal_err,
DataFusionError, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
@@ -206,8 +201,6 @@ struct ExternalSorter {
schema: SchemaRef,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
- /// RowConverter corresponding to the sort expressions
- sort_keys_row_converter: Arc<RowConverter>,
/// The target number of rows for output batches
batch_size: usize,
/// If the in size of buffered memory batches is below this size,
@@ -275,22 +268,6 @@ impl ExternalSorter {
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
.register(&runtime.memory_pool);
- // Construct RowConverter for sort keys
- let sort_fields = expr
- .iter()
- .map(|e| {
- let data_type = e
- .expr
- .data_type(&schema)
- .map_err(|e| e.context("Resolving sort expression data
type"))?;
- Ok(SortField::new_with_options(data_type, e.options))
- })
- .collect::<Result<Vec<_>>>()?;
-
- let converter = RowConverter::new(sort_fields).map_err(|e| {
- exec_datafusion_err!("Failed to create RowConverter: {:?}", e)
- })?;
-
let spill_manager = SpillManager::new(
Arc::clone(&runtime),
metrics.spill_metrics.clone(),
@@ -303,7 +280,6 @@ impl ExternalSorter {
in_progress_spill_file: None,
finished_spill_files: vec![],
expr: expr.into(),
- sort_keys_row_converter: Arc::new(converter),
metrics,
reservation,
spill_manager,
@@ -728,22 +704,10 @@ impl ExternalSorter {
let schema = batch.schema();
let expressions: LexOrdering = self.expr.iter().cloned().collect();
- let row_converter = Arc::clone(&self.sort_keys_row_converter);
let stream = futures::stream::once(async move {
let _timer = metrics.elapsed_compute().timer();
- let sort_columns = expressions
- .iter()
- .map(|expr| expr.evaluate_to_sort_column(&batch))
- .collect::<Result<Vec<_>>>()?;
-
- let sorted = if is_multi_column_with_lists(&sort_columns) {
- // lex_sort_to_indices doesn't support List with more than one
column
- // https://github.com/apache/arrow-rs/issues/5454
- sort_batch_row_based(&batch, &expressions, row_converter,
None)?
- } else {
- sort_batch(&batch, &expressions, None)?
- };
+ let sorted = sort_batch(&batch, &expressions, None)?;
metrics.record_output(sorted.num_rows());
drop(batch);
@@ -834,45 +798,6 @@ impl Debug for ExternalSorter {
}
}
-/// Converts rows into a sorted array of indices based on their order.
-/// This function returns the indices that represent the sorted order of the
rows.
-fn rows_to_indices(rows: Rows, limit: Option<usize>) -> Result<UInt32Array> {
- let mut sort: Vec<_> = rows.iter().enumerate().collect();
- sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
-
- let mut len = rows.num_rows();
- if let Some(limit) = limit {
- len = limit.min(len);
- }
- let indices =
- UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as
u32));
- Ok(indices)
-}
-
-/// Sorts a `RecordBatch` by converting its sort columns into Arrow Row Format
for faster comparison.
-fn sort_batch_row_based(
- batch: &RecordBatch,
- expressions: &LexOrdering,
- row_converter: Arc<RowConverter>,
- fetch: Option<usize>,
-) -> Result<RecordBatch> {
- let sort_columns = expressions
- .iter()
- .map(|expr| expr.evaluate_to_sort_column(batch).map(|col| col.values))
- .collect::<Result<Vec<_>>>()?;
- let rows = row_converter.convert_columns(&sort_columns)?;
- let indices = rows_to_indices(rows, fetch)?;
- let columns = take_arrays(batch.columns(), &indices, None)?;
-
- let options =
RecordBatchOptions::new().with_row_count(Some(indices.len()));
-
- Ok(RecordBatch::try_new_with_options(
- batch.schema(),
- columns,
- &options,
- )?)
-}
-
pub fn sort_batch(
batch: &RecordBatch,
expressions: &LexOrdering,
@@ -883,14 +808,7 @@ pub fn sort_batch(
.map(|expr| expr.evaluate_to_sort_column(batch))
.collect::<Result<Vec<_>>>()?;
- let indices = if is_multi_column_with_lists(&sort_columns) {
- // lex_sort_to_indices doesn't support List with more than one column
- // https://github.com/apache/arrow-rs/issues/5454
- lexsort_to_indices_multi_columns(sort_columns, fetch)?
- } else {
- lexsort_to_indices(&sort_columns, fetch)?
- };
-
+ let indices = lexsort_to_indices(&sort_columns, fetch)?;
let mut columns = take_arrays(batch.columns(), &indices, None)?;
// The columns may be larger than the unsorted columns in `batch`
especially for variable length
@@ -909,50 +827,6 @@ pub fn sort_batch(
)?)
}
-#[inline]
-fn is_multi_column_with_lists(sort_columns: &[SortColumn]) -> bool {
- sort_columns.iter().any(|c| {
- matches!(
- c.values.data_type(),
- DataType::List(_) | DataType::LargeList(_) |
DataType::FixedSizeList(_, _)
- )
- })
-}
-
-pub(crate) fn lexsort_to_indices_multi_columns(
- sort_columns: Vec<SortColumn>,
- limit: Option<usize>,
-) -> Result<UInt32Array> {
- let (fields, columns) = sort_columns.into_iter().fold(
- (vec![], vec![]),
- |(mut fields, mut columns), sort_column| {
- fields.push(SortField::new_with_options(
- sort_column.values.data_type().clone(),
- sort_column.options.unwrap_or_default(),
- ));
- columns.push(sort_column.values);
- (fields, columns)
- },
- );
-
- // Note: row converter is reused through `sort_batch_row_based()`, this
function
- // is not used during normal sort execution, but it's kept temporarily
because
- // it's inside a public interface `sort_batch()`.
- let converter = RowConverter::new(fields)?;
- let rows = converter.convert_columns(&columns)?;
- let mut sort: Vec<_> = rows.iter().enumerate().collect();
- sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
-
- let mut len = rows.num_rows();
- if let Some(limit) = limit {
- len = limit.min(len);
- }
- let indices =
- UInt32Array::from_iter_values(sort.iter().take(len).map(|(i, _)| *i as
u32));
-
- Ok(indices)
-}
-
/// Sort execution plan.
///
/// Support sorting datasets that are larger than the memory allotted
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]