[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358354#comment-16358354 ]
ASF GitHub Bot commented on KAFKA-6529: --------------------------------------- rajinisivaram closed pull request #4517: KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives URL: https://github.com/apache/kafka/pull/4517 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 6bfcfd21a90..ed037b3a8f7 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -325,9 +325,9 @@ public void send(Send send) { } catch (Exception e) { // update the state for consistency, the channel will be discarded after `close` channel.state(ChannelState.FAILED_SEND); - // ensure notification via `disconnected` + // ensure notification via `disconnected` when `failedSends` are processed in the next poll this.failedSends.add(connectionId); - close(channel, false); + close(channel, false, false); if (!(e instanceof CancelledKeyException)) { log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e); @@ -450,6 +450,7 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys, if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); + boolean sendFailed = false; try { /* complete any connections that have finished their handshake (either normally or immediately) */ @@ -491,7 +492,13 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys, /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { - Send send = channel.write(); + Send send = null; + try { + send = channel.write(); + } catch (Exception e) { + sendFailed = true; + throw e; + } if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); @@ -500,7 +507,7 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys, /* cancel any defunct sockets */ if (!key.isValid()) - close(channel, true); + close(channel, true, true); } catch (Exception e) { String desc = channel.socketDescription(); @@ -510,7 +517,7 @@ else if (e instanceof AuthenticationException) // will be logged later as error log.debug("Connection with {} disconnected due to authentication exception", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); - close(channel, true); + close(channel, !sendFailed, true); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } @@ -620,7 +627,7 @@ private void maybeCloseOldestConnection(long currentTimeNanos) { log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); channel.state(ChannelState.EXPIRED); - close(channel, true); + close(channel, true, true); } } } @@ -674,7 +681,7 @@ public void close(String id) { // There is no disconnect notification for local close, but updating // channel state here anyway to avoid confusion. channel.state(ChannelState.LOCAL_CLOSE); - close(channel, false); + close(channel, false, false); } else { KafkaChannel closingChannel = this.closingChannels.remove(id); // Close any closing channel, leave the channel in the state in which closing was triggered @@ -694,7 +701,10 @@ public void close(String id) { * closed immediately. The channel will not be added to disconnected list and it is the * responsibility of the caller to handle disconnect notifications. */ - private void close(KafkaChannel channel, boolean processOutstanding) { + private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect) { + + if (processOutstanding && !notifyDisconnect) + throw new IllegalStateException("Disconnect notification required for remote disconnect after processing outstanding requests"); channel.disconnect(); @@ -712,8 +722,9 @@ private void close(KafkaChannel channel, boolean processOutstanding) { if (processOutstanding && deque != null && !deque.isEmpty()) { // stagedReceives will be moved to completedReceives later along with receives from other channels closingChannels.put(channel.id(), channel); + log.debug("Tracking closing connection {} to process outstanding requests", channel.id()); } else - doClose(channel, processOutstanding); + doClose(channel, notifyDisconnect); this.channels.remove(channel.id()); if (idleExpiryManager != null) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 13299c7cc29..8ee4c36ba44 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -148,7 +148,7 @@ class SocketServerTest extends JUnitSuite { } def sendAndReceiveRequest(socket: Socket, server: SocketServer): RequestChannel.Request = { - sendRequest(socket, producerRequestBytes) + sendRequest(socket, producerRequestBytes()) receiveRequest(server.requestChannel) } @@ -157,11 +157,10 @@ class SocketServerTest extends JUnitSuite { server.metrics.close() } - private def producerRequestBytes: Array[Byte] = { + private def producerRequestBytes(ack: Short = 0): Array[Byte] = { val correlationId = -1 val clientId = "" val ackTimeoutMs = 10000 - val ack = 0: Short val emptyRequest = ProduceRequest.Builder.forCurrentMagic(ack, ackTimeoutMs, new HashMap[TopicPartition, MemoryRecords]()).build() @@ -177,7 +176,7 @@ class SocketServerTest extends JUnitSuite { @Test def simpleRequest() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() // Test PLAINTEXT socket sendRequest(plainSocket, serializedBytes) @@ -206,7 +205,7 @@ class SocketServerTest extends JUnitSuite { @Test def testGracefulClose() { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() for (_ <- 0 until 10) sendRequest(plainSocket, serializedBytes) @@ -221,7 +220,7 @@ class SocketServerTest extends JUnitSuite { @Test def testNoOpAction(): Unit = { val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() for (_ <- 0 until 3) sendRequest(plainSocket, serializedBytes) @@ -235,7 +234,7 @@ class SocketServerTest extends JUnitSuite { @Test def testConnectionId() { val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT)) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() val requests = sockets.map{socket => sendRequest(socket, serializedBytes) @@ -264,7 +263,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() // Connection with no staged receives val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) @@ -347,7 +346,7 @@ class SocketServerTest extends JUnitSuite { // Send requests to `channel1` until a receive is staged and advance time beyond idle time so that `channel1` is // closed with staged receives and is in Selector.closingChannels - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() val request = sendRequestsUntilStagedReceive(overrideServer, socket1, serializedBytes) time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openChannel.isEmpty, "Idle channel not closed") @@ -437,7 +436,7 @@ class SocketServerTest extends JUnitSuite { TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length, "Failed to decrement connection count after close") val conn2 = connect() - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn2, serializedBytes) val request = server.requestChannel.receiveRequest(2000) assertNotNull(request) @@ -456,7 +455,7 @@ class SocketServerTest extends JUnitSuite { val conns = (0 until overrideNum).map(_ => connect(overrideServer)) // it should succeed - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conns.last, serializedBytes) val request = overrideServer.requestChannel.receiveRequest(2000) assertNotNull(request) @@ -539,7 +538,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() conn = connect(overrideServer) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel @@ -564,6 +563,54 @@ class SocketServerTest extends JUnitSuite { } } + @Test + def testClientDisconnectionWithStagedReceivesFullyProcessed() { + val serverMetrics = new Metrics + @volatile var selector: TestableSelector = null + val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0" + val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { + override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { + new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { + override protected[network] def connectionId(socket: Socket): String = overrideConnectionId + override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = { + val testableSelector = new TestableSelector(config, channelBuilder, time, metrics) + selector = testableSelector + testableSelector + } + } + } + } + + def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId) + def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId) + + try { + overrideServer.startup() + val socket = connect(overrideServer) + + TestUtils.waitUntilTrue(() => openChannel.nonEmpty, "Channel not found") + + // Setup channel to client with staged receives so when client disconnects + // it will be stored in Selector.closingChannels + val serializedBytes = producerRequestBytes(1) + val request = sendRequestsUntilStagedReceive(overrideServer, socket, serializedBytes) + + // Set SoLinger to 0 to force a hard disconnect via TCP RST + socket.setSoLinger(true, 0) + socket.close() + + // Complete request with socket exception so that the channel is removed from Selector.closingChannels + processRequest(overrideServer.requestChannel, request) + TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not closed after failed send") + assertTrue("Unexpected completed send", selector.completedSends.isEmpty) + } finally { + overrideServer.shutdown() + serverMetrics.close() + } + } + /* * Test that we update request metrics if the channel has been removed from the selector when the broker calls * `selector.send` (selector closes old connections, for example). @@ -578,7 +625,7 @@ class SocketServerTest extends JUnitSuite { try { overrideServer.startup() conn = connect(overrideServer) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) val channel = overrideServer.requestChannel val request = receiveRequest(channel) @@ -697,7 +744,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.updateMinWakeup(2) val sockets = (1 to 2).map(_ => connect(testableServer)) - sockets.foreach(sendRequest(_, producerRequestBytes)) + sockets.foreach(sendRequest(_, producerRequestBytes())) testableServer.testableSelector.addFailure(SelectorOperation.Send) sockets.foreach(_ => processRequest(testableServer.requestChannel)) @@ -720,7 +767,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.updateMinWakeup(2) val sockets = (1 to 2).map(_ => connect(testableServer)) - sockets.foreach(sendRequest(_, producerRequestBytes)) + sockets.foreach(sendRequest(_, producerRequestBytes())) val requestChannel = testableServer.requestChannel val requests = sockets.map(_ => receiveRequest(requestChannel)) @@ -748,7 +795,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.updateMinWakeup(2) val sockets = (1 to 2).map(_ => connect(testableServer)) - val serializedBytes = producerRequestBytes + val serializedBytes = producerRequestBytes() val request = sendRequestsUntilStagedReceive(testableServer, sockets(0), serializedBytes) sendRequest(sockets(1), serializedBytes) @@ -782,7 +829,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.cachedCompletedReceives.minPerPoll = 2 testableSelector.addFailure(SelectorOperation.Mute) - sockets.foreach(sendRequest(_, producerRequestBytes)) + sockets.foreach(sendRequest(_, producerRequestBytes())) val requests = sockets.map(_ => receiveRequest(requestChannel)) testableSelector.waitForOperations(SelectorOperation.Mute, 2) testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) @@ -906,7 +953,7 @@ class SocketServerTest extends JUnitSuite { // Check new channel behaves as expected val (socket, connectionId) = connectAndProcessRequest(testableServer) - assertArrayEquals(producerRequestBytes, receiveResponse(socket)) + assertArrayEquals(producerRequestBytes(), receiveResponse(socket)) assertNotNull("Channel should not have been closed", selector.channel(connectionId)) assertNull("Channel should not be closing", selector.closingChannel(connectionId)) socket.close() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker leaks memory and file descriptors after sudden client disconnects > ------------------------------------------------------------------------ > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network > Affects Versions: 1.0.0, 0.11.0.2 > Reporter: Graham Campbell > Priority: Major > Fix For: 1.1.0, 0.11.0.3, 1.0.2 > > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)