gruuya commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1286970765


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -465,16 +484,23 @@ impl ExternalSorter {
         // This is a very rough heuristic and likely could be refined further
         if self.reservation.size() < 1048576 {
             // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            let (_, batches): (Vec<bool>, Vec<RecordBatch>) =
+                std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
+            let batch = concat_batches(&self.schema, &batches)?;
             self.in_mem_batches.clear();
-            return self.sort_batch_stream(batch, metrics);
+            // Even if all individual batches were themselves sorted the 
resulting concatenated one
+            // isn't guaranteed to be sorted, so we must perform sorting on 
the stream.
+            return self.sort_batch_stream(batch, false, metrics);

Review Comment:
   So I did try to test this approach as well, and then saw some improvements 
that seemed too good to be true. I went and re-ran the benchmarks again and the 
improvements held, until they didn't at some point 🤷🏻‍♂️  (fwiw I'm running the 
benchmarks on a cloud VM, not dedicated hardware).
   
   In hindsight, the sorting benchmarks actually do not use a memory limit and 
so there were no spills and this code path wasn't exercised. I did try running 
the benchmarks with memory limits on, but then I hit `Dictionary replacement 
detected when writing IPC file format.` arrow error during spilling. It seems 
like this is a general problem as it happens on the main branch too, though I 
haven't investigated further.
   
   Either way, I'll add this check now even without doing benchmarking on it 
because it seems it can only help.



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -465,16 +484,23 @@ impl ExternalSorter {
         // This is a very rough heuristic and likely could be refined further
         if self.reservation.size() < 1048576 {
             // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            let (_, batches): (Vec<bool>, Vec<RecordBatch>) =
+                std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
+            let batch = concat_batches(&self.schema, &batches)?;
             self.in_mem_batches.clear();
-            return self.sort_batch_stream(batch, metrics);
+            // Even if all individual batches were themselves sorted the 
resulting concatenated one
+            // isn't guaranteed to be sorted, so we must perform sorting on 
the stream.
+            return self.sort_batch_stream(batch, false, metrics);

Review Comment:
   So I did try to test this approach as well, and then saw some improvements 
that seemed too good to be true. I went and re-ran the benchmarks again and the 
improvements held, until they didn't at some point 🤷🏻‍♂️  (fwiw I'm running the 
benchmarks on a cloud VM, not dedicated hardware).
   
   In hindsight, the sorting benchmarks actually do not use a memory limit and 
so there were no spills and this code path wasn't exercised. I did try running 
the benchmarks with memory limits on, but then I hit `Dictionary replacement 
detected when writing IPC file format.` arrow error during spilling. It seems 
like this is a general problem as it happens on the main branch too, though I 
haven't investigated further.
   
   Either way, I'll add this check now even without doing benchmarking on it 
because it seems it can only help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to