Copilot commented on code in PR #61282:
URL: https://github.com/apache/doris/pull/61282#discussion_r2924038306


##########
be/src/pipeline/exec/streaming_aggregation_operator.cpp:
##########
@@ -561,6 +591,183 @@ void 
StreamingAggLocalState::_destroy_agg_status(vectorized::AggregateDataPtr da
     }
 }
 
+vectorized::MutableColumns StreamingAggLocalState::_get_keys_hash_table() {
+    return std::visit(
+            vectorized::Overload {
+                    [&](std::monostate& arg) {
+                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                        return vectorized::MutableColumns();
+                    },
+                    [&](auto&& agg_method) -> vectorized::MutableColumns {
+                        vectorized::MutableColumns key_columns;
+                        for (int i = 0; i < _probe_expr_ctxs.size(); ++i) {
+                            key_columns.emplace_back(
+                                    
_probe_expr_ctxs[i]->root()->data_type()->create_column());
+                        }
+                        auto& data = *agg_method.hash_table;
+                        bool has_null_key = data.has_null_key_data();
+                        const auto size = data.size() - has_null_key;
+                        using KeyType = 
std::decay_t<decltype(agg_method)>::Key;
+                        std::vector<KeyType> keys(size);
+
+                        uint32_t num_rows = 0;
+                        auto iter = _aggregate_data_container->begin();
+                        {
+                            while (iter != _aggregate_data_container->end()) {
+                                keys[num_rows] = iter.get_key<KeyType>();
+                                ++iter;
+                                ++num_rows;
+                            }
+                        }
+                        agg_method.insert_keys_into_columns(keys, key_columns, 
num_rows);
+                        if (has_null_key) {

Review Comment:
   `_get_keys_hash_table()` appends a NULL key row when `has_null_key` is true, 
but it doesn’t assert the same invariants enforced elsewhere (e.g. only 1 
group-by key and that key column is nullable). Add `DCHECK(key_columns.size() 
== 1)` / `DCHECK(key_columns[0]->is_nullable())` (and/or handle multi-key 
correctly) before `insert_data(nullptr, 0)` to avoid producing columns with 
inconsistent sizes if the invariant ever changes.
   ```suggestion
                           if (has_null_key) {
                               DCHECK_EQ(key_columns.size(), 1);
                               DCHECK(key_columns[0]->is_nullable());
   ```



##########
be/src/vec/exec/scan/scanner.h:
##########
@@ -209,6 +219,8 @@ class Scanner {
     // Used in common subexpression elimination to compute intermediate 
results.
     std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
     vectorized::Block _origin_block;
+    vectorized::Block _padding_block;
+    bool _alreay_eos = false;

Review Comment:
   Typo in the new member name `_alreay_eos` (should be `_already_eos`). Since 
this flag is part of the scanner state machine, keeping the misspelling will 
make future maintenance/error searches harder; please rename it consistently in 
both the header and implementation.
   ```suggestion
       bool _already_eos = false;
   ```



##########
be/src/pipeline/exec/streaming_aggregation_operator.h:
##########
@@ -95,10 +104,70 @@ class StreamingAggLocalState MOCK_REMOVE(final) : public 
PipelineXLocalState<Fak
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
     std::unique_ptr<AggregateDataContainer> _aggregate_data_container = 
nullptr;
-    bool _should_limit_output = false;
     bool _reach_limit = false;
     size_t _input_num_rows = 0;
 
+    int64_t limit = -1;
+    int need_do_sort_limit = -1;
+    bool do_sort_limit = false;
+    vectorized::MutableColumns limit_columns;
+    int limit_columns_min = -1;
+    vectorized::PaddedPODArray<uint8_t> need_computes;
+    std::vector<uint8_t> cmp_res;
+    std::vector<int> order_directions;
+    std::vector<int> null_directions;

Review Comment:
   New sort-limit related state in `StreamingAggLocalState` uses non-standard 
member naming (`limit`, `do_sort_limit`, etc.) compared to the rest of this 
class (mostly `_prefixed` members). Please rename these to match the existing 
member naming convention to make the state machine easier to follow (e.g. 
`_sort_limit`, `_need_do_sort_limit`, `_do_sort_limit`, etc.).
   ```suggestion
       int64_t _sort_limit = -1;
       int _need_do_sort_limit = -1;
       bool _do_sort_limit = false;
       vectorized::MutableColumns _limit_columns;
       int _limit_columns_min = -1;
       vectorized::PaddedPODArray<uint8_t> _need_computes;
       std::vector<uint8_t> _cmp_res;
       std::vector<int> _order_directions;
       std::vector<int> _null_directions;
   ```



##########
be/src/pipeline/exec/streaming_aggregation_operator.h:
##########
@@ -198,7 +266,14 @@ class StreamingAggOperatorX MOCK_REMOVE(final) : public 
StatefulOperatorX<Stream
     std::vector<size_t> _make_nullable_keys;
     bool _have_conjuncts;
     RowDescriptor _agg_fn_output_row_descriptor;
-    std::vector<TExpr> _partition_exprs;
+<<<<<<< HEAD
+    // For sort limit
+    bool _do_sort_limit = false;
+    int64_t _sort_limit = -1;
+    std::vector<int> _order_directions;
+    std::vector<int> _null_directions;
+
+    const std::vector<TExpr> _partition_exprs;

Review Comment:
   This header still contains unresolved merge-conflict markers (e.g. `<<<<<<< 
HEAD`) and the member declarations around them are incomplete/ambiguous. Please 
resolve the conflict and ensure the final `_partition_exprs` type matches its 
usage (it is assigned in `StreamingAggOperatorX::update_operator`, so it cannot 
be `const`).
   ```suggestion
       // For sort limit
       bool _do_sort_limit = false;
       int64_t _sort_limit = -1;
       std::vector<int> _order_directions;
       std::vector<int> _null_directions;
   
       std::vector<TExpr> _partition_exprs;
   ```



##########
be/src/vec/exec/scan/scanner.cpp:
##########
@@ -79,8 +79,39 @@ Status Scanner::init(RuntimeState* state, const 
VExprContextSPtrs& conjuncts) {
 Status Scanner::get_block_after_projects(RuntimeState* state, 
vectorized::Block* block, bool* eos) {
     auto& row_descriptor = _local_state->_parent->row_descriptor();
     if (_output_row_descriptor) {
-        
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
-        RETURN_IF_ERROR(get_block(state, &_origin_block, eos));
+        if (_alreay_eos) {
+            *eos = true;
+            _padding_block.swap(_origin_block);
+        } else {
+            
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+            const auto min_batch_size = std::max(state->batch_size() / 2, 1);

Review Comment:
   `get_block_after_projects` reads `*eos` in the `while (_padding_block.rows() 
< min_batch_size && !*eos)` condition before this method has set it. This 
relies on all callers initializing `*eos`, which is easy to violate and could 
lead to undefined behavior or skipping the first `get_block()` call. Initialize 
`*eos` to `false` before the loop (or restructure the loop to fetch before 
checking `*eos`).
   ```suggestion
               const auto min_batch_size = std::max(state->batch_size() / 2, 1);
               *eos = false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to