Copilot commented on code in PR #60812:
URL: https://github.com/apache/doris/pull/60812#discussion_r2847233869


##########
be/src/runtime/task_execution_context.cpp:
##########
@@ -19,22 +19,45 @@
 
 #include <glog/logging.h>
 
-#include <condition_variable>
-
 namespace doris {
-void TaskExecutionContext::ref_task_execution_ctx() {
-    ++_has_task_execution_ctx_ref_count;
+
+void TaskExecutionContext::init_sentinel() {
+    _rerun_wait_ctx = std::make_shared<RerunWaitContext>();
+    auto ctx = _rerun_wait_ctx; // shared_ptr copy captured by deleter
+    _sentinel = std::shared_ptr<void>(
+            reinterpret_cast<void*>(1), // dummy pointer, does not manage 
actual resource
+            [ctx](void*) {
+                std::lock_guard<std::mutex> lk(ctx->mtx);
+                ctx->sentinel_destroyed = true;
+                ctx->cv.notify_all();
+            });
 }
 
-void TaskExecutionContext::unref_task_execution_ctx() {
-    --_has_task_execution_ctx_ref_count;
-    if (_has_task_execution_ctx_ref_count == 0) {
-        _notify_cv.notify_all();
+bool TaskExecutionContext::wait_for_sentinel_destruction(std::function<bool()> 
is_cancelled) {
+    if (!_rerun_wait_ctx) {
+        return true;
+    }
+    std::unique_lock<std::mutex> lk(_rerun_wait_ctx->mtx);
+    while (!_rerun_wait_ctx->sentinel_destroyed) {
+        LOG(WARNING) << "Waiting for sentinel destruction";

Review Comment:
   This LOG(WARNING) will be emitted every second while waiting for sentinel 
destruction. Consider using LOG(INFO) or adding a counter to log only 
periodically (e.g., every 10 iterations) to reduce log spam during long waits.
   ```suggestion
       int wait_iterations = 0;
       while (!_rerun_wait_ctx->sentinel_destroyed) {
           if ((wait_iterations++ % 10) == 0) {
               LOG(INFO) << "Waiting for sentinel destruction";
           }
   ```



##########
gensrc/proto/internal_service.proto:
##########
@@ -75,11 +75,10 @@ message PTransmitRecCTEBlockResult {
 
 message PRerunFragmentParams {
     enum Opcode {
-    wait = 1;    // wait fragment execute done
-    release = 2; // release current round's resource
-    rebuild = 3; // rebuild next round pipeline tasks
-    submit = 4;  // submit tasks to execute
-    close = 5;   // close fragment
+    wait_for_close = 1;       // wait for PFC close
+    wait_for_destroy = 2;     // wait for external thread finished and destroy 
PFC
+    recreate_and_submit = 3;  // recreate PFC from saved params + submit
+    final_close = 4;          // close fragment (final round)

Review Comment:
   This is a breaking protocol change. The enum values have been renumbered 
(wait=1 is now wait_for_close=1, but they have completely different semantics). 
This means BE nodes running the new code cannot communicate with BE nodes 
running the old code for recursive CTE operations. During a rolling upgrade, if 
the FE sends rerun_fragment requests to a mix of old and new BE nodes, the old 
BE nodes will misinterpret the commands and execute incorrect operations.
   
   Consider either:
   1. Adding new enum values (6, 7, 8, 9) for the new protocol while keeping 
old values deprecated
   2. Adding version checking to ensure all nodes are upgraded before enabling 
this feature
   3. Documenting that this requires a coordinated upgrade of all BE nodes
   ```suggestion
           // Legacy opcodes (old protocol). These numeric values are kept for
           // backward compatibility and must not be reused with new semantics.
           // New code should avoid using these and prefer the new opcodes 
below.
           wait = 1;
           wait_for_destroy_legacy = 2;
           recreate_legacy = 3;
           final_close_legacy = 4;
   
           // New opcodes (new protocol). These use distinct numeric values to
           // avoid collisions with the legacy protocol during rolling upgrades.
           wait_for_close = 6;       // wait for PFC close
           wait_for_destroy = 7;     // wait for external thread finished and 
destroy PFC
           recreate_and_submit = 8;  // recreate PFC from saved params + submit
           final_close = 9;          // close fragment (final round)
   ```



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1450,33 +1457,104 @@ Status FragmentMgr::transmit_rec_cte_block(
 
 Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
                                    PRerunFragmentParams_Opcode stage) {
-    if (auto q_ctx = get_query_ctx(query_id)) {
+    if (stage == PRerunFragmentParams::wait_for_close) {
+        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+        if (!fragment_ctx) {
+            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
+                                    print_id(query_id), fragment);
+        }
+        SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx());
+        RETURN_IF_ERROR(fragment_ctx->wait_close(false));
+        fragment_ctx->release_resource();
+        return Status::OK();
+    } else if (stage == PRerunFragmentParams::wait_for_destroy) {
+        auto fragment_ctx = _pipeline_map.find({query_id, fragment});
+        if (!fragment_ctx) {
+            return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
+                                    print_id(query_id), fragment);
+        }
+        SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx());
+        // Wait for all external sentinel copies to be released
+        bool ok = fragment_ctx->wait_for_sentinel_destruction(
+                [&fragment_ctx]() { return 
fragment_ctx->get_query_ctx()->is_cancelled(); });
+        if (!ok) {
+            return Status::Cancelled("Query has been cancelled while waiting 
for sentinel");
+        }
+        remove_pipeline_context({query_id, fragment});
+        return Status::OK();
+    } else if (stage == PRerunFragmentParams::recreate_and_submit) {
+        auto q_ctx = get_query_ctx(query_id);
+        if (!q_ctx) {
+            return Status::NotFound(
+                    "rerun_fragment: Query context (query-id: {}) not found, 
maybe finished",
+                    print_id(query_id));
+        }
         SCOPED_ATTACH_TASK(q_ctx.get());
+        RerunableFragmentInfo info;
+        {
+            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+            auto it = _rerunnable_params_map.find({query_id, fragment});
+            if (it == _rerunnable_params_map.end()) {
+                return Status::NotFound(
+                        "Rerunnable params (query-id: {}, fragment-id: {}) not 
found",
+                        print_id(query_id), fragment);
+            }
+            info = it->second;
+        }
+
+        auto context = std::make_shared<pipeline::PipelineFragmentContext>(
+                q_ctx->query_id(), info.params, q_ctx, _exec_env, 
info.finish_callback,
+                [this](const ReportStatusRequest& req, auto&& ctx) {
+                    return this->trigger_pipeline_context_report(req, 
std::move(ctx));
+                });
+
+        Status prepare_st = Status::OK();
+        ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = 
context->prepare(_thread_pool.get()),
+                                         prepare_st);
+        if (!prepare_st.ok()) {
+            q_ctx->cancel(prepare_st, info.params.fragment_id);
+            return prepare_st;
+        }
+
+        // Update fragment executing count (remove_pipeline_context 
decremented it)
+        {
+            int64_t now = duration_cast<std::chrono::milliseconds>(
+                                  
std::chrono::system_clock::now().time_since_epoch())
+                                  .count();
+            g_fragment_executing_count << 1;
+            g_fragment_last_active_time.set_value(now);
+        }
+
+        // Insert new PFC into _pipeline_map (old one was removed)
+        _pipeline_map.insert({info.params.query_id, info.params.fragment_id}, 
context);
+
+        // Update QueryContext mapping (must support overwrite)
+        q_ctx->set_pipeline_context(info.params.fragment_id, context);
+
+        RETURN_IF_ERROR(context->submit());
+        return Status::OK();
+
+    } else if (stage == PRerunFragmentParams::final_close) {
         auto fragment_ctx = _pipeline_map.find({query_id, fragment});
         if (!fragment_ctx) {
             return Status::NotFound("Fragment context (query-id: {}, 
fragment-id: {}) not found",
                                     print_id(query_id), fragment);
         }
+        SCOPED_ATTACH_TASK(fragment_ctx->get_query_ctx());
 
-        if (stage == PRerunFragmentParams::wait) {
-            return fragment_ctx->wait_close(false);
-        } else if (stage == PRerunFragmentParams::release) {
-            return fragment_ctx->set_to_rerun();
-        } else if (stage == PRerunFragmentParams::rebuild) {
-            return fragment_ctx->rebuild(_thread_pool.get());
-        } else if (stage == PRerunFragmentParams::submit) {
-            return fragment_ctx->submit();
-        } else if (stage == PRerunFragmentParams::close) {
-            return fragment_ctx->wait_close(true);
-        } else {
-            return Status::InvalidArgument("Unknown rerun fragment opcode: 
{}", stage);
+        // wait_close(true) internally does send_report + 
remove_pipeline_context
+        RETURN_IF_ERROR(fragment_ctx->wait_close(true));
+
+        // Clean up saved params
+        {
+            std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+            _rerunnable_params_map.erase({query_id, fragment});

Review Comment:
   Memory leak: If a recursive CTE query is cancelled or errors out before 
reaching the final_close stage, the entry in _rerunnable_params_map will never 
be cleaned up. This map stores a full copy of TPipelineFragmentParams, 
TPipelineFragmentParamsList, the callback, and a shared_ptr to QueryContext, 
which can consume significant memory.
   
   Add cleanup logic in cancel_query or remove_query_context to iterate through 
_rerunnable_params_map and erase entries matching the cancelled query_id. 
Alternatively, consider using query_id as part of the map key and cleaning up 
all fragments for a query when the query is cancelled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to