This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 0e46be5bf07 KAFKA-16471 invoke SSLEngine::closeInbound on SslTransportLayer close (#15655) 0e46be5bf07 is described below commit 0e46be5bf078f6ca734b7502a1f2b10cf8bc4eee Author: Gaurav Narula <gaurav_naru...@apple.com> AuthorDate: Fri Apr 5 05:11:03 2024 +0100 KAFKA-16471 invoke SSLEngine::closeInbound on SslTransportLayer close (#15655) Invokes `SSLEngine::closeInbound` after we flush close_notify alert tothe socket. This fixes memory leak in Netty/OpenSSL based SSLEngine which only free native resources once closeInbound has been invoked. Reviewers: Omnia Ibrahim <o.g.h.ibra...@gmail.com>, Luke Chen <show...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/common/network/SslTransportLayer.java | 8 +++++ .../common/network/SslTransportLayerTest.java | 35 ++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 904c5216a40..870b1c7f7dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -199,6 +199,14 @@ public class SslTransportLayer implements TransportLayer { } catch (IOException ie) { log.debug("Failed to send SSL Close message", ie); } finally { + try { + sslEngine.closeInbound(); + } catch (SSLException e) { + // This log is for debugging purposes as an exception might occur frequently + // at this point due to peers not following the TLS specs and failing to send a close_notify alert. + // Even if they do, currently, we do not read data from the socket after invoking close(). + log.debug("SSLEngine.closeInBound() raised an exception.", e); + } socketChannel.socket().close(); socketChannel.close(); netReadBuffer = null; diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d92f4facb3c..e65d53b2b7a 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -47,6 +48,7 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SelectionKey; @@ -65,6 +67,7 @@ import java.util.stream.Stream; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLException; import javax.net.ssl.SSLParameters; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -72,6 +75,12 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; /** * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses. @@ -1467,4 +1476,30 @@ public class SslTransportLayerTest { } } } + + @Test + public void testSSLEngineCloseInboundInvokedOnClose() throws IOException { + // Given + SSLEngine sslEngine = mock(SSLEngine.class); + Socket socket = mock(Socket.class); + SocketChannel socketChannel = mock(SocketChannel.class); + SelectionKey selectionKey = mock(SelectionKey.class); + when(socketChannel.socket()).thenReturn(socket); + when(selectionKey.channel()).thenReturn(socketChannel); + doThrow(new SSLException("Mock exception")).when(sslEngine).closeInbound(); + SslTransportLayer sslTransportLayer = new SslTransportLayer( + "test-channel", + selectionKey, + sslEngine, + mock(ChannelMetadataRegistry.class) + ); + + // When + sslTransportLayer.close(); + + // Then + verify(sslEngine, times(1)).closeOutbound(); + verify(sslEngine, times(1)).closeInbound(); + verifyNoMoreInteractions(sslEngine); + } }