Copilot commented on code in PR #61130:
URL: https://github.com/apache/doris/pull/61130#discussion_r2902875823


##########
be/src/runtime/query_context.h:
##########
@@ -394,6 +395,10 @@ class QueryContext : public 
std::enable_shared_from_this<QueryContext> {
     std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> 
_cte_scan;
     std::mutex _cte_scan_lock;
 
+    // for represent the rf's fragment execution round number of recursive cte
+    std::unordered_map<int, uint32_t> _filter_id_to_stage; // filter id -> 
stage number
+    std::mutex __filter_id_to_stage_mtx;
+

Review Comment:
   `__filter_id_to_stage_mtx` begins with a double underscore, which is a 
reserved identifier in C++ and can lead to undefined behavior. Rename it to a 
non-reserved identifier (e.g., `_filter_id_to_stage_mtx`) and update all uses.



##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -179,9 +179,16 @@ void PipelineFragmentContext::cancel(const Status reason) {
     {
         std::lock_guard<std::mutex> l(_task_mutex);
         if (_closed_tasks >= _total_tasks) {
+            if (_need_notify_close) {
+                // if fragment cancelled and waiting for notify to close, need 
to remove from fragment mgr
+                _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+                _need_notify_close = false;
+            }
             // All tasks in this PipelineXFragmentContext already closed.
             return;
         }
+        // make fragment release by self after cancel
+        _need_notify_close = false;

Review Comment:
   In `cancel()`, calling `fragment_mgr()->remove_pipeline_context()` when 
`_closed_tasks >= _total_tasks` can cause the fragment to be removed multiple 
times (e.g. if a rerun path already removed it). `remove_pipeline_context()` 
unconditionally decrements `g_fragment_executing_count`, so double-removal will 
corrupt metrics. Consider making removal idempotent (only update metrics if the 
context existed) or ensuring this path cannot run after an earlier removal.



##########
be/src/exec/pipeline/pipeline_fragment_context.h:
##########
@@ -128,11 +130,24 @@ class PipelineFragmentContext : public 
TaskExecutionContext {
     std::string get_load_error_url();
     std::string get_first_error_msg();
 
-    Status wait_close(bool close);
-    Status rebuild(ThreadPool* thread_pool);
-    Status set_to_rerun();
+    std::set<int> get_deregister_runtime_filter() const;
 
-    bool need_notify_close() const { return _need_notify_close; }
+    Status listen_wait_close(const std::shared_ptr<brpc::ClosureGuard>& guard,
+                             bool need_send_report_on_destruction) {
+        if (_wait_close_guard) {
+            return Status::InternalError("Already listening wait close");
+        }
+        if (!_need_notify_close) {
+            return Status::InternalError("Not need to listen wait close");
+        }
+        if (need_send_report_on_destruction) {
+            _need_notify_close = false;
+            return send_report(true);
+        } else {
+            _wait_close_guard = guard;
+        }
+        return Status::OK();

Review Comment:
   `listen_wait_close(..., /*need_send_report_on_destruction=*/true)` 
immediately calls `send_report(true)` and does not retain the `ClosureGuard`, 
so the rerun_fragment RPC can return before tasks actually close. This breaks 
the intended "wait for close" semantics (and can lead to double report / double 
remove when the fragment later closes). Consider storing the guard and 
deferring report/cleanup until `_close_fragment_instance()` (or otherwise 
ensuring the RPC only completes after close).



##########
be/src/runtime/query_context.h:
##########
@@ -410,7 +415,28 @@ class QueryContext : public 
std::enable_shared_from_this<QueryContext> {
     timespec get_query_arrival_timestamp() const { return 
this->_query_arrival_timestamp; }
     QuerySource get_query_source() const { return this->_query_source; }
 
-    const TQueryOptions get_query_options() const { return _query_options; }
+    TQueryOptions get_query_options() const { return _query_options; }
+
+    uint32_t get_stage(int filter_id) {
+        std::lock_guard<std::mutex> lock(__filter_id_to_stage_mtx);
+        return _filter_id_to_stage[filter_id];

Review Comment:
   `get_stage()` uses `_filter_id_to_stage[filter_id]`, which inserts a new 
entry on read. Since this is called on RPC paths (e.g. apply/merge/sync), it 
can grow the map due to unexpected/invalid filter_ids and also mutates state in 
what looks like a getter. Prefer `find()` and return a default stage (e.g. 0) 
without insertion unless the filter_id is explicitly registered.
   ```suggestion
           auto it = _filter_id_to_stage.find(filter_id);
           if (it == _filter_id_to_stage.end()) {
               return 0;
           }
           return it->second;
   ```



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1460,35 +1503,96 @@ Status FragmentMgr::transmit_rec_cte_block(
     }
 }
 
-Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& 
guard,
+                                   const TUniqueId& query_id, int fragment,
                                    PRerunFragmentParams_Opcode stage) {
-    if (auto q_ctx = get_query_ctx(query_id)) {
-        SCOPED_ATTACH_TASK(q_ctx.get());
+    if (stage == PRerunFragmentParams::wait_for_destroy ||
+        stage == PRerunFragmentParams::final_close) {
         auto fragment_ctx = _pipeline_map.find({query_id, fragment});
         if (!fragment_ctx) {
             return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
                                     print_id(query_id), fragment);
         }
 
-        if (stage == PRerunFragmentParams::wait) {
-            return fragment_ctx->wait_close(false);
-        } else if (stage == PRerunFragmentParams::release) {
-            return fragment_ctx->set_to_rerun();
-        } else if (stage == PRerunFragmentParams::rebuild) {
-            return fragment_ctx->rebuild(_thread_pool.get());
-        } else if (stage == PRerunFragmentParams::submit) {
-            return fragment_ctx->submit();
-        } else if (stage == PRerunFragmentParams::close) {
-            return fragment_ctx->wait_close(true);
-        } else {
-            return Status::InvalidArgument("Unknown rerun fragment opcode: 
{}", stage);
+        if (stage == PRerunFragmentParams::wait_for_destroy) {
+            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+            auto it = _rerunnable_params_map.find({query_id, fragment});
+            if (it == _rerunnable_params_map.end()) {
+                auto st = fragment_ctx->listen_wait_close(guard, true);
+                if (!st.ok()) {
+                    LOG(WARNING) << fmt::format(
+                            "wait_for_destroy fragment context (query-id: {}, 
fragment-id: "

Review Comment:
   In the `wait_for_destroy` path, `_rerunnable_params_lock` is held while 
calling `fragment_ctx->listen_wait_close(...)` (which can call into 
`send_report()` and other subsystems). This increases the risk of lock 
inversion / long critical sections. Capture the NotFound decision under the 
lock, then release the lock before invoking methods on `fragment_ctx`.



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1460,35 +1503,96 @@ Status FragmentMgr::transmit_rec_cte_block(
     }
 }
 
-Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>& 
guard,
+                                   const TUniqueId& query_id, int fragment,
                                    PRerunFragmentParams_Opcode stage) {
-    if (auto q_ctx = get_query_ctx(query_id)) {
-        SCOPED_ATTACH_TASK(q_ctx.get());
+    if (stage == PRerunFragmentParams::wait_for_destroy ||
+        stage == PRerunFragmentParams::final_close) {
         auto fragment_ctx = _pipeline_map.find({query_id, fragment});
         if (!fragment_ctx) {
             return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
                                     print_id(query_id), fragment);
         }
 
-        if (stage == PRerunFragmentParams::wait) {
-            return fragment_ctx->wait_close(false);
-        } else if (stage == PRerunFragmentParams::release) {
-            return fragment_ctx->set_to_rerun();
-        } else if (stage == PRerunFragmentParams::rebuild) {
-            return fragment_ctx->rebuild(_thread_pool.get());
-        } else if (stage == PRerunFragmentParams::submit) {
-            return fragment_ctx->submit();
-        } else if (stage == PRerunFragmentParams::close) {
-            return fragment_ctx->wait_close(true);
-        } else {
-            return Status::InvalidArgument("Unknown rerun fragment opcode: 
{}", stage);
+        if (stage == PRerunFragmentParams::wait_for_destroy) {
+            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+            auto it = _rerunnable_params_map.find({query_id, fragment});
+            if (it == _rerunnable_params_map.end()) {
+                auto st = fragment_ctx->listen_wait_close(guard, true);
+                if (!st.ok()) {
+                    LOG(WARNING) << fmt::format(
+                            "wait_for_destroy fragment context (query-id: {}, 
fragment-id: "
+                            "{}) failed: {}",
+                            print_id(query_id), fragment, st.to_string());
+                }
+                return Status::NotFound(
+                        "Rerunnable params (query-id: {}, fragment-id: {}) not 
found",
+                        print_id(query_id), fragment);
+            }
+
+            it->second.deregister_runtime_filter_ids.merge(
+                    fragment_ctx->get_deregister_runtime_filter());
+        }
+
+        auto* query_ctx = fragment_ctx->get_query_ctx();
+        SCOPED_ATTACH_TASK(query_ctx);
+        RETURN_IF_ERROR(
+                fragment_ctx->listen_wait_close(guard, stage == 
PRerunFragmentParams::final_close));
+        remove_pipeline_context({query_id, fragment});

Review Comment:
   `final_close` currently calls `listen_wait_close(..., true)` and then 
immediately `remove_pipeline_context(...)`. Since `listen_wait_close` (with 
`true`) does not keep the `ClosureGuard`, the RPC may return before tasks are 
closed; additionally, the fragment may later remove itself again in 
`_close_fragment_instance()` after `_need_notify_close` is set false, causing a 
second `remove_pipeline_context()` and incorrect metrics. Consider making 
`final_close` actually wait for close via the guard (like `wait_for_destroy`) 
and avoid removing the context until close is complete (or make removal 
idempotent).
   ```suggestion
           // For both wait_for_destroy and final_close, we need to actually 
wait for close
           // via the guard, so do not pass 'true' here (which would release 
the ClosureGuard
           // early). The 'true' case is handled only in the special-cased 
branch above.
           RETURN_IF_ERROR(fragment_ctx->listen_wait_close(guard, false));
           // Only remove the pipeline context for wait_for_destroy; for 
final_close, the
           // fragment will remove itself on close completion to avoid double 
removal and
           // incorrect metrics.
           if (stage == PRerunFragmentParams::wait_for_destroy) {
               remove_pipeline_context({query_id, fragment});
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to