This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bd324632a7 [fix](be) Catch std exceptions in pipeline scheduler 
(#64934)
3bd324632a7 is described below

commit 3bd324632a773fa1b6cb5df841697a0f10f65024
Author: HappenLee <[email protected]>
AuthorDate: Tue Jun 30 00:18:37 2026 +0800

    [fix](be) Catch std exceptions in pipeline scheduler (#64934)
    
    Problem Summary: Pipeline task execution is wrapped by
    ASSIGN_STATUS_IF_CATCH_EXCEPTION, which converts Doris exceptions to
    Status but lets std::exception-derived exceptions escape the scheduler
    worker thread. When task execution throws an STL exception such as
    std::out_of_range, the worker can terminate the BE process instead of
    returning a query error. This change adds a scheduler-level
    std::exception fallback around the existing Doris exception handling and
    converts it to an InternalError so the existing cancel and close path
    handles the failed task. A unit test covers a task whose execute()
    throws std::out_of_range.
    
    ### Release note
    
    Fix pipeline task execution to return a query error for
    std::exception-derived failures instead of aborting the backend.
    
    ### Check List (For Author)
    
    - Test:
    - Unit Test: ./run-be-ut.sh --run
    --filter=PipelineTaskTest.TEST_SCHEDULER_CATCH_STD_EXCEPTION
    - Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
    build-support/clang-format.sh
    - Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
    build-support/check-format.sh
        - Manual test: git diff --check
    - Manual test: build-support/run-clang-tidy.sh --build-dir
    be/ut_build_ASAN
    - Behavior changed: Yes. std::exception-derived failures thrown from
    pipeline task execution now become query errors instead of escaping the
    scheduler worker.
    - Does this need documentation: No
---
 be/src/exec/pipeline/task_scheduler.cpp      |  7 ++-
 be/test/exec/pipeline/pipeline_task_test.cpp | 92 ++++++++++++++++++++++++++++
 2 files changed, 98 insertions(+), 1 deletion(-)

diff --git a/be/src/exec/pipeline/task_scheduler.cpp 
b/be/src/exec/pipeline/task_scheduler.cpp
index 8e5150370ed..51e890c7605 100644
--- a/be/src/exec/pipeline/task_scheduler.cpp
+++ b/be/src/exec/pipeline/task_scheduler.cpp
@@ -26,6 +26,7 @@
 #include <algorithm>
 #include <chrono> // IWYU pragma: keep
 #include <cstddef>
+#include <exception>
 #include <functional>
 #include <memory>
 #include <mutex>
@@ -146,7 +147,11 @@ void TaskScheduler::_do_work(int index) {
         }
 
         // Main logics of execution
-        ASSIGN_STATUS_IF_CATCH_EXCEPTION(status = task->execute(&done), 
status);
+        try {
+            ASSIGN_STATUS_IF_CATCH_EXCEPTION(status = task->execute(&done), 
status);
+        } catch (const std::exception& e) {
+            status = Status::InternalError("Catch std::exception: {}", 
e.what());
+        }
         fragment_context->trigger_report_if_necessary();
     }
 }
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp 
b/be/test/exec/pipeline/pipeline_task_test.cpp
index 3c4adb63fc2..cd00dff86d9 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -22,6 +22,8 @@
 #include <chrono>
 #include <functional>
 #include <future>
+#include <stdexcept>
+#include <string>
 #include <thread>
 
 #include "common/config.h"
@@ -34,6 +36,7 @@
 #include "exec/pipeline/pipeline_fragment_context.h"
 #include "exec/pipeline/thrift_builder.h"
 #include "exec/spill/spill_file.h"
+#include "load/stream_load/new_load_stream_mgr.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "testutil/mock/mock_runtime_state.h"
@@ -134,6 +137,34 @@ private:
     std::atomic<int>* _blockable_checks;
 };
 
+class ThrowStdExceptionTask final : public PipelineTask {
+public:
+    ThrowStdExceptionTask(PipelinePtr& pipeline, uint32_t task_id, 
RuntimeState* state,
+                          std::shared_ptr<PipelineFragmentContext> 
fragment_context,
+                          RuntimeProfile* parent_profile,
+                          std::map<int, 
std::pair<std::shared_ptr<BasicSharedState>,
+                                                  
std::vector<std::shared_ptr<Dependency>>>>
+                                  shared_state_map,
+                          int task_idx, std::promise<std::string>* 
close_status)
+            : PipelineTask(pipeline, task_id, state, fragment_context, 
parent_profile,
+                           std::move(shared_state_map), task_idx),
+              _close_status(close_status) {}
+
+    Status execute(bool* /*done*/) override {
+        throw std::out_of_range("pipeline task std exception");
+    }
+
+    Status close(Status exec_status, bool /*close_sink*/) override {
+        _close_status->set_value(exec_status.to_string());
+        return Status::OK();
+    }
+
+    Status finalize() override { return Status::OK(); }
+
+private:
+    std::promise<std::string>* _close_status;
+};
+
 TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
     auto num_instances = 1;
     auto pip_id = 0;
@@ -726,6 +757,67 @@ TEST_F(PipelineTaskTest, 
TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE)
     EXPECT_TRUE(task->finalize().ok());
 }
 
+TEST_F(PipelineTaskTest, TEST_SCHEDULER_CATCH_STD_EXCEPTION) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    OperatorPtr source_op;
+    source_op.reset(new DummyOperator());
+    EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+    int op_id = 1;
+    int node_id = 2;
+    int dest_id = 3;
+    DataSinkOperatorPtr sink_op;
+    sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+    EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _context->_runtime_state = std::move(_runtime_state);
+    auto* runtime_state = _context->_runtime_state.get();
+    runtime_state->resize_op_id_to_local_state(-1);
+    std::promise<std::string> close_status;
+    auto close_status_future = close_status.get_future();
+    auto task = std::make_shared<ThrowStdExceptionTask>(pip, task_id, 
runtime_state, _context,
+                                                        profile.get(), 
shared_state_map, task_id,
+                                                        &close_status);
+    task->_exec_time_slice = 10'000'000'000ULL;
+    std::vector<TScanRangeParams> scan_range;
+    int sender_id = 0;
+    TDataSink tsink;
+    ASSERT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    pip->incr_created_tasks(task_id, task.get());
+    _context->_pip_id_to_pipeline[pip->id()] = pip.get();
+    _context->_total_tasks = 2;
+
+    auto* exec_env = ExecEnv::GetInstance();
+    bool need_clear_new_load_stream_mgr = exec_env->new_load_stream_mgr() == 
nullptr;
+    if (need_clear_new_load_stream_mgr) {
+        exec_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
+    }
+    Defer clear_new_load_stream_mgr {[&]() {
+        if (need_clear_new_load_stream_mgr) {
+            exec_env->clear_new_load_stream_mgr();
+        }
+    }};
+
+    HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+    Defer stop_scheduler {[&]() { scheduler.stop(); }};
+    ASSERT_TRUE(scheduler.start().ok());
+    ASSERT_TRUE(scheduler.submit(task).ok());
+    ASSERT_EQ(close_status_future.wait_for(std::chrono::seconds(5)), 
std::future_status::ready);
+
+    auto close_status_string = close_status_future.get();
+    EXPECT_NE(close_status_string.find("Catch std::exception: pipeline task 
std exception"),
+              std::string::npos)
+            << close_status_string;
+    EXPECT_TRUE(_context->is_canceled());
+}
+
 TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
     auto num_instances = 1;
     auto pip_id = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to