This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new fcb04595b4c [fix](be) Catch std exceptions in pipeline scheduler 
(#65019)
fcb04595b4c is described below

commit fcb04595b4ca7bd95fcf0c02e7027a5a1119fcc3
Author: HappenLee <[email protected]>
AuthorDate: Tue Jun 30 19:36:17 2026 +0800

    [fix](be) Catch std exceptions in pipeline scheduler (#65019)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: #64934
    
    Problem Summary: Backport #64934 to branch-4.1. Pipeline task execution
    is wrapped by ASSIGN_STATUS_IF_CATCH_EXCEPTION, which converts Doris
    exceptions to Status but lets std::exception-derived exceptions escape
    the scheduler worker thread. When task execution throws an STL exception
    such as std::out_of_range, the worker can terminate the BE process
    instead of returning a query error. This change adds a scheduler-level
    std::exception fallback around the existing Doris exception handling and
    preserves branch-4.1's pipeline tracer logic, converting std::exception
    failures to InternalError so the existing cancel and close path handles
    the failed task. A unit test covers a task whose execute() throws
    std::out_of_range.
    
    ### Release note
    
    Fix pipeline task execution to return a query error for
    std::exception-derived failures instead of aborting the backend.
    
    ### Check List (For Author)
    
    - Test:
    - Unit Test: ./run-be-ut.sh -j 48 --run
    --filter=PipelineTaskTest.TEST_SCHEDULER_CATCH_STD_EXCEPTION
    - Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
    build-support/clang-format.sh
    - Manual test: PATH=/mnt/disk6/common/ldb_toolchain_toucan/bin:$PATH
    build-support/check-format.sh
        - Manual test: git diff --check upstream/branch-4.1..HEAD
    - Behavior changed: Yes. std::exception-derived failures thrown from
    pipeline task execution now become query errors instead of escaping the
    scheduler worker.
    - Does this need documentation: No
---
 be/src/exec/pipeline/task_scheduler.cpp      | 41 ++++++------
 be/test/exec/pipeline/pipeline_task_test.cpp | 96 ++++++++++++++++++++++++++++
 2 files changed, 119 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/pipeline/task_scheduler.cpp 
b/be/src/exec/pipeline/task_scheduler.cpp
index fb6cf4e80f5..1a052fc1ffa 100644
--- a/be/src/exec/pipeline/task_scheduler.cpp
+++ b/be/src/exec/pipeline/task_scheduler.cpp
@@ -27,6 +27,7 @@
 #include <algorithm>
 #include <chrono> // IWYU pragma: keep
 #include <cstddef>
+#include <exception>
 #include <functional>
 #include <memory>
 #include <mutex>
@@ -150,24 +151,28 @@ void TaskScheduler::_do_work(int index) {
         }
 
         // Main logics of execution
-        ASSIGN_STATUS_IF_CATCH_EXCEPTION(
-                //TODO: use a better enclose to abstracting these
-                if 
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
-                    TUniqueId query_id = fragment_context->get_query_id();
-                    std::string task_name = task->task_name();
-
-                    std::thread::id tid = std::this_thread::get_id();
-                    uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
-                    uint64_t start_time = MonotonicMicros();
-
-                    status = task->execute(&done);
-
-                    uint64_t end_time = MonotonicMicros();
-                    ExecEnv::GetInstance()->pipeline_tracer_context()->record(
-                            {query_id, task_name, 
static_cast<uint32_t>(index), thread_id,
-                             start_time, end_time});
-                } else { status = task->execute(&done); },
-                status);
+        try {
+            ASSIGN_STATUS_IF_CATCH_EXCEPTION(
+                    //TODO: use a better enclose to abstracting these
+                    if 
(ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
+                        TUniqueId query_id = fragment_context->get_query_id();
+                        std::string task_name = task->task_name();
+
+                        std::thread::id tid = std::this_thread::get_id();
+                        uint64_t thread_id = 
*reinterpret_cast<uint64_t*>(&tid);
+                        uint64_t start_time = MonotonicMicros();
+
+                        status = task->execute(&done);
+
+                        uint64_t end_time = MonotonicMicros();
+                        
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
+                                {query_id, task_name, 
static_cast<uint32_t>(index), thread_id,
+                                 start_time, end_time});
+                    } else { status = task->execute(&done); },
+                    status);
+        } catch (const std::exception& e) {
+            status = Status::InternalError("Catch std::exception: {}", 
e.what());
+        }
         fragment_context->trigger_report_if_necessary();
     }
 }
diff --git a/be/test/exec/pipeline/pipeline_task_test.cpp 
b/be/test/exec/pipeline/pipeline_task_test.cpp
index 29555a8d2ae..0780498e29f 100644
--- a/be/test/exec/pipeline/pipeline_task_test.cpp
+++ b/be/test/exec/pipeline/pipeline_task_test.cpp
@@ -20,6 +20,9 @@
 
 #include <atomic>
 #include <chrono>
+#include <future>
+#include <stdexcept>
+#include <string>
 #include <thread>
 
 #include "common/config.h"
@@ -30,15 +33,18 @@
 #include "exec/pipeline/dummy_task_queue.h"
 #include "exec/pipeline/pipeline.h"
 #include "exec/pipeline/pipeline_fragment_context.h"
