This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d11604cf4 RATIS-2325. Create GrpcStubPool for GrpcServerProtocolClient
(#1283)
d11604cf4 is described below
commit d11604cf4d920a47ae3ebcd66c2c088ffe234264
Author: Symious <[email protected]>
AuthorDate: Sat Sep 20 09:38:36 2025 +0800
RATIS-2325. Create GrpcStubPool for GrpcServerProtocolClient (#1283)
---
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 9 ++
.../grpc/server/GrpcServerProtocolClient.java | 38 +++++-
.../apache/ratis/grpc/server/GrpcServicesImpl.java | 4 +-
.../org/apache/ratis/grpc/server/GrpcStubPool.java | 132 +++++++++++++++++++++
4 files changed, 179 insertions(+), 4 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index f21a9b99f..cef62779d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -282,6 +282,15 @@ public interface GrpcConfigKeys {
static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
}
+
+ String STUB_POOL_SIZE_KEY = PREFIX + ".stub.pool.size";
+ int STUB_POOL_SIZE_DEFAULT = 10;
+ static int stubPoolSize(RaftProperties properties) {
+ return get(properties::getInt, STUB_POOL_SIZE_KEY,
STUB_POOL_SIZE_DEFAULT, getDefaultLog());
+ }
+ static void setStubPoolSize(RaftProperties properties, int size) {
+ setInt(properties::setInt, STUB_POOL_SIZE_KEY, size);
+ }
}
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 4a280ab33..2e936bb0b 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -47,6 +47,7 @@ import java.io.Closeable;
public class GrpcServerProtocolClient implements Closeable {
// Common channel
private final ManagedChannel channel;
+ private final GrpcStubPool<RaftServerProtocolServiceStub> pool;
// Channel and stub for heartbeat
private ManagedChannel hbChannel;
private RaftServerProtocolServiceStub hbAsyncStub;
@@ -59,7 +60,7 @@ public class GrpcServerProtocolClient implements Closeable {
//visible for using in log / error messages AND to use in instrumented tests
private final RaftPeerId raftPeerId;
- public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
+ public GrpcServerProtocolClient(RaftPeer target, int connections, int
flowControlWindow,
TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean
separateHBChannel) {
raftPeerId = target.getId();
LOG.info("Build channel for {}", target);
@@ -72,6 +73,8 @@ public class GrpcServerProtocolClient implements Closeable {
hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel);
}
requestTimeoutDuration = requestTimeout;
+ this.pool = new GrpcStubPool<RaftServerProtocolServiceStub>(target,
connections,
+ ch -> RaftServerProtocolServiceGrpc.newStub(ch), tlsConfig);
}
private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow,
@@ -107,6 +110,7 @@ public class GrpcServerProtocolClient implements Closeable {
GrpcUtil.shutdownManagedChannel(hbChannel);
}
GrpcUtil.shutdownManagedChannel(channel);
+ pool.close();
}
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
@@ -125,8 +129,36 @@ public class GrpcServerProtocolClient implements Closeable
{
}
void readIndex(ReadIndexRequestProto request,
StreamObserver<ReadIndexReplyProto> s) {
- asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(),
requestTimeoutDuration.getUnit())
- .readIndex(request, s);
+ GrpcStubPool.PooledStub<RaftServerProtocolServiceStub> p;
+ try {
+ p = pool.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ s.onError(e); return;
+ }
+ p.getStub().withDeadlineAfter(requestTimeoutDuration.getDuration(),
requestTimeoutDuration.getUnit())
+ .readIndex(request, new StreamObserver<ReadIndexReplyProto>() {
+ @Override
+ public void onNext(ReadIndexReplyProto v) {
+ s.onNext(v);
+ }
+ @Override
+ public void onError(Throwable t) {
+ try {
+ s.onError(t);
+ } finally {
+ p.release();
+ }
+ }
+ @Override
+ public void onCompleted() {
+ try {
+ s.onCompleted();
+ } finally {
+ p.release();
+ }
+ }
+ });
}
CallStreamObserver<AppendEntriesRequestProto> appendEntries(
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index 853a420d1..b686be0a2 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -113,6 +113,7 @@ public final class GrpcServicesImpl
private String serverHost;
private int serverPort;
private GrpcTlsConfig serverTlsConfig;
+ private int serverStubPoolSize;
private SizeInBytes messageSizeMax;
private SizeInBytes flowControlWindow;
@@ -135,6 +136,7 @@ public final class GrpcServicesImpl
this.flowControlWindow = GrpcConfigKeys.flowControlWindow(properties,
LOG::info);
this.requestTimeoutDuration =
RaftServerConfigKeys.Rpc.requestTimeout(properties);
this.separateHeartbeatChannel =
GrpcConfigKeys.Server.heartbeatChannel(properties);
+ this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties);
final SizeInBytes appenderBufferSize =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final SizeInBytes gap = SizeInBytes.ONE_MB;
@@ -155,7 +157,7 @@ public final class GrpcServicesImpl
}
private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer
target) {
- return new GrpcServerProtocolClient(target,
flowControlWindow.getSizeInt(),
+ return new GrpcServerProtocolClient(target, serverStubPoolSize,
flowControlWindow.getSizeInt(),
requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
new file mode 100644
index 000000000..fcfb0f1b8
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
+import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+final class GrpcStubPool<S extends AbstractStub<S>> {
+ public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class);
+
+ static final class PooledStub<S extends AbstractStub<S>> {
+ private final ManagedChannel ch;
+ private final S stub;
+ private final Semaphore permits;
+
+ PooledStub(ManagedChannel ch, S stub, int maxInflight) {
+ this.ch = ch;
+ this.stub = stub;
+ this.permits = new Semaphore(maxInflight);
+ }
+
+ S getStub() {
+ return stub;
+ }
+
+ void release() {
+ permits.release();
+ }
+ }
+
+ private final List<PooledStub<S>> pool;
+ private final AtomicInteger rr = new AtomicInteger();
+ private final NioEventLoopGroup elg;
+ private final int size;
+
+ GrpcStubPool(RaftPeer target, int n, Function<ManagedChannel, S>
stubFactory, GrpcTlsConfig tlsConfig) {
+ this(target, n, stubFactory, tlsConfig, Math.max(2,
Runtime.getRuntime().availableProcessors() / 2), 16);
+ }
+
+ GrpcStubPool(RaftPeer target, int n,
+ Function<ManagedChannel, S> stubFactory, GrpcTlsConfig tlsConf,
+ int elgThreads, int maxInflightPerConn) {
+ this.elg = new NioEventLoopGroup(elgThreads);
+ ArrayList<PooledStub<S>> tmp = new ArrayList<>(n);
+ for (int i = 0; i < n; i++) {
+ NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(target.getAddress())
+ .eventLoopGroup(elg)
+ .channelType(NioSocketChannel.class)
+ .keepAliveTime(30, TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true)
+ .idleTimeout(24, TimeUnit.HOURS)
+ .withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(64 << 10, 128 << 10));
+ if (tlsConf != null) {
+ LOG.debug("Setting TLS for {}", target.getAddress());
+ SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
+ GrpcUtil.setTrustManager(sslContextBuilder, tlsConf.getTrustManager());
+ if (tlsConf.getMtlsEnabled()) {
+ GrpcUtil.setKeyManager(sslContextBuilder, tlsConf.getKeyManager());
+ }
+ try {
+
channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ } else {
+ channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
+ }
+ ManagedChannel ch = channelBuilder.build();
+ tmp.add(new PooledStub<>(ch, stubFactory.apply(ch), maxInflightPerConn));
+ ch.getState(true);
+ }
+ this.pool = Collections.unmodifiableList(tmp);
+ this.size = n;
+ }
+
+ PooledStub<S> acquire() throws InterruptedException {
+ final int start = ThreadLocalRandom.current().nextInt(size);
+ for (int k = 0; k < size; k++) {
+ PooledStub<S> p = pool.get((start + k) % size);
+ if (p.permits.tryAcquire()) {
+ return p;
+ }
+ }
+ final PooledStub<S> p = pool.get(start);
+ p.permits.acquire();
+ return p;
+ }
+
+ public void close() {
+ for (PooledStub p : pool) {
+ p.ch.shutdown();
+ }
+ elg.shutdownGracefully();
+ }
+}