lide-reed opened a new issue, #64172: URL: https://github.com/apache/doris/issues/64172
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no similar issues. ### Version Reproduced on the current `master`. The same buggy code path (`PartitionSorter::_read_row_rank`) is present in 2.1 / 3.x / 4.x as well. The following reproduce verifying in 4.1.1 ### What's Wrong? A window query that filters on `dense_rank()` (or `rank()`), such as ```sql SELECT ... FROM ( SELECT *, dense_rank() OVER (PARTITION BY p ORDER BY o DESC) AS rk FROM t ) WHERE rk = 1; ``` is rewritten by Nereids into a `VPartitionTopN` node with `partition limit = 1` and `partition topn phase: TWO_PHASE_LOCAL_PTOPN`. When the `dense_rank = 1` group of a partition contains **more rows than `batch_size`**, the query **silently returns only ~`batch_size` rows per pipeline instance** instead of the full group. No error is raised — the result is just wrong. `PartitionSorter::_read_row_rank()` declares EOS via a `Defer` (`be/src/exec/sort/partition_sorter.cpp`): ```cpp Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) { ... size_t merged_rows = 0; Defer defer {[&]() { if (merged_rows == 0 || _get_enough_data()) { // <-- BUG *eos = true; } }}; while (queue.is_valid() && merged_rows < batch_size) { ... for (...; merged_rows < batch_size; ...) { bool cmp_res = _previous_row->impl && _previous_row->compare_two_rows(current->impl); if (!cmp_res) { if (_get_enough_data()) { return Status::OK(); } *_previous_row = *current; _output_distinct_rows++; } // emit row ... merged_rows++; _output_total_rows++; ... } } return Status::OK(); } ``` For `DENSE_RANK`, `_get_enough_data()` is (`be/src/exec/sort/partition_sorter.h`): ```cpp bool _get_enough_data() const { if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) { return _output_distinct_rows >= _partition_inner_limit; } else { return _output_total_rows >= _partition_inner_limit; } } ``` With `partition_inner_limit = 1` (i.e. `WHERE rk = 1`), `_output_distinct_rows` becomes `1` right after the **first** row of the `dense_rank = 1` group is emitted, so `_get_enough_data()` returns `true` immediately. The rest of that group has **not** been emitted yet — it is supposed to be drained across multiple `get_next()` calls. But when the inner loop exits because `merged_rows` reached `batch_size`, the `Defer` unconditionally sets `*eos = true`. The source operator then treats the sorter as fully drained (`be/src/exec/operator/partition_sort_source_operator.cpp`): ```cpp if (local_state._sort_idx < sorter_size) { RETURN_IF_ERROR( sorters[local_state._sort_idx]->get_next(state, output_block, ¤t_eos)); } if (current_eos) { local_state._sort_idx++; // advance to next sorter, dropping the rest ... } ``` so it advances `_sort_idx` to the next sorter and **permanently drops all remaining rows of the current `dense_rank = 1` group**. Net effect: each pipeline instance emits at most `batch_size` rows of the `dense_rank = 1` group, so the observed row count is roughly `batch_size * number_of_pipeline_instances`, regardless of the group's true size. `row_number()` is **not** affected: its `_get_enough_data()` uses `_output_total_rows >= _partition_inner_limit`, which correctly becomes true only after exactly `partition_inner_limit` rows are emitted. ### What You Expected? The query should return the **entire** `dense_rank = 1` group (all rows that share the top sort-key value within each partition), independent of `batch_size` or the number of pipeline instances. ### How to Reproduce? Single BE, single tablet, single partition value — no special data layout needed (the bug is deterministic): ```sql DROP DATABASE IF EXISTS bug_repro; CREATE DATABASE bug_repro; USE bug_repro; CREATE TABLE t_min ( ts DATE NOT NULL, pk BIGINT NOT NULL, val INT ) DUPLICATE KEY(ts, pk) DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES ("replication_num" = "1"); -- dense_rank = 1 group (ts = 2024-01-02): 10000 rows, far larger than batch_size. -- Total rows (10003) < PARTITION_SORT_ROWS_THRESHOLD (20000 in release build), -- so partial sort is not triggered and the lost count is cleanly = batch_size. INSERT INTO t_min SELECT DATE '2024-01-02', 1, number FROM numbers("number" = "10000"); -- dense_rank = 2 group (ts = 2024-01-01): 3 rows (just to create a 2nd distinct ts). INSERT INTO t_min SELECT DATE '2024-01-01', 1, number FROM numbers("number" = "3"); -- Single instance for a clean, deterministic count. SET parallel_pipeline_task_num = 1; SELECT /*+ SET_VAR(batch_size = 256) */ ts, COUNT(*) cnt FROM ( SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t ) x WHERE dr = 1 GROUP BY ts ORDER BY ts; SELECT /*+ SET_VAR(batch_size = 1024) */ ts, COUNT(*) cnt FROM ( SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t ) x WHERE dr = 1 GROUP BY ts ORDER BY ts; SELECT /*+ SET_VAR(batch_size = 4096) */ ts, COUNT(*) cnt FROM ( SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t ) x WHERE dr = 1 GROUP BY ts ORDER BY ts; SELECT /*+ SET_VAR(batch_size = 16384) */ ts, COUNT(*) cnt FROM ( SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t ) x WHERE dr = 1 GROUP BY ts ORDER BY ts; ``` **Actual output (buggy):** ``` batch_size = 256 -> ts=2024-01-02, cnt= 256 batch_size = 1024 -> ts=2024-01-02, cnt= 1024 batch_size = 4096 -> ts=2024-01-02, cnt= 4096 batch_size = 16384 -> ts=2024-01-02, cnt=10000 (>= group size, so nothing dropped) ``` `cnt` is exactly `min(batch_size, group_size)` — the row count tracks `batch_size`, which is clearly wrong. **Expected output (correct):** always ``` ts=2024-01-02, cnt=10000 ``` #### Confirm via EXPLAIN and the golden path ```sql EXPLAIN SELECT /*+ SET_VAR(batch_size = 1024) */ ts, COUNT(*) cnt FROM ( SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t ) x WHERE dr = 1 GROUP BY ts ORDER BY ts; -- contains: -- VPartitionTopN -- functions: dense_rank -- partition limit: 1 -- partition topn phase: TWO_PHASE_LOCAL_PTOPN -- Disabling partition top-n routes through plain VAnalytic and is always correct: SELECT /*+ SET_VAR(enable_partition_topn = false, batch_size = 256) */ ts, COUNT(*) cnt FROM ( SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t ) x WHERE dr = 1 GROUP BY ts ORDER BY ts; -- ts=2024-01-02, cnt=10000 (stable for any batch_size) ``` ### Anything Else? Suggested fix: the `Defer` must not treat `_get_enough_data() == true` as "input exhausted". Introduce a `bool finished` flag that is set to `true` only when (a) the loop hits the boundary of the next distinct group with the limit already satisfied (the `if (_get_enough_data()) return;` branch), or (b) the input queue is fully drained. The `Defer` then signals EOS only when `finished` is true (or when no rows were emitted at all): ```cpp Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) { ... size_t merged_rows = 0; bool finished = false; // NEW Defer defer {[&]() { if (merged_rows == 0 || finished) { // CHANGED: was _get_enough_data() *eos = true; } }}; while (queue.is_valid() && merged_rows < batch_size) { ... for (...; merged_rows < batch_size; ...) { ... if (!cmp_res) { if (_get_enough_data()) { finished = true; // NEW: real group boundary reached return Status::OK(); } *_previous_row = *current; _output_distinct_rows++; } ... } } if (!queue.is_valid()) { // NEW: input fully drained finished = true; } return Status::OK(); } ``` With this fix the same group is drained across multiple `get_next()` calls and the full `dense_rank = 1` group is returned for any `batch_size`. Verified on a 3-BE cluster: all of `batch_size = 256 / 1024 / 2048 / 10240 / 20480` return the correct, stable row count after the fix. I'm happy to submit a PR with the fix and BE unit tests covering both `DENSE_RANK` and `RANK` with a first group larger than `batch_size`. ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
