This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 84e9ce8ebf Simplifications (#12639)
84e9ce8ebf is described below
commit 84e9ce8ebf27c376c43b9eddf6b6bbb282426731
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Sep 27 08:35:31 2024 -0700
Simplifications (#12639)
---
datafusion/physical-plan/src/sorts/sort.rs | 23 ++++++++++-------------
1 file changed, 10 insertions(+), 13 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index fb03ceb15c..64434e7a4a 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -40,12 +40,13 @@ use crate::{
SendableRecordBatchStream, Statistics,
};
-use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn};
+use arrow::compute::{concat_batches, lexsort_to_indices, SortColumn};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
+use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::{internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -350,12 +351,8 @@ impl ExternalSorter {
self.fetch,
self.reservation.new_empty(),
)
- } else if !self.in_mem_batches.is_empty() {
- self.in_mem_sort_stream(self.metrics.baseline.clone())
} else {
- Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
- &self.schema,
- ))))
+ self.in_mem_sort_stream(self.metrics.baseline.clone())
}
}
@@ -500,7 +497,11 @@ impl ExternalSorter {
&mut self,
metrics: BaselineMetrics,
) -> Result<SendableRecordBatchStream> {
- assert_ne!(self.in_mem_batches.len(), 0);
+ if self.in_mem_batches.is_empty() {
+ return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+ &self.schema,
+ ))));
+ }
// The elapsed compute timer is updated when the value is dropped.
// There is no need for an explicit call to drop.
@@ -508,7 +509,7 @@ impl ExternalSorter {
let _timer = elapsed_compute.timer();
if self.in_mem_batches.len() == 1 {
- let batch = self.in_mem_batches.remove(0);
+ let batch = self.in_mem_batches.swap_remove(0);
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, metrics, reservation);
}
@@ -616,11 +617,7 @@ pub fn sort_batch(
lexsort_to_indices(&sort_columns, fetch)?
};
- let columns = batch
- .columns()
- .iter()
- .map(|c| take(c.as_ref(), &indices, None))
- .collect::<Result<_, _>>()?;
+ let columns = get_arrayref_at_indices(batch.columns(), &indices)?;
let options =
RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]