This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 153335925fd [fix](be) Avoid finalized pipeline task submit crash
(#64899)
153335925fd is described below
commit 153335925fd05f12283d786e07e56ec2974d399e
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 29 12:09:09 2026 +0800
[fix](be) Avoid finalized pipeline task submit crash (#64899)
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: A runtime filter dependency can wake a pipeline task
while another thread is closing or finalizing the same task.
`HybridTaskScheduler::submit()` synchronously calls
`PipelineTask::is_blockable()` before enqueueing, and `is_blockable()`
reads `_sink` and `_operators`. After terminal close/finalize starts
releasing those resources, a late submit can dereference cleared task
resources and crash.
This PR adds a small submit gate on `PipelineTask`. Terminal
close/finalize closes that gate under a dedicated lock, and
`HybridTaskScheduler::submit()` checks it under the same lock before
calling `is_blockable()`. The lock only covers the blockable check; it
does not cover the actual queue submit.
### Release note
None
### Check List (For Author)
- Test: Unit Test / Static check
- `./run-be-ut.sh --run
--filter=PipelineTaskTest.TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT`
- `build-support/check-format.sh`
- `CLANG_TIDY_BINARY=/mnt/disk6/common/ldb_toolchain_028/bin/clang-tidy
build-support/run-clang-tidy.sh --base upstream/master --build-dir
be/ut_build_ASAN`
- `git diff --check`
- Behavior changed: No
- Does this need documentation: No
---
be/src/exec/pipeline/pipeline_task.cpp | 9 +++
be/src/exec/pipeline/pipeline_task.h | 9 +++
be/src/exec/pipeline/task_scheduler.cpp | 10 ++-
be/test/exec/pipeline/pipeline_task_test.cpp | 102 +++++++++++++++++++++++++++
4 files changed, 129 insertions(+), 1 deletion(-)
diff --git a/be/src/exec/pipeline/pipeline_task.cpp
b/be/src/exec/pipeline/pipeline_task.cpp
index dfbb0955145..064e34fe2ca 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -322,6 +322,11 @@ bool PipelineTask::is_blockable() const {
_sink->is_blockable(_state);
}
+void PipelineTask::_stop_accepting_submit() {
+ std::unique_lock<std::mutex> lock(_blockable_check_lock);
+ _accept_submit = false;
+}
+
bool PipelineTask::_is_blocked() {
// `_dry_run = true` means we do not need data from source operator.
if (!_dry_run) {
@@ -887,6 +892,7 @@ Status PipelineTask::finalize() {
return Status::OK();
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
+ _stop_accepting_submit();
// Synchronize with unblock_all_dependencies() before clearing state used
by wake_up()->submit().
std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
RETURN_IF_ERROR(_state_transition(State::FINALIZED));
@@ -901,6 +907,9 @@ Status PipelineTask::finalize() {
}
Status PipelineTask::close(Status exec_status, bool close_sink) {
+ if (close_sink) {
+ _stop_accepting_submit();
+ }
int64_t close_ns = 0;
Status s;
{
diff --git a/be/src/exec/pipeline/pipeline_task.h
b/be/src/exec/pipeline/pipeline_task.h
index 2268c00a4c7..dbe5d57f23d 100644
--- a/be/src/exec/pipeline/pipeline_task.h
+++ b/be/src/exec/pipeline/pipeline_task.h
@@ -19,6 +19,7 @@
#include <cstdint>
#include <memory>
+#include <mutex>
#include <string>
#include <vector>
@@ -187,6 +188,10 @@ protected:
PipelineTask() : _index(0) {}
private:
+ friend class HybridTaskScheduler;
+
+ void _stop_accepting_submit();
+
// Whether this task is blocked before execution (FE 2-phase commit
trigger, runtime filters)
bool _wait_to_start();
// Whether this task is blocked during execution (read dependency, write
dependency)
@@ -276,6 +281,10 @@ private:
// call wake_up() and submit this task, so close()/finalize() must not
clear operator/shared
// state until forced unblocking finishes. wake_up() must not take this
lock.
std::mutex _dependency_lifecycle_lock;
+ // Guards _accept_submit and keeps HybridTaskScheduler::submit() from
reading _sink/_operators
+ // in is_blockable() while terminal close/finalize is closing the submit
gate.
+ std::mutex _blockable_check_lock;
+ bool _accept_submit = true;
std::atomic<bool> _running {false};
std::atomic<bool> _eos {false};
diff --git a/be/src/exec/pipeline/task_scheduler.cpp
b/be/src/exec/pipeline/task_scheduler.cpp
index a565d5d54eb..8e5150370ed 100644
--- a/be/src/exec/pipeline/task_scheduler.cpp
+++ b/be/src/exec/pipeline/task_scheduler.cpp
@@ -168,7 +168,15 @@ void TaskScheduler::stop() {
}
Status HybridTaskScheduler::submit(PipelineTaskSPtr task) {
- if (task->is_blockable()) {
+ bool blockable = false;
+ {
+ std::unique_lock<std::mutex>
blockable_check_lock(task->_blockable_check_lock);
+ if (!task->_accept_submit) {
+ return Status::OK();
+ }
+ blockable = task->is_blockable();
+ }
+ if (blockable) {
return _blocking_scheduler.submit(task);
} else {
return _simple_scheduler.submit(task);
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp
b/be/test/exec/pipeline/pipeline_task_test.cpp
index d6294825288..3c4adb63fc2 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include <atomic>
#include <chrono>
#include <functional>
#include <future>
@@ -113,6 +114,26 @@ public:
std::function<void(PipelineTaskSPtr)> on_submit;
};
+class CountingBlockableSinkOperator final : public
DataSinkOperatorX<DummySinkLocalState> {
+public:
+ CountingBlockableSinkOperator(int op_id, int node_id, int dest_id,
+ std::atomic<int>* blockable_checks)
+ : DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id),
+ _blockable_checks(blockable_checks) {}
+
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
+ return Status::OK();
+ }
+
+ bool is_blockable(RuntimeState* state) const override {
+ _blockable_checks->fetch_add(1, std::memory_order_relaxed);
+ return DataSinkOperatorX<DummySinkLocalState>::is_blockable(state);
+ }
+
+private:
+ std::atomic<int>* _blockable_checks;
+};
+
TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
@@ -662,6 +683,87 @@ TEST_F(PipelineTaskTest,
TEST_WAKE_UP_SUBMIT_PROTECTED_FROM_FINALIZE) {
EXPECT_TRUE(task->_operators.empty());
}
+TEST_F(PipelineTaskTest,
TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ std::atomic<int> blockable_checks = 0;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new CountingBlockableSinkOperator(op_id, node_id, dest_id,
&blockable_checks));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ EXPECT_TRUE(task->close(Status::OK()).ok());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINISHED);
+ EXPECT_NE(task->_sink, nullptr);
+ EXPECT_FALSE(task->_operators.empty());
+ EXPECT_FALSE(task->_accept_submit);
+
+ HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+ EXPECT_TRUE(scheduler.submit(task).ok());
+ EXPECT_EQ(blockable_checks.load(std::memory_order_relaxed), 0);
+ scheduler.stop();
+ EXPECT_TRUE(task->finalize().ok());
+}
+
+TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ EXPECT_TRUE(task->close(Status::OK()).ok());
+ EXPECT_TRUE(task->finalize().ok());
+ EXPECT_EQ(task->_sink, nullptr);
+
+ HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+ EXPECT_TRUE(scheduler.submit(task).ok());
+ scheduler.stop();
+}
+
TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
auto num_instances = 1;
auto pip_id = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]