This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f96e22cd208 [Fix](ai) Fix _exec_plan_fragment_impl meet unknown error 
when call AI_Functions (#58363)
f96e22cd208 is described below

commit f96e22cd2081ad7cae855bb562e36fea17d938b1
Author: linrrarity <[email protected]>
AuthorDate: Fri Nov 28 21:08:08 2025 +0800

    [Fix](ai) Fix _exec_plan_fragment_impl meet unknown error when call 
AI_Functions (#58363)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    When a query statement contains some Commands(e.g. `UPDATE`), it will
    cause the AI function call to not go through `NereidsCoordinator` and
    fallback to `Coordinator`. In this case, the FE will not send
    `AI_Resources` to the BE, which will lead to errors in subsequent
    queries and the error messages will not be clear.
    
    This pr also replace every directly `throw Status` with `throw
    Exception(Status...), so the errors can be surfaced as `Exception`, not
    raw `Status`
    
    ```text
    I20251114 18:00:45.502351 59053 fragment_mgr.cpp:716] query_id: 
5c963987bf8340bc-a56b019c8b0b3300, coord_addr: 
TNetworkAddress(hostname=172.17.6.136, port=9020), total fragment num on 
current host: 1, fe process uuid: 1763114220687, query type: SELECT, report 
audit fe:TNetworkAddress(hostname=172.17.6.136, port=9020), use 
wg:1763112792749,normal
    W20251114 18:00:45.528087 59053 status.h:438] meet error status: 
[INTERNAL_ERROR]AI resources not found
    
            0#  
doris::vectorized::AIFunction<doris::vectorized::FunctionAITranslate>::_init_from_resource(doris::FunctionContext*,
 doris::vectorized::Block const&, std::vector<unsigned int, 
std::allocator<unsigned int> > const&, doris::TAIResource&, 
std::shared_ptr<doris::vectorized::AIAdapter>&) at 
/home/zcp/repo_center/doris_release/doris/be/src/runtime/query_context.h:268
            1#  
doris::vectorized::AIFunction<doris::vectorized::FunctionAITranslate>::execute_impl(doris::FunctionContext*,
 doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned 
int> > const&, unsigned int, unsigned long) const at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            2#  non-virtual thunk to 
doris::vectorized::AIFunction<doris::vectorized::FunctionAITranslate>::execute_impl(doris::FunctionContext*,
 doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned 
int> > const&, unsigned int, unsigned long) const at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/ai/ai_functions.h:0
            3#  
doris::vectorized::PreparedFunctionImpl::default_implementation_for_constant_arguments(doris::FunctionContext*,
 doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned 
int> > const&, unsigned int, unsigned long, bool, bool*) const at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/common/cow.h:0
            4#  
doris::vectorized::PreparedFunctionImpl::execute_without_low_cardinality_columns(doris::FunctionContext*,
 doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned 
int> > const&, unsigned int, unsigned long, bool) const at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/function.cpp:0
            5#  
doris::vectorized::PreparedFunctionImpl::execute(doris::FunctionContext*, 
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned 
int> > const&, unsigned int, unsigned long, bool) const at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/function.cpp:249
            6#  
doris::vectorized::IFunctionBase::execute(doris::FunctionContext*, 
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned 
int> > const&, unsigned int, unsigned long, bool) const at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/function.h:192
            7#  
doris::vectorized::VectorizedFnCall::_do_execute(doris::vectorized::VExprContext*,
 doris::vectorized::Block*, int*, std::vector<unsigned int, 
std::allocator<unsigned int> >&) at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/exprs/vectorized_fn_call.cpp:238
            8#  
doris::vectorized::VectorizedFnCall::execute(doris::vectorized::VExprContext*, 
doris::vectorized::Block*, int*) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_vector.h:375
            9#  
doris::vectorized::VExpr::get_const_col(doris::vectorized::VExprContext*, 
std::shared_ptr<doris::ColumnPtrWrapper>*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            10# doris::vectorized::VectorizedFnCall::open(doris::RuntimeState*, 
doris::vectorized::VExprContext*, doris::FunctionContext::FunctionStateScope) 
at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            11# doris::vectorized::VExprContext::open(doris::RuntimeState*) at 
/home/zcp/repo_center/doris_release/doris/be/src/vec/exprs/vexpr_context.cpp:0
            12# 
