This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new f3366a8243 (brpc) using pooled connection and enlarge brpc connection
timeout and retry times (#12458)
f3366a8243 is described below
commit f3366a82435a83ef766c8a8540d4c07bb34d8c6a
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 8 14:39:30 2022 +0800
(brpc) using pooled connection and enlarge brpc connection timeout and
retry times (#12458)
When a connection failure happen, doris fails queries using the connection.
We should lower the impact of a connection failure by using pooled
connection
and enlaring connection timeout and retry times.
---
be/src/util/brpc_client_cache.h | 5 ++++-
be/src/vec/sink/vdata_stream_sender.h | 6 ++++--
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index aef96b6ec6..8cd7e11b13 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -79,7 +79,7 @@ public:
std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
const std::string& protocol =
"baidu_std",
- const std::string& connect_type
= "") {
+ const std::string& connect_type
= "pooled") {
brpc::ChannelOptions options;
if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
options.protocol = config::function_service_protocol;
@@ -89,6 +89,9 @@ public:
if (connect_type != "") {
options.connection_type = connect_type;
}
+ options.connect_timeout_ms = 2000;
+ options.max_retry = 10;
+
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
int ret_code = 0;
if (host_port.find("://") == std::string::npos) {
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 94aefeda99..154c8c667a 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -241,8 +241,10 @@ private:
brpc::Join(call_id);
if (cntl->Failed()) {
std::string err = fmt::format(
- "failed to send brpc batch, error={}, error_text={},
client: {}",
- berror(cntl->ErrorCode()), cntl->ErrorText(),
BackendOptions::get_localhost());
+ "failed to send brpc batch, error={}, error_text={},
client: {}, "
+ "latency = {}",
+ berror(cntl->ErrorCode()), cntl->ErrorText(),
BackendOptions::get_localhost(),
+ cntl->latency_us());
LOG(WARNING) << err;
return Status::ThriftRpcError(err);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]