This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new fcb04595b4c [fix](be) Catch std exceptions in pipeline scheduler
(#65019)
fcb04595b4c is described below
commit fcb04595b4ca7bd95fcf0c02e7027a5a1119fcc3
Author: HappenLee <[email protected]>
AuthorDate: Tue Jun 30 19:36:17 2026 +0800
[fix](be) Catch std exceptions in pipeline scheduler (#65019)
### What problem does this PR solve?
Issue Number: None
Related PR: #64934
Problem Summary: Backport #64934 to branch-4.1. Pipeline task execution
is wrapped by ASSIGN_STATUS_IF_CATCH_EXCEPTION, which converts Doris
exceptions to Status but lets std::exception-derived exceptions escape
the scheduler worker thread. When task execution throws an STL exception
such as std::out_of_range, the worker can terminate the BE process
instead of returning a query error. This change adds a scheduler-level
std::exception fallback around the existing Doris exception handling and
preserves branch-4.1's pipeline tracer logic, converting std::exception
failures to InternalError so the existing cancel and close path handles
the failed task. A unit test covers a task whose execute() throws
std::out_of_range.
### Release note
Fix pipeline task execution to return a query error for
std::exception-derived failures instead of aborting the backend.
### Check List (For Author)
- Test:
- Unit Test: ./run-be-ut.sh -j 48 --run
--filter=PipelineTaskTest.TEST_SCHEDULER_CATCH_STD_EXCEPTION
- Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
build-support/clang-format.sh
- Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
build-support/check-format.sh
- Manual test: git diff --check upstream/branch-4.1..HEAD
- Behavior changed: Yes. std::exception-derived failures thrown from
pipeline task execution now become query errors instead of escaping the
scheduler worker.
- Does this need documentation: No
---
be/src/exec/pipeline/task_scheduler.cpp | 41 ++++++------
be/test/exec/pipeline/pipeline_task_test.cpp | 96 ++++++++++++++++++++++++++++
2 files changed, 119 insertions(+), 18 deletions(-)
diff --git a/be/src/exec/pipeline/task_scheduler.cpp
b/be/src/exec/pipeline/task_scheduler.cpp
index fb6cf4e80f5..1a052fc1ffa 100644
--- a/be/src/exec/pipeline/task_scheduler.cpp
+++ b/be/src/exec/pipeline/task_scheduler.cpp
@@ -27,6 +27,7 @@
#include <algorithm>
#include <chrono> // IWYU pragma: keep
#include <cstddef>
+#include <exception>
#include <functional>
#include <memory>
#include <mutex>
@@ -150,24 +151,28 @@ void TaskScheduler::_do_work(int index) {
}
// Main logics of execution
- ASSIGN_STATUS_IF_CATCH_EXCEPTION(
- //TODO: use a better enclose to abstracting these
- if
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
- TUniqueId query_id = fragment_context->get_query_id();
- std::string task_name = task->task_name();
-
- std::thread::id tid = std::this_thread::get_id();
- uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
- uint64_t start_time = MonotonicMicros();
-
- status = task->execute(&done);
-
- uint64_t end_time = MonotonicMicros();
- ExecEnv::GetInstance()->pipeline_tracer_context()->record(
- {query_id, task_name,
static_cast<uint32_t>(index), thread_id,
- start_time, end_time});
- } else { status = task->execute(&done); },
- status);
+ try {
+ ASSIGN_STATUS_IF_CATCH_EXCEPTION(
+ //TODO: use a better enclose to abstracting these
+ if
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
+ TUniqueId query_id = fragment_context->get_query_id();
+ std::string task_name = task->task_name();
+
+ std::thread::id tid = std::this_thread::get_id();
+ uint64_t thread_id =
*reinterpret_cast<uint64_t*>(&tid);
+ uint64_t start_time = MonotonicMicros();
+
+ status = task->execute(&done);
+
+ uint64_t end_time = MonotonicMicros();
+
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
+ {query_id, task_name,
static_cast<uint32_t>(index), thread_id,
+ start_time, end_time});
+ } else { status = task->execute(&done); },
+ status);
+ } catch (const std::exception& e) {
+ status = Status::InternalError("Catch std::exception: {}",
e.what());
+ }
fragment_context->trigger_report_if_necessary();
}
}
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp
b/be/test/exec/pipeline/pipeline_task_test.cpp
index 29555a8d2ae..0780498e29f 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -20,6 +20,9 @@
#include <atomic>
#include <chrono>
+#include <future>
+#include <stdexcept>
+#include <string>
#include <thread>
#include "common/config.h"
@@ -30,15 +33,18 @@
#include "exec/pipeline/dummy_task_queue.h"
#include "exec/pipeline/pipeline.h"
#include "exec/pipeline/pipeline_fragment_context.h"
+#include "exec/pipeline/pipeline_tracing.h"
#include "exec/pipeline/task_scheduler.h"
#include "exec/pipeline/thrift_builder.h"
#include "exec/spill/spill_file.h"
+#include "load/stream_load/new_load_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "testutil/mock/mock_runtime_state.h"
#include "testutil/mock/mock_thread_mem_tracker_mgr.h"
#include "testutil/mock/mock_workload_group_mgr.h"
#include "util/debug_points.h"
+#include "util/defer_op.h"
namespace doris {
@@ -51,6 +57,7 @@ public:
void SetUp() override {
_thread_mem_tracker_mgr =
std::move(thread_context()->thread_mem_tracker_mgr);
thread_context()->thread_mem_tracker_mgr =
std::make_unique<MockThreadMemTrackerMgr>();
+ ExecEnv::GetInstance()->_pipeline_tracer_ctx =
std::make_unique<PipelineTracerContext>();
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
@@ -117,6 +124,34 @@ private:
std::atomic<int>* _blockable_checks;
};
+class ThrowStdExceptionTask final : public PipelineTask {
+public:
+ ThrowStdExceptionTask(PipelinePtr& pipeline, uint32_t task_id,
RuntimeState* state,
+ std::shared_ptr<PipelineFragmentContext>
fragment_context,
+ RuntimeProfile* parent_profile,
+ std::map<int,
std::pair<std::shared_ptr<BasicSharedState>,
+
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map,
+ int task_idx, std::promise<std::string>*
close_status)
+ : PipelineTask(pipeline, task_id, state, fragment_context,
parent_profile,
+ std::move(shared_state_map), task_idx),
+ _close_status(close_status) {}
+
+ Status execute(bool* /*done*/) override {
+ throw std::out_of_range("pipeline task std exception");
+ }
+
+ Status close(Status exec_status, bool /*close_sink*/) override {
+ _close_status->set_value(exec_status.to_string());
+ return Status::OK();
+ }
+
+ Status finalize() override { return Status::OK(); }
+
+private:
+ std::promise<std::string>* _close_status;
+};
+
TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
@@ -585,6 +620,67 @@ TEST_F(PipelineTaskTest,
TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE)
EXPECT_TRUE(task->finalize().ok());
}
+TEST_F(PipelineTaskTest, TEST_SCHEDULER_CATCH_STD_EXCEPTION) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _context->_runtime_state = std::move(_runtime_state);
+ auto* runtime_state = _context->_runtime_state.get();
+ runtime_state->resize_op_id_to_local_state(-1);
+ std::promise<std::string> close_status;
+ auto close_status_future = close_status.get_future();
+ auto task = std::make_shared<ThrowStdExceptionTask>(pip, task_id,
runtime_state, _context,
+ profile.get(),
shared_state_map, task_id,
+ &close_status);
+ task->_exec_time_slice = 10'000'000'000ULL;
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ ASSERT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ pip->incr_created_tasks(task_id, task.get());
+ _context->_pip_id_to_pipeline[pip->id()] = pip.get();
+ _context->_total_tasks = 2;
+
+ auto* exec_env = ExecEnv::GetInstance();
+ bool need_clear_new_load_stream_mgr = exec_env->new_load_stream_mgr() ==
nullptr;
+ if (need_clear_new_load_stream_mgr) {
+ exec_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
+ }
+ Defer clear_new_load_stream_mgr {[&]() {
+ if (need_clear_new_load_stream_mgr) {
+ exec_env->clear_new_load_stream_mgr();
+ }
+ }};
+
+ HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+ Defer stop_scheduler {[&]() { scheduler.stop(); }};
+ ASSERT_TRUE(scheduler.start().ok());
+ ASSERT_TRUE(scheduler.submit(task).ok());
+ ASSERT_EQ(close_status_future.wait_for(std::chrono::seconds(5)),
std::future_status::ready);
+
+ auto close_status_string = close_status_future.get();
+ EXPECT_NE(close_status_string.find("Catch std::exception: pipeline task
std exception"),
+ std::string::npos)
+ << close_status_string;
+ EXPECT_TRUE(_context->is_canceled());
+}
+
TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
auto num_instances = 1;
auto pip_id = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]