doris::vectorized::VExpr::open(std::vector<std::shared_ptr<doris::vectorized::VExprContext>,
 std::allocator<std::shared_ptr<doris::vectorized::VExprContext> > > const&, 
doris::RuntimeState*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            13# 
doris::pipeline::UnionSourceOperatorX::prepare(doris::RuntimeState*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            14# doris::pipeline::Pipeline::prepare(doris::RuntimeState*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            15# 
doris::pipeline::PipelineFragmentContext::prepare(doris::ThreadPool*) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            16# 
doris::FragmentMgr::exec_plan_fragment(doris::TPipelineFragmentParams const&, 
doris::QuerySource, std::function<void (doris::RuntimeState*, doris::Status*)> 
const&, doris::TPipelineFragmentParamsList const&) at 
/home/zcp/repo_center/doris_release/doris/be/src/runtime/fragment_mgr.cpp:0
            17# 
doris::FragmentMgr::exec_plan_fragment(doris::TPipelineFragmentParams const&, 
doris::QuerySource, doris::TPipelineFragmentParamsList const&) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:245
            18# 
doris::PInternalService::_exec_plan_fragment_impl(std::__cxx11::basic_string<char,
 std::char_traits<char>, std::allocator<char> > const&, 
doris::PFragmentRequestVersion, bool, std::function<void (doris::RuntimeState*, 
doris::Status*)> const&) at 
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
            19# 
doris::PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcController*,
 doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, 
google::protobuf::Closure*) at 
/home/zcp/repo_center/doris_release/doris/be/src/service/internal_service.cpp:0
            20# doris::WorkThreadPool<false>::work_thread(int) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/atomic_base.h:641
            21# execute_native_thread_routine
            22# start_thread
            23# clone
    I20251114 18:00:45.528275 59053 pipeline_fragment_context.cpp:139] 
PipelineFragmentContext::~PipelineFragmentContext|query_id=5c963987bf8340bc-a56b019c8b0b3300|fragment_id=0
    I20251114 18:00:45.528398 59053 query_context.cpp:240] Query 
5c963987bf8340bc-a56b019c8b0b3300 deconstructed, mem_tracker:
    W20251114 18:00:45.531440 59053 status.h:456] meet error status: 
[INTERNAL_ERROR]_exec_plan_fragment_impl meet unknown error
    
            0#  
doris::PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcController*,
 doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*, 
google::protobuf::Closure*) at 
/home/zcp/repo_center/doris_release/doris/be/src/service/internal_service.cpp:0
            1#  doris::WorkThreadPool<false>::work_thread(int) at 
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/atomic_base.h:641
            2#  execute_native_thread_routine
            3#  start_thread
            4#  clone
    W20251114 18:00:45.531484 59053 internal_service.cpp:351] exec plan 
