This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f96e22cd208 [Fix](ai) Fix _exec_plan_fragment_impl meet unknown error
when call AI_Functions (#58363)
f96e22cd208 is described below
commit f96e22cd2081ad7cae855bb562e36fea17d938b1
Author: linrrarity <[email protected]>
AuthorDate: Fri Nov 28 21:08:08 2025 +0800
[Fix](ai) Fix _exec_plan_fragment_impl meet unknown error when call
AI_Functions (#58363)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
When a query statement contains some Commands(e.g. `UPDATE`), it will
cause the AI function call to not go through `NereidsCoordinator` and
fallback to `Coordinator`. In this case, the FE will not send
`AI_Resources` to the BE, which will lead to errors in subsequent
queries and the error messages will not be clear.
This pr also replace every directly `throw Status` with `throw
Exception(Status...), so the errors can be surfaced as `Exception`, not
raw `Status`
```text
I20251114 18:00:45.502351 59053 fragment_mgr.cpp:716] query_id:
5c963987bf8340bc-a56b019c8b0b3300, coord_addr:
TNetworkAddress(hostname=172.17.6.136, port=9020), total fragment num on
current host: 1, fe process uuid: 1763114220687, query type: SELECT, report
audit fe:TNetworkAddress(hostname=172.17.6.136, port=9020), use
wg:1763112792749,normal
W20251114 18:00:45.528087 59053 status.h:438] meet error status:
[INTERNAL_ERROR]AI resources not found
0#
doris::vectorized::AIFunction<doris::vectorized::FunctionAITranslate>::_init_from_resource(doris::FunctionContext*,
doris::vectorized::Block const&, std::vector<unsigned int,
std::allocator<unsigned int> > const&, doris::TAIResource&,
std::shared_ptr<doris::vectorized::AIAdapter>&) at
/home/zcp/repo_center/doris_release/doris/be/src/runtime/query_context.h:268
1#
doris::vectorized::AIFunction<doris::vectorized::FunctionAITranslate>::execute_impl(doris::FunctionContext*,
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned
int> > const&, unsigned int, unsigned long) const at
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
2# non-virtual thunk to
doris::vectorized::AIFunction<doris::vectorized::FunctionAITranslate>::execute_impl(doris::FunctionContext*,
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned
int> > const&, unsigned int, unsigned long) const at
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/ai/ai_functions.h:0
3#
doris::vectorized::PreparedFunctionImpl::default_implementation_for_constant_arguments(doris::FunctionContext*,
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned
int> > const&, unsigned int, unsigned long, bool, bool*) const at
/home/zcp/repo_center/doris_release/doris/be/src/vec/common/cow.h:0
4#
doris::vectorized::PreparedFunctionImpl::execute_without_low_cardinality_columns(doris::FunctionContext*,
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned
int> > const&, unsigned int, unsigned long, bool) const at
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/function.cpp:0
5#
doris::vectorized::PreparedFunctionImpl::execute(doris::FunctionContext*,
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned
int> > const&, unsigned int, unsigned long, bool) const at
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/function.cpp:249
6#
doris::vectorized::IFunctionBase::execute(doris::FunctionContext*,
doris::vectorized::Block&, std::vector<unsigned int, std::allocator<unsigned
int> > const&, unsigned int, unsigned long, bool) const at
/home/zcp/repo_center/doris_release/doris/be/src/vec/functions/function.h:192
7#
doris::vectorized::VectorizedFnCall::_do_execute(doris::vectorized::VExprContext*,
doris::vectorized::Block*, int*, std::vector<unsigned int,
std::allocator<unsigned int> >&) at
/home/zcp/repo_center/doris_release/doris/be/src/vec/exprs/vectorized_fn_call.cpp:238
8#
doris::vectorized::VectorizedFnCall::execute(doris::vectorized::VExprContext*,
doris::vectorized::Block*, int*) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/stl_vector.h:375
9#
doris::vectorized::VExpr::get_const_col(doris::vectorized::VExprContext*,
std::shared_ptr<doris::ColumnPtrWrapper>*) at
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
10# doris::vectorized::VectorizedFnCall::open(doris::RuntimeState*,
doris::vectorized::VExprContext*, doris::FunctionContext::FunctionStateScope)
at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
11# doris::vectorized::VExprContext::open(doris::RuntimeState*) at
/home/zcp/repo_center/doris_release/doris/be/src/vec/exprs/vexpr_context.cpp:0
12#
doris::vectorized::VExpr::open(std::vector<std::shared_ptr<doris::vectorized::VExprContext>,
std::allocator<std::shared_ptr<doris::vectorized::VExprContext> > > const&,
doris::RuntimeState*) at
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
13#
doris::pipeline::UnionSourceOperatorX::prepare(doris::RuntimeState*) at
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
14# doris::pipeline::Pipeline::prepare(doris::RuntimeState*) at
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
15#
doris::pipeline::PipelineFragmentContext::prepare(doris::ThreadPool*) at
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
16#
doris::FragmentMgr::exec_plan_fragment(doris::TPipelineFragmentParams const&,
doris::QuerySource, std::function<void (doris::RuntimeState*, doris::Status*)>
const&, doris::TPipelineFragmentParamsList const&) at
/home/zcp/repo_center/doris_release/doris/be/src/runtime/fragment_mgr.cpp:0
17#
doris::FragmentMgr::exec_plan_fragment(doris::TPipelineFragmentParams const&,
doris::QuerySource, doris::TPipelineFragmentParamsList const&) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:245
18#
doris::PInternalService::_exec_plan_fragment_impl(std::__cxx11::basic_string<char,
std::char_traits<char>, std::allocator<char> > const&,
doris::PFragmentRequestVersion, bool, std::function<void (doris::RuntimeState*,
doris::Status*)> const&) at
/home/zcp/repo_center/doris_release/doris/be/src/common/status.h:524
19#
doris::PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcController*,
doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*,
google::protobuf::Closure*) at
/home/zcp/repo_center/doris_release/doris/be/src/service/internal_service.cpp:0
20# doris::WorkThreadPool<false>::work_thread(int) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/atomic_base.h:641
21# execute_native_thread_routine
22# start_thread
23# clone
I20251114 18:00:45.528275 59053 pipeline_fragment_context.cpp:139]
PipelineFragmentContext::~PipelineFragmentContext|query_id=5c963987bf8340bc-a56b019c8b0b3300|fragment_id=0
I20251114 18:00:45.528398 59053 query_context.cpp:240] Query
5c963987bf8340bc-a56b019c8b0b3300 deconstructed, mem_tracker:
W20251114 18:00:45.531440 59053 status.h:456] meet error status:
[INTERNAL_ERROR]_exec_plan_fragment_impl meet unknown error
0#
doris::PInternalService::_exec_plan_fragment_in_pthread(google::protobuf::RpcController*,
doris::PExecPlanFragmentRequest const*, doris::PExecPlanFragmentResult*,
google::protobuf::Closure*) at
/home/zcp/repo_center/doris_release/doris/be/src/service/internal_service.cpp:0
1# doris::WorkThreadPool<false>::work_thread(int) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/atomic_base.h:641
2# execute_native_thread_routine
3# start_thread
4# clone
W20251114 18:00:45.531484 59053 internal_service.cpp:351] exec plan
fragment failed, errmsg=[INTERNAL_ERROR]_exec_plan_fragment_impl meet unknown
error
```
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
.../segment_v2/ann_index/ann_index_reader.cpp | 8 +++---
be/src/runtime/query_context.h | 11 +++-----
be/src/util/quantile_state.h | 2 +-
.../aggregate_function_ai_agg.h | 11 +++++---
.../exprs/lambda_function/varray_sort_function.cpp | 4 +--
be/src/vec/functions/ai/ai_functions.h | 9 ++++---
be/test/ai/aggregate_function_ai_agg_test.cpp | 28 +++++++++++++++++++++
be/test/ai/ai_function_test.cpp | 29 ++++++++++++++++++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 14 +++++++++++
9 files changed, 96 insertions(+), 20 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp
b/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp
index 35880336580..f993804c072 100644
--- a/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/ann_index/ann_index_reader.cpp
@@ -125,8 +125,8 @@ Status AnnIndexReader::query(io::IOContext* io_ctx,
AnnTopNParam* param, AnnInde
stats->engine_convert_ns.update(index_search_result.engine_convert_ns);
stats->engine_prepare_ns.update(index_search_result.engine_prepare_ns);
} else {
- throw Status::NotSupported("Unsupported index type: {}",
- ann_index_type_to_string(_index_type));
+ throw Exception(Status::NotSupported("Unsupported index type: {}",
+
ann_index_type_to_string(_index_type)));
}
DCHECK(index_search_result.roaring != nullptr);
@@ -174,8 +174,8 @@ Status AnnIndexReader::range_search(const
AnnRangeSearchParams& params,
hnsw_param->bounded_queue = custom_params.hnsw_bounded_queue;
search_param = std::move(hnsw_param);
} else {
- throw Status::NotSupported("Unsupported index type: {}",
- ann_index_type_to_string(_index_type));
+ throw Exception(Status::NotSupported("Unsupported index type: {}",
+
ann_index_type_to_string(_index_type)));
}
search_param->is_le_or_lt = params.is_le_or_lt;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index ef028f20424..7fd9fc29e35 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -266,14 +266,11 @@ public:
void set_ai_resources(std::map<std::string, TAIResource> ai_resources) {
_ai_resources =
- std::make_unique<std::map<std::string,
TAIResource>>(std::move(ai_resources));
+ std::make_shared<std::map<std::string,
TAIResource>>(std::move(ai_resources));
}
- const std::map<std::string, TAIResource>& get_ai_resources() const {
- if (_ai_resources == nullptr) {
- throw Status::InternalError("AI resources not found");
- }
- return *_ai_resources;
+ const std::shared_ptr<std::map<std::string, TAIResource>>&
get_ai_resources() const {
+ return _ai_resources;
}
std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
@@ -366,7 +363,7 @@ private:
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_profile_map;
std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>>
_load_channel_profile_map;
- std::unique_ptr<std::map<std::string, TAIResource>> _ai_resources;
+ std::shared_ptr<std::map<std::string, TAIResource>> _ai_resources;
void _report_query_profile();
diff --git a/be/src/util/quantile_state.h b/be/src/util/quantile_state.h
index 2d46989fcc6..23e43860c34 100644
--- a/be/src/util/quantile_state.h
+++ b/be/src/util/quantile_state.h
@@ -64,7 +64,7 @@ public:
double get_explicit_value_by_percentile(float percentile) const;
#ifdef BE_TEST
std::string to_string() const {
- throw Status::NotSupported("QuantileState::to_string() not
implemented");
+ throw Exception(Status::NotSupported("QuantileState::to_string() not
implemented"));
}
#endif
~QuantileState() = default;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h
b/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h
index 89ac8828ba1..d27ceb41312 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_ai_agg.h
@@ -141,9 +141,14 @@ public:
_task = task_ref.to_string();
std::string resource_name = resource_name_ref.to_string();
- const std::map<std::string, TAIResource>& ai_resources =
_ctx->get_ai_resources();
- auto it = ai_resources.find(resource_name);
- if (it == ai_resources.end()) {
+ const std::shared_ptr<std::map<std::string, TAIResource>>&
ai_resources =
+ _ctx->get_ai_resources();
+ if (!ai_resources) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "AI resources metadata missing in
QueryContext");
+ }
+ auto it = ai_resources->find(resource_name);
+ if (it == ai_resources->end()) {
throw Exception(ErrorCode::NOT_FOUND, "AI resource not found:
" + resource_name);
}
_ai_config = it->second;
diff --git a/be/src/vec/exprs/lambda_function/varray_sort_function.cpp
b/be/src/vec/exprs/lambda_function/varray_sort_function.cpp
index 945bb33d95a..1cffdcd5ea7 100644
--- a/be/src/vec/exprs/lambda_function/varray_sort_function.cpp
+++ b/be/src/vec/exprs/lambda_function/varray_sort_function.cpp
@@ -185,9 +185,9 @@ public:
auto status =
children[0]->execute(context,
&lambda_block, &lambda_res_id);
if (!status.ok()) [[unlikely]] {
- throw Status::InternalError(
+ throw Exception(Status::InternalError(
"when execute array_sort lambda
function: {}",
- status.to_string());
+ status.to_string()));
}
// raw_res_col maybe columnVector or ColumnConst
diff --git a/be/src/vec/functions/ai/ai_functions.h
b/be/src/vec/functions/ai/ai_functions.h
index c31a7659614..01efe65f938 100644
--- a/be/src/vec/functions/ai/ai_functions.h
+++ b/be/src/vec/functions/ai/ai_functions.h
@@ -190,10 +190,13 @@ private:
StringRef resource_name_ref = resource_column.column->get_data_at(0);
std::string resource_name = std::string(resource_name_ref.data,
resource_name_ref.size);
- const std::map<std::string, TAIResource>& ai_resources =
+ const std::shared_ptr<std::map<std::string, TAIResource>>&
ai_resources =
context->state()->get_query_ctx()->get_ai_resources();
- auto it = ai_resources.find(resource_name);
- if (it == ai_resources.end()) {
+ if (!ai_resources) {
+ return Status::InternalError("AI resources metadata missing in
QueryContext");
+ }
+ auto it = ai_resources->find(resource_name);
+ if (it == ai_resources->end()) {
return Status::InvalidArgument("AI resource not found: " +
resource_name);
}
config = it->second;
diff --git a/be/test/ai/aggregate_function_ai_agg_test.cpp
b/be/test/ai/aggregate_function_ai_agg_test.cpp
index ff7580552ea..911da8583e8 100644
--- a/be/test/ai/aggregate_function_ai_agg_test.cpp
+++ b/be/test/ai/aggregate_function_ai_agg_test.cpp
@@ -413,4 +413,32 @@ TEST_F(AggregateFunctionAIAggTest,
mock_resource_send_request_test) {
_agg_function->destroy(place);
}
+TEST_F(AggregateFunctionAIAggTest, missing_ai_resources_metadata_test) {
+ auto empty_query_ctx = MockQueryContext::create();
+ _agg_function->set_query_context(empty_query_ctx.get());
+
+ std::vector<std::string> resources = {"resource_name"};
+ std::vector<std::string> texts = {"test input"};
+ std::vector<std::string> task = {"summarize"};
+ auto col_resource = ColumnHelper::create_column<DataTypeString>(resources);
+ auto col_text = ColumnHelper::create_column<DataTypeString>(texts);
+ auto col_task = ColumnHelper::create_column<DataTypeString>(task);
+
+ std::unique_ptr<char[]> memory(new char[_agg_function->size_of_data()]);
+ AggregateDataPtr place = memory.get();
+ _agg_function->create(place);
+
+ const IColumn* columns[3] = {col_resource.get(), col_text.get(),
col_task.get()};
+
+ try {
+ _agg_function->add(place, columns, 0, _arena);
+ FAIL() << "Expected exception for missing AI resources";
+ } catch (const Exception& e) {
+ EXPECT_EQ(e.code(), ErrorCode::INTERNAL_ERROR);
+ EXPECT_NE(e.to_string().find("AI resources metadata missing"),
std::string::npos);
+ }
+
+ _agg_function->destroy(place);
+}
+
} // namespace doris::vectorized
diff --git a/be/test/ai/ai_function_test.cpp b/be/test/ai/ai_function_test.cpp
index 74a46240be8..14409781417 100644
--- a/be/test/ai/ai_function_test.cpp
+++ b/be/test/ai/ai_function_test.cpp
@@ -551,6 +551,35 @@ TEST(AIFunctionTest, MockResourceSendRequest) {
ASSERT_EQ(val, "this is a mock response. test input");
}
+TEST(AIFunctionTest, MissingAIResourcesMetadataTest) {
+ auto query_ctx = MockQueryContext::create();
+ TQueryOptions query_options;
+ TQueryGlobals query_globals;
+ RuntimeState runtime_state(TUniqueId(), 0, query_options, query_globals,
nullptr,
+ query_ctx.get());
+ auto ctx = FunctionContext::create_context(&runtime_state, {}, {});
+
+ std::vector<std::string> resources = {"resource_name"};
+ std::vector<std::string> texts = {"test"};
+ auto col_resource = ColumnHelper::create_column<DataTypeString>(resources);
+ auto col_text = ColumnHelper::create_column<DataTypeString>(texts);
+
+ Block block;
+ block.insert({std::move(col_resource), std::make_shared<DataTypeString>(),
"resource"});
+ block.insert({std::move(col_text), std::make_shared<DataTypeString>(),
"text"});
+ block.insert({nullptr, std::make_shared<DataTypeString>(), "result"});
+
+ ColumnNumbers arguments = {0, 1};
+ size_t result_idx = 2;
+
+ auto sentiment_func = FunctionAISentiment::create();
+ Status exec_status =
+ sentiment_func->execute_impl(ctx.get(), block, arguments,
result_idx, texts.size());
+
+ ASSERT_FALSE(exec_status.ok());
+ ASSERT_NE(exec_status.to_string().find("AI resources metadata missing"),
std::string::npos);
+}
+
TEST(AIFunctionTest, ReturnTypeTest) {
FunctionAIClassify func_classify;
DataTypes args;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9d44f15384e..c0b9f2be457 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -19,8 +19,10 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.catalog.AIResource;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.Resource;
import org.apache.doris.common.Config;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
@@ -89,6 +91,7 @@ import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
+import org.apache.doris.thrift.TAIResource;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDescriptorTable;
@@ -3235,6 +3238,17 @@ public class Coordinator implements CoordInterface {
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
}
+
+ // Used for AI Functions
+ Map<String, TAIResource> aiResourceMap =
Maps.newLinkedHashMap();
+ for (Resource resource :
Env.getCurrentEnv().getResourceMgr()
+
.getResource(Resource.ResourceType.AI)) {
+ if (resource instanceof AIResource) {
+ aiResourceMap.put(resource.getName(),
((AIResource) resource).toThrift());
+ }
+ }
+
+ params.setAiResources(aiResourceMap);
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer,
Integer>());
res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer,
Integer>());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]