This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fed20f06899 [fix](be) Prevent finalized pipeline task resubmission
(#62891)
fed20f06899 is described below
commit fed20f06899663d9a6e1812efa1f5b1b1716e246
Author: Pxl <[email protected]>
AuthorDate: Wed May 6 11:55:48 2026 +0800
[fix](be) Prevent finalized pipeline task resubmission (#62891)
Problem Summary: `Pipeline::make_all_runnable()` wakes upstream/related
pipeline tasks early by calling `set_wake_up_early()` and then
`unblock_all_dependencies()`. `unblock_all_dependencies()` marks every
dependency ready/always-ready, and `Dependency::set_ready()`
synchronously calls back into `PipelineTask::wake_up()` for blocked
tasks. That wake-up path may submit the task again, and
`HybridTaskScheduler::submit()` immediately inspects the task via
`is_blockable()`.
At the same time, once a task returns `done=true`,
`TaskScheduler::task_running_defer` calls `close_task()`, which runs
`close()` and `finalize()`. `finalize()` releases task-owned resources
such as `_sink`, `_operators`, shared states, and `_block`.
Before this fix, dependency pointer lifetime and forced-unblock/finalize
ordering were protected by separate synchronization concepts. That made
the ownership boundary unclear: forced unblocking needs the dependency
containers and raw `Dependency*` pointers to remain valid, and it also
needs task-owned operator/shared state to remain stable while
`set_ready()` may synchronously trigger `wake_up()->submit()`. If
close/finalize releases task resources while forced unblocking is still
firing wake-up callbacks, the delayed wake-up can submit or inspect a
task whose `_sink`/`_operators` have already been released, leading to
crashes or invalid task state/resource access.
This PR uses one clearly named `_dependency_lifecycle_lock` for the
task/dependency lifetime boundary:
- It protects the dependency containers and the raw `Dependency*`
pointers stored in them.
- `unblock_all_dependencies()` holds it while marking dependencies
ready. Since `Dependency::set_ready()` invokes `PipelineTask::wake_up()`
synchronously, wake-ups caused by forced unblocking complete before the
lock is released.
- `close()` takes the same lock only around the short `FINISHED` state
publication, so delayed wake-ups see a stable terminal transition.
- `finalize()` takes the same lock while publishing `FINALIZED` and
releasing task-owned operator/shared state.
- `wake_up()` itself does not take this lock, avoiding re-entrant
deadlock when `set_ready()` calls back into `wake_up()` while
`unblock_all_dependencies()` already holds the lock.
This lock intentionally only serializes forced unblocking with
close/finalize. A normal dependency can still call
`Dependency::set_ready()` from another thread and invoke `wake_up()`
without holding `_dependency_lifecycle_lock`, so the state transition
itself must also be race-safe. This PR therefore keeps
`_state_transition()` as a CAS/retry loop: each transition validates the
observed state and publishes the new state with
`compare_exchange_strong`. If a delayed `wake_up()` observes `BLOCKED`
but `close()/finalize()` publishes `FINISHED/FINALIZED` before the CAS,
the CAS fails, the wake-up retries, sees the terminal state, and treats
`FINISHED/FINALIZED -> RUNNABLE` as an atomic no-op. That prevents a
stale wake-up from overwriting terminal state back to `RUNNABLE` and
resubmitting a task whose resources were already released.
This does not introduce a broad scheduling lock. The long/meaningful
critical section is the same forced-unblock path that already needed
dependency lifetime protection; `close()` only adds a short
state-transition critical section, and `finalize()` already had to wait
before clearing the same state. The combined design makes the intended
lifecycle ordering explicit: forced unblocking keeps task resources
stable while firing synchronous wake-up callbacks, and the CAS state
machine protects delayed wake-ups from racing with terminal transitions
outside that forced-unblock path.
### Release note
None
### Check List (For Author)
- Test: Unit Test
- `./run-be-ut.sh --run --filter=PipelineTaskTest.*`
- `build-support/check-format.sh`
- `build-support/run-clang-tidy.sh --build-dir be/ut_build_ASAN` (failed
due to pre-existing `pipeline_task.cpp` `execute()` complexity warnings,
pre-existing `pipeline_task_test.cpp` `TestBody` complexity warnings,
and `jni-util.h` static_assert errors)
- Behavior changed: No
- Does this need documentation: No
---------
Co-authored-by: Copilot <[email protected]>
---
be/src/exec/pipeline/pipeline_task.cpp | 35 ++++++---
be/src/exec/pipeline/pipeline_task.h | 6 +-
be/test/exec/pipeline/pipeline_task_test.cpp | 105 +++++++++++++++++++++++++++
3 files changed, 135 insertions(+), 11 deletions(-)
diff --git a/be/src/exec/pipeline/pipeline_task.cpp
b/be/src/exec/pipeline/pipeline_task.cpp
index b98bdae81de..fe75d7b499d 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -154,7 +154,7 @@ Status PipelineTask::prepare(const
std::vector<TScanRangeParams>& scan_range, co
{
const auto& deps =
_state->get_local_state(_source->operator_id())->execution_dependencies();
- std::unique_lock<std::mutex> lc(_dependency_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
std::copy(deps.begin(), deps.end(),
std::inserter(_execution_dependencies,
_execution_dependencies.end()));
}
@@ -200,7 +200,7 @@ Status PipelineTask::_extract_dependencies() {
}
}
{
- std::unique_lock<std::mutex> lc(_dependency_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
read_dependencies.swap(_read_dependencies);
write_dependencies.swap(_write_dependencies);
finish_dependencies.swap(_finish_dependencies);
@@ -345,12 +345,18 @@ bool PipelineTask::_is_blocked() {
}
void PipelineTask::unblock_all_dependencies() {
- // We use a lock to assure all dependencies are not deconstructed here.
- std::unique_lock<std::mutex> lc(_dependency_lock);
+ // Keep dependency pointers and task-owned operator/shared state stable
because set_ready() may
+ // synchronously call wake_up() and submit this task.
+ std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
auto fragment = _fragment_context.lock();
if (!is_finalized() && fragment) {
try {
DCHECK(_wake_up_early || fragment->is_canceled());
+
DBUG_EXECUTE_IF("PipelineTask::unblock_all_dependencies.before_set_ready", {
+ if (dp->callback.has_value()) {
+ DBUG_RUN_CALLBACK();
+ }
+ });
std::ranges::for_each(_write_dependencies,
[&](Dependency* dep) {
dep->set_always_ready(); });
std::ranges::for_each(_finish_dependencies,
@@ -882,8 +888,9 @@ Status PipelineTask::finalize() {
return Status::OK();
}
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(fragment->get_query_ctx()->query_mem_tracker());
+ // Synchronize with unblock_all_dependencies() before clearing state used
by wake_up()->submit().
+ std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
RETURN_IF_ERROR(_state_transition(State::FINALIZED));
- std::unique_lock<std::mutex> lc(_dependency_lock);
_sink_shared_state.reset();
_op_shared_states.clear();
_shared_state_map.clear();
@@ -920,6 +927,8 @@ Status PipelineTask::close(Status exec_status, bool
close_sink) {
}
if (close_sink) {
+ // Synchronize FINISHED with forced unblocking so delayed wake_up()
sees a stable state.
+ std::unique_lock<std::mutex> lock(_dependency_lifecycle_lock);
RETURN_IF_ERROR(_state_transition(State::FINISHED));
}
return s;
@@ -939,7 +948,7 @@ std::string PipelineTask::debug_string() {
_index, _opened, _eos, _to_string(_exec_state), _dry_run,
_wake_up_early.load(),
_wake_by, _state_change_watcher.elapsed_time() /
NANOS_PER_SEC, _spilling,
is_running());
- std::unique_lock<std::mutex> lc(_dependency_lock);
+ std::unique_lock<std::mutex> lc(_dependency_lifecycle_lock);
auto* cur_blocked_dep = _blocked_dep;
auto fragment = _fragment_context.lock();
if (is_finalized() || !fragment) {
@@ -1076,12 +1085,18 @@ void PipelineTask::wake_up(Dependency* dep,
std::unique_lock<std::mutex>& /* dep
Status PipelineTask::_state_transition(State new_state) {
const auto& table =
_wake_up_early ? WAKE_UP_EARLY_LEGAL_STATE_TRANSITION :
LEGAL_STATE_TRANSITION;
- if (!table[(int)new_state].contains(_exec_state)) {
+ auto current_state = _exec_state.load();
+ if (!table[(int)new_state].contains(current_state)) {
return Status::InternalError(
- "Task state transition from {} to {} is not allowed! Task
info: {}",
- _to_string(_exec_state), _to_string(new_state),
debug_string());
+ "Task state transition from {} to {} is not allowed! Task:
query_id={}, "
+ "instance_id={}, id={}, pipeline={}, open={}, eos={},
dry_run={}, "
+ "wake_up_early={}, wake_by={}, spilling={}, running={}",
+ _to_string(current_state), _to_string(new_state),
print_id(_query_id),
+ print_id(_state->fragment_instance_id()), _index,
_pipeline_name, _opened,
+ _eos.load(), _dry_run, _wake_up_early.load(), _wake_by.load(),
_spilling.load(),
+ is_running());
}
- // FINISHED/FINALIZED → RUNNABLE is legal under wake_up_early (delayed
wake_up() arriving
+ // FINISHED/FINALIZED -> RUNNABLE is legal under wake_up_early (delayed
wake_up() arriving
// after the task already terminated), but we must not actually move the
state backwards
// or update profile info (which would misleadingly show RUNNABLE for a
terminated task).
bool need_move = !((_exec_state == State::FINISHED || _exec_state ==
State::FINALIZED) &&
diff --git a/be/src/exec/pipeline/pipeline_task.h
b/be/src/exec/pipeline/pipeline_task.h
index bb1440bfcb7..2268c00a4c7 100644
--- a/be/src/exec/pipeline/pipeline_task.h
+++ b/be/src/exec/pipeline/pipeline_task.h
@@ -271,7 +271,11 @@ private:
Dependency* _blocked_dep = nullptr;
Dependency* _memory_sufficient_dependency;
- std::mutex _dependency_lock;
+ // Protects dependency containers and the raw Dependency pointers they
contain. It also
+ // serializes forced dependency unblocking with close()/finalize():
set_ready() may synchronously
+ // call wake_up() and submit this task, so close()/finalize() must not
clear operator/shared
+ // state until forced unblocking finishes. wake_up() must not take this
lock.
+ std::mutex _dependency_lifecycle_lock;
std::atomic<bool> _running {false};
std::atomic<bool> _eos {false};
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp
b/be/test/exec/pipeline/pipeline_task_test.cpp
index 34d007be4c2..d6294825288 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,11 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include <chrono>
+#include <functional>
+#include <future>
+#include <thread>
+
#include "common/config.h"
#include "common/status.h"
#include "exec/operator/operator.h"
@@ -34,6 +39,7 @@
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
#include "util/debug_points.h"
+#include "util/defer_op.h"
namespace doris {
@@ -94,6 +100,19 @@ private:
template class OperatorX<DummyOperatorLocalState>;
template class DataSinkOperatorX<DummySinkLocalState>;
+class BlockableSubmitTaskScheduler : public MockTaskScheduler {
+public:
+ Status submit(PipelineTaskSPtr task) override {
+ if (on_submit) {
+ on_submit(task);
+ }
+ static_cast<void>(task->is_blockable());
+ return MockTaskScheduler::submit(task);
+ }
+
+ std::function<void(PipelineTaskSPtr)> on_submit;
+};
+
TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
@@ -557,6 +576,92 @@ TEST_F(PipelineTaskTest, TEST_STATE_TRANSITION) {
}
}
+TEST_F(PipelineTaskTest, TEST_WAKE_UP_SUBMIT_PROTECTED_FROM_FINALIZE) {
+ auto scheduler = std::make_unique<BlockableSubmitTaskScheduler>();
+ auto* scheduler_ptr = scheduler.get();
+ _query_ctx->_task_scheduler = scheduler_ptr;
+
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ task->_wake_up_early = true;
+ auto dep = std::make_shared<Dependency>(0, 0, "test_dep", false);
+ task->_execution_dependencies.push_back(dep.get());
+ EXPECT_EQ(dep->is_blocked_by(task), dep.get());
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+
+ auto origin_enable_debug_points = config::enable_debug_points;
+ config::enable_debug_points = true;
+ Defer debug_point_cleanup {[&]() {
+
DebugPoints::instance()->remove("PipelineTask::unblock_all_dependencies.before_set_ready");
+ config::enable_debug_points = origin_enable_debug_points;
+ _query_ctx->_task_scheduler = _task_scheduler.get();
+ }};
+
+ std::promise<void> wake_up_reached_promise;
+ auto wake_up_reached = wake_up_reached_promise.get_future();
+ std::promise<void> release_wake_up_promise;
+ auto release_wake_up = release_wake_up_promise.get_future();
+ DebugPoints::instance()->add_with_callback(
+ "PipelineTask::unblock_all_dependencies.before_set_ready",
std::function<void()>([&]() {
+ wake_up_reached_promise.set_value();
+ release_wake_up.wait();
+ }));
+
+ scheduler_ptr->on_submit = [&](PipelineTaskSPtr submitted_task) {
+ EXPECT_EQ(submitted_task.get(), task.get());
+ EXPECT_NE(task->_sink, nullptr);
+ EXPECT_FALSE(task->_operators.empty());
+ };
+
+ std::thread unblock_thread([&]() { task->unblock_all_dependencies(); });
+ EXPECT_EQ(wake_up_reached.wait_for(std::chrono::seconds(10)),
std::future_status::ready);
+
+ std::promise<void> close_started_promise;
+ auto close_started = close_started_promise.get_future();
+ auto close_finalize = std::async(std::launch::async, [&]() {
+ close_started_promise.set_value();
+ EXPECT_TRUE(task->close(Status::OK()).ok());
+ EXPECT_TRUE(task->finalize().ok());
+ });
+ EXPECT_EQ(close_started.wait_for(std::chrono::seconds(10)),
std::future_status::ready);
+ EXPECT_EQ(close_finalize.wait_for(std::chrono::milliseconds(100)),
std::future_status::timeout);
+
+ release_wake_up_promise.set_value();
+ unblock_thread.join();
+ close_finalize.wait();
+
+ EXPECT_EQ(scheduler_ptr->submit_count(), 1);
+ EXPECT_EQ(task->_exec_state, PipelineTask::State::FINALIZED);
+ EXPECT_EQ(task->_sink, nullptr);
+ EXPECT_TRUE(task->_operators.empty());
+}
+
TEST_F(PipelineTaskTest, TEST_SINK_FINISHED) {
auto num_instances = 1;
auto pip_id = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]