fragment failed, errmsg=[INTERNAL_ERROR]_exec_plan_fragment_impl meet unknown 
error
    ```
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 .../segment_v2/ann_index/ann_index_reader.cpp      |  8 +++---
 be/src/runtime/query_context.h                     | 11 +++-----
 be/src/util/quantile_state.h                       |  2 +-
 .../aggregate_function_ai_agg.h                    | 11 +++++---
 .../exprs/lambda_function/varray_sort_function.cpp |  4 +--
 be/src/vec/functions/ai/ai_functions.h             |  9 ++++---
 be/test/ai/aggregate_function_ai_agg_test.cpp      | 28 +++++++++++++++++++++
 be/test/ai/ai_function_test.cpp                    | 29 ++++++++++++++++++++++
 .../main/java/org/apache/doris/qe/Coordinator.java | 14 +++++++++++
 9 files changed, 96 insertions(+), 20 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp 
b/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp
index 35880336580..f993804c072 100644
--- a/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp
@@ -125,8 +125,8 @@ Status AnnIndexReader::query(io::IOContext* io_ctx, 
AnnTopNParam* param, AnnInde
             
stats->engine_convert_ns.update(index_search_result.engine_convert_ns);
             
stats->engine_prepare_ns.update(index_search_result.engine_prepare_ns);
         } else {
-            throw Status::NotSupported("Unsupported index type: {}",
-                                       ann_index_type_to_string(_index_type));
+            throw Exception(Status::NotSupported("Unsupported index type: {}",
+                                                 
ann_index_type_to_string(_index_type)));
         }
 
         DCHECK(index_search_result.roaring != nullptr);
@@ -174,8 +174,8 @@ Status AnnIndexReader::range_search(const 
AnnRangeSearchParams& params,
             hnsw_param->bounded_queue = custom_params.hnsw_bounded_queue;
             search_param = std::move(hnsw_param);
         } else {
-            throw Status::NotSupported("Unsupported index type: {}",
-                                       ann_index_type_to_string(_index_type));
+            throw Exception(Status::NotSupported("Unsupported index type: {}",
+                                                 
ann_index_type_to_string(_index_type)));
         }
 
         search_param->is_le_or_lt = params.is_le_or_lt;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index ef028f20424..7fd9fc29e35 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -266,14 +266,11 @@ public:
 
     void set_ai_resources(std::map<std::string, TAIResource> ai_resources) {
         _ai_resources =
-                std::make_unique<std::map<std::string, 
TAIResource>>(std::move(ai_resources));
+                std::make_shared<std::map<std::string, 
TAIResource>>(std::move(ai_resources));
     }
 
-    const std::map<std::string, TAIResource>& get_ai_resources() const {
-        if (_ai_resources == nullptr) {
-            throw Status::InternalError("AI resources not found");
-        }
-        return *_ai_resources;
+    const std::shared_ptr<std::map<std::string, TAIResource>>& 
get_ai_resources() const {
+        return _ai_resources;
     }
 
     std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
@@ -366,7 +363,7 @@ private:
     std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> 
_profile_map;
     std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> 
_load_channel_profile_map;
 
-    std::unique_ptr<std::map<std::string, TAIResource>> _ai_resources;
+    std::shared_ptr<std::map<std::string, TAIResource>> _ai_resources;
 
     void _report_query_profile();
 
diff --git a/be/src/util/quantile_state.h b/be/src/util/quantile_state.h
index 2d46989fcc6..23e43860c34 100644
--- a/be/src/util/quantile_state.h
+++ b/be/src/util/quantile_state.h
@@ -64,7 +64,7 @@ public:
     double get_explicit_value_by_percentile(float percentile) const;
 #ifdef BE_TEST
     std::string to_string() const {
-        throw Status::NotSupported("QuantileState::to_string() not 
implemented");
+        throw Exception(Status::NotSupported("QuantileState::to_string() not 
implemented"));
     }
 #endif
     ~QuantileState() = default;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h 
b/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h
index 89ac8828ba1..d27ceb41312 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h
@@ -141,9 +141,14 @@ public:
             _task = task_ref.to_string();
 
             std::string resource_name = resource_name_ref.to_string();
-            const std::map<std::string, TAIResource>& ai_resources = 
_ctx->get_ai_resources();
-            auto it = ai_resources.find(resource_name);
-            if (it == ai_resources.end()) {
+            const std::shared_ptr<std::map<std::string, TAIResource>>& 
ai_resources =
+                    _ctx->get_ai_resources();
+            if (!ai_resources) {
+                throw Exception(ErrorCode::INTERNAL_ERROR,
+                                "AI resources metadata missing in 
QueryContext");
+            }
+            auto it = ai_resources->find(resource_name);
+            if (it == ai_resources->end()) {
                 throw Exception(ErrorCode::NOT_FOUND, "AI resource not found: 
" + resource_name);
             }
             _ai_config = it->second;
diff --git a/be/src/vec/exprs/lambda_function/varray_sort_function.cpp 
b/be/src/vec/exprs/lambda_function/varray_sort_function.cpp
index 945bb33d95a..1cffdcd5ea7 100644
--- a/be/src/vec/exprs/lambda_function/varray_sort_function.cpp
+++ b/be/src/vec/exprs/lambda_function/varray_sort_function.cpp
@@ -185,9 +185,9 @@ public:
                             auto status =
                                     children[0]->execute(context, 
&lambda_block, &lambda_res_id);
                             if (!status.ok()) [[unlikely]] {
-                                throw Status::InternalError(
+                                throw Exception(Status::InternalError(
                                         "when execute array_sort lambda 
function: {}",
-                                        status.to_string());
+                                        status.to_string()));
                             }
 
                             // raw_res_col maybe columnVector or ColumnConst
diff --git a/be/src/vec/functions/ai/ai_functions.h 
b/be/src/vec/functions/ai/ai_functions.h
index c31a7659614..01efe65f938 100644
--- a/be/src/vec/functions/ai/ai_functions.h
+++ b/be/src/vec/functions/ai/ai_functions.h
@@ -190,10 +190,13 @@ private:
         StringRef resource_name_ref = resource_column.column->get_data_at(0);
         std::string resource_name = std::string(resource_name_ref.data, 
resource_name_ref.size);
 
-        const std::map<std::string, TAIResource>& ai_resources =
+        const std::shared_ptr<std::map<std::string, TAIResource>>& 
ai_resources =
                 context->state()->get_query_ctx()->get_ai_resources();
-        auto it = ai_resources.find(resource_name);
-        if (it == ai_resources.end()) {
+        if (!ai_resources) {
+            return Status::InternalError("AI resources metadata missing in 
QueryContext");
+        }
+        auto it = ai_resources->find(resource_name);
+        if (it == ai_resources->end()) {
             return Status::InvalidArgument("AI resource not found: " + 
resource_name);
         }
         config = it->second;
diff --git a/be/test/ai/aggregate_function_ai_agg_test.cpp 
b/be/test/ai/aggregate_function_ai_agg_test.cpp
index ff7580552ea..911da8583e8 100644
--- a/be/test/ai/aggregate_function_ai_agg_test.cpp
+++ b/be/test/ai/aggregate_function_ai_agg_test.cpp
@@ -413,4 +413,32 @@ TEST_F(AggregateFunctionAIAggTest, 
mock_resource_send_request_test) {
     _agg_function->destroy(place);
 }
 
+TEST_F(AggregateFunctionAIAggTest, missing_ai_resources_metadata_test) {
+    auto empty_query_ctx = MockQueryContext::create();
+    _agg_function->set_query_context(empty_query_ctx.get());
+
+    std::vector<std::string> resources = {"resource_name"};
+    std::vector<std::string> texts = {"test input"};
+    std::vector<std::string> task = {"summarize"};
+    auto col_resource = ColumnHelper::create_column<DataTypeString>(resources);
+    auto col_text = ColumnHelper::create_column<DataTypeString>(texts);
+    auto col_task = ColumnHelper::create_column<DataTypeString>(task);
+
+    std::unique_ptr<char[]> memory(new char[_agg_function->size_of_data()]);
+    AggregateDataPtr place = memory.get();
+    _agg_function->create(place);
+
+    const IColumn* columns[3] = {col_resource.get(), col_text.get(), 
col_task.get()};
+
+    try {
+        _agg_function->add(place, columns, 0, _arena);
+        FAIL() << "Expected exception for missing AI resources";
+    } catch (const Exception& e) {
+        EXPECT_EQ(e.code(), ErrorCode::INTERNAL_ERROR);
+        EXPECT_NE(e.to_string().find("AI resources metadata missing"), 
std::string::npos);
+    }
+
+    _agg_function->destroy(place);
+}
+
 } // namespace doris::vectorized
diff --git a/be/test/ai/ai_function_test.cpp b/be/test/ai/ai_function_test.cpp
index 74a46240be8..14409781417 100644
--- a/be/test/ai/ai_function_test.cpp
+++ b/be/test/ai/ai_function_test.cpp
@@ -551,6 +551,35 @@ TEST(AIFunctionTest, MockResourceSendRequest) {
     ASSERT_EQ(val, "this is a mock response. test input");
 }
 
+TEST(AIFunctionTest, MissingAIResourcesMetadataTest) {
+    auto query_ctx = MockQueryContext::create();
+    TQueryOptions query_options;
+    TQueryGlobals query_globals;
+    RuntimeState runtime_state(TUniqueId(), 0, query_options, query_globals, 
nullptr,
+                               query_ctx.get());
+    auto ctx = FunctionContext::create_context(&runtime_state, {}, {});
+
+    std::vector<std::string> resources = {"resource_name"};
+    std::vector<std::string> texts = {"test"};
+    auto col_resource = ColumnHelper::create_column<DataTypeString>(resources);
+    auto col_text = ColumnHelper::create_column<DataTypeString>(texts);
+
+    Block block;
+    block.insert({std::move(col_resource), std::make_shared<DataTypeString>(), 
"resource"});
+    block.insert({std::move(col_text), std::make_shared<DataTypeString>(), 
"text"});
+    block.insert({nullptr, std::make_shared<DataTypeString>(), "result"});
+
+    ColumnNumbers arguments = {0, 1};
+    size_t result_idx = 2;
+
+    auto sentiment_func = FunctionAISentiment::create();
+    Status exec_status =
+            sentiment_func->execute_impl(ctx.get(), block, arguments, 
result_idx, texts.size());
+
+    ASSERT_FALSE(exec_status.ok());
+    ASSERT_NE(exec_status.to_string().find("AI resources metadata missing"), 
std::string::npos);
+}
+
 TEST(AIFunctionTest, ReturnTypeTest) {
     FunctionAIClassify func_classify;
     DataTypes args;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9d44f15384e..c0b9f2be457 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -19,8 +19,10 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.catalog.AIResource;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Pair;
@@ -89,6 +91,7 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
+import org.apache.doris.thrift.TAIResource;
 import org.apache.doris.thrift.TBrokerScanRange;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDescriptorTable;
@@ -3235,6 +3238,17 @@ public class Coordinator implements CoordInterface {
                     if (ignoreDataDistribution) {
                         params.setParallelInstances(parallelTasksNum);
                     }
+
+                    // Used for AI Functions
+                    Map<String, TAIResource> aiResourceMap = 
Maps.newLinkedHashMap();
+                    for (Resource resource : 
Env.getCurrentEnv().getResourceMgr()
+                                                    
.getResource(Resource.ResourceType.AI)) {
+                        if (resource instanceof AIResource) {
+                            aiResourceMap.put(resource.getName(), 
((AIResource) resource).toThrift());
+                        }
+                    }
+
+                    params.setAiResources(aiResourceMap);
                     res.put(instanceExecParam.host, params);
                     
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, 
Integer>());
                     
res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer, 
Integer>());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to