This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b9a1a5523b [Bug](pipeline) fix wake up early without terminate call 
(#61679)
2b9a1a5523b is described below

commit 2b9a1a5523ba837f28393ba96c4416acd6dd9239
Author: Pxl <[email protected]>
AuthorDate: Thu Mar 26 16:02:33 2026 +0800

    [Bug](pipeline) fix wake up early without terminate call (#61679)
    
    ```
    Thread A (正在执行 HashJoin Build Task)           Thread B (下游 pipeline 全部完成)
    ────────────────────────────────────────           
──────────────────────────────────
    Defer 开始执行:
      line 475: 读取 _wake_up_early → false
                                                       decrement_running_task() 
触发
                                                       make_all_runnable():
                                                         line 127: 
set_wake_up_early() → true
                                                         line 132: terminate()
                                                           → 
finish_dep.set_always_ready()
    
      line 481: else if (_eos && !_spilling &&
                  !_is_pending_finish())
                _is_pending_finish() = false ← 因为 always_ready!
      line 483: *done = true
      ← 注意: _sink->terminate() 从未被调用!
    
    close_task():
      task->close(OK):
    ```
    
    
    This pull request addresses a subtle race condition in the pipeline task
    execution logic and adds a targeted test to verify the fix. The main
    improvement ensures that operator termination is reliably triggered even
    in the presence of concurrent state changes, preventing operators from
    being left in an inconsistent state. Additionally, the pull request
    introduces a debug point for precise testing and includes minor test
    code cleanups.
    
    **Race condition fix and test coverage:**
    
    * Fixed a race condition in `PipelineTask::execute()` by reordering the
    logic to ensure `terminate()` is always called if required, even when
    another thread updates task state between checks. Added a debug point to
    simulate the race for testing.
    * Added a new test `TEST_TERMINATE_RACE_FIX` in `pipeline_task_test.cpp`
    that uses the debug point to reliably reproduce and verify the race
    condition fix, ensuring operator termination is not skipped.
    
    **Test infrastructure and cleanup:**
    
    * Included `debug_points.h` and `common/config.h` in
    `pipeline_task_test.cpp` to support debug point injection and
    configuration toggling for the new test.
    
[[1]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR21)
    
[[2]](diffhunk://#diff-262afd1bf43b83333335fec0b00b65ab0b0241315fd3ceb98c5b3d568971052fR36)
    * Minor formatting cleanup in an existing test case for readability.
---
 be/src/exec/pipeline/pipeline_task.cpp       | 32 +++++++++-
 be/test/exec/pipeline/pipeline_task_test.cpp | 91 ++++++++++++++++++++++++++++
 2 files changed, 120 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_task.cpp 
b/be/src/exec/pipeline/pipeline_task.cpp
index ffd1817683f..1b82530ebb8 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
 
         // If task is woke up early, we should terminate all operators, and 
this task could be closed immediately.
         if (_wake_up_early) {
-            terminate();
-            THROW_IF_ERROR(_root->terminate(_state));
-            THROW_IF_ERROR(_sink->terminate(_state));
             _eos = true;
             *done = true;
         } else if (_eos && !_spilling &&
                    (fragment_context->is_canceled() || !_is_pending_finish())) 
{
+            // Debug point for testing the race condition fix: inject 
set_wake_up_early() +
+            // terminate() here to simulate Thread B writing A then B between 
Thread A's two
+            // reads of _wake_up_early.
+            DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+                set_wake_up_early();
+                terminate();
+            });
             *done = true;
         }
+
+        // NOTE: The terminate() call is intentionally placed AFTER the 
_is_pending_finish() check
+        // above, not before. This ordering is critical to avoid a race 
condition:
+        //
+        // Pipeline::make_all_runnable() writes in this order:
+        //   (A) set_wake_up_early()  ->  (B) terminate() [sets 
finish_dep._always_ready]
+        //
+        // If we checked _wake_up_early (A) before _is_pending_finish() (B), 
there would be a
+        // window where Thread A reads _wake_up_early=false, then Thread B 
writes both A and B,
+        // then Thread A reads _is_pending_finish()=false (due to 
_always_ready). Thread A would
+        // then set *done=true without ever calling operator terminate(), 
causing close() to run
+        // on operators that were never properly terminated (e.g. 
RuntimeFilterProducer still in
+        // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+        //
+        // By reading _is_pending_finish() (B) before the second read of 
_wake_up_early (A),
+        // if Thread A observes B's effect (_always_ready=true), it is 
guaranteed to also observe
+        // A's effect (_wake_up_early=true) on this second read, ensuring 
terminate() is called.
+        if (_wake_up_early) {
+            terminate();
+            THROW_IF_ERROR(_root->terminate(_state));
+            THROW_IF_ERROR(_sink->terminate(_state));
+        }
     }};
     const auto query_id = _state->query_id();
     // If this task is already EOS and block is empty (which means we already 
output all blocks),
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp 
b/be/test/exec/pipeline/pipeline_task_test.cpp
index b47f00af89f..db61819c4da 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "common/config.h"
 #include "common/status.h"
 #include "exec/operator/operator.h"
 #include "exec/operator/spill_utils.h"
@@ -32,6 +33,7 @@
 #include "testutil/mock/mock_runtime_state.h"
 #include "testutil/mock/mock_thread_mem_tracker_mgr.h"
 #include "testutil/mock/mock_workload_group_mgr.h"
+#include "util/debug_points.h"
 
 namespace doris {
 
@@ -1534,4 +1536,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
     }
 }
 
+// Test for the race condition fix between _wake_up_early and 
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A) 
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A 
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads 
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling 
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So 
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect 
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate 
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after 
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second 
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    {
+        OperatorPtr source_op;
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->_exec_time_slice = 10'000'000'000ULL;
+    {
+        std::vector<TScanRangeParams> scan_range;
+        int sender_id = 0;
+        TDataSink tsink;
+        EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    }
+    _query_ctx->get_execution_dependency()->set_ready();
+
+    // Get the sink's finish dependency and block it to simulate a pending 
async operation
+    // (e.g. runtime filter size sync RPC in flight).
+    auto* sink_finish_dep =
+            
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+    EXPECT_NE(sink_finish_dep, nullptr);
+    sink_finish_dep->block();
+
+    // Drive the task to EOS so it will enter the Defer's pending-finish check.
+    task->_operators.front()->cast<DummyOperator>()._eos = true;
+    {
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        // EOS reached but still blocked on finish dependency: not done yet.
+        EXPECT_TRUE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+    }
+
+    // Now unblock the finish dependency (simulates the async op completing) 
and activate the
+    // debug point. The debug point fires inside the else-if branch — after 
_is_pending_finish()
+    // returns false but before the second _wake_up_early read — and calls 
set_wake_up_early() +
+    // terminate(). This precisely reproduces the race where Thread B's writes 
land between
+    // Thread A's two reads of _wake_up_early.
+    sink_finish_dep->set_ready();
+    config::enable_debug_points = true;
+    
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+    {
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_TRUE(task->_eos);
+        EXPECT_TRUE(done);
+        // The key assertion: even though the task took the else-if path (not 
the
+        // if(_wake_up_early) path), operator terminate() must have been 
called because the
+        // second read of _wake_up_early correctly observed the value set by 
the debug point.
+        EXPECT_TRUE(task->_wake_up_early);
+        
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
+        EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
+    }
+    DebugPoints::instance()->clear();
+    config::enable_debug_points = false;
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to