github-actions[bot] commented on code in PR #62190:
URL: https://github.com/apache/doris/pull/62190#discussion_r3048925372


##########
be/src/exec/pipeline/dependency.cpp:
##########
@@ -62,25 +62,42 @@ void 
Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
     _blocked_task.push_back(task);
 }
 
-void Dependency::set_ready() {
+void Dependency::set_ready() noexcept {
     if (_ready) {
         return;
     }
-    std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
-    {
-        std::unique_lock<std::mutex> lc(_task_lock);
-        if (_ready) {
-            return;
-        }
-        _watcher.stop();
-        _ready = true;
-        local_block_task.swap(_blocked_task);
-    }
-    for (auto task : local_block_task) {
-        if (auto t = task.lock()) {
+    try {
+        std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
+        {
             std::unique_lock<std::mutex> lc(_task_lock);
-            THROW_IF_ERROR(t->wake_up(this, lc));
+            if (_ready) {
+                return;
+            }
+            _watcher.stop();
+            _ready = true;
+            local_block_task.swap(_blocked_task);
+        }
+        for (const auto& task : local_block_task) {
+            if (auto t = task.lock()) {
+                std::unique_lock<std::mutex> lc(_task_lock);
+                auto st = t->wake_up(this, lc);
+                if (!st.ok()) {

Review Comment:
   `PipelineTask::wake_up()` is not an atomic "enqueue me" operation. It first 
clears `_blocked_dep` and transitions the task to `RUNNABLE`, and only then 
calls `submit()` (`pipeline_task.cpp:1047-1055`). If `submit()` returns non-OK 
here, this new branch only cancels the fragment and returns. The task has 
already been removed from `_blocked_task`, is no longer blocked on this 
dependency, and was never inserted into the scheduler queue, so no worker 
thread will ever call `close_task()` / `decrement_running_task()` for it. 
`PipelineFragmentContext::cancel()` only calls `unblock_all_dependencies()`, 
which does not repair that stranded-task state. This turns a scheduler failure 
into a potential permanent hang/leaked running-task count.



##########
be/src/exec/pipeline/dependency.cpp:
##########
@@ -62,25 +62,42 @@ void 
Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
     _blocked_task.push_back(task);
 }
 
-void Dependency::set_ready() {
+void Dependency::set_ready() noexcept {
     if (_ready) {
         return;
     }
-    std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
-    {
-        std::unique_lock<std::mutex> lc(_task_lock);
-        if (_ready) {
-            return;
-        }
-        _watcher.stop();
-        _ready = true;
-        local_block_task.swap(_blocked_task);
-    }
-    for (auto task : local_block_task) {
-        if (auto t = task.lock()) {
+    try {
+        std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
+        {
             std::unique_lock<std::mutex> lc(_task_lock);
-            THROW_IF_ERROR(t->wake_up(this, lc));
+            if (_ready) {
+                return;
+            }
+            _watcher.stop();
+            _ready = true;
+            local_block_task.swap(_blocked_task);
+        }
+        for (const auto& task : local_block_task) {
+            if (auto t = task.lock()) {
+                std::unique_lock<std::mutex> lc(_task_lock);
+                auto st = t->wake_up(this, lc);
+                if (!st.ok()) {
+                    LOG(WARNING) << "Dependency::set_ready(): failed to 
wake_up task, cancelling "
+                                    "query. dep="
+                                 << _name << ", task=" << t->debug_string() << 
", status=" << st;
+                    if (auto frag = t->fragment_context().lock()) {
+                        frag->cancel(Status::InternalError(
+                                "wake_up failed in Dependency::set_ready: {}", 
st.to_string()));
+                    }
+                }
+            }
         }
+    } catch (const std::exception& e) {
+        // Recovery itself threw (e.g. OOM during logging/cancel). Best 
effort: nothing more we can do for this task.

Review Comment:
   This still aborts the BE process on the exact `std::exception` path the PR 
description says should be handled gracefully. In particular, `std::bad_alloc` 
during logging or `frag->cancel()` will now hit `CHECK(false)` and crash the 
process instead of degrading to a query-local failure. If the intent is truly 
`noexcept` + best-effort recovery, this branch needs to avoid process 
termination; otherwise the PR description should not claim graceful handling 
for standard exceptions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to