This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a026e7da2f perf: Optimize heap handling in TopK operator (#20556)
a026e7da2f is described below
commit a026e7da2fe88f1923770ec3249c5f0e7b2ea6b0
Author: Adam Gutglick <[email protected]>
AuthorDate: Thu Feb 26 12:31:09 2026 +0000
perf: Optimize heap handling in TopK operator (#20556)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #.
## Rationale for this change
This change to make a significant performance impact in the `TopK`
operator, which is a commonly used operator.
## What changes are included in this PR?
Instead of doing two operations on the inner heap (pop than push), we
use `Binary::peek_mut`, which allows us to replace the heap item
in-place and then sift it to its proper location in the heap.
Some SLT results seem to change, the only explanation I can find for it
is that pop/push vs the sift_down that `PeekMut` uses have some subtle
differences that resolve ties in a different way, ending up with a
slightly different result.
On my macbook, running the `topk_aggregate` benchmark, most benchmarks
are not changed significantly, aside from the following:
```
distinct 10000000 rows desc [no TopK]
time: [554.69 ms 903.25 ms 1.3318 s]
change: [−82.888% −69.587% −47.591%] (p = 0.00 <
0.05)
Performance has improved.
Found 17 outliers among 100 measurements (17.00%)
5 (5.00%) high mild
12 (12.00%) high severe
Benchmarking distinct 10000000 rows asc [no TopK]: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 113.7s, or reduce sample count to 10.
distinct 10000000 rows asc [no TopK]
time: [405.87 ms 702.47 ms 1.0583 s]
change: [−86.490% −75.215% −51.486%] (p = 0.00 <
0.05)
Performance has improved.
Found 17 outliers among 100 measurements (17.00%)
3 (3.00%) high mild
14 (14.00%) high severe
distinct 10000000 rows desc [TopK]
time: [6.8372 ms 6.9933 ms 7.1523 ms]
change: [−0.5254% +2.2409% +5.0920%] (p = 0.13 >
0.05)
No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
distinct 10000000 rows asc [TopK]
time: [6.8731 ms 6.9952 ms 7.1226 ms]
change: [+3.3252% +5.3824% +7.5131%] (p = 0.00 <
0.05)
Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
```
## Are these changes tested?
Existing test suite.
## Are there any user-facing changes?
No API changes, seems like some ordering might change in queries that
use the `TopK` operator, but in a way that seems correct.
---
datafusion/physical-plan/src/topk/mod.rs | 44 ++++++++++-----------------
datafusion/sqllogictest/test_files/limit.slt | 8 ++---
datafusion/sqllogictest/test_files/window.slt | 8 ++---
3 files changed, 24 insertions(+), 36 deletions(-)
diff --git a/datafusion/physical-plan/src/topk/mod.rs
b/datafusion/physical-plan/src/topk/mod.rs
index 4b93e6a188..e0b91f2516 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -724,8 +724,8 @@ impl TopKHeap {
let row = row.as_ref();
// Reuse storage for evicted item if possible
- let new_top_k = if self.inner.len() == self.k {
- let prev_min = self.inner.pop().unwrap();
+ if self.inner.len() == self.k {
+ let mut prev_min = self.inner.peek_mut().unwrap();
// Update batch use
if prev_min.batch_id == batch_entry.id {
@@ -736,15 +736,16 @@ impl TopKHeap {
// update memory accounting
self.owned_bytes -= prev_min.owned_size();
- prev_min.with_new_row(row, batch_id, index)
- } else {
- TopKRow::new(row, batch_id, index)
- };
- self.owned_bytes += new_top_k.owned_size();
+ prev_min.replace_with(row, batch_id, index);
- // put the new row into the heap
- self.inner.push(new_top_k)
+ self.owned_bytes += prev_min.owned_size();
+ } else {
+ let new_row = TopKRow::new(row, batch_id, index);
+ self.owned_bytes += new_row.owned_size();
+ // put the new row into the heap
+ self.inner.push(new_row);
+ };
}
/// Returns the values stored in this heap, from values low to
@@ -911,26 +912,13 @@ impl TopKRow {
}
}
- /// Create a new TopKRow reusing the existing allocation
- fn with_new_row(
- self,
- new_row: impl AsRef<[u8]>,
- batch_id: u32,
- index: usize,
- ) -> Self {
- let Self {
- mut row,
- batch_id: _,
- index: _,
- } = self;
- row.clear();
- row.extend_from_slice(new_row.as_ref());
+ // Replace the existing row capacity with new values
+ fn replace_with(&mut self, new_row: impl AsRef<[u8]>, batch_id: u32,
index: usize) {
+ self.row.clear();
+ self.row.extend_from_slice(new_row.as_ref());
- Self {
- row,
- batch_id,
- index,
- }
+ self.batch_id = batch_id;
+ self.index = index;
}
/// Returns the number of bytes owned by this row in the heap (not
diff --git a/datafusion/sqllogictest/test_files/limit.slt
b/datafusion/sqllogictest/test_files/limit.slt
index ec8363f51a..ff3c49485a 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -679,19 +679,19 @@ ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc;
----
3 98 96
-3 98 89
+3 98 87
3 98 82
3 98 79
3 97 96
-3 97 89
+3 97 87
3 97 82
3 97 79
3 96 96
-3 96 89
+3 96 87
3 96 82
3 96 79
3 95 96
-3 95 89
+3 95 87
3 95 82
3 95 79
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 8ac8724683..c3e6f39adb 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4387,9 +4387,9 @@ LIMIT 5;
----
78 50
63 38
-3 53
+NULL 19
24 31
-14 94
+24 56
# result should be same with above, when LAG/LEAD algorithm work with pruned
data.
# decreasing batch size, causes data to be produced in smaller chunks at the
source.
@@ -4406,9 +4406,9 @@ LIMIT 5;
----
78 50
63 38
-3 53
+NULL 19
24 31
-14 94
+24 56
statement ok
set datafusion.execution.batch_size = 100;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]