Copilot commented on code in PR #61282:
URL: https://github.com/apache/doris/pull/61282#discussion_r2924038306
##########
be/src/pipeline/exec/streaming_aggregation_operator.cpp:
##########
@@ -561,6 +591,183 @@ void
StreamingAggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr da
}
}
+vectorized::MutableColumns StreamingAggLocalState::_get_keys_hash_table() {
+ return std::visit(
+ vectorized::Overload {
+ [&](std::monostate& arg) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ return vectorized::MutableColumns();
+ },
+ [&](auto&& agg_method) -> vectorized::MutableColumns {
+ vectorized::MutableColumns key_columns;
+ for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
+ key_columns.emplace_back(
+
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+ }
+ auto& data = *agg_method.hash_table;
+ bool has_null_key = data.has_null_key_data();
+ const auto size = data.size() - has_null_key;
+ using KeyType =
std::decay_t<decltype(agg_method)>::Key;
+ std::vector<KeyType> keys(size);
+
+ uint32_t num_rows = 0;
+ auto iter = _aggregate_data_container->begin();
+ {
+ while (iter != _aggregate_data_container->end()) {
+ keys[num_rows] = iter.get_key<KeyType>();
+ ++iter;
+ ++num_rows;
+ }
+ }
+ agg_method.insert_keys_into_columns(keys, key_columns,
num_rows);
+ if (has_null_key) {
Review Comment:
`_get_keys_hash_table()` appends a NULL key row when `has_null_key` is true,
but it doesn’t assert the same invariants enforced elsewhere (e.g. only 1
group-by key and that key column is nullable). Add `DCHECK(key_columns.size()
== 1)` / `DCHECK(key_columns[0]->is_nullable())` (and/or handle multi-key
correctly) before `insert_data(nullptr, 0)` to avoid producing columns with
inconsistent sizes if the invariant ever changes.
```suggestion
if (has_null_key) {
DCHECK_EQ(key_columns.size(), 1);
DCHECK(key_columns[0]->is_nullable());
```
##########
be/src/vec/exec/scan/scanner.h:
##########
@@ -209,6 +219,8 @@ class Scanner {
// Used in common subexpression elimination to compute intermediate
results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
vectorized::Block _origin_block;
+ vectorized::Block _padding_block;
+ bool _alreay_eos = false;
Review Comment:
Typo in the new member name `_alreay_eos` (should be `_already_eos`). Since
this flag is part of the scanner state machine, keeping the misspelling will
make future maintenance/error searches harder; please rename it consistently in
both the header and implementation.
```suggestion
bool _already_eos = false;
```
##########
be/src/pipeline/exec/streaming_aggregation_operator.h:
##########
@@ -95,10 +104,70 @@ class StreamingAggLocalState MOCK_REMOVE(final) : public
PipelineXLocalState<Fak
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<AggregateDataContainer> _aggregate_data_container =
nullptr;
- bool _should_limit_output = false;
bool _reach_limit = false;
size_t _input_num_rows = 0;
+ int64_t limit = -1;
+ int need_do_sort_limit = -1;
+ bool do_sort_limit = false;
+ vectorized::MutableColumns limit_columns;
+ int limit_columns_min = -1;
+ vectorized::PaddedPODArray<uint8_t> need_computes;
+ std::vector<uint8_t> cmp_res;
+ std::vector<int> order_directions;
+ std::vector<int> null_directions;
Review Comment:
New sort-limit related state in `StreamingAggLocalState` uses non-standard
member naming (`limit`, `do_sort_limit`, etc.) compared to the rest of this
class (mostly `_prefixed` members). Please rename these to match the existing
member naming convention to make the state machine easier to follow (e.g.
`_sort_limit`, `_need_do_sort_limit`, `_do_sort_limit`, etc.).
```suggestion
int64_t _sort_limit = -1;
int _need_do_sort_limit = -1;
bool _do_sort_limit = false;
vectorized::MutableColumns _limit_columns;
int _limit_columns_min = -1;
vectorized::PaddedPODArray<uint8_t> _need_computes;
std::vector<uint8_t> _cmp_res;
std::vector<int> _order_directions;
std::vector<int> _null_directions;
```
##########
be/src/pipeline/exec/streaming_aggregation_operator.h:
##########
@@ -198,7 +266,14 @@ class StreamingAggOperatorX MOCK_REMOVE(final) : public
StatefulOperatorX<Stream
std::vector<size_t> _make_nullable_keys;
bool _have_conjuncts;
RowDescriptor _agg_fn_output_row_descriptor;
- std::vector<TExpr> _partition_exprs;
+<<<<<<< HEAD
+ // For sort limit
+ bool _do_sort_limit = false;
+ int64_t _sort_limit = -1;
+ std::vector<int> _order_directions;
+ std::vector<int> _null_directions;
+
+ const std::vector<TExpr> _partition_exprs;
Review Comment:
This header still contains unresolved merge-conflict markers (e.g. `<<<<<<<
HEAD`) and the member declarations around them are incomplete/ambiguous. Please
resolve the conflict and ensure the final `_partition_exprs` type matches its
usage (it is assigned in `StreamingAggOperatorX::update_operator`, so it cannot
be `const`).
```suggestion
// For sort limit
bool _do_sort_limit = false;
int64_t _sort_limit = -1;
std::vector<int> _order_directions;
std::vector<int> _null_directions;
std::vector<TExpr> _partition_exprs;
```
##########
be/src/vec/exec/scan/scanner.cpp:
##########
@@ -79,8 +79,39 @@ Status Scanner::init(RuntimeState* state, const
VExprContextSPtrs& conjuncts) {
Status Scanner::get_block_after_projects(RuntimeState* state,
vectorized::Block* block, bool* eos) {
auto& row_descriptor = _local_state->_parent->row_descriptor();
if (_output_row_descriptor) {
-
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
- RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+ if (_alreay_eos) {
+ *eos = true;
+ _padding_block.swap(_origin_block);
+ } else {
+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+ const auto min_batch_size = std::max(state->batch_size() / 2, 1);
Review Comment:
`get_block_after_projects` reads `*eos` in the `while (_padding_block.rows()
< min_batch_size && !*eos)` condition before this method has set it. This
relies on all callers initializing `*eos`, which is easy to violate and could
lead to undefined behavior or skipping the first `get_block()` call. Initialize
`*eos` to `false` before the loop (or restructure the loop to fetch before
checking `*eos`).
```suggestion
const auto min_batch_size = std::max(state->batch_size() / 2, 1);
*eos = false;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]