This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a026e7da2f perf: Optimize heap handling in TopK operator (#20556)
a026e7da2f is described below

commit a026e7da2fe88f1923770ec3249c5f0e7b2ea6b0
Author: Adam Gutglick <[email protected]>
AuthorDate: Thu Feb 26 12:31:09 2026 +0000

    perf: Optimize heap handling in TopK operator (#20556)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #.
    
    ## Rationale for this change
    
    This change to make a significant performance impact in the `TopK`
    operator, which is a commonly used operator.
    
    ## What changes are included in this PR?
    
    Instead of doing two operations on the inner heap (pop than push), we
    use `Binary::peek_mut`, which allows us to replace the heap item
    in-place and then sift it to its proper location in the heap.
    
    Some SLT results seem to change, the only explanation I can find for it
    is that pop/push vs the sift_down that `PeekMut` uses have some subtle
    differences that resolve ties in a different way, ending up with a
    slightly different result.
    
    On my macbook, running the `topk_aggregate` benchmark, most benchmarks
    are not changed significantly, aside from the following:
    ```
    distinct 10000000 rows desc [no TopK]
                            time:   [554.69 ms 903.25 ms 1.3318 s]
                            change: [−82.888% −69.587% −47.591%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 17 outliers among 100 measurements (17.00%)
      5 (5.00%) high mild
      12 (12.00%) high severe
    
    Benchmarking distinct 10000000 rows asc [no TopK]: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 113.7s, or reduce sample count to 10.
    distinct 10000000 rows asc [no TopK]
                            time:   [405.87 ms 702.47 ms 1.0583 s]
                            change: [−86.490% −75.215% −51.486%] (p = 0.00 < 
0.05)
                            Performance has improved.
    Found 17 outliers among 100 measurements (17.00%)
      3 (3.00%) high mild
      14 (14.00%) high severe
    
    distinct 10000000 rows desc [TopK]
                            time:   [6.8372 ms 6.9933 ms 7.1523 ms]
                            change: [−0.5254% +2.2409% +5.0920%] (p = 0.13 > 
0.05)
                            No change in performance detected.
    Found 2 outliers among 100 measurements (2.00%)
      2 (2.00%) high mild
    
    distinct 10000000 rows asc [TopK]
                            time:   [6.8731 ms 6.9952 ms 7.1226 ms]
                            change: [+3.3252% +5.3824% +7.5131%] (p = 0.00 < 
0.05)
                            Performance has regressed.
    Found 2 outliers among 100 measurements (2.00%)
      2 (2.00%) high mild
    ```
    
    ## Are these changes tested?
    
    Existing test suite.
    
    ## Are there any user-facing changes?
    
    No API changes, seems like some ordering might change in queries that
    use the `TopK` operator, but in a way that seems correct.
---
 datafusion/physical-plan/src/topk/mod.rs      | 44 ++++++++++-----------------
 datafusion/sqllogictest/test_files/limit.slt  |  8 ++---
 datafusion/sqllogictest/test_files/window.slt |  8 ++---
 3 files changed, 24 insertions(+), 36 deletions(-)

diff --git a/datafusion/physical-plan/src/topk/mod.rs 
b/datafusion/physical-plan/src/topk/mod.rs
index 4b93e6a188..e0b91f2516 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -724,8 +724,8 @@ impl TopKHeap {
         let row = row.as_ref();
 
         // Reuse storage for evicted item if possible
-        let new_top_k = if self.inner.len() == self.k {
-            let prev_min = self.inner.pop().unwrap();
+        if self.inner.len() == self.k {
+            let mut prev_min = self.inner.peek_mut().unwrap();
 
             // Update batch use
             if prev_min.batch_id == batch_entry.id {
@@ -736,15 +736,16 @@ impl TopKHeap {
 
             // update memory accounting
             self.owned_bytes -= prev_min.owned_size();
-            prev_min.with_new_row(row, batch_id, index)
-        } else {
-            TopKRow::new(row, batch_id, index)
-        };
 
-        self.owned_bytes += new_top_k.owned_size();
+            prev_min.replace_with(row, batch_id, index);
 
-        // put the new row into the heap
-        self.inner.push(new_top_k)
+            self.owned_bytes += prev_min.owned_size();
+        } else {
+            let new_row = TopKRow::new(row, batch_id, index);
+            self.owned_bytes += new_row.owned_size();
+            // put the new row into the heap
+            self.inner.push(new_row);
+        };
     }
 
     /// Returns the values stored in this heap, from values low to
@@ -911,26 +912,13 @@ impl TopKRow {
         }
     }
 
-    /// Create a new  TopKRow reusing the existing allocation
-    fn with_new_row(
-        self,
-        new_row: impl AsRef<[u8]>,
-        batch_id: u32,
-        index: usize,
-    ) -> Self {
-        let Self {
-            mut row,
-            batch_id: _,
-            index: _,
-        } = self;
-        row.clear();
-        row.extend_from_slice(new_row.as_ref());
+    // Replace the existing row capacity with new values
+    fn replace_with(&mut self, new_row: impl AsRef<[u8]>, batch_id: u32, 
index: usize) {
+        self.row.clear();
+        self.row.extend_from_slice(new_row.as_ref());
 
-        Self {
-            row,
-            batch_id,
-            index,
-        }
+        self.batch_id = batch_id;
+        self.index = index;
     }
 
     /// Returns the number of bytes owned by this row in the heap (not
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index ec8363f51a..ff3c49485a 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -679,19 +679,19 @@ ON t1.b = t2.b
 ORDER BY t1.b desc, c desc, c2 desc;
 ----
 3 98 96
-3 98 89
+3 98 87
 3 98 82
 3 98 79
 3 97 96
-3 97 89
+3 97 87
 3 97 82
 3 97 79
 3 96 96
-3 96 89
+3 96 87
 3 96 82
 3 96 79
 3 95 96
-3 95 89
+3 95 87
 3 95 82
 3 95 79
 
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 8ac8724683..c3e6f39adb 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4387,9 +4387,9 @@ LIMIT 5;
 ----
 78 50
 63 38
-3 53
+NULL 19
 24 31
-14 94
+24 56
 
 # result should be same with above, when LAG/LEAD algorithm work with pruned 
data.
 # decreasing batch size, causes data to be produced in smaller chunks at the 
source.
@@ -4406,9 +4406,9 @@ LIMIT 5;
 ----
 78 50
 63 38
-3 53
+NULL 19
 24 31
-14 94
+24 56
 
 statement ok
 set datafusion.execution.batch_size = 100;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to