This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3bd324632a7 [fix](be) Catch std exceptions in pipeline scheduler
(#64934)
3bd324632a7 is described below
commit 3bd324632a773fa1b6cb5df841697a0f10f65024
Author: HappenLee <[email protected]>
AuthorDate: Tue Jun 30 00:18:37 2026 +0800
[fix](be) Catch std exceptions in pipeline scheduler (#64934)
Problem Summary: Pipeline task execution is wrapped by
ASSIGN_STATUS_IF_CATCH_EXCEPTION, which converts Doris exceptions to
Status but lets std::exception-derived exceptions escape the scheduler
worker thread. When task execution throws an STL exception such as
std::out_of_range, the worker can terminate the BE process instead of
returning a query error. This change adds a scheduler-level
std::exception fallback around the existing Doris exception handling and
converts it to an InternalError so the existing cancel and close path
handles the failed task. A unit test covers a task whose execute()
throws std::out_of_range.
### Release note
Fix pipeline task execution to return a query error for
std::exception-derived failures instead of aborting the backend.
### Check List (For Author)
- Test:
- Unit Test: ./run-be-ut.sh --run
--filter=PipelineTaskTest.TEST_SCHEDULER_CATCH_STD_EXCEPTION
- Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
build-support/clang-format.sh
- Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
build-support/check-format.sh
- Manual test: git diff --check
- Manual test: build-support/run-clang-tidy.sh --build-dir
be/ut_build_ASAN
- Behavior changed: Yes. std::exception-derived failures thrown from
pipeline task execution now become query errors instead of escaping the
scheduler worker.
- Does this need documentation: No
---
be/src/exec/pipeline/task_scheduler.cpp | 7 ++-
be/test/exec/pipeline/pipeline_task_test.cpp | 92 ++++++++++++++++++++++++++++
2 files changed, 98 insertions(+), 1 deletion(-)
diff --git a/be/src/exec/pipeline/task_scheduler.cpp
b/be/src/exec/pipeline/task_scheduler.cpp
index 8e5150370ed..51e890c7605 100644
--- a/be/src/exec/pipeline/task_scheduler.cpp
+++ b/be/src/exec/pipeline/task_scheduler.cpp
@@ -26,6 +26,7 @@
#include <algorithm>
#include <chrono> // IWYU pragma: keep
#include <cstddef>
+#include <exception>
#include <functional>
#include <memory>
#include <mutex>
@@ -146,7 +147,11 @@ void TaskScheduler::_do_work(int index) {
}
// Main logics of execution
- ASSIGN_STATUS_IF_CATCH_EXCEPTION(status = task->execute(&done),
status);
+ try {
+ ASSIGN_STATUS_IF_CATCH_EXCEPTION(status = task->execute(&done),
status);
+ } catch (const std::exception& e) {
+ status = Status::InternalError("Catch std::exception: {}",
e.what());
+ }
fragment_context->trigger_report_if_necessary();
}
}
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp
b/be/test/exec/pipeline/pipeline_task_test.cpp
index 3c4adb63fc2..cd00dff86d9 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -22,6 +22,8 @@
#include <chrono>
#include <functional>
#include <future>
+#include <stdexcept>
+#include <string>
#include <thread>
#include "common/config.h"
@@ -34,6 +36,7 @@
#include "exec/pipeline/pipeline_fragment_context.h"
#include "exec/pipeline/thrift_builder.h"
#include "exec/spill/spill_file.h"
+#include "load/stream_load/new_load_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "testutil/mock/mock_runtime_state.h"
@@ -134,6 +137,34 @@ private:
std::atomic<int>* _blockable_checks;
};
+class ThrowStdExceptionTask final : public PipelineTask {
+public:
+ ThrowStdExceptionTask(PipelinePtr& pipeline, uint32_t task_id,
RuntimeState* state,
+ std::shared_ptr<PipelineFragmentContext>
fragment_context,
+ RuntimeProfile* parent_profile,
+ std::map<int,
std::pair<std::shared_ptr<BasicSharedState>,
+
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map,
+ int task_idx, std::promise<std::string>*
close_status)
+ : PipelineTask(pipeline, task_id, state, fragment_context,
parent_profile,
+ std::move(shared_state_map), task_idx),
+ _close_status(close_status) {}
+
+ Status execute(bool* /*done*/) override {
+ throw std::out_of_range("pipeline task std exception");
+ }
+
+ Status close(Status exec_status, bool /*close_sink*/) override {
+ _close_status->set_value(exec_status.to_string());
+ return Status::OK();
+ }
+
+ Status finalize() override { return Status::OK(); }
+
+private:
+ std::promise<std::string>* _close_status;
+};
+
TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
auto num_instances = 1;
auto pip_id = 0;
@@ -726,6 +757,67 @@ TEST_F(PipelineTaskTest,
TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE)
EXPECT_TRUE(task->finalize().ok());
}
+TEST_F(PipelineTaskTest, TEST_SCHEDULER_CATCH_STD_EXCEPTION) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _context->_runtime_state = std::move(_runtime_state);
+ auto* runtime_state = _context->_runtime_state.get();
+ runtime_state->resize_op_id_to_local_state(-1);
+ std::promise<std::string> close_status;
+ auto close_status_future = close_status.get_future();
+ auto task = std::make_shared<ThrowStdExceptionTask>(pip, task_id,
runtime_state, _context,
+ profile.get(),
shared_state_map, task_id,
+ &close_status);
+ task->_exec_time_slice = 10'000'000'000ULL;
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ ASSERT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ pip->incr_created_tasks(task_id, task.get());
+ _context->_pip_id_to_pipeline[pip->id()] = pip.get();
+ _context->_total_tasks = 2;
+
+ auto* exec_env = ExecEnv::GetInstance();
+ bool need_clear_new_load_stream_mgr = exec_env->new_load_stream_mgr() ==
nullptr;
+ if (need_clear_new_load_stream_mgr) {
+ exec_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
+ }
+ Defer clear_new_load_stream_mgr {[&]() {
+ if (need_clear_new_load_stream_mgr) {
+ exec_env->clear_new_load_stream_mgr();
+ }
+ }};
+
+ HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+ Defer stop_scheduler {[&]() { scheduler.stop(); }};
+ ASSERT_TRUE(scheduler.start().ok());
+ ASSERT_TRUE(scheduler.submit(task).ok());
+ ASSERT_EQ(close_status_future.wait_for(std::chrono::seconds(5)),
std::future_status::ready);
+
+ auto close_status_string = close_status_future.get();
+ EXPECT_NE(close_status_string.find("Catch std::exception: pipeline task
std exception"),
+ std::string::npos)
+ << close_status_string;
+ EXPECT_TRUE(_context->is_canceled());
+}
+
TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
auto num_instances = 1;
auto pip_id = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]