This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 0c2f5799276 [fix](be) Avoid finalized pipeline task submit crash 
(#64953)
0c2f5799276 is described below

commit 0c2f579927673120195fb48c828094d0824b0730
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 30 14:08:24 2026 +0800

    [fix](be) Avoid finalized pipeline task submit crash (#64953)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: #64899
    
    Problem Summary: A runtime filter dependency can wake a pipeline task
    while another thread is closing or finalizing the same task.
    HybridTaskScheduler::submit() synchronously calls
    PipelineTask::is_blockable() before enqueueing the task, and
    is_blockable() reads _sink and _operators. After close/finalize starts
    releasing terminal task resources, a late submit can therefore
    dereference cleared task resources and crash.
    
    This change adds a small submit gate on PipelineTask. Terminal
    close/finalize closes that gate under a dedicated lock, and
    HybridTaskScheduler::submit() checks the gate under the same lock before
    calling is_blockable(). The lock only covers the blockable check and
    does not extend to the actual queue submit.
    
    (cherry picked from commit 1dd3207f04a7311ddec655990fca58d68b0b0b9d)
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Unit Test / Static check
        - build-support/clang-format.sh
        - build-support/check-format.sh
        - git diff --check
    - Reused the compile command from be/ut_build_ASAN/compile_commands.json
    to run -fsyntax-only for be/test/exec/pipeline/pipeline_task_test.cpp.
    - Attempted: ./run-be-ut.sh --run
    
--filter=PipelineTaskTest.TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE:PipelineTaskTest.TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT.
    It was interrupted after confirming the changed pipeline_task.cpp and
    task_scheduler.cpp objects compiled, because branch-4.1 BE UT uses one
    large doris_be_test binary and the filtered run expanded into a near
    full UT build.
    - Attempted: ninja -C be/ut_build_ASAN -j1
    test/CMakeFiles/doris_be_test.dir/exec/pipeline/pipeline_task_test.cpp.o.
    It was stopped after it began rebuilding a large OpenBLAS dependency
    chain; the test-file compile error was verified instead with the
    compile_commands.json -fsyntax-only command above.
    - Behavior changed: No
    - Does this need documentation: No
---
 be/src/exec/pipeline/pipeline_task.cpp       |   9 +++
 be/src/exec/pipeline/pipeline_task.h         |   9 +++
 be/src/exec/pipeline/task_scheduler.cpp      |  10 ++-
 be/test/exec/pipeline/pipeline_task_test.cpp | 104 +++++++++++++++++++++++++++
 4 files changed, 131 insertions(+), 1 deletion(-)

diff --git a/be/src/exec/pipeline/pipeline_task.cpp 
b/be/src/exec/pipeline/pipeline_task.cpp
index 19da7499872..e05e94e2d34 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -325,6 +325,11 @@ bool PipelineTask::is_blockable() const {
            _sink->is_blockable(_state);
 }
 
+void PipelineTask::_stop_accepting_submit() {
+    std::unique_lock<std::mutex> lock(_blockable_check_lock);
+    _accept_submit = false;
+}
+
 bool PipelineTask::_is_blocked() {
     // `_dry_run = true` means we do not need data from source operator.
     if (!_dry_run) {
@@ -881,6 +886,7 @@ Status PipelineTask::finalize() {
         return Status::OK();
     }
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
+    _stop_accepting_submit();
     RETURN_IF_ERROR(_state_transition(State::FINALIZED));
     std::unique_lock<std::mutex> lc(_dependency_lock);
     _sink_shared_state.reset();
@@ -894,6 +900,9 @@ Status PipelineTask::finalize() {
 }
 
 Status PipelineTask::close(Status exec_status, bool close_sink) {
+    if (close_sink) {
+        _stop_accepting_submit();
+    }
     int64_t close_ns = 0;
     Status s;
     {
diff --git a/be/src/exec/pipeline/pipeline_task.h 
b/be/src/exec/pipeline/pipeline_task.h
index c5404daaf02..68d2195e559 100644
--- a/be/src/exec/pipeline/pipeline_task.h
+++ b/be/src/exec/pipeline/pipeline_task.h
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -183,6 +184,10 @@ protected:
     PipelineTask() : _index(0) {}
 
 private:
+    friend class HybridTaskScheduler;
+
+    void _stop_accepting_submit();
+
     // Whether this task is blocked before execution (FE 2-phase commit 
trigger, runtime filters)
     bool _wait_to_start();
     // Whether this task is blocked during execution (read dependency, write 
dependency)
@@ -268,6 +273,10 @@ private:
 
     Dependency* _memory_sufficient_dependency;
     std::mutex _dependency_lock;
+    // Guards _accept_submit and keeps HybridTaskScheduler::submit() from 
reading _sink/_operators
+    // in is_blockable() while terminal close/finalize is closing the submit 
gate.
+    std::mutex _blockable_check_lock;
+    bool _accept_submit = true;
 
     std::atomic<bool> _running {false};
     std::atomic<bool> _eos {false};
diff --git a/be/src/exec/pipeline/task_scheduler.cpp 
b/be/src/exec/pipeline/task_scheduler.cpp
index 20931c03aa0..fb6cf4e80f5 100644
--- a/be/src/exec/pipeline/task_scheduler.cpp
+++ b/be/src/exec/pipeline/task_scheduler.cpp
@@ -189,7 +189,15 @@ void TaskScheduler::stop() {
 }
 
 Status HybridTaskScheduler::submit(PipelineTaskSPtr task) {
-    if (task->is_blockable()) {
+    bool blockable = false;
+    {
+        std::unique_lock<std::mutex> 
blockable_check_lock(task->_blockable_check_lock);
+        if (!task->_accept_submit) {
+            return Status::OK();
+        }
+        blockable = task->is_blockable();
+    }
+    if (blockable) {
         return _blocking_scheduler.submit(task);
     } else {
         return _simple_scheduler.submit(task);
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp 
b/be/test/exec/pipeline/pipeline_task_test.cpp
index 08d84f4f8e2..29555a8d2ae 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,10 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include <atomic>
+#include <chrono>
+#include <thread>
+
 #include "common/config.h"
 #include "common/status.h"
 #include "exec/operator/operator.h"
@@ -26,6 +30,7 @@
 #include "exec/pipeline/dummy_task_queue.h"
 #include "exec/pipeline/pipeline.h"
 #include "exec/pipeline/pipeline_fragment_context.h"
+#include "exec/pipeline/task_scheduler.h"
 #include "exec/pipeline/thrift_builder.h"
 #include "exec/spill/spill_file.h"
 #include "runtime/exec_env.h"
@@ -94,6 +99,24 @@ private:
 template class OperatorX<DummyOperatorLocalState>;
 template class DataSinkOperatorX<DummySinkLocalState>;
 
+class CountingBlockableSinkOperator final : public 
DataSinkOperatorX<DummySinkLocalState> {
+public:
+    CountingBlockableSinkOperator(int op_id, int node_id, int dest_id,
+                                  std::atomic<int>* blockable_checks)
+            : DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id),
+              _blockable_checks(blockable_checks) {}
+
+    Status sink(RuntimeState* state, Block* in_block, bool eos) override { 
return Status::OK(); }
+
+    bool is_blockable(RuntimeState* state) const override {
+        _blockable_checks->fetch_add(1, std::memory_order_relaxed);
+        return DataSinkOperatorX<DummySinkLocalState>::is_blockable(state);
+    }
+
+private:
+    std::atomic<int>* _blockable_checks;
+};
+
 TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
     auto num_instances = 1;
     auto pip_id = 0;
@@ -519,6 +542,87 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
     }
 }
 
+TEST_F(PipelineTaskTest, 
TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    OperatorPtr source_op;
+    source_op.reset(new DummyOperator());
+    EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+    int op_id = 1;
+    int node_id = 2;
+    int dest_id = 3;
+    std::atomic<int> blockable_checks = 0;
+    DataSinkOperatorPtr sink_op;
+    sink_op.reset(new CountingBlockableSinkOperator(op_id, node_id, dest_id, 
&blockable_checks));
+    EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->_exec_time_slice = 10'000'000'000ULL;
+
+    std::vector<TScanRangeParams> scan_range;
+    int sender_id = 0;
+    TDataSink tsink;
+    EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    EXPECT_TRUE(task->close(Status::OK()).ok());
+    EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+    EXPECT_NE(task->_sink, nullptr);
+    EXPECT_FALSE(task->_operators.empty());
+    EXPECT_FALSE(task->_accept_submit);
+
+    HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+    EXPECT_TRUE(scheduler.submit(task).ok());
+    EXPECT_EQ(blockable_checks.load(std::memory_order_relaxed), 0);
+    scheduler.stop();
+    EXPECT_TRUE(task->finalize().ok());
+}
+
+TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    OperatorPtr source_op;
+    source_op.reset(new DummyOperator());
+    EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+    int op_id = 1;
+    int node_id = 2;
+    int dest_id = 3;
+    DataSinkOperatorPtr sink_op;
+    sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+    EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->_exec_time_slice = 10'000'000'000ULL;
+
+    std::vector<TScanRangeParams> scan_range;
+    int sender_id = 0;
+    TDataSink tsink;
+    EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    EXPECT_TRUE(task->close(Status::OK()).ok());
+    EXPECT_TRUE(task->finalize().ok());
+    EXPECT_EQ(task->_sink, nullptr);
+
+    HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+    EXPECT_TRUE(scheduler.submit(task).ok());
+    scheduler.stop();
+}
+
 TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
     auto num_instances = 1;
     auto pip_id = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to