This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cb7bed8f07e [fix](arrow-flight-sql) Fix arrow flight result sink
(#36827)
cb7bed8f07e is described below
commit cb7bed8f07eb847f6a59b102192a30b9316c47c0
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Jun 26 14:14:11 2024 +0800
[fix](arrow-flight-sql) Fix arrow flight result sink (#36827)
1. get arrow flight result schema use query id instead of instance id.
2. get arrow flight result is a sync method, need wait for data ready
and return result, introduced by #36035 36667.
TODO, waiting for data will block pipeline, so use a request pool to
save requests waiting for data.
---
be/src/pipeline/exec/result_sink_operator.cpp | 3 +--
be/src/runtime/buffer_control_block.cpp | 13 ++++++++-----
be/src/runtime/buffer_control_block.h | 4 ++++
.../arrowflight/DorisFlightSqlProducer.java | 2 +-
.../arrowflight/FlightSqlConnectProcessor.java | 22 +++++++++++-----------
5 files changed, 25 insertions(+), 19 deletions(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 378fea18eea..0495e48b7dc 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -80,8 +80,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs,
&arrow_schema));
-
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
- arrow_schema);
+
state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(),
arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
break;
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index a1a83b22840..845afb9a84b 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -151,10 +151,6 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState*
state,
int num_rows = result->num_rows();
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
- }
-
// TODO: merge RocordBatch, ToStructArray -> Make again
_arrow_flight_batch_queue.push_back(std::move(result));
@@ -162,6 +158,7 @@ Status BufferControlBlock::add_arrow_batch(RuntimeState*
state,
_instance_rows_in_queue.emplace_back();
_instance_rows[state->fragment_instance_id()] += num_rows;
_instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows;
+ _arrow_data_arrival.notify_one();
_update_dependency();
return Status::OK();
}
@@ -212,6 +209,10 @@ Status
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
return Status::Cancelled("Cancelled");
}
+ while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
+ _arrow_data_arrival.wait_for(l, std::chrono::seconds(1));
+ }
+
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}
@@ -234,7 +235,7 @@ Status
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
_update_dependency();
return Status::OK();
}
- return Status::InternalError("Abnormal Ending");
+ return Status::InternalError("Get Arrow Batch Abnormal Ending");
}
Status BufferControlBlock::close(const TUniqueId& id, Status exec_status) {
@@ -250,6 +251,7 @@ Status BufferControlBlock::close(const TUniqueId& id,
Status exec_status) {
_is_close = true;
_status = exec_status;
+ _arrow_data_arrival.notify_all();
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
@@ -269,6 +271,7 @@ Status BufferControlBlock::close(const TUniqueId& id,
Status exec_status) {
void BufferControlBlock::cancel() {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
+ _arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(Status::Cancelled("Cancelled"));
}
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 12cbc72ff52..d8bb6e0f506 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -124,6 +124,10 @@ protected:
// protects all subsequent data in this block
std::mutex _lock;
+ // get arrow flight result is a sync method, need wait for data ready and
return result.
+ // TODO, waiting for data will block pipeline, so use a request pool to
save requests waiting for data.
+ std::condition_variable _arrow_data_arrival;
+
std::deque<GetResultBatchCtx*> _waiting_rpc;
// only used for FE using return rows to check limit
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 2c7aaae4f2a..af6d85c954e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -225,7 +225,7 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
} else {
// Now only query stmt will pull results from BE.
final ByteString handle = ByteString.copyFromUtf8(
- DebugUtil.printId(connectContext.getFinstId()) + ":" +
query);
+ DebugUtil.printId(connectContext.queryId()) + ":" +
query);
Schema schema =
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow
flight schema is null").toRuntimeException();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index a4aa5a88c8f..f91d63ed90d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -102,13 +102,13 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
public Schema fetchArrowFlightSchema(int timeoutMs) {
TNetworkAddress address = ctx.getResultInternalServiceAddr();
- TUniqueId tid = ctx.getFinstId();
+ TUniqueId tid = ctx.queryId();
ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
- Types.PUniqueId finstId =
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
+ Types.PUniqueId queryId =
Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
try {
InternalService.PFetchArrowFlightSchemaRequest request =
InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
- .setFinstId(finstId)
+ .setFinstId(queryId)
.build();
Future<InternalService.PFetchArrowFlightSchemaResult> future
@@ -116,12 +116,12 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
InternalService.PFetchArrowFlightSchemaResult pResult;
pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (pResult == null) {
- throw new RuntimeException(String.format("fetch arrow flight
schema timeout, finstId: %s",
+ throw new RuntimeException(String.format("fetch arrow flight
schema timeout, queryId: %s",
DebugUtil.printId(tid)));
}
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
- throw new RuntimeException(String.format("fetch arrow flight
schema failed, finstId: %s, errmsg: %s",
+ throw new RuntimeException(String.format("fetch arrow flight
schema failed, queryId: %s, errmsg: %s",
DebugUtil.printId(tid), resultStatus.toString()));
}
if (pResult.hasBeArrowFlightIp()) {
@@ -138,7 +138,7 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
List<FieldVector> fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != resultOutputExprs.size()) {
throw new RuntimeException(String.format(
- "Schema size %s' is not equal to arrow field
size %s, finstId: %s.",
+ "Schema size %s' is not equal to arrow field
size %s, queryId: %s.",
fieldVectors.size(), resultOutputExprs.size(),
DebugUtil.printId(tid)));
}
return root.getSchema();
@@ -146,24 +146,24 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
throw new RuntimeException("Read Arrow Flight Schema
failed.", e);
}
} else {
- throw new RuntimeException(String.format("get empty arrow
flight schema, finstId: %s",
+ throw new RuntimeException(String.format("get empty arrow
flight schema, queryId: %s",
DebugUtil.printId(tid)));
}
} catch (RpcException e) {
throw new RuntimeException(String.format(
- "arrow flight schema fetch catch rpc exception, finstId:
%s,backend: %s",
+ "arrow flight schema fetch catch rpc exception, queryId:
%s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (InterruptedException e) {
throw new RuntimeException(String.format(
- "arrow flight schema future get interrupted exception,
finstId: %s,backend: %s",
+ "arrow flight schema future get interrupted exception,
queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
- "arrow flight schema future get execution exception,
finstId: %s,backend: %s",
+ "arrow flight schema future get execution exception,
queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format(
- "arrow flight schema fetch timeout, finstId: %s,backend:
%s",
+ "arrow flight schema fetch timeout, queryId: %s,backend:
%s",
DebugUtil.printId(tid), address), e);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]