This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d11604cf4 RATIS-2325. Create GrpcStubPool for GrpcServerProtocolClient 
(#1283)
d11604cf4 is described below

commit d11604cf4d920a47ae3ebcd66c2c088ffe234264
Author: Symious <[email protected]>
AuthorDate: Sat Sep 20 09:38:36 2025 +0800

    RATIS-2325. Create GrpcStubPool for GrpcServerProtocolClient (#1283)
---
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java |   9 ++
 .../grpc/server/GrpcServerProtocolClient.java      |  38 +++++-
 .../apache/ratis/grpc/server/GrpcServicesImpl.java |   4 +-
 .../org/apache/ratis/grpc/server/GrpcStubPool.java | 132 +++++++++++++++++++++
 4 files changed, 179 insertions(+), 4 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index f21a9b99f..cef62779d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -282,6 +282,15 @@ public interface GrpcConfigKeys {
     static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
       parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
     }
+
+    String STUB_POOL_SIZE_KEY = PREFIX + ".stub.pool.size";
+    int STUB_POOL_SIZE_DEFAULT = 10;
+    static int stubPoolSize(RaftProperties properties) {
+      return get(properties::getInt, STUB_POOL_SIZE_KEY, 
STUB_POOL_SIZE_DEFAULT, getDefaultLog());
+    }
+    static void setStubPoolSize(RaftProperties properties, int size) {
+      setInt(properties::setInt, STUB_POOL_SIZE_KEY, size);
+    }
   }
 
   String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 4a280ab33..2e936bb0b 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -47,6 +47,7 @@ import java.io.Closeable;
 public class GrpcServerProtocolClient implements Closeable {
   // Common channel
   private final ManagedChannel channel;
+  private final GrpcStubPool<RaftServerProtocolServiceStub> pool;
   // Channel and stub for heartbeat
   private ManagedChannel hbChannel;
   private RaftServerProtocolServiceStub hbAsyncStub;
@@ -59,7 +60,7 @@ public class GrpcServerProtocolClient implements Closeable {
   //visible for using in log / error messages AND to use in instrumented tests
   private final RaftPeerId raftPeerId;
 
-  public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
+  public GrpcServerProtocolClient(RaftPeer target, int connections, int 
flowControlWindow,
       TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean 
separateHBChannel) {
     raftPeerId = target.getId();
     LOG.info("Build channel for {}", target);
@@ -72,6 +73,8 @@ public class GrpcServerProtocolClient implements Closeable {
       hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel);
     }
     requestTimeoutDuration = requestTimeout;
+    this.pool = new GrpcStubPool<RaftServerProtocolServiceStub>(target, 
connections,
+            ch -> RaftServerProtocolServiceGrpc.newStub(ch), tlsConfig);
   }
 
   private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow,
@@ -107,6 +110,7 @@ public class GrpcServerProtocolClient implements Closeable {
       GrpcUtil.shutdownManagedChannel(hbChannel);
     }
     GrpcUtil.shutdownManagedChannel(channel);
+    pool.close();
   }
 
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
@@ -125,8 +129,36 @@ public class GrpcServerProtocolClient implements Closeable 
{
   }
 
   void readIndex(ReadIndexRequestProto request, 
StreamObserver<ReadIndexReplyProto> s) {
-    asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), 
requestTimeoutDuration.getUnit())
-        .readIndex(request, s);
+    GrpcStubPool.PooledStub<RaftServerProtocolServiceStub> p;
+    try {
+      p = pool.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      s.onError(e); return;
+    }
+    p.getStub().withDeadlineAfter(requestTimeoutDuration.getDuration(), 
requestTimeoutDuration.getUnit())
+            .readIndex(request, new StreamObserver<ReadIndexReplyProto>() {
+              @Override
+              public void onNext(ReadIndexReplyProto v) {
+                s.onNext(v);
+              }
+              @Override
+              public void onError(Throwable t) {
+                try {
+                  s.onError(t);
+                } finally {
+                  p.release();
+                }
+              }
+              @Override
+              public void onCompleted() {
+                try {
+                  s.onCompleted();
+                } finally {
+                  p.release();
+                }
+              }
+            });
   }
 
   CallStreamObserver<AppendEntriesRequestProto> appendEntries(
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index 853a420d1..b686be0a2 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -113,6 +113,7 @@ public final class GrpcServicesImpl
     private String serverHost;
     private int serverPort;
     private GrpcTlsConfig serverTlsConfig;
+    private int serverStubPoolSize;
 
     private SizeInBytes messageSizeMax;
     private SizeInBytes flowControlWindow;
@@ -135,6 +136,7 @@ public final class GrpcServicesImpl
       this.flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, 
LOG::info);
       this.requestTimeoutDuration = 
