This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 832562f028 [ISSUE #8920] Refactor SSL context loading process to
support multiple protocols dynamic loading (#9483)
832562f028 is described below
commit 832562f0281a1bb9e331fa07d614e6e5e83f149d
Author: EnableAsync <[email protected]>
AuthorDate: Fri Jun 27 09:56:19 2025 +0800
[ISSUE #8920] Refactor SSL context loading process to support multiple
protocols dynamic loading (#9483)
* feat(proxy): 添加 gRPC 和 Remoting 服务器的 TLS 证书热更新支持
- 在 GrpcServer 和 RemotingProtocolServer 中添加文件监视服务,用于监控 TLS 证书和密钥的变化
- 实现证书和密钥变更时重新加载 SSL 上下文的逻辑
- 优化 ProxyAndTlsProtocolNegotiator 中的 SSL 上下文加载过程
- 添加日志记录,方便调试和监控 TLS 相关操作
* refactor(proxy): 重构 gRPC 证书监控逻辑并添加单元测试
- 重构 GrpcServer 中的证书监控逻辑,提取到独立的 GrpcCertKeyFileWatchListener 类中
- 优化证书变更处理流程,提高代码可读性和维护性
- 新增 GrpcServerTest 类,为 gRPC服务器和证书监控添加单元测试- 测试覆盖了各种证书变更场景,包括单独变更和组合变更
- 验证了证书变更时 SSLContext 的重新加载和错误处理
Signed-off-by: Async <[email protected]>
* refactor(proxy): 重构 gRPC 证书监控逻辑并添加单元测试
- 重构 GrpcServer 中的证书监控逻辑,提取到独立的 GrpcCertKeyFileWatchListener 类中
- 优化证书变更处理流程,提高代码可读性和维护性
- 新增 GrpcServerTest 类,为 gRPC服务器和证书监控添加单元测试- 测试覆盖了各种证书变更场景,包括单独变更和组合变更
- 验证了证书变更时 SSLContext 的重新加载和错误处理
Signed-off-by: Async <[email protected]>
* fix: code format
Signed-off-by: Async <[email protected]>
* test: add test cases
Signed-off-by: Async <[email protected]>
* fix: code format
Signed-off-by: Async <[email protected]>
* refactor(proxy): 重构 TLS证书更新逻辑
- 移除 FileWatchService,改用 TlsCertificateManager 统一管理 TLS证书
- 实现 TlsContextReloadListener 接口,响应 TLS 证书更新
- 优化 GrpcServer 和 RemotingProtocolServer 中的 TLS 证书更新逻辑
- 新增单元测试验证 TLS 证书更新功能
Signed-off-by: Async <[email protected]>
* test(proxy): 优化 TLS 相关测试用例
- 重构了多个测试类中的重复代码- 提高了测试的可读性和维护性
- 确保在测试中正确关闭资源
Signed-off-by: Async <[email protected]>
* refactor(proxy): 优化代码导入结构
- 移除了不必要的导入项
- 显式导入了所有活动类,提高了代码的可读性和维护性
Signed-off-by: Async <[email protected]>
* update
* fix: no static
Signed-off-by: Async <[email protected]>
* fix: add SingletonHolder for TlsCertificateManager
Signed-off-by: Async <[email protected]>
* refactor
* refactor(proxy): 重构 TLS证书管理
- 将 TlsCertificateManager 实例化移至 ProxyStartup 类
- 更新 GrpcServer 和 RemotingProtocolServer 类以使用 TlsCertificateManager
- 移除冗余的 TLS 证书管理相关测试用例
- 优化 TLS 上下文重载逻辑
Signed-off-by: Async <[email protected]>
* refactor(proxy): 优化日志信息内容
- 将 cert file changed 日志信息改为更通用的 File changed
- 保持代码风格一致性,提高日志的可读性和维护性
Signed-off-by: Async <[email protected]>
* test(proxy): 重构并增强 TlsCertificateManager 测试用例- 重新设计测试用例,使用临时文件模拟证书和密钥
- 增加对 TlsCertificateManager 各种方法的单元测试
- 涉及到的测试场景包括:
- 构造函数
- 启动和关闭
- 注册和注销监听器
- 文件变更通知(证书、密钥、未知文件等)
- 多个监听器的情况
- 监听器抛出异常的情况
- 增加对内部 CertKeyFileWatchListener 的测试
Signed-off-by: Async <[email protected]>
* refactor
* test(proxy): 优化 TlsCertificateManager 单元测试
-移除了未使用的 import 语句
- 替换了 import static语句,使其更加有序
- 删除了未使用的静态方法断言(verify、times、never)
- 重置了 mock 对象以避免测试之间的干扰
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
* fix format
Signed-off-by: Async <[email protected]>
---------
Signed-off-by: Async <[email protected]>
---
.../org/apache/rocketmq/proxy/ProxyStartup.java | 10 +-
.../org/apache/rocketmq/proxy/grpc/GrpcServer.java | 40 ++-
.../rocketmq/proxy/grpc/GrpcServerBuilder.java | 17 +-
.../proxy/grpc/ProxyAndTlsProtocolNegotiator.java | 87 ++++---
.../remoting/MultiProtocolRemotingServer.java | 4 +-
.../proxy/remoting/MultiProtocolTlsHelper.java | 4 +-
.../proxy/remoting/RemotingProtocolServer.java | 24 +-
.../http2proxy/Http2ProtocolProxyHandler.java | 4 +-
.../proxy/service/cert/TlsCertificateManager.java | 121 +++++++++
.../service/cert/TlsCertificateManagerTest.java | 289 +++++++++++++++++++++
.../remoting/netty/NettyRemotingClient.java | 10 +-
.../remoting/netty/NettyRemotingServer.java | 6 +-
.../apache/rocketmq/srvutil/FileWatchService.java | 13 +-
13 files changed, 561 insertions(+), 68 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index e37ba975fe..131faffa38 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.proxy.metrics.ProxyMetricsManager;
import org.apache.rocketmq.proxy.processor.DefaultMessagingProcessor;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.RemotingProtocolServer;
+import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -76,8 +77,13 @@ public class ProxyStartup {
MessagingProcessor messagingProcessor = createMessagingProcessor();
+ // tls cert update
+ TlsCertificateManager tlsCertificateManager = new
TlsCertificateManager();
+
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(tlsCertificateManager);
+
// create grpcServer
- GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor,
ConfigurationManager.getProxyConfig().getGrpcServerPort())
+ GrpcServer grpcServer = GrpcServerBuilder.newBuilder(executor,
+ ConfigurationManager.getProxyConfig().getGrpcServerPort(),
tlsCertificateManager)
.addService(createServiceProcessor(messagingProcessor))
.addService(ChannelzService.newInstance(100))
.addService(ProtoReflectionService.newInstance())
@@ -86,7 +92,7 @@ public class ProxyStartup {
.build();
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(grpcServer);
- RemotingProtocolServer remotingServer = new
RemotingProtocolServer(messagingProcessor);
+ RemotingProtocolServer remotingServer = new
RemotingProtocolServer(messagingProcessor, tlsCertificateManager);
PROXY_START_AND_SHUTDOWN.appendStartAndShutdown(remotingServer);
// start servers one by one.
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
index d5b896fe14..af3d6b4c6c 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServer.java
@@ -17,12 +17,17 @@
package org.apache.rocketmq.proxy.grpc;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import io.grpc.Server;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
+
+import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.util.concurrent.TimeUnit;
public class GrpcServer implements StartAndShutdown {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -33,23 +38,50 @@ public class GrpcServer implements StartAndShutdown {
private final TimeUnit unit;
- protected GrpcServer(Server server, long timeout, TimeUnit unit) {
+ private final TlsCertificateManager tlsCertificateManager;
+ @VisibleForTesting final GrpcTlsReloadHandler tlsReloadHandler;
+
+ protected GrpcServer(Server server, long timeout, TimeUnit unit,
+ TlsCertificateManager tlsCertificateManager) throws Exception {
this.server = server;
this.timeout = timeout;
this.unit = unit;
+ this.tlsCertificateManager = tlsCertificateManager;
+ this.tlsReloadHandler = new GrpcTlsReloadHandler();
}
public void start() throws Exception {
+ // Register the TLS context reload handler
+ tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
+
this.server.start();
log.info("grpc server start successfully.");
}
public void shutdown() {
try {
+ // Unregister the TLS context reload handler
+
tlsCertificateManager.unregisterReloadListener(this.tlsReloadHandler);
+
this.server.shutdown().awaitTermination(timeout, unit);
+
log.info("grpc server shutdown successfully.");
} catch (Exception e) {
e.printStackTrace();
+ log.error("Failed to shutdown grpc server", e);
+ }
+ }
+
+ @VisibleForTesting
+ class GrpcTlsReloadHandler implements
TlsCertificateManager.TlsContextReloadListener {
+ @Override
+ public void onTlsContextReload() {
+ try {
+ ProxyAndTlsProtocolNegotiator.loadSslContext();
+ log.info("SslContext reloaded for grpc server");
+ } catch (CertificateException | IOException e) {
+ log.error("Failed to reload SslContext for server", e);
+ }
}
}
-}
\ No newline at end of file
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
index ab00b967e6..163e799f41 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/GrpcServerBuilder.java
@@ -34,6 +34,7 @@ import
org.apache.rocketmq.proxy.grpc.interceptor.HeaderInterceptor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
public class GrpcServerBuilder {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -43,11 +44,15 @@ public class GrpcServerBuilder {
protected TimeUnit unit = TimeUnit.SECONDS;
- public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor,
int port) {
- return new GrpcServerBuilder(executor, port);
+ protected TlsCertificateManager tlsCertificateManager;
+
+ public static GrpcServerBuilder newBuilder(ThreadPoolExecutor executor,
int port,
+ TlsCertificateManager tlsCertificateManager) {
+ return new GrpcServerBuilder(executor, port, tlsCertificateManager);
}
- protected GrpcServerBuilder(ThreadPoolExecutor executor, int port) {
+ protected GrpcServerBuilder(ThreadPoolExecutor executor, int port,
TlsCertificateManager tlsCertificateManager) {
+ this.tlsCertificateManager = tlsCertificateManager;
serverBuilder = NettyServerBuilder.forPort(port);
serverBuilder.protocolNegotiator(new ProxyAndTlsProtocolNegotiator());
@@ -71,7 +76,7 @@ public class GrpcServerBuilder {
}
serverBuilder.maxInboundMessageSize(maxInboundMessageSize)
- .maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);
+ .maxConnectionIdle(idleTimeMills, TimeUnit.MILLISECONDS);
log.info("grpc server has built. port: {}, bossLoopNum: {},
workerLoopNum: {}, maxInboundMessageSize: {}",
port, bossLoopNum, workerLoopNum, maxInboundMessageSize);
@@ -98,8 +103,8 @@ public class GrpcServerBuilder {
return this;
}
- public GrpcServer build() {
- return new GrpcServer(this.serverBuilder.build(), time, unit);
+ public GrpcServer build() throws Exception {
+ return new GrpcServer(this.serverBuilder.build(), time, unit,
tlsCertificateManager);
}
public GrpcServerBuilder configInterceptor() {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
index e0a6099ccc..43e2c8ae34 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/ProxyAndTlsProtocolNegotiator.java
@@ -36,16 +36,23 @@ import
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyMessageDecoder
import
io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
import io.grpc.netty.shaded.io.netty.handler.codec.haproxy.HAProxyTLV;
import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
+import io.grpc.netty.shaded.io.netty.handler.ssl.OpenSsl;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler;
+import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
+
import
io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate;
import io.grpc.netty.shaded.io.netty.util.AsciiString;
import io.grpc.netty.shaded.io.netty.util.CharsetUtil;
+
+import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.security.cert.CertificateException;
import java.util.List;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.HAProxyConstants;
@@ -73,7 +80,13 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
private static SslContext sslContext;
public ProxyAndTlsProtocolNegotiator() {
- sslContext = loadSslContext();
+ try {
+ loadSslContext();
+ log.info("SslContext created for proxy server");
+ } catch (IOException | CertificateException e) {
+ log.error("SslContext init error", e);
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -90,35 +103,36 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
public void close() {
}
- private static SslContext loadSslContext() {
- try {
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
- if (proxyConfig.isTlsTestModeEnable()) {
- SelfSignedCertificate selfSignedCertificate = new
SelfSignedCertificate();
- return
GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
- selfSignedCertificate.privateKey())
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .clientAuth(ClientAuth.NONE)
- .build();
- } else {
- String tlsKeyPath =
ConfigurationManager.getProxyConfig().getTlsKeyPath();
- String tlsCertPath =
ConfigurationManager.getProxyConfig().getTlsCertPath();
- try (InputStream serverKeyInputStream = Files.newInputStream(
- Paths.get(tlsKeyPath));
- InputStream serverCertificateStream =
Files.newInputStream(
- Paths.get(tlsCertPath))) {
- SslContext res =
GrpcSslContexts.forServer(serverCertificateStream,
- serverKeyInputStream)
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .clientAuth(ClientAuth.NONE)
- .build();
- log.info("grpc load TLS configured OK");
- return res;
- }
+ public static void loadSslContext() throws CertificateException,
IOException {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ SslProvider provider;
+ if (OpenSsl.isAvailable()) {
+ provider = SslProvider.OPENSSL;
+ log.info("Using OpenSSL provider");
+ } else {
+ provider = SslProvider.JDK;
+ log.info("Using JDK SSL provider");
+ }
+ if (proxyConfig.isTlsTestModeEnable()) {
+ SelfSignedCertificate selfSignedCertificate = new
SelfSignedCertificate();
+ sslContext =
GrpcSslContexts.forServer(selfSignedCertificate.certificate(),
selfSignedCertificate.privateKey())
+ .sslProvider(provider)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .clientAuth(ClientAuth.NONE)
+ .build();
+ } else {
+ String tlsCertPath =
ConfigurationManager.getProxyConfig().getTlsCertPath();
+ String tlsKeyPath =
ConfigurationManager.getProxyConfig().getTlsKeyPath();
+ try (InputStream serverKeyInputStream = Files.newInputStream(
+ Paths.get(tlsKeyPath));
+ InputStream serverCertificateStream = Files.newInputStream(
+ Paths.get(tlsCertPath))) {
+ sslContext = GrpcSslContexts.forServer(serverCertificateStream,
+ serverKeyInputStream)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .clientAuth(ClientAuth.NONE)
+ .build();
}
- } catch (Exception e) {
- log.error("grpc tls set failed. msg: {}, e:", e.getMessage(), e);
- throw new RuntimeException("grpc tls set failed: " +
e.getMessage());
}
}
@@ -135,15 +149,14 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
try {
- ProtocolDetectionResult<HAProxyProtocolVersion> ha =
HAProxyMessageDecoder.detectProtocol(
- in);
+ ProtocolDetectionResult<HAProxyProtocolVersion> ha =
HAProxyMessageDecoder.detectProtocol(in);
if (ha.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
return;
}
if (ha.state() == ProtocolDetectionState.DETECTED) {
ctx.pipeline().addAfter(ctx.name(), HA_PROXY_DECODER, new
HAProxyMessageDecoder())
- .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new
HAProxyMessageHandler())
- .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new
TlsModeHandler(grpcHandler));
+ .addAfter(HA_PROXY_DECODER, HA_PROXY_HANDLER, new
HAProxyMessageHandler())
+ .addAfter(HA_PROXY_HANDLER, TLS_MODE_HANDLER, new
TlsModeHandler(grpcHandler));
} else {
ctx.pipeline().addAfter(ctx.name(), TLS_MODE_HANDLER, new
TlsModeHandler(grpcHandler));
}
@@ -209,7 +222,7 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
msg.tlvs().forEach(tlv -> handleHAProxyTLV(tlv, builder));
}
pne = InternalProtocolNegotiationEvent
-
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
+
.withAttributes(InternalProtocolNegotiationEvent.getDefault(), builder.build());
} finally {
msg.release();
}
@@ -244,9 +257,9 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
public TlsModeHandler(GrpcHttp2ConnectionHandler grpcHandler) {
this.ssl = InternalProtocolNegotiators.serverTls(sslContext)
- .newHandler(grpcHandler);
+ .newHandler(grpcHandler);
this.plaintext = InternalProtocolNegotiators.serverPlaintext()
- .newHandler(grpcHandler);
+ .newHandler(grpcHandler);
}
@Override
@@ -258,7 +271,7 @@ public class ProxyAndTlsProtocolNegotiator implements
InternalProtocolNegotiator
} else if (TlsMode.DISABLED.equals(tlsMode)) {
ctx.pipeline().addAfter(ctx.name(), null, this.plaintext);
} else {
- // in SslHandler.isEncrypted, it need at least 5 bytes to
judge is encrypted or not
+ // in SslHandler.isEncrypted, it needs at least 5 bytes to
judge is encrypted or not
if (in.readableBytes() < SSL_RECORD_HEADER_LENGTH) {
return;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index d7c2820b27..7bbca44a50 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -68,9 +68,9 @@ public class MultiProtocolRemotingServer extends
NettyRemotingServer {
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = MultiProtocolTlsHelper.buildSslContext();
- log.info("SSLContext created for server");
+ log.info("SslContext created for multi protocol remoting
server");
} catch (CertificateException | IOException e) {
- throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create
SSLContext for server", e);
+ throw new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "Failed to create
SslContext for server", e);
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
index 5a21ec68e5..b874e8351d 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
@@ -61,12 +61,12 @@ public class MultiProtocolTlsHelper extends TlsHelper {
log.info("Using JDK SSL provider");
}
- SslContextBuilder sslContextBuilder = null;
+ SslContextBuilder sslContextBuilder;
if (tlsTestModeEnable) {
SelfSignedCertificate selfSignedCertificate = new
SelfSignedCertificate();
sslContextBuilder = SslContextBuilder
.forServer(selfSignedCertificate.certificate(),
selfSignedCertificate.privateKey())
- .sslProvider(SslProvider.OPENSSL)
+ .sslProvider(provider)
.clientAuth(ClientAuth.OPTIONAL);
} else {
sslContextBuilder = SslContextBuilder.forServer(
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index 3fae056a2a..da130769ab 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -46,6 +46,7 @@ import
org.apache.rocketmq.proxy.remoting.pipeline.AuthenticationPipeline;
import org.apache.rocketmq.proxy.remoting.pipeline.AuthorizationPipeline;
import org.apache.rocketmq.proxy.remoting.pipeline.ContextInitPipeline;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
+import org.apache.rocketmq.proxy.service.cert.TlsCertificateManager;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingServer;
@@ -88,8 +89,11 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
protected final ThreadPoolExecutor topicRouteExecutor;
protected final ThreadPoolExecutor defaultExecutor;
protected final ScheduledExecutorService timerExecutor;
+ protected final TlsCertificateManager tlsCertificateManager;
+ protected final RemotingTlsReloadHandler tlsReloadHandler;
- public RemotingProtocolServer(MessagingProcessor messagingProcessor) {
+
+ public RemotingProtocolServer(MessagingProcessor messagingProcessor,
TlsCertificateManager tlsCertificateManager) throws Exception {
this.messagingProcessor = messagingProcessor;
this.remotingChannelManager = new RemotingChannelManager(this,
messagingProcessor.getProxyRelayService());
@@ -114,6 +118,8 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
System.setProperty(TlsSystemConfig.TLS_SERVER_CERTPATH,
config.getTlsCertPath());
TlsSystemConfig.tlsServerKeyPath = config.getTlsKeyPath();
System.setProperty(TlsSystemConfig.TLS_SERVER_KEYPATH,
config.getTlsKeyPath());
+ this.tlsCertificateManager = tlsCertificateManager;
+ this.tlsReloadHandler = new RemotingTlsReloadHandler();
this.clientHousekeepingService = new
ClientHousekeepingService(this.clientManagerActivity);
@@ -191,6 +197,16 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
this.registerRemotingServer(this.defaultRemotingServer);
}
+ protected class RemotingTlsReloadHandler implements
TlsCertificateManager.TlsContextReloadListener {
+ @Override
+ public void onTlsContextReload() {
+ if (defaultRemotingServer instanceof NettyRemotingServer) {
+ ((NettyRemotingServer) defaultRemotingServer).loadSslContext();
+ log.info("SslContext reloaded for remoting server");
+ }
+ }
+ }
+
protected void registerRemotingServer(RemotingServer remotingServer) {
remotingServer.registerProcessor(RequestCode.SEND_MESSAGE,
sendMessageActivity, this.sendMessageExecutor);
remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,
sendMessageActivity, this.sendMessageExecutor);
@@ -226,6 +242,9 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
@Override
public void shutdown() throws Exception {
+ // Unregister the TLS context reload handler
+ tlsCertificateManager.unregisterReloadListener(this.tlsReloadHandler);
+
this.defaultRemotingServer.shutdown();
this.remotingChannelManager.shutdown();
this.sendMessageExecutor.shutdown();
@@ -238,6 +257,9 @@ public class RemotingProtocolServer implements
StartAndShutdown, RemotingProxyOu
@Override
public void start() throws Exception {
+ // Register the TLS context reload handler
+ tlsCertificateManager.registerReloadListener(this.tlsReloadHandler);
+
this.remotingChannelManager.start();
this.defaultRemotingServer.start();
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index 103b099bbe..4ab0a01f70 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -75,8 +75,8 @@ public class Http2ProtocolProxyHandler implements
ProtocolHandler {
.build();
}
} catch (SSLException e) {
- log.error("Failed to create SSLContext for
Http2ProtocolProxyHandler", e);
- throw new RuntimeException("Failed to create SSLContext for
Http2ProtocolProxyHandler", e);
+ log.error("Failed to create SslContext for
Http2ProtocolProxyHandler", e);
+ throw new RuntimeException("Failed to create SslContext for
Http2ProtocolProxyHandler", e);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManager.java
new file mode 100644
index 0000000000..0e6f43baa3
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManager.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.service.cert;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.srvutil.FileWatchService;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TlsCertificateManager implements StartAndShutdown {
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+
+ private final FileWatchService fileWatchService;
+ private final List<TlsContextReloadListener> reloadListeners = new
ArrayList<>();
+
+ public TlsCertificateManager() {
+ try {
+ this.fileWatchService = new FileWatchService(
+ new String[] {
+ ConfigurationManager.getProxyConfig().getTlsCertPath(),
+ ConfigurationManager.getProxyConfig().getTlsKeyPath()
+ },
+ new CertKeyFileWatchListener(),
+ 60 * 60 * 1000 /* 1 hour */
+ );
+ } catch (Exception e) {
+ log.error("Failed to initialize TLS certificate watch service", e);
+ throw new RuntimeException("Failed to initialize TLS certificate
manager", e);
+ }
+ }
+
+ public FileWatchService getFileWatchService() {
+ return this.fileWatchService;
+ }
+
+ public void registerReloadListener(TlsContextReloadListener listener) {
+ if (listener != null) {
+ this.reloadListeners.add(listener);
+ }
+ }
+
+ public void unregisterReloadListener(TlsContextReloadListener listener) {
+ if (listener != null) {
+ this.reloadListeners.remove(listener);
+ }
+ }
+
+ public List<TlsContextReloadListener> getReloadListeners() {
+ return this.reloadListeners;
+ }
+
+ @Override
+ public void start() throws Exception {
+ this.fileWatchService.start();
+ log.info("TLS certificate manager started successfully, start
watching: {} {}",
+ ConfigurationManager.getProxyConfig().getTlsCertPath(),
+ ConfigurationManager.getProxyConfig().getTlsKeyPath()
+ );
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ this.fileWatchService.shutdown();
+ log.info("TLS certificate manager shutdown successfully");
+ }
+
+ private class CertKeyFileWatchListener implements
FileWatchService.Listener {
+ private boolean certChanged = false;
+ private boolean keyChanged = false;
+
+ @Override
+ public void onChanged(String path) {
+ log.info("File changed: {}", path);
+ if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+ certChanged = true;
+ } else if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+ keyChanged = true;
+ }
+
+ if (certChanged && keyChanged) {
+ log.info("The certificate and private key changed, reload the
ssl context");
+ notifyContextReload();
+ certChanged = false;
+ keyChanged = false;
+ }
+ }
+
+ private void notifyContextReload() {
+ for (TlsContextReloadListener listener : reloadListeners) {
+ try {
+ listener.onTlsContextReload();
+ } catch (Exception e) {
+ log.error("Failed to notify TLS context reload to
listener: " + listener, e);
+ }
+ }
+ }
+ }
+
+ // Interface for listeners interested in TLS context reload events
+ public interface TlsContextReloadListener {
+ void onTlsContextReload();
+ }
+}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManagerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManagerTest.java
new file mode 100644
index 0000000000..8c0f1ef549
--- /dev/null
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/cert/TlsCertificateManagerTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.service.cert;
+
+import java.io.FileWriter;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.srvutil.FileWatchService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.file.Path;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class TlsCertificateManagerTest {
+
+ @TempDir
+ Path tempDir;
+
+ private TlsCertificateManager manager;
+
+ @Mock
+ private ProxyConfig proxyConfig;
+
+ @Mock
+ private TlsCertificateManager.TlsContextReloadListener listener1;
+
+ @Mock
+ private TlsCertificateManager.TlsContextReloadListener listener2;
+
+ private File certFile;
+ private File keyFile;
+ private FileWatchService.Listener fileWatchListener;
+ private Field configField;
+ private ProxyConfig originalConfig;
+
+ @BeforeAll
+ public static void setUpAll() throws Exception {
+ ConfigurationManager.initEnv();
+ ConfigurationManager.intConfig();
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ // Create temporary certificate and key files
+ certFile = new File(tempDir.toFile(), "server.crt");
+ keyFile = new File(tempDir.toFile(), "server.key");
+ try (FileWriter certWriter = new FileWriter(certFile);
+ FileWriter keyWriter = new FileWriter(keyFile)) {
+ certWriter.write("test certificate content");
+ keyWriter.write("test key content");
+ }
+
+ // Set TlsSystemConfig paths
+ TlsSystemConfig.tlsServerCertPath = certFile.getAbsolutePath();
+ TlsSystemConfig.tlsServerKeyPath = keyFile.getAbsolutePath();
+
+ // Create the TlsCertificateManager
+ manager = new TlsCertificateManager();
+
+ // Extract the file watch listener using reflection
+ fileWatchListener = extractFileWatchListener(manager);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ // Restore the original config
+ if (configField != null && originalConfig != null) {
+ configField.set(null, originalConfig);
+ }
+ }
+
+ private FileWatchService.Listener
extractFileWatchListener(TlsCertificateManager manager) throws Exception {
+ Field fileWatchServiceField =
TlsCertificateManager.class.getDeclaredField("fileWatchService");
+ fileWatchServiceField.setAccessible(true);
+ FileWatchService fileWatchService = (FileWatchService)
fileWatchServiceField.get(manager);
+
+ Field listenerField =
FileWatchService.class.getDeclaredField("listener");
+ listenerField.setAccessible(true);
+ return (FileWatchService.Listener) listenerField.get(fileWatchService);
+ }
+
+ @Test
+ public void testConstructor() {
+ // The constructor should initialize the FileWatchService with the
correct paths
+ assertNotNull(manager);
+ }
+
+ @Test
+ public void testStartAndShutdown() throws Exception {
+ TlsCertificateManager managerSpy = spy(manager);
+
+ Field watchServiceField =
TlsCertificateManager.class.getDeclaredField("fileWatchService");
+ watchServiceField.setAccessible(true);
+ FileWatchService watchService = (FileWatchService)
watchServiceField.get(managerSpy);
+ FileWatchService watchServiceSpy = spy(watchService);
+ watchServiceField.set(managerSpy, watchServiceSpy);
+
+ managerSpy.start();
+ verify(watchServiceSpy).start();
+
+ managerSpy.shutdown();
+ verify(watchServiceSpy).shutdown();
+ }
+
+ @Test
+ public void testRegisterAndUnregisterListener() {
+ manager.registerReloadListener(listener1);
+
+ List<TlsCertificateManager.TlsContextReloadListener> listeners =
manager.getReloadListeners();
+ assertEquals(1, listeners.size());
+ assertTrue(listeners.contains(listener1));
+
+ manager.registerReloadListener(listener2);
+ assertEquals(2, listeners.size());
+ assertTrue(listeners.contains(listener2));
+
+ manager.unregisterReloadListener(listener1);
+ assertEquals(1, listeners.size());
+ assertFalse(listeners.contains(listener1));
+ assertTrue(listeners.contains(listener2));
+
+ manager.registerReloadListener(null);
+ assertEquals(1, listeners.size()); // Should remain unchanged
+
+ manager.unregisterReloadListener(null);
+ assertEquals(1, listeners.size()); // Should remain unchanged
+ }
+
+ @Test
+ public void testFileChangeNotification_CertOnly() throws Exception {
+ manager.registerReloadListener(listener1);
+
+ fileWatchListener.onChanged(certFile.getAbsolutePath());
+
+ verify(listener1, never()).onTlsContextReload();
+ }
+
+ @Test
+ public void testFileChangeNotification_KeyOnly() throws Exception {
+ manager.registerReloadListener(listener1);
+
+ fileWatchListener.onChanged(keyFile.getAbsolutePath());
+
+ verify(listener1, never()).onTlsContextReload();
+ }
+
+ @Test
+ public void testFileChangeNotification_BothFiles() throws Exception {
+ manager.registerReloadListener(listener1);
+
+ fileWatchListener.onChanged(certFile.getAbsolutePath());
+ fileWatchListener.onChanged(keyFile.getAbsolutePath());
+
+ verify(listener1, times(1)).onTlsContextReload();
+ }
+
+ @Test
+ public void testFileChangeNotification_MultipleListeners() throws
Exception {
+ manager.registerReloadListener(listener1);
+ manager.registerReloadListener(listener2);
+
+ fileWatchListener.onChanged(certFile.getAbsolutePath());
+ fileWatchListener.onChanged(keyFile.getAbsolutePath());
+
+ verify(listener1, times(1)).onTlsContextReload();
+ verify(listener2, times(1)).onTlsContextReload();
+ }
+
+ @Test
+ public void testFileChangeNotification_BothFilesReverseOrder() throws
Exception {
+ manager.registerReloadListener(listener1);
+
+ fileWatchListener.onChanged(keyFile.getAbsolutePath());
+ fileWatchListener.onChanged(certFile.getAbsolutePath());
+
+ verify(listener1, times(1)).onTlsContextReload();
+ }
+
+ @Test
+ public void testFileChangeNotification_RepeatedChanges() throws Exception {
+ manager.registerReloadListener(listener1);
+
+ fileWatchListener.onChanged(certFile.getAbsolutePath());
+ fileWatchListener.onChanged(keyFile.getAbsolutePath());
+
+ verify(listener1, times(1)).onTlsContextReload();
+
+ fileWatchListener.onChanged(certFile.getAbsolutePath());
+ fileWatchListener.onChanged(keyFile.getAbsolutePath());
+
+ verify(listener1, times(2)).onTlsContextReload();
+ }
+
+ @Test
+ public void testFileChangeNotification_UnknownFile() throws Exception {
+ manager.registerReloadListener(listener1);
+
+ fileWatchListener.onChanged("/unknown/file/path");
+
+ verify(listener1, never()).onTlsContextReload();
+ }
+
+ @Test
+ public void testFileChangeNotification_ListenerThrowsException() throws
Exception {
+ TlsCertificateManager.TlsContextReloadListener exceptionListener =
mock(TlsCertificateManager.TlsContextReloadListener.class);
+ doThrow(new RuntimeException("Test
exception")).when(exceptionListener).onTlsContextReload();
+
+ manager.registerReloadListener(exceptionListener);
+ manager.registerReloadListener(listener1);
+
+ fileWatchListener.onChanged(certFile.getAbsolutePath());
+ fileWatchListener.onChanged(keyFile.getAbsolutePath());
+
+ verify(exceptionListener, times(1)).onTlsContextReload();
+ verify(listener1, times(1)).onTlsContextReload();
+ }
+
+ @Test
+ public void testInnerCertKeyFileWatchListener() throws Exception {
+ Class<?> innerClass = null;
+ for (Class<?> clazz :
TlsCertificateManager.class.getDeclaredClasses()) {
+ if (clazz.getSimpleName().equals("CertKeyFileWatchListener")) {
+ innerClass = clazz;
+ break;
+ }
+ }
+
+ assertNotNull(innerClass, "CertKeyFileWatchListener class not found");
+
+ Constructor<?> constructor =
innerClass.getDeclaredConstructor(TlsCertificateManager.class);
+ constructor.setAccessible(true);
+ Object innerListener = constructor.newInstance(manager);
+
+ manager.registerReloadListener(listener1);
+
+ Method onChangedMethod = innerClass.getDeclaredMethod("onChanged",
String.class);
+ onChangedMethod.setAccessible(true);
+
+ onChangedMethod.invoke(innerListener, certFile.getAbsolutePath());
+ verify(listener1, never()).onTlsContextReload();
+
+ onChangedMethod.invoke(innerListener, keyFile.getAbsolutePath());
+ verify(listener1, times(1)).onTlsContextReload();
+
+ reset(listener1);
+
+ onChangedMethod.invoke(innerListener, certFile.getAbsolutePath());
+ verify(listener1, never()).onTlsContextReload();
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 92ced6b01a..8c9da13b40 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -167,10 +167,10 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
sslContext = TlsHelper.buildSslContext(true);
LOGGER.info("SSL enabled for client");
} catch (IOException e) {
- LOGGER.error("Failed to create SSLContext", e);
+ LOGGER.error("Failed to create SslContext", e);
} catch (CertificateException e) {
- LOGGER.error("Failed to create SSLContext", e);
- throw new RuntimeException("Failed to create SSLContext", e);
+ LOGGER.error("Failed to create SslContext", e);
+ throw new RuntimeException("Failed to create SslContext", e);
}
}
}
@@ -209,7 +209,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
pipeline.addFirst(defaultEventExecutorGroup,
"sslHandler", sslContext.newHandler(ch.alloc()));
LOGGER.info("Prepend SSL handler");
} else {
- LOGGER.warn("Connections are insecure as
SSLContext is null!");
+ LOGGER.warn("Connections are insecure as
SslContext is null!");
}
}
ch.pipeline().addLast(
@@ -337,7 +337,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
"sslHandler",
sslContext.newHandler(ch.alloc()));
LOGGER.info("Prepend SSL handler");
} else {
- LOGGER.warn("Connections are insecure as
SSLContext is null!");
+ LOGGER.warn("Connections are insecure as
SslContext is null!");
}
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 7ed804483b..d56d6faa33 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -184,9 +184,9 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
- log.info("SSLContext created for server");
+ log.info("SslContext created for server");
} catch (CertificateException | IOException e) {
- log.error("Failed to create SSLContext for server", e);
+ log.error("Failed to create SslContext for server", e);
}
}
}
@@ -514,7 +514,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
log.info("Handlers prepended to channel pipeline
to establish SSL connection");
} else {
ctx.close();
- log.error("Trying to establish an SSL connection
but sslContext is null");
+ log.error("Trying to establish an SSL connection
but SslContext is null");
}
break;
diff --git
a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
index 0c0690da5c..9b203e88ef 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
+++ b/srvutil/src/main/java/org/apache/rocketmq/srvutil/FileWatchService.java
@@ -34,15 +34,20 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class FileWatchService extends LifecycleAwareServiceThread {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ private static final int DEFAULT_WATCH_INTERVAL = 500;
private final Map<String, String> currentHash = new HashMap<>();
private final Listener listener;
- private static final int WATCH_INTERVAL = 500;
+ private final int watchInterval;
private final MessageDigest md = MessageDigest.getInstance("MD5");
- public FileWatchService(final String[] watchFiles,
- final Listener listener) throws Exception {
+ public FileWatchService(final String[] watchFiles, final Listener
listener) throws Exception {
+ this(watchFiles, listener, DEFAULT_WATCH_INTERVAL);
+ }
+
+ public FileWatchService(final String[] watchFiles, final Listener
listener, int watchInterval) throws Exception {
this.listener = listener;
+ this.watchInterval = watchInterval;
for (String file : watchFiles) {
if (!Strings.isNullOrEmpty(file) && new File(file).exists()) {
currentHash.put(file, md5Digest(file));
@@ -61,7 +66,7 @@ public class FileWatchService extends
LifecycleAwareServiceThread {
while (!this.isStopped()) {
try {
- this.waitForRunning(WATCH_INTERVAL);
+ this.waitForRunning(watchInterval);
for (Map.Entry<String, String> entry : currentHash.entrySet())
{
String newHash = md5Digest(entry.getKey());
if (!newHash.equals(entry.getValue())) {