This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f7bc15f9811 [fix](arrow-flight-sql) Fix FE not found arrow flight
schema (#43960)
f7bc15f9811 is described below
commit f7bc15f9811668cfb8085aa97632337c4ab7a286
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Nov 18 11:08:36 2024 +0800
[fix](arrow-flight-sql) Fix FE not found arrow flight schema (#43960)
### What problem does this PR solve?
Problem Summary:
After query first phase `exec_plan_fragment`, FE will fetches arrow
schema to BE, but BE will generate arrow schema when query second stage
`ResultSinkLocalState::open`.
Therefore, this pr is changed to generate arrow schema in the first
phase `ResultSinkLocalState::init`.
Fix:
```
rrmsg: Status [errorCode=NOT_FOUND, errorMsg=(172.16.212.191)[NOT_FOUND]FE
not found arrow flight schema, maybe query has been canceled], error code:
null, error msg:
java.lang.RuntimeException: fetch arrow flight schema failed, finstId:
3573efbeb10c44a7-956531d8e15d1630, errmsg: Status [errorCode=NOT_FOUND,
errorMsg=(172.16.212.191)[NOT_FOUND]FE not found arrow flight schema, maybe
query has been canceled]
at
org.apache.doris.service.arrowflight.FlightSqlConnectProcessor.fetchArrowFlightSchema(FlightSqlConnectProcessor.java:126)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.service.arrowflight.DorisFlightSqlProducer.executeQueryStatement(DorisFlightSqlProducer.java:229)
~[doris-fe.jar:1.2-SNAPSHOT]
at
org.apache.doris.service.arrowflight.DorisFlightSqlProducer.getFlightInfoStatement(DorisFlightSqlProducer.java:260)
~[doris-fe.jar:1.2-SNAPSHOT]
```
---
be/src/pipeline/exec/result_sink_operator.cpp | 21 ++++++++++++---------
be/src/service/internal_service.cpp | 1 +
2 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index a3f1133f00e..f8196910021 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -46,14 +46,25 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_profile, timer_name, 1);
auto fragment_instance_id = state->fragment_instance_id();
+ auto& p = _parent->cast<ResultSinkOperatorX>();
if (state->query_options().enable_parallel_result_sink) {
_sender = _parent->cast<ResultSinkOperatorX>()._sender;
} else {
- auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
fragment_instance_id, p._result_sink_buffer_size_rows,
&_sender, state));
}
_sender->set_dependency(fragment_instance_id,
_dependency->shared_from_this());
+
+ _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
+ for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
+ }
+ if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+ std::shared_ptr<arrow::Schema> arrow_schema;
+ RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs,
&arrow_schema,
+ state->timezone()));
+ _sender->register_arrow_schema(arrow_schema);
+ }
return Status::OK();
}
@@ -62,10 +73,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ResultSinkOperatorX>();
- _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
- for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
- RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
- }
// create writer based on sink type
switch (p._sink_type) {
case TResultSinkType::MYSQL_PROTOCAL: {
@@ -79,10 +86,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
break;
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
- std::shared_ptr<arrow::Schema> arrow_schema;
- RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs,
&arrow_schema,
- state->timezone()));
- _sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile));
break;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 29eb01bad2a..be99278ab54 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -903,6 +903,7 @@ void
PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
UniqueId(request->finst_id()).to_thrift(), &schema);
if (!st.ok()) {
+ LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]