This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 84e9ce8ebf Simplifications (#12639)
84e9ce8ebf is described below

commit 84e9ce8ebf27c376c43b9eddf6b6bbb282426731
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Sep 27 08:35:31 2024 -0700

    Simplifications (#12639)
---
 datafusion/physical-plan/src/sorts/sort.rs | 23 ++++++++++-------------
 1 file changed, 10 insertions(+), 13 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index fb03ceb15c..64434e7a4a 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -40,12 +40,13 @@ use crate::{
     SendableRecordBatchStream, Statistics,
 };
 
-use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn};
+use arrow::compute::{concat_batches, lexsort_to_indices, SortColumn};
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use arrow::row::{RowConverter, SortField};
 use arrow_array::{Array, RecordBatchOptions, UInt32Array};
 use arrow_schema::DataType;
+use datafusion_common::utils::get_arrayref_at_indices;
 use datafusion_common::{internal_err, Result};
 use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -350,12 +351,8 @@ impl ExternalSorter {
                 self.fetch,
                 self.reservation.new_empty(),
             )
-        } else if !self.in_mem_batches.is_empty() {
-            self.in_mem_sort_stream(self.metrics.baseline.clone())
         } else {
-            Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
-                &self.schema,
-            ))))
+            self.in_mem_sort_stream(self.metrics.baseline.clone())
         }
     }
 
@@ -500,7 +497,11 @@ impl ExternalSorter {
         &mut self,
         metrics: BaselineMetrics,
     ) -> Result<SendableRecordBatchStream> {
-        assert_ne!(self.in_mem_batches.len(), 0);
+        if self.in_mem_batches.is_empty() {
+            return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+                &self.schema,
+            ))));
+        }
 
         // The elapsed compute timer is updated when the value is dropped.
         // There is no need for an explicit call to drop.
@@ -508,7 +509,7 @@ impl ExternalSorter {
         let _timer = elapsed_compute.timer();
 
         if self.in_mem_batches.len() == 1 {
-            let batch = self.in_mem_batches.remove(0);
+            let batch = self.in_mem_batches.swap_remove(0);
             let reservation = self.reservation.take();
             return self.sort_batch_stream(batch, metrics, reservation);
         }
@@ -616,11 +617,7 @@ pub fn sort_batch(
         lexsort_to_indices(&sort_columns, fetch)?
     };
 
-    let columns = batch
-        .columns()
-        .iter()
-        .map(|c| take(c.as_ref(), &indices, None))
-        .collect::<Result<_, _>>()?;
+    let columns = get_arrayref_at_indices(batch.columns(), &indices)?;
 
     let options = 
RecordBatchOptions::new().with_row_count(Some(indices.len()));
     Ok(RecordBatch::try_new_with_options(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to