Dandandan commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1287692850


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -253,11 +261,23 @@ impl ExternalSorter {
     /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
     ///
     /// Updates memory usage metrics, and possibly triggers spilling to disk
-    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+    async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
         if input.num_rows() == 0 {
             return Ok(());
         }
 
+        let mut batch_sorted = false;
+        if self
+            .fetch
+            .map_or(false, |f| f <= input.num_rows() && f <= 100)
+        {
+            // Eagerly sort the batch to potentially reduce the number of rows
+            // after applying the fetch parameter.
+            // Currently only applied for fetch of 100 rows or less.
+            input = sort_batch(&input, &self.expr, self.fetch)?;

Review Comment:
   I wonder whether you could recover part of the perf difference by concar + 
sorting only once every `n` (say every 10) batches. The selectivity of the 
limit and total work to be performed is much bigger for sorting 81920 vs 8192 
rows, also the merging to be performed will be over fewer batches.



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -253,11 +261,23 @@ impl ExternalSorter {
     /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
     ///
     /// Updates memory usage metrics, and possibly triggers spilling to disk
-    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+    async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
         if input.num_rows() == 0 {
             return Ok(());
         }
 
+        let mut batch_sorted = false;
+        if self
+            .fetch
+            .map_or(false, |f| f <= input.num_rows() && f <= 100)
+        {
+            // Eagerly sort the batch to potentially reduce the number of rows
+            // after applying the fetch parameter.
+            // Currently only applied for fetch of 100 rows or less.
+            input = sort_batch(&input, &self.expr, self.fetch)?;

Review Comment:
   I wonder whether you could recover part of the perf difference by concat + 
sorting only once every `n` (say every 10) batches. The selectivity of the 
limit and total work to be performed is much bigger for sorting 81920 vs 8192 
rows, also the merging to be performed will be over fewer batches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to