RaftServerConfigKeys.Rpc.requestTimeout(properties);
       this.separateHeartbeatChannel = 
GrpcConfigKeys.Server.heartbeatChannel(properties);
+      this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties);
 
       final SizeInBytes appenderBufferSize = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
       final SizeInBytes gap = SizeInBytes.ONE_MB;
@@ -155,7 +157,7 @@ public final class GrpcServicesImpl
     }
 
     private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer 
target) {
-      return new GrpcServerProtocolClient(target, 
flowControlWindow.getSizeInt(),
+      return new GrpcServerProtocolClient(target, serverStubPoolSize, 
flowControlWindow.getSizeInt(),
           requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
     }
 
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
new file mode 100644
index 000000000..fcfb0f1b8
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
+import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+
+final class GrpcStubPool<S extends AbstractStub<S>> {
+  public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class);
+
+  static final class PooledStub<S extends AbstractStub<S>> {
+    private final ManagedChannel ch;
+    private final S stub;
+    private final Semaphore permits;
+
+    PooledStub(ManagedChannel ch, S stub, int maxInflight) {
+      this.ch = ch;
+      this.stub = stub;
+      this.permits = new Semaphore(maxInflight);
+    }
+
+    S getStub() {
+      return stub;
+    }
+
+    void release() {
+      permits.release();
+    }
+  }
+
+  private final List<PooledStub<S>> pool;
+  private final AtomicInteger rr = new AtomicInteger();
+  private final NioEventLoopGroup elg;
+  private final int size;
+
+  GrpcStubPool(RaftPeer target, int n, Function<ManagedChannel, S> 
stubFactory, GrpcTlsConfig tlsConfig) {
+    this(target, n, stubFactory, tlsConfig, Math.max(2, 
Runtime.getRuntime().availableProcessors() / 2), 16);
+  }
+
+  GrpcStubPool(RaftPeer target, int n,
+               Function<ManagedChannel, S> stubFactory, GrpcTlsConfig tlsConf,
+               int elgThreads, int maxInflightPerConn) {
+    this.elg = new NioEventLoopGroup(elgThreads);
+    ArrayList<PooledStub<S>> tmp = new ArrayList<>(n);
+    for (int i = 0; i < n; i++) {
+      NettyChannelBuilder channelBuilder = 
NettyChannelBuilder.forTarget(target.getAddress())
+          .eventLoopGroup(elg)
+          .channelType(NioSocketChannel.class)
+          .keepAliveTime(30, TimeUnit.SECONDS)
+          .keepAliveWithoutCalls(true)
+          .idleTimeout(24, TimeUnit.HOURS)
+          .withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(64 << 10, 128 << 10));
+      if (tlsConf != null) {
+        LOG.debug("Setting TLS for {}", target.getAddress());
+        SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
+        GrpcUtil.setTrustManager(sslContextBuilder, tlsConf.getTrustManager());
+        if (tlsConf.getMtlsEnabled()) {
+          GrpcUtil.setKeyManager(sslContextBuilder, tlsConf.getKeyManager());
+        }
+        try {
+          
channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
+        } catch (Exception ex) {
+          throw new RuntimeException(ex);
+        }
+      } else {
+        channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
+      }
+      ManagedChannel ch = channelBuilder.build();
+      tmp.add(new PooledStub<>(ch, stubFactory.apply(ch), maxInflightPerConn));
+      ch.getState(true);
+    }
+    this.pool = Collections.unmodifiableList(tmp);
+    this.size = n;
+  }
+
+  PooledStub<S> acquire() throws InterruptedException {
+    final int start = ThreadLocalRandom.current().nextInt(size);
+    for (int k = 0; k < size; k++) {
+      PooledStub<S> p = pool.get((start + k) % size);
+      if (p.permits.tryAcquire()) {
+        return p;
+      }
+    }
+    final PooledStub<S> p = pool.get(start);
+    p.permits.acquire();
+    return p;
+  }
+
+  public void close() {
+    for (PooledStub p : pool) {
+      p.ch.shutdown();
+    }
+    elg.shutdownGracefully();
+  }
+}

Reply via email to