This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee3e5eaa9f9 [fix] Handle TLS close_notify to avoid
SslClosedEngineException: SSLEngine closed already (#24986)
ee3e5eaa9f9 is described below
commit ee3e5eaa9f9364287061a70dbb41f9bc9c2adb0c
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Nov 15 10:11:13 2025 +0200
[fix] Handle TLS close_notify to avoid SslClosedEngineException: SSLEngine
closed already (#24986)
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 13 ------------
.../pulsar/common/protocol/PulsarDecoder.java | 24 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 13 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 9cccbecf3aa..07df92f501e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -29,7 +29,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.Promise;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
@@ -1442,18 +1441,6 @@ public class ClientCnx extends PulsarHandler {
}
}
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
- if (evt instanceof SslHandshakeCompletionEvent) {
- SslHandshakeCompletionEvent sslHandshakeCompletionEvent =
(SslHandshakeCompletionEvent) evt;
- if (sslHandshakeCompletionEvent.cause() != null) {
- log.warn("{} Got ssl handshake exception {}", ctx.channel(),
- sslHandshakeCompletionEvent);
- }
- }
- ctx.fireUserEventTriggered(evt);
- }
-
protected void closeWithException(Throwable e) {
if (ctx != null) {
connectionFuture.completeExceptionally(e);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 33307a8cacc..9017805c75b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.ssl.SslCloseCompletionEvent;
+import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAckResponse;
@@ -749,4 +751,26 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
}
+
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+ if (evt instanceof SslHandshakeCompletionEvent) {
+ // log handshake failures
+ SslHandshakeCompletionEvent sslHandshakeCompletionEvent =
(SslHandshakeCompletionEvent) evt;
+ if (!sslHandshakeCompletionEvent.isSuccess()) {
+ log.warn("[{}] TLS handshake failed. {}", ctx.channel(),
sslHandshakeCompletionEvent);
+ }
+ } else if (evt instanceof SslCloseCompletionEvent) {
+ // handle TLS close_notify event and immediately close the channel
+ // this is not handled by Netty by default
+ // See https://datatracker.ietf.org/doc/html/rfc8446#section-6.1
for more details
+ SslCloseCompletionEvent sslCloseCompletionEvent =
(SslCloseCompletionEvent) evt;
+ if (sslCloseCompletionEvent.isSuccess() &&
ctx.channel().isActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received a TLS close_notify, closing the
channel.", ctx.channel());
+ }
+ ctx.close();
+ }
+ }
+ ctx.fireUserEventTriggered(evt);
+ }
}