This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 83208ee1a88 [pick](branch-2.1) pick #43960 #43929 #44177 (#44240)
83208ee1a88 is described below
commit 83208ee1a8815ca95a9531853141b920c4992a6e
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Nov 19 17:25:16 2024 +0800
[pick](branch-2.1) pick #43960 #43929 #44177 (#44240)
pick #43960 #43929 #44177
---
be/src/common/config.cpp | 25 +++++++++++++++++--
be/src/common/config.h | 28 ++++++++++++++++++----
be/src/pipeline/exec/result_sink_operator.cpp | 19 ++++++++-------
be/src/service/internal_service.cpp | 9 ++++---
.../data_types/serde/data_type_number_serde.cpp | 21 +++++++++++-----
.../arrowflight/DorisFlightSqlProducer.java | 11 +++++++--
.../arrowflight/FlightSqlConnectProcessor.java | 8 ++++---
7 files changed, 92 insertions(+), 29 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index d5b67c2c128..c41f19a3e27 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -63,8 +63,29 @@ DEFINE_Int32(brpc_port, "8060");
DEFINE_Int32(arrow_flight_sql_port, "-1");
-DEFINE_mString(public_access_ip, "");
-DEFINE_Int32(public_access_port, "-1");
+// If the external client cannot directly access priority_networks, set
public_host to be accessible
+// to external client.
+// There are usually two usage scenarios:
+// 1. in production environment, it is often inconvenient to expose Doris BE
nodes to the external network.
+// However, a reverse proxy (such as Nginx) can be added to all Doris BE
nodes, and the external client will be
+// randomly routed to a Doris BE node when connecting to Nginx. set
public_host to the host of Nginx.
+// 2. if priority_networks is an internal network IP, and BE node has its own
independent external IP,
+// but Doris currently does not support modifying priority_networks, setting
public_host to the real external IP.
+DEFINE_mString(public_host, "");
+
+// If the BE node is connected to the external network through a reverse proxy
like Nginx
+// and need to use Arrow Flight SQL, should add a server in Nginx to reverse
proxy
+// `Nginx:arrow_flight_sql_proxy_port` to
`BE_priority_networks:arrow_flight_sql_port`. For example:
+// upstream arrowflight {
+// server 10.16.10.8:8069;
+// server 10.16.10.8:8068;
+//}
+// server {
+// listen 8167 http2;
+// listen [::]:8167 http2;
+// server_name doris.arrowflight.com;
+// }
+DEFINE_Int32(arrow_flight_sql_proxy_port, "-1");
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
diff --git a/be/src/common/config.h b/be/src/common/config.h
index aca5b6b829a..7693af0f7ae 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);
-// If priority_networks is incorrect but cannot be modified, set
public_access_ip as BE’s real IP.
-// For ADBC client fetch result, default is empty, the ADBC client uses the
backend ip to fetch the result.
-// If ADBC client cannot access the backend ip, can set public_access_ip to
modify the fetch result ip.
-DECLARE_mString(public_access_ip);
-DECLARE_Int32(public_access_port);
+// If the external client cannot directly access priority_networks, set
public_host to be accessible
+// to external client.
+// There are usually two usage scenarios:
+// 1. in production environment, it is often inconvenient to expose Doris BE
nodes to the external network.
+// However, a reverse proxy (such as Nginx) can be added to all Doris BE
nodes, and the external client will be
+// randomly routed to a Doris BE node when connecting to Nginx. set
public_host to the host of Nginx.
+// 2. if priority_networks is an internal network IP, and BE node has its own
independent external IP,
+// but Doris currently does not support modifying priority_networks, setting
public_host to the real external IP.
+DECLARE_mString(public_host);
+
+// If the BE node is connected to the external network through a reverse proxy
like Nginx
+// and need to use Arrow Flight SQL, should add a server in Nginx to reverse
proxy
+// `Nginx:arrow_flight_sql_proxy_port` to
`BE_priority_networks:arrow_flight_sql_port`. For example:
+// upstream arrowflight {
+// server 10.16.10.8:8069;
+// server 10.16.10.8:8068;
+//}
+// server {
+// listen 8167 http2;
+// listen [::]:8167 http2;
+// server_name doris.arrowflight.com;
+// }
+DECLARE_Int32(arrow_flight_sql_proxy_port);
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index d2dfa89cdd6..0ef1164b2e5 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -71,6 +71,17 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), p._result_sink_buffer_size_rows,
&_sender, true, state));
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
+
+ _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
+ for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
+ }
+ if (p._sink_type == TResultSinkType::ARROW_FLIGHT_PROTOCAL) {
+ std::shared_ptr<arrow::Schema> arrow_schema;
+ RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs,
&arrow_schema,
+ state->timezone()));
+ _sender->register_arrow_schema(arrow_schema);
+ }
return Status::OK();
}
@@ -79,10 +90,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ResultSinkOperatorX>();
- _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
- for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
- RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
- }
// create writer based on sink type
switch (p._sink_type) {
case TResultSinkType::MYSQL_PROTOCAL: {
@@ -96,10 +103,6 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
break;
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
- std::shared_ptr<arrow::Schema> arrow_schema;
- RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs,
&arrow_schema,
- state->timezone()));
- _sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
_sender.get(), _output_vexpr_ctxs, _profile));
break;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index a82ab9988b1..701fc6c018d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -923,6 +923,7 @@ void
PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
UniqueId(request->finst_id()).to_thrift(), &schema);
if (!st.ok()) {
+ LOG(WARNING) << "fetch arrow flight schema failed, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
@@ -931,9 +932,11 @@ void
PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
st = serialize_arrow_schema(&schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
- if (!config::public_access_ip.empty() &&
config::public_access_port != -1) {
- result->set_be_arrow_flight_ip(config::public_access_ip);
- result->set_be_arrow_flight_port(config::public_access_port);
+ if (!config::public_host.empty()) {
+ result->set_be_arrow_flight_ip(config::public_host);
+ }
+ if (config::arrow_flight_sql_proxy_port != -1) {
+
result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port);
}
}
st.to_protobuf(result->mutable_status());
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index efa41e346bf..f4fb6bbbb1f 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -78,12 +78,21 @@ void DataTypeNumberSerDe<T>::write_column_to_arrow(const
IColumn& column, const
auto arrow_null_map = revert_null_map(null_map, start, end);
auto arrow_null_map_data = arrow_null_map.empty() ? nullptr :
arrow_null_map.data();
if constexpr (std::is_same_v<T, UInt8>) {
- ARROW_BUILDER_TYPE& builder =
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
- checkArrowStatus(
- builder.AppendValues(reinterpret_cast<const
uint8_t*>(col_data.data() + start),
- end - start,
- reinterpret_cast<const
uint8_t*>(arrow_null_map_data)),
- column.get_name(), array_builder->type()->name());
+ auto* null_builder = dynamic_cast<arrow::NullBuilder*>(array_builder);
+ if (null_builder) {
+ for (size_t i = start; i < end; ++i) {
+ checkArrowStatus(null_builder->AppendNull(), column.get_name(),
+ null_builder->type()->name());
+ }
+ } else {
+ ARROW_BUILDER_TYPE& builder =
assert_cast<ARROW_BUILDER_TYPE&>(*array_builder);
+ checkArrowStatus(
+ builder.AppendValues(reinterpret_cast<const
uint8_t*>(col_data.data() + start),
+ end - start,
+ reinterpret_cast<const
uint8_t*>(arrow_null_map_data)),
+ column.get_name(), array_builder->type()->name());
+ }
+
} else if constexpr (std::is_same_v<T, Int128>) {
auto& string_builder =
assert_cast<arrow::StringBuilder&>(*array_builder);
for (size_t i = start; i < end; ++i) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 6f45f3faac7..758f30469bf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -249,8 +249,15 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
// The query results of Arrow Flight SQL will be
randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly
routed by nginx,
// data forwarding needs to be done inside the Doris
BE node.
- location =
Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname,
-
flightSQLConnectProcessor.getPublicAccessAddr().port);
+ if
(flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
+ location = Location.forGrpcInsecure(
+
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
+
flightSQLConnectProcessor.getPublicAccessAddr().port);
+ } else {
+ location = Location.forGrpcInsecure(
+
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
+
connectContext.getResultFlightServerAddr().port);
+ }
} else {
location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 6724065f99a..20b377eb5c3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -131,9 +131,11 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
throw new RuntimeException(String.format("fetch arrow flight
schema failed, queryId: %s, errmsg: %s",
DebugUtil.printId(tid), resultStatus));
}
- if (pResult.hasBeArrowFlightIp() &&
pResult.hasBeArrowFlightPort()) {
- publicAccessAddr.hostname =
pResult.getBeArrowFlightIp().toStringUtf8();
- publicAccessAddr.port = pResult.getBeArrowFlightPort();
+ if (pResult.hasBeArrowFlightIp()) {
+
publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
+ }
+ if (pResult.hasBeArrowFlightPort()) {
+ publicAccessAddr.setPort(pResult.getBeArrowFlightPort());
}
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new
RootAllocator(Integer.MAX_VALUE);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]