alamb commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1285023739


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -253,11 +251,27 @@ impl ExternalSorter {
     /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
     ///
     /// Updates memory usage metrics, and possibly triggers spilling to disk
-    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+    async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
         if input.num_rows() == 0 {
             return Ok(());
         }
 
+        let mut batch_sorted = false;
+        if self.fetch.map_or(false, |f| f < input.num_rows()) {
+            // Eagerly sort the batch to potentially reduce the number of rows
+            // after applying the fetch parameter; first perform a memory 
reservation
+            // for the sorting procedure.
+            let mut reservation =

Review Comment:
   Ah that makes sense
   
   I think I think you could use (they very newly added) `Reservation::split()` 
to track the short allocation and free it on drop without needing an entirely 
new consumer:
   
   
https://github.com/apache/arrow-datafusion/blob/23547587c2773ddddb9b16cba1eb8ebf2eebd85a/datafusion/execution/src/memory_pool/mod.rs#L225-L231
   
   perhaps like 
   ```rust
   let batch_reservation = self.reservation.split(input.get_arra_memory_size());
   input = sort_batch(&input, &self.expr, self.fetch)?;
   // free allocation for previous input
   // (it would also be freed on drop so the free isn't necessary but may be 
clearer)
   batch_reservation.free();
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to