lide-reed opened a new pull request, #64177:
URL: https://github.com/apache/doris/pull/64177
…ows when the rk=1 group exceeds batch_size
Issue Number: close #64172
Problem Summary:
A window query that filters on `dense_rank()` / `rank()`, e.g.
```sql
SELECT ... FROM (
SELECT *, dense_rank() OVER (PARTITION BY p ORDER BY o DESC) AS rk FROM t
) WHERE rk = 1;
```
is rewritten into a `VPartitionTopN` node (`partition limit = 1`,
`TWO_PHASE_LOCAL_PTOPN`). When a partition's `dense_rank = 1` group has more
rows than `batch_size`, the query silently returns only ~`batch_size` rows per
pipeline instance instead of the whole group. No error is raised.
Root cause is in `PartitionSorter::_read_row_rank()`
(`be/src/exec/sort/partition_sorter.cpp`). Its `Defer` signals EOS whenever
`_get_enough_data()` is true:
```cpp
Defer defer {[&]() {
if (merged_rows == 0 || _get_enough_data()) { // BUG
*eos = true;
}
}};
```
For `DENSE_RANK`, `_get_enough_data()` is
`_output_distinct_rows >= _partition_inner_limit`. With
`partition_inner_limit = 1`, it becomes true right after the *first* row of the
group is emitted. When the loop exits because `merged_rows` reached
`batch_size`, the `Defer` sets `*eos = true`, so
`PartitionSortSourceOperator` advances `_sort_idx` to the next sorter and
drops the rest of the current `dense_rank = 1` group. Observed row count ends
up being `batch_size * num_pipeline_instances`.
`row_number()` is not affected (its `_get_enough_data()` uses
`_output_total_rows`, which only becomes true after exactly
`partition_inner_limit` rows are emitted).
Introduce a `bool finished` flag that is set to true only when:
- the loop reaches the boundary of the next distinct group with the limit
already satisfied (`if (_get_enough_data()) { finished = true; return; }`), or
- the input queue is fully drained.
The `Defer` then signals EOS only on `merged_rows == 0 || finished`, so the
same `dense_rank = 1` group is drained across multiple `get_next()` calls.
```diff
Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int
batch_size) {
...
size_t merged_rows = 0;
+ bool finished = false;
Defer defer {[&]() {
- if (merged_rows == 0 || _get_enough_data()) {
+ if (merged_rows == 0 || finished) {
*eos = true;
}
}};
while (queue.is_valid() && merged_rows < batch_size) {
...
for (...; merged_rows < batch_size; ...) {
...
if (!cmp_res) {
if (_get_enough_data()) {
+ finished = true;
return Status::OK();
}
*_previous_row = *current;
_output_distinct_rows++;
}
...
}
}
+
+ if (!queue.is_valid()) {
+ finished = true;
+ }
return Status::OK();
}
```
Fix `dense_rank()` / `rank()` window functions with a top-n filter (e.g.
`WHERE rk = 1`) silently dropping rows when the matching group is larger than
`batch_size`.
- Test
- [ ] Regression test
- [x] Unit Test <!-- new BE unit tests in
be/test/exec/sort/partition_sorter_test.cpp -->
- [x] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
Manual test (single BE, single tablet, single partition value; the bug is
deterministic):
```sql
CREATE TABLE t_min (ts DATE NOT NULL, pk BIGINT NOT NULL, val INT)
DUPLICATE KEY(ts, pk) DISTRIBUTED BY HASH(pk) BUCKETS 1
PROPERTIES ("replication_num" = "1");
INSERT INTO t_min SELECT DATE '2024-01-02', 1, number FROM
numbers("number" = "10000");
INSERT INTO t_min SELECT DATE '2024-01-01', 1, number FROM
numbers("number" = "3");
SET parallel_pipeline_task_num = 1;
SELECT /*+ SET_VAR(batch_size = 1024) */ ts, COUNT(*) cnt FROM (
SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM
t_min t
) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
```
| batch_size | before (buggy) | after (fixed) |
|---|---|---|
| 256 | 256 | 10000 |
| 1024 | 1024 | 10000 |
| 4096 | 4096 | 10000 |
| 16384 | 10000 | 10000 |
Also verified on a 3-BE cluster against a 15M-row production table: before
the fix the same query returned varying wrong counts
(e.g. 149990 / 109350 across runs, and `8 * batch_size` exactly under
`parallel_pipeline_task_num = 1` per instance); after the fix it is stable
and matches the golden value from `SET enable_partition_topn = false`.
New unit tests:
`test_dense_rank_first_group_exceeds_batch_size_regression` and
`test_rank_first_group_exceeds_batch_size_regression` build a first group
of 5000 rows with `batch_size = 4096` and assert that all 5000 rows are
emitted across multiple `get_next()` calls (they fail before the fix,
pass after).
- Behavior changed:
- [x] No. <!-- It fixes wrong results; correct queries are unaffected,
and row_number()/other window functions are unchanged. -->
- Does this need documentation?
- [x] No.
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR should
merge into -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]