This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new faf2d9097c use pooled stub to call rpc on be instead of one stub
(#12459)
faf2d9097c is described below
commit faf2d9097cd560125e390d76899cc1ef1b3e3a77
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 8 14:39:16 2022 +0800
use pooled stub to call rpc on be instead of one stub (#12459)
* use pooled stub to call rpc on be instead of one stub
A channel is closed when a timedout or exception happens, if only
one stub is used, then all query would fail.
If we dont close the channel, sometimes grpc-java stuck without sending
any rpc.
* enable retry on grpc to be and keep connection without call
---
.../org/apache/doris/rpc/BackendServiceClient.java | 3 ++-
.../org/apache/doris/rpc/BackendServiceProxy.java | 19 ++++++++++++++++---
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 82bd1508bd..704190c7c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -34,7 +34,7 @@ import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
public class BackendServiceClient {
public static final Logger LOG =
LogManager.getLogger(BackendServiceClient.class);
- private static final int MAX_RETRY_NUM = 0;
+ private static final int MAX_RETRY_NUM = 10;
private final TNetworkAddress address;
private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
@@ -44,6 +44,7 @@ public class BackendServiceClient {
this.address = address;
channel = NettyChannelBuilder.forAddress(address.getHostname(),
address.getPort())
.flowControlWindow(Config.grpc_max_message_size_bytes)
+ .keepAliveWithoutCalls(true)
.maxInboundMessageSize(Config.grpc_max_message_size_bytes).enableRetry().maxRetryAttempts(MAX_RETRY_NUM)
.usePlaintext().build();
stub = PBackendServiceGrpc.newFutureStub(channel);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 95148aa8e1..f9f8a83ce9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -38,6 +38,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
public class BackendServiceProxy {
@@ -51,12 +52,24 @@ public class BackendServiceProxy {
serviceMap = Maps.newConcurrentMap();
}
- private static class SingletonHolder {
- private static final BackendServiceProxy INSTANCE = new
BackendServiceProxy();
+ private static class Holder {
+ private static final int PROXY_NUM = 20;
+ private static BackendServiceProxy[] proxies = new
BackendServiceProxy[PROXY_NUM];
+ private static AtomicInteger count = new AtomicInteger();
+
+ static {
+ for (int i = 0; i < proxies.length; i++) {
+ proxies[i] = new BackendServiceProxy();
+ }
+ }
+
+ static BackendServiceProxy get() {
+ return proxies[count.addAndGet(1) % PROXY_NUM];
+ }
}
public static BackendServiceProxy getInstance() {
- return SingletonHolder.INSTANCE;
+ return Holder.get();
}
public void removeProxy(TNetworkAddress address) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]