+#include "exec/pipeline/pipeline_tracing.h"
 #include "exec/pipeline/task_scheduler.h"
 #include "exec/pipeline/thrift_builder.h"
 #include "exec/spill/spill_file.h"
+#include "load/stream_load/new_load_stream_mgr.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "testutil/mock/mock_runtime_state.h"
 #include "testutil/mock/mock_thread_mem_tracker_mgr.h"
 #include "testutil/mock/mock_workload_group_mgr.h"
 #include "util/debug_points.h"
+#include "util/defer_op.h"
 
 namespace doris {
 
@@ -51,6 +57,7 @@ public:
     void SetUp() override {
         _thread_mem_tracker_mgr = 
std::move(thread_context()->thread_mem_tracker_mgr);
         thread_context()->thread_mem_tracker_mgr = 
std::make_unique<MockThreadMemTrackerMgr>();
+        ExecEnv::GetInstance()->_pipeline_tracer_ctx = 
std::make_unique<PipelineTracerContext>();
         _query_options = TQueryOptionsBuilder()
                                  .set_enable_local_exchange(true)
                                  .set_enable_local_shuffle(true)
@@ -117,6 +124,34 @@ private:
     std::atomic<int>* _blockable_checks;
 };
 
+class ThrowStdExceptionTask final : public PipelineTask {
+public:
+    ThrowStdExceptionTask(PipelinePtr& pipeline, uint32_t task_id, 
RuntimeState* state,
+                          std::shared_ptr<PipelineFragmentContext> 
fragment_context,
+                          RuntimeProfile* parent_profile,
+                          std::map<int, 
std::pair<std::shared_ptr<BasicSharedState>,
+                                                  
std::vector<std::shared_ptr<Dependency>>>>
+                                  shared_state_map,
+                          int task_idx, std::promise<std::string>* 
close_status)
+            : PipelineTask(pipeline, task_id, state, fragment_context, 
parent_profile,
+                           std::move(shared_state_map), task_idx),
+              _close_status(close_status) {}
+
+    Status execute(bool* /*done*/) override {
+        throw std::out_of_range("pipeline task std exception");
+    }
+
+    Status close(Status exec_status, bool /*close_sink*/) override {
+        _close_status->set_value(exec_status.to_string());
+        return Status::OK();
+    }
+
+    Status finalize() override { return Status::OK(); }
+
+private:
+    std::promise<std::string>* _close_status;
+};
+
 TEST_F(PipelineTaskTest, TEST_CONSTRUCTOR) {
     auto num_instances = 1;
     auto pip_id = 0;
@@ -585,6 +620,67 @@ TEST_F(PipelineTaskTest, 
TEST_CLOSED_TASK_REJECTS_HYBRID_SUBMIT_BEFORE_FINALIZE)
     EXPECT_TRUE(task->finalize().ok());
 }
 
+TEST_F(PipelineTaskTest, TEST_SCHEDULER_CATCH_STD_EXCEPTION) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    OperatorPtr source_op;
+    source_op.reset(new DummyOperator());
+    EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+    int op_id = 1;
+    int node_id = 2;
+    int dest_id = 3;
+    DataSinkOperatorPtr sink_op;
+    sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+    EXPECT_TRUE(pip->set_sink(sink_op).ok());
+
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _context->_runtime_state = std::move(_runtime_state);
+    auto* runtime_state = _context->_runtime_state.get();
+    runtime_state->resize_op_id_to_local_state(-1);
+    std::promise<std::string> close_status;
+    auto close_status_future = close_status.get_future();
+    auto task = std::make_shared<ThrowStdExceptionTask>(pip, task_id, 
runtime_state, _context,
+                                                        profile.get(), 
shared_state_map, task_id,
+                                                        &close_status);
+    task->_exec_time_slice = 10'000'000'000ULL;
+    std::vector<TScanRangeParams> scan_range;
+    int sender_id = 0;
+    TDataSink tsink;
+    ASSERT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    pip->incr_created_tasks(task_id, task.get());
+    _context->_pip_id_to_pipeline[pip->id()] = pip.get();
+    _context->_total_tasks = 2;
+
+    auto* exec_env = ExecEnv::GetInstance();
+    bool need_clear_new_load_stream_mgr = exec_env->new_load_stream_mgr() == 
nullptr;
+    if (need_clear_new_load_stream_mgr) {
+        exec_env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
+    }
+    Defer clear_new_load_stream_mgr {[&]() {
+        if (need_clear_new_load_stream_mgr) {
+            exec_env->clear_new_load_stream_mgr();
+        }
+    }};
+
+    HybridTaskScheduler scheduler(1, 1, "test_hybrid_task_scheduler", nullptr);
+    Defer stop_scheduler {[&]() { scheduler.stop(); }};
+    ASSERT_TRUE(scheduler.start().ok());
+    ASSERT_TRUE(scheduler.submit(task).ok());
+    ASSERT_EQ(close_status_future.wait_for(std::chrono::seconds(5)), 
std::future_status::ready);
+
+    auto close_status_string = close_status_future.get();
+    EXPECT_NE(close_status_string.find("Catch std::exception: pipeline task 
std exception"),
+              std::string::npos)
+            << close_status_string;
+    EXPECT_TRUE(_context->is_canceled());
+}
+
 TEST_F(PipelineTaskTest, TEST_FINALIZED_TASK_REJECTS_HYBRID_SUBMIT) {
     auto num_instances = 1;
     auto pip_id = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to