github-actions[bot] commented on code in PR #61130:
URL: https://github.com/apache/doris/pull/61130#discussion_r2902959715


##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -280,13 +280,14 @@ Status ConcurrentContextMap<Key, Value, 
ValueType>::apply_if_not_exists(
 }
 
 template <typename Key, typename Value, typename ValueType>
-void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
+bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
     auto id = get_map_id(query_id, _internal_map.size());
     {
         std::unique_lock lock(*_internal_map[id].first);
         auto& map = _internal_map[id].second;
-        map.erase(query_id);
+        return map.erase(query_id) != 0;
     }
+    return false;

Review Comment:
   **[HIGH] Unreachable code.** The `return map.erase(query_id) != 0;` at line 
288 always returns from within the braced block, so this `return false;` on 
line 290 is dead code. The compiler may warn about this. Consider removing the 
outer braces to make the control flow clearer:
   ```cpp
   bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) 
{
       auto id = get_map_id(query_id, _internal_map.size());
       std::unique_lock lock(*_internal_map[id].first);
       auto& map = _internal_map[id].second;
       return map.erase(query_id) != 0;
   }
   ```



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -179,9 +179,16 @@ void PipelineFragmentContext::cancel(const Status reason) {
     {
         std::lock_guard<std::mutex> l(_task_mutex);
         if (_closed_tasks >= _total_tasks) {
+            if (_need_notify_close) {
+                // if fragment cancelled and waiting for notify to close, need 
to remove from fragment mgr
+                _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});

Review Comment:
   **[NOTE] Cancel path and `_wait_close_guard`.** When `cancel()` runs and all 
tasks are already closed with `_need_notify_close` true, this code removes the 
PFC from `_pipeline_map` and sets `_need_notify_close = false`. However, if 
`listen_wait_close()` was previously called and stored a guard in 
`_wait_close_guard`, this path does NOT clear `_wait_close_guard`. The guard 
will only be released when the PFC is destroyed.
   
   This means the brpc response for `wait_for_destroy` is deferred until PFC 
destruction rather than being sent immediately on cancel. The caller 
(`RecCTESourceOperatorX`) will block on `brpc::Join()` until then. Consider 
explicitly setting `_wait_close_guard = nullptr` here to send the response 
promptly.



##########
be/src/runtime/query_context.h:
##########
@@ -410,7 +415,28 @@ class QueryContext : public 
std::enable_shared_from_this<QueryContext> {
     timespec get_query_arrival_timestamp() const { return 
this->_query_arrival_timestamp; }
     QuerySource get_query_source() const { return this->_query_source; }
 
-    const TQueryOptions get_query_options() const { return _query_options; }
+    TQueryOptions get_query_options() const { return _query_options; }
+
+    uint32_t get_stage(int filter_id) {
+        std::lock_guard<std::mutex> lock(_filter_id_to_stage_mtx);
+        return _filter_id_to_stage[filter_id];

Review Comment:
   **[LOW] Map insertion on read.** `operator[]` on `_filter_id_to_stage` 
inserts a default entry (0) for any filter_id that doesn't exist. For non-CTE 
queries, every runtime filter stage check will insert an entry. Consider using 
`find()` with a default return:
   ```cpp
   uint32_t get_stage(int filter_id) {
       std::lock_guard<std::mutex> lock(_filter_id_to_stage_mtx);
       auto it = _filter_id_to_stage.find(filter_id);
       return it != _filter_id_to_stage.end() ? it->second : 0;
   }
   ```



##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -372,6 +388,12 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
     butil::IOBuf request_attachment;
 
     PPublishFilterRequestV2 apply_request;
+    if (auto q_ctx = ctx.lock(); q_ctx) {
+        
apply_request.set_stage(q_ctx->get_stage(cnt_val.runtime_filter_desc.filter_id));

Review Comment:
   **[NOTE] Stage read outside `cnt_val.mtx`.** `_send_rf_to_target` is called 
after releasing `cnt_val.mtx` (from `merge()`), but here it reads the stage via 
`q_ctx->get_stage()`. This is safe because `get_stage()` has its own lock. 
However, there's a subtle race: between releasing `cnt_val.mtx` in `merge()` 
and reaching here, `GlobalMergeContext::reset()` could run (acquiring 
`cnt_val.mtx`), incrementing `stage`. This would cause the publish to carry the 
NEW stage number rather than the old one, potentially being rejected by 
receivers.
   
   In practice this is likely prevented by the sequential nature of the 
recursive CTE protocol (wait_for_destroy completes before reset_global_rf), but 
worth a comment.



##########
be/src/exec/pipeline/pipeline_fragment_context.h:
##########
@@ -128,11 +130,24 @@ class PipelineFragmentContext : public 
TaskExecutionContext {
     std::string get_load_error_url();
     std::string get_first_error_msg();
 
-    Status wait_close(bool close);
-    Status rebuild(ThreadPool* thread_pool);
-    Status set_to_rerun();
+    std::set<int> get_deregister_runtime_filter() const;
 
-    bool need_notify_close() const { return _need_notify_close; }
+    Status listen_wait_close(const std::shared_ptr<brpc::ClosureGuard>& guard,

Review Comment:
   **[MEDIUM] Thread safety concern.** `listen_wait_close()` reads/writes 
`_need_notify_close` and `_wait_close_guard` without holding `_task_mutex`. 
Meanwhile, `cancel()` modifies `_need_notify_close` under `_task_mutex`. This 
creates a TOCTOU race:
   
   1. `listen_wait_close()` checks `_need_notify_close` is true (line 140)
   2. `cancel()` runs, sets `_need_notify_close = false` (line 191 of .cpp)
   3. `listen_wait_close()` sets `_wait_close_guard = guard` (line 147)
   4. `_close_fragment_instance()` sees `!_need_notify_close`, enters the 
normal close path, never clears `_wait_close_guard`
   
   The brpc response would be deferred until PFC destruction, which may work in 
practice but is fragile. Consider acquiring `_task_mutex` in this method, or 
document why the race is benign.



##########
gensrc/proto/internal_service.proto:
##########
@@ -75,11 +75,10 @@ message PTransmitRecCTEBlockResult {
 
 message PRerunFragmentParams {
     enum Opcode {
-    wait = 1;    // wait fragment execute done
-    release = 2; // release current round's resource
-    rebuild = 3; // rebuild next round pipeline tasks
-    submit = 4;  // submit tasks to execute
-    close = 5;   // close fragment
+    wait_for_destroy = 1;     // deregister RF, destroy old PFC, async wait 
for tasks to close via brpc closure
+    rebuild = 2;              // rebuild next round pipeline tasks
+    submit = 3;               // submit tasks to execute
+    final_close = 4;          // async wait for tasks to close, send report, 
and clean up (last round)
     }

Review Comment:
   **[MEDIUM] Rolling upgrade compatibility.** The enum values have been 
renumbered: `rebuild` changed from 3→2, `submit` from 4→3. Since 
`RecCTESourceOperatorX::_fragments_to_reset` can target remote BEs (the 
addresses come from FE plan), during a rolling upgrade an old BE sending 
`rebuild=3` would be interpreted as `submit=3` by a new BE, causing incorrect 
behavior.
   
   Consider keeping the original numeric values to maintain wire compatibility:
   ```protobuf
   enum Opcode {
       wait_for_destroy = 1;
       rebuild = 3;          // keep original value
       submit = 4;           // keep original value
       final_close = 5;      // keep original value
   }
   ```
   Protobuf enum values don't need to be contiguous.



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1460,35 +1504,96 @@ Status FragmentMgr::transmit_rec_cte_block(
     }
 }
 
-Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& 
guard,
+                                   const TUniqueId& query_id, int fragment,
                                    PRerunFragmentParams_Opcode stage) {
-    if (auto q_ctx = get_query_ctx(query_id)) {
-        SCOPED_ATTACH_TASK(q_ctx.get());
+    if (stage == PRerunFragmentParams::wait_for_destroy ||
+        stage == PRerunFragmentParams::final_close) {
         auto fragment_ctx = _pipeline_map.find({query_id, fragment});
         if (!fragment_ctx) {
             return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
                                     print_id(query_id), fragment);
         }
 
-        if (stage == PRerunFragmentParams::wait) {
-            return fragment_ctx->wait_close(false);
-        } else if (stage == PRerunFragmentParams::release) {
-            return fragment_ctx->set_to_rerun();
-        } else if (stage == PRerunFragmentParams::rebuild) {
-            return fragment_ctx->rebuild(_thread_pool.get());
-        } else if (stage == PRerunFragmentParams::submit) {
-            return fragment_ctx->submit();
-        } else if (stage == PRerunFragmentParams::close) {
-            return fragment_ctx->wait_close(true);
-        } else {
-            return Status::InvalidArgument("Unknown rerun fragment opcode: 
{}", stage);
+        if (stage == PRerunFragmentParams::wait_for_destroy) {
+            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+            auto it = _rerunnable_params_map.find({query_id, fragment});
+            if (it == _rerunnable_params_map.end()) {
+                auto st = fragment_ctx->listen_wait_close(guard, true);
+                if (!st.ok()) {
+                    LOG(WARNING) << fmt::format(
+                            "wait_for_destroy fragment context (query-id: {}, 
fragment-id: "
+                            "{}) failed: {}",
+                            print_id(query_id), fragment, st.to_string());
+                }
+                return Status::NotFound(
+                        "Rerunnable params (query-id: {}, fragment-id: {}) not 
found",
+                        print_id(query_id), fragment);
+            }
+
+            it->second.deregister_runtime_filter_ids.merge(
+                    fragment_ctx->get_deregister_runtime_filter());
         }
+
+        auto* query_ctx = fragment_ctx->get_query_ctx();
+        SCOPED_ATTACH_TASK(query_ctx);
+        RETURN_IF_ERROR(
+                fragment_ctx->listen_wait_close(guard, stage == 
PRerunFragmentParams::final_close));
+        remove_pipeline_context({query_id, fragment});
+        return Status::OK();
+    } else if (stage == PRerunFragmentParams::rebuild) {
+        auto q_ctx = get_query_ctx(query_id);
+        if (!q_ctx) {
+            return Status::NotFound(
+                    "rerun_fragment: Query context (query-id: {}) not found, 
maybe finished",
+                    print_id(query_id));
+        }
+        SCOPED_ATTACH_TASK(q_ctx.get());
+        RerunableFragmentInfo info;
+        {
+            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+            auto it = _rerunnable_params_map.find({query_id, fragment});
+            if (it == _rerunnable_params_map.end()) {
+                return Status::NotFound("rebuild (query-id: {}, fragment-id: 
{}) not found",
+                                        print_id(query_id), fragment);
+            }
+            it->second.stage++;
+            RETURN_IF_ERROR(q_ctx->update_filters_stage(it->second.stage,
+                                                        
it->second.deregister_runtime_filter_ids));
+            info = it->second;

Review Comment:
   **[LOW] Large struct copy under lock.** `info = it->second` copies the 
entire `RerunableFragmentInfo`, which includes `TPipelineFragmentParams` (a 
potentially large thrift struct with the full plan) and 
`TPipelineFragmentParamsList`. This copy happens while holding 
`_rerunnable_params_lock`.
   
   Consider either:
   1. Moving the heavy parts out of the struct copy (e.g., store `params` as 
`shared_ptr<const TPipelineFragmentParams>`)
   2. Or extracting just the needed fields under the lock and releasing it 
before the PFC construction



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to