This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d411b0288 RATIS-2331. Reuse SslContext in gRPC. (#1288)
d411b0288 is described below
commit d411b028882f2e397cee27cc5bf51e2f0dc4dd35
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Sep 27 11:00:09 2025 -0700
RATIS-2331. Reuse SslContext in gRPC. (#1288)
---
.../main/java/org/apache/ratis/util/LifeCycle.java | 3 +
.../java/org/apache/ratis/grpc/GrpcFactory.java | 78 +++++++--------
.../main/java/org/apache/ratis/grpc/GrpcUtil.java | 40 ++++++++
.../grpc/client/GrpcClientProtocolClient.java | 28 ++----
.../ratis/grpc/client/GrpcClientProtocolProxy.java | 108 ---------------------
.../apache/ratis/grpc/client/GrpcClientRpc.java | 6 +-
.../grpc/server/GrpcServerProtocolClient.java | 36 +++----
.../apache/ratis/grpc/server/GrpcServicesImpl.java | 56 +++++------
.../org/apache/ratis/grpc/server/GrpcStubPool.java | 29 ++----
9 files changed, 134 insertions(+), 250 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index 9870fe371..e96ba88a5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -117,6 +117,9 @@ public class LifeCycle {
if (LOG.isTraceEnabled()) {
LOG.trace("TRACE", new Throwable());
}
+ if (to == EXCEPTION) {
+ LOG.error("{} has failed ({} -> {})", name, from, to, new
Throwable("TRACE"));
+ }
Preconditions.assertTrue(isValid(from, to),
"ILLEGAL TRANSITION: In %s, %s -> %s", name, from, to);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 331d1a858..1053cab80 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -32,11 +32,15 @@ import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.Supplier;
public class GrpcFactory implements ServerFactory, ClientFactory {
@@ -65,19 +69,32 @@ public class GrpcFactory implements ServerFactory,
ClientFactory {
return value;
}
- private final GrpcServices.Customizer servicesCustomizer;
+ static final BiFunction<GrpcTlsConfig, SslContext, SslContext>
BUILD_SSL_CONTEXT_FOR_SERVER
+ = (tlsConf, defaultContext) -> tlsConf == null ? defaultContext :
GrpcUtil.buildSslContextForServer(tlsConf);
+
+ static final BiFunction<GrpcTlsConfig, SslContext, SslContext>
BUILD_SSL_CONTEXT_FOR_CLIENT
+ = (tlsConf, defaultContext) -> tlsConf == null ? defaultContext :
GrpcUtil.buildSslContextForClient(tlsConf);
- private final GrpcTlsConfig tlsConfig;
- private final GrpcTlsConfig adminTlsConfig;
- private final GrpcTlsConfig clientTlsConfig;
- private final GrpcTlsConfig serverTlsConfig;
+ static final class SslContexts {
+ private final SslContext adminSslContext;
+ private final SslContext clientSslContext;
+ private final SslContext serverSslContext;
- public static Parameters newRaftParameters(GrpcTlsConfig conf) {
- final Parameters p = new Parameters();
- GrpcConfigKeys.TLS.setConf(p, conf);
- return p;
+ private SslContexts(GrpcTlsConfig tlsConfig, GrpcTlsConfig adminTlsConfig,
+ GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig,
+ BiFunction<GrpcTlsConfig, SslContext, SslContext> buildMethod) {
+ final SslContext defaultSslContext = buildMethod.apply(tlsConfig, null);
+ this.adminSslContext = buildMethod.apply(adminTlsConfig,
defaultSslContext);
+ this.clientSslContext = buildMethod.apply(clientTlsConfig,
defaultSslContext);
+ this.serverSslContext = buildMethod.apply(serverTlsConfig,
defaultSslContext);
+ }
}
+ private final GrpcServices.Customizer servicesCustomizer;
+
+ private final Supplier<SslContexts> forServerSupplier;
+ private final Supplier<SslContexts> forClientSupplier;
+
public GrpcFactory(Parameters parameters) {
this(GrpcConfigKeys.Server.servicesCustomizer(parameters),
GrpcConfigKeys.TLS.conf(parameters),
@@ -87,35 +104,15 @@ public class GrpcFactory implements ServerFactory,
ClientFactory {
);
}
- public GrpcFactory(GrpcTlsConfig tlsConfig) {
- this(null, tlsConfig, null, null, null);
- }
-
private GrpcFactory(GrpcServices.Customizer servicesCustomizer,
GrpcTlsConfig tlsConfig, GrpcTlsConfig adminTlsConfig,
GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) {
this.servicesCustomizer = servicesCustomizer;
- this.tlsConfig = tlsConfig;
- this.adminTlsConfig = adminTlsConfig;
- this.clientTlsConfig = clientTlsConfig;
- this.serverTlsConfig = serverTlsConfig;
- }
-
- public GrpcTlsConfig getTlsConfig() {
- return tlsConfig;
- }
-
- public GrpcTlsConfig getAdminTlsConfig() {
- return adminTlsConfig != null ? adminTlsConfig : tlsConfig;
- }
-
- public GrpcTlsConfig getClientTlsConfig() {
- return clientTlsConfig != null ? clientTlsConfig : tlsConfig;
- }
-
- public GrpcTlsConfig getServerTlsConfig() {
- return serverTlsConfig != null ? serverTlsConfig : tlsConfig;
+ this.forServerSupplier = MemoizedSupplier.valueOf(() -> new SslContexts(
+ tlsConfig, adminTlsConfig, clientTlsConfig, serverTlsConfig,
BUILD_SSL_CONTEXT_FOR_SERVER));
+ this.forClientSupplier = MemoizedSupplier.valueOf(() -> new SslContexts(
+ tlsConfig, adminTlsConfig, clientTlsConfig, serverTlsConfig,
BUILD_SSL_CONTEXT_FOR_CLIENT));
}
@Override
@@ -131,19 +128,24 @@ public class GrpcFactory implements ServerFactory,
ClientFactory {
@Override
public GrpcServices newRaftServerRpc(RaftServer server) {
checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::info);
+
+ final SslContexts forServer = forServerSupplier.get();
+ final SslContexts forClient = forClientSupplier.get();
return GrpcServicesImpl.newBuilder()
.setServer(server)
.setCustomizer(servicesCustomizer)
- .setAdminTlsConfig(getAdminTlsConfig())
- .setServerTlsConfig(getServerTlsConfig())
- .setClientTlsConfig(getClientTlsConfig())
+ .setAdminSslContext(forServer.adminSslContext)
+ .setServerSslContextForServer(forServer.serverSslContext)
+ .setServerSslContextForClient(forClient.serverSslContext)
+ .setClientSslContext(forServer.clientSslContext)
.build();
}
@Override
public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties
properties) {
checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::debug);
- return new GrpcClientRpc(clientId, properties,
- getAdminTlsConfig(), getClientTlsConfig());
+
+ final SslContexts forClient = forClientSupplier.get();
+ return new GrpcClientRpc(clientId, properties, forClient.adminSslContext,
forClient.clientSslContext);
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 2f9ee01ec..8dcfb6544 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -28,7 +28,10 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.Metadata;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -39,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLException;
import javax.net.ssl.TrustManager;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -46,6 +50,8 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
+import static
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
+
public interface GrpcUtil {
Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
@@ -299,4 +305,38 @@ public interface GrpcUtil {
b.keyManager(privateKey.get(), certificates.get());
}
}
+
+ static SslContext buildSslContextForServer(GrpcTlsConfig tlsConf) {
+ if (tlsConf == null) {
+ return null;
+ }
+ SslContextBuilder b =
initSslContextBuilderForServer(tlsConf.getKeyManager());
+ if (tlsConf.getMtlsEnabled()) {
+ b.clientAuth(ClientAuth.REQUIRE);
+ setTrustManager(b, tlsConf.getTrustManager());
+ }
+ b = GrpcSslContexts.configure(b, OPENSSL);
+ try {
+ return b.build();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to buildSslContextForServer
from tlsConfig " + tlsConf, e);
+ }
+ }
+
+ static SslContext buildSslContextForClient(GrpcTlsConfig tlsConf) {
+ if (tlsConf == null) {
+ return null;
+ }
+
+ final SslContextBuilder b = GrpcSslContexts.forClient();
+ setTrustManager(b, tlsConf.getTrustManager());
+ if (tlsConf.getMtlsEnabled()) {
+ setKeyManager(b, tlsConf.getKeyManager());
+ }
+ try {
+ return b.build();
+ } catch (SSLException e) {
+ throw new IllegalArgumentException("Failed to buildSslContextForClient
from tlsConfig " + tlsConf, e);
+ }
+ }
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 3b9d51268..159919fab 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -21,7 +21,6 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.intercept.client.MetricClientInterceptor;
import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
@@ -49,11 +48,10 @@ import
org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SizeInBytes;
@@ -97,7 +95,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final MetricClientInterceptor metricClientInterceptor;
GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties
properties,
- GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
+ SslContext adminSslContext, SslContext clientSslContext) {
this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
final SizeInBytes flowControlWindow =
GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
@@ -110,11 +108,9 @@ public class GrpcClientProtocolClient implements Closeable
{
.filter(x -> !x.isEmpty()).orElse(target.getAddress());
final boolean separateAdminChannel = !Objects.equals(clientAddress,
adminAddress);
- clientChannel = buildChannel(clientAddress, clientTlsConfig,
- flowControlWindow, maxMessageSize);
+ clientChannel = buildChannel(clientAddress, clientSslContext,
flowControlWindow, maxMessageSize);
adminChannel = separateAdminChannel
- ? buildChannel(adminAddress, adminTlsConfig,
- flowControlWindow, maxMessageSize)
+ ? buildChannel(adminAddress, adminSslContext, flowControlWindow,
maxMessageSize)
: clientChannel;
asyncStub = RaftClientProtocolServiceGrpc.newStub(clientChannel);
@@ -124,26 +120,16 @@ public class GrpcClientProtocolClient implements
Closeable {
RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
}
- private ManagedChannel buildChannel(String address, GrpcTlsConfig tlsConf,
+ private ManagedChannel buildChannel(String address, SslContext sslContext,
SizeInBytes flowControlWindow, SizeInBytes maxMessageSize) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(address);
// ignore any http proxy for grpc
channelBuilder.proxyDetector(uri -> null);
- if (tlsConf != null) {
+ if (sslContext != null) {
LOG.debug("Setting TLS for {}", address);
- SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
- GrpcUtil.setTrustManager(sslContextBuilder, tlsConf.getTrustManager());
- if (tlsConf.getMtlsEnabled()) {
- GrpcUtil.setKeyManager(sslContextBuilder, tlsConf.getKeyManager());
- }
- try {
- channelBuilder.useTransportSecurity().sslContext(
- sslContextBuilder.build());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
+ channelBuilder.useTransportSecurity().sslContext(sslContext);
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
deleted file mode 100644
index 95119ef7d..000000000
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
-import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.protocol.RaftPeer;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.function.Function;
-
-public class GrpcClientProtocolProxy implements Closeable {
- private final GrpcClientProtocolClient proxy;
- private final Function<RaftPeer, CloseableStreamObserver>
responseHandlerCreation;
- private RpcSession currentSession;
-
- public GrpcClientProtocolProxy(ClientId clientId, RaftPeer target,
- Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
- RaftProperties properties, GrpcTlsConfig tlsConfig) {
- proxy = new GrpcClientProtocolClient(clientId, target, properties,
tlsConfig, tlsConfig);
- this.responseHandlerCreation = responseHandlerCreation;
- }
-
- @Override
- public void close() throws IOException {
- closeCurrentSession();
- proxy.close();
- }
-
- @Override
- public String toString() {
- return "ProxyTo:" + proxy.getTarget();
- }
-
- public void closeCurrentSession() {
- if (currentSession != null) {
- currentSession.close();
- currentSession = null;
- }
- }
-
- public void onNext(RaftClientRequestProto request) {
- if (currentSession == null) {
- currentSession = new RpcSession(
- responseHandlerCreation.apply(proxy.getTarget()));
- }
- currentSession.requestObserver.onNext(request);
- }
-
- public void onError() {
- if (currentSession != null) {
- currentSession.onError();
- }
- }
-
- public interface CloseableStreamObserver
- extends StreamObserver<RaftClientReplyProto>, Closeable {
- }
-
- class RpcSession implements Closeable {
- private final StreamObserver<RaftClientRequestProto> requestObserver;
- private final CloseableStreamObserver responseHandler;
- private boolean hasError = false;
-
- RpcSession(CloseableStreamObserver responseHandler) {
- this.responseHandler = responseHandler;
- this.requestObserver = proxy.ordered(responseHandler);
- }
-
- void onError() {
- hasError = true;
- }
-
- @Override
- public void close() {
- if (!hasError) {
- try {
- requestObserver.onCompleted();
- } catch (Exception ignored) {
- }
- }
- try {
- responseHandler.close();
- } catch (IOException ignored) {
- }
- }
- }
-}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index b825429ae..4010ade27 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -21,7 +21,6 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
@@ -36,6 +35,7 @@ import
org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
import org.apache.ratis.proto.RaftProtos.SnapshotManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.LeaderElectionManagementRequestProto;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
@@ -54,9 +54,9 @@ public class GrpcClientRpc extends
RaftClientRpcWithProxy<GrpcClientProtocolClie
private final int maxMessageSize;
public GrpcClientRpc(ClientId clientId, RaftProperties properties,
- GrpcTlsConfig adminTlsConfig, GrpcTlsConfig clientTlsConfig) {
+ SslContext adminSslContext, SslContext clientSslContext) {
super(new PeerProxyMap<>(clientId.toString(),
- p -> new GrpcClientProtocolClient(clientId, p, properties,
adminTlsConfig, clientTlsConfig)));
+ p -> new GrpcClientProtocolClient(clientId, p, properties,
adminSslContext, clientSslContext)));
this.clientId = clientId;
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties,
LOG::debug).getSizeInt();
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 2e936bb0b..a0a17dc9f 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,13 +17,11 @@
*/
package org.apache.ratis.grpc.server;
-import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
@@ -33,7 +31,7 @@ import
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
import
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
import
org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +42,7 @@ import java.io.Closeable;
* This is a RaftClient implementation that supports streaming data to the raft
* ring. The stream implementation utilizes gRPC.
*/
-public class GrpcServerProtocolClient implements Closeable {
+class GrpcServerProtocolClient implements Closeable {
// Common channel
private final ManagedChannel channel;
private final GrpcStubPool<RaftServerProtocolServiceStub> pool;
@@ -60,42 +58,30 @@ public class GrpcServerProtocolClient implements Closeable {
//visible for using in log / error messages AND to use in instrumented tests
private final RaftPeerId raftPeerId;
- public GrpcServerProtocolClient(RaftPeer target, int connections, int
flowControlWindow,
- TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean
separateHBChannel) {
+ GrpcServerProtocolClient(RaftPeer target, int connections, int
flowControlWindow,
+ TimeDuration requestTimeout, SslContext sslContext, boolean
separateHBChannel) {
raftPeerId = target.getId();
LOG.info("Build channel for {}", target);
useSeparateHBChannel = separateHBChannel;
- channel = buildChannel(target, flowControlWindow, tlsConfig);
+ channel = buildChannel(target, flowControlWindow, sslContext);
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
if (useSeparateHBChannel) {
- hbChannel = buildChannel(target, flowControlWindow, tlsConfig);
+ hbChannel = buildChannel(target, flowControlWindow, sslContext);
hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel);
}
requestTimeoutDuration = requestTimeout;
- this.pool = new GrpcStubPool<RaftServerProtocolServiceStub>(target,
connections,
- ch -> RaftServerProtocolServiceGrpc.newStub(ch), tlsConfig);
+ this.pool = new GrpcStubPool<>(target, connections,
RaftServerProtocolServiceGrpc::newStub, sslContext);
}
- private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow,
- GrpcTlsConfig tlsConfig) {
+ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow,
SslContext sslContext) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(target.getAddress());
// ignore any http proxy for grpc
channelBuilder.proxyDetector(uri -> null);
- if (tlsConfig!= null) {
- SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
- GrpcUtil.setTrustManager(sslContextBuilder, tlsConfig.getTrustManager());
- if (tlsConfig.getMtlsEnabled()) {
- GrpcUtil.setKeyManager(sslContextBuilder, tlsConfig.getKeyManager());
- }
- try {
-
channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
- } catch (Exception ex) {
- throw new IllegalArgumentException("Failed to build SslContext,
peerId=" + raftPeerId
- + ", tlsConfig=" + tlsConfig, ex);
- }
+ if (sslContext != null) {
+ channelBuilder.useTransportSecurity().sslContext(sslContext);
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index b686be0a2..b1af0960d 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -19,8 +19,6 @@ package org.apache.ratis.grpc.server;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
@@ -34,13 +32,11 @@ import org.apache.ratis.server.RaftServerRpcWithProxy;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptor;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.thirdparty.io.grpc.Server;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.util.*;
@@ -56,8 +52,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
-import static
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
-
/** A grpc implementation of {@link org.apache.ratis.server.RaftServerRpc}. */
public final class GrpcServicesImpl
extends RaftServerRpcWithProxy<GrpcServerProtocolClient,
PeerProxyMap<GrpcServerProtocolClient>>
@@ -106,13 +100,14 @@ public final class GrpcServicesImpl
private String adminHost;
private int adminPort;
- private GrpcTlsConfig adminTlsConfig;
+ private SslContext adminSslContext;
private String clientHost;
private int clientPort;
- private GrpcTlsConfig clientTlsConfig;
+ private SslContext clientSslContext;
private String serverHost;
private int serverPort;
- private GrpcTlsConfig serverTlsConfig;
+ private SslContext serverSslContextForServer;
+ private SslContext serverSslContextForClient;
private int serverStubPoolSize;
private SizeInBytes messageSizeMax;
@@ -158,7 +153,7 @@ public final class GrpcServicesImpl
private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer
target) {
return new GrpcServerProtocolClient(target, serverStubPoolSize,
flowControlWindow.getSizeInt(),
- requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
+ requestTimeoutDuration, serverSslContextForClient,
separateHeartbeatChannel);
}
private ExecutorService newExecutor() {
@@ -188,18 +183,18 @@ public final class GrpcServicesImpl
}
private NettyServerBuilder newNettyServerBuilderForServer() {
- return newNettyServerBuilder(serverHost, serverPort, serverTlsConfig);
+ return newNettyServerBuilder(serverHost, serverPort,
serverSslContextForServer);
}
private NettyServerBuilder newNettyServerBuilderForAdmin() {
- return newNettyServerBuilder(adminHost, adminPort, adminTlsConfig);
+ return newNettyServerBuilder(adminHost, adminPort, adminSslContext);
}
private NettyServerBuilder newNettyServerBuilderForClient() {
- return newNettyServerBuilder(clientHost, clientPort, clientTlsConfig);
+ return newNettyServerBuilder(clientHost, clientPort, clientSslContext);
}
- private NettyServerBuilder newNettyServerBuilder(String hostname, int
port, GrpcTlsConfig tlsConfig) {
+ private NettyServerBuilder newNettyServerBuilder(String hostname, int
port, SslContext sslContext) {
final InetSocketAddress address = hostname == null || hostname.isEmpty()
?
new InetSocketAddress(port) : new InetSocketAddress(hostname, port);
final NettyServerBuilder nettyServerBuilder =
NettyServerBuilder.forAddress(address)
@@ -207,19 +202,9 @@ public final class GrpcServicesImpl
.maxInboundMessageSize(messageSizeMax.getSizeInt())
.flowControlWindow(flowControlWindow.getSizeInt());
- if (tlsConfig != null) {
+ if (sslContext != null) {
LOG.info("Setting TLS for {}", address);
- SslContextBuilder sslContextBuilder =
GrpcUtil.initSslContextBuilderForServer(tlsConfig.getKeyManager());
- if (tlsConfig.getMtlsEnabled()) {
- sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
- GrpcUtil.setTrustManager(sslContextBuilder,
tlsConfig.getTrustManager());
- }
- sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder,
OPENSSL);
- try {
- nettyServerBuilder.sslContext(sslContextBuilder.build());
- } catch (Exception ex) {
- throw new IllegalArgumentException("Failed to build SslContext,
tlsConfig=" + tlsConfig, ex);
- }
+ nettyServerBuilder.sslContext(sslContext);
}
return nettyServerBuilder;
}
@@ -253,18 +238,23 @@ public final class GrpcServicesImpl
return new GrpcServicesImpl(this);
}
- public Builder setAdminTlsConfig(GrpcTlsConfig config) {
- this.adminTlsConfig = config;
+ public Builder setAdminSslContext(SslContext adminSslContext) {
+ this.adminSslContext = adminSslContext;
+ return this;
+ }
+
+ public Builder setClientSslContext(SslContext clientSslContext) {
+ this.clientSslContext = clientSslContext;
return this;
}
- public Builder setClientTlsConfig(GrpcTlsConfig config) {
- this.clientTlsConfig = config;
+ public Builder setServerSslContextForServer(SslContext
serverSslContextForServer) {
+ this.serverSslContextForServer = serverSslContextForServer;
return this;
}
- public Builder setServerTlsConfig(GrpcTlsConfig config) {
- this.serverTlsConfig = config;
+ public Builder setServerSslContextForClient(SslContext
serverSslContextForClient) {
+ this.serverSslContextForClient = serverSslContextForClient;
return this;
}
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
index fcfb0f1b8..c949707a4 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -17,11 +17,8 @@
*/
package org.apache.ratis.grpc.server;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
-import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub;
@@ -29,7 +26,7 @@ import
org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +36,6 @@ import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
final class GrpcStubPool<S extends AbstractStub<S>> {
@@ -66,16 +62,14 @@ final class GrpcStubPool<S extends AbstractStub<S>> {
}
private final List<PooledStub<S>> pool;
- private final AtomicInteger rr = new AtomicInteger();
private final NioEventLoopGroup elg;
private final int size;
- GrpcStubPool(RaftPeer target, int n, Function<ManagedChannel, S>
stubFactory, GrpcTlsConfig tlsConfig) {
- this(target, n, stubFactory, tlsConfig, Math.max(2,
Runtime.getRuntime().availableProcessors() / 2), 16);
+ GrpcStubPool(RaftPeer target, int n, Function<ManagedChannel, S>
stubFactory, SslContext sslContext) {
+ this(target, n, stubFactory, sslContext, Math.max(2,
Runtime.getRuntime().availableProcessors() / 2), 16);
}
- GrpcStubPool(RaftPeer target, int n,
- Function<ManagedChannel, S> stubFactory, GrpcTlsConfig tlsConf,
+ GrpcStubPool(RaftPeer target, int n, Function<ManagedChannel, S>
stubFactory, SslContext sslContext,
int elgThreads, int maxInflightPerConn) {
this.elg = new NioEventLoopGroup(elgThreads);
ArrayList<PooledStub<S>> tmp = new ArrayList<>(n);
@@ -87,18 +81,9 @@ final class GrpcStubPool<S extends AbstractStub<S>> {
.keepAliveWithoutCalls(true)
.idleTimeout(24, TimeUnit.HOURS)
.withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new
WriteBufferWaterMark(64 << 10, 128 << 10));
- if (tlsConf != null) {
+ if (sslContext != null) {
LOG.debug("Setting TLS for {}", target.getAddress());
- SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
- GrpcUtil.setTrustManager(sslContextBuilder, tlsConf.getTrustManager());
- if (tlsConf.getMtlsEnabled()) {
- GrpcUtil.setKeyManager(sslContextBuilder, tlsConf.getKeyManager());
- }
- try {
-
channelBuilder.useTransportSecurity().sslContext(sslContextBuilder.build());
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
+ channelBuilder.useTransportSecurity().sslContext(sslContext);
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
@@ -124,7 +109,7 @@ final class GrpcStubPool<S extends AbstractStub<S>> {
}
public void close() {
- for (PooledStub p : pool) {
+ for (PooledStub<S> p : pool) {
p.ch.shutdown();
}
elg.shutdownGracefully();