[ https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869167#comment-17869167 ]
bboyleonp commented on KAFKA-16701: ----------------------------------- [~gharris1727] I have added custom logs to narrow down the issue and find the following information Log for test `closingChannelSendFailure` on JDK 17 ({color:#de350b}Fail{color}) {code:java} [2024-07-28 19:46:31,751] DEBUG Leon: before receiveRequest (kafka:700) [2024-07-28 19:46:31,751] DEBUG Leon: request obtained by callbackQueue is null (kafka.request.logger:471) [2024-07-28 19:46:31,751] DEBUG Leon: the connectionQuotas of inetAddress is 1 (kafka.network.SocketServerTest$TestableProcessor:62) [2024-07-28 19:46:32,053] DEBUG Leon: the connectionQuotas of inetAddress is 0 (kafka.network.SocketServerTest$TestableProcessor:62) [2024-07-28 19:46:32,053] ERROR Exception while processing disconnection of 127.0.0.1:51325-127.0.0.1:51327-0 (kafka.network.SocketServerTest$TestableProcessor:76) java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /127.0.0.1 at kafka.network.ConnectionQuotas.$anonfun$dec$1(SocketServer.scala:1535) at scala.collection.mutable.HashMap.getOrElse(HashMap.scala:451) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:1535) at kafka.network.Processor.$anonfun$processDisconnected$1(SocketServer.scala:1225) at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008) at kafka.network.Processor.processDisconnected(SocketServer.scala:1216) at kafka.network.Processor.run(SocketServer.scala:1019) at java.base/java.lang.Thread.run(Thread.java:840) [2024-07-28 19:46:41,755] DEBUG Leon: request obtained by requestQueue is null (kafka.request.logger:476) [2024-07-28 19:46:41,755] DEBUG Leon: before finally::proxyServer.close (kafka:703) {code} Log for test `closingChannelSendFailure` on JDK 11 ({color:#00875a}Success{color}) {code:java} [2024-07-28 19:24:48,265] DEBUG Leon: before receiveRequest (kafka:700) [2024-07-28 19:24:48,265] DEBUG Leon: request obtained by callbackQueue is null (kafka.request.logger:471) [2024-07-28 19:24:48,265] DEBUG Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":0,"requestApiVersion":11,"correlationId":-1,"clientId":"","requestApiKeyName":"PRODUCE"},"request":{"transactionalId":null,"acks":0,"timeoutMs":10000,"topicData":[]},"response":"","connection":"127.0.0.1:50229-127.0.0.1:50231-0","totalTimeMs":104.595,"requestQueueTimeMs":0.0,"localTimeMs":3.2080946725E7,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.316,"sendTimeMs":0.03,"securityProtocol":"SSL","principal":"User:ANONYMOUS","listener":"SSL","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} (kafka.request.logger:279) [2024-07-28 19:24:48,265] TRACE Socket server received empty response to send, registering for read: Response(type=NoOp, request=Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, session=org.apache.kafka.network.Session@242ff747, listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None)) (kafka.network.SocketServerTest$TestableProcessor:54) [2024-07-28 19:24:48,266] TRACE Processor 0 received request: RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=, correlationId=-1, headerVersion=2) -- {acks=0,timeout=10000,partitionSizes=[]} (kafka.network.RequestChannel$:45) [2024-07-28 19:24:48,266] DEBUG Leon: request obtained by requestQueue is Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, session=org.apache.kafka.network.Session@5329f6b3, listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None) (kafka.request.logger:476) [2024-07-28 19:24:48,267] DEBUG Leon: before finally::proxyServer.close (kafka:703) [2024-07-28 19:24:48,267] DEBUG Leon: before sslConnect (kafka:1469) [2024-07-28 19:24:48,268] DEBUG Leon: before sendRequest (kafka:1471) {code} This test will call `receiveRequest` in _RequestChannel.scala_ to poll for `callbackQueue` and `requestQueue`. I found a daemon in _SocketServer.scala_ that will monitor and count the valid connections which is maintained by `connectionQuotas`. It seems that the connections in JDK 17 are disconnected unexpectedly. *Please find the 2 lines marked in red.* *Could you help to verify if you can find the same behavior on your environment as well?* Here's the code snippet that I use for printing out the connectionQuotas. {code:java} private def processDisconnected(): Unit = { selector.disconnected.keySet.forEach { connectionId => try { val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost inflightResponses.remove(connectionId).foreach(updateRequestMetrics) // the channel has been closed by the selector but the quotas still need to be updated val inetAddress = InetAddress.getByName(remoteHost) debug(s"Leon: the connectionQuotas of inetAddress is ${connectionQuotas.get(inetAddress)}") connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost)) } catch { case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e) } } } {code} It's rarely the case that `connectionQuota` goes down so quickly. I can only find 2 cases under JDK 17, but `closingChannelSendFailure` is the only one that goes down to 0. In the current stage, I cannot come up a valid root cause of this issue. Any ideas? > Some SocketServerTest buffered close tests flaky failing locally > ---------------------------------------------------------------- > > Key: KAFKA-16701 > URL: https://issues.apache.org/jira/browse/KAFKA-16701 > Project: Kafka > Issue Type: Test > Components: core, unit tests > Affects Versions: 3.5.0, 3.6.0, 3.7.0 > Reporter: Greg Harris > Assignee: bboyleonp > Priority: Major > Labels: flaky-test > > These tests are failing for me on a local development environment, but don't > appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. > I'm using an M1 Mac, so it is possible that either the Mac's linear port > allocation, or a native implementation is impacting this. > closingChannelSendFailure() > > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690) > at > kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434) > at > kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430) > at > kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat} > closingChannelWithBufferedReceivesFailedSend() > > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) > at > kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat} > closingChannelWithCompleteAndIncompleteBufferedReceives() > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) > at > kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511) > {noformat} > remoteCloseWithBufferedReceives() > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) > at > kafka.network.SocketServerTest.remoteCloseWithBufferedReceives(SocketServerTest.scala:1453){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)