[ 
https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869167#comment-17869167
 ] 

bboyleonp commented on KAFKA-16701:
-----------------------------------

[~gharris1727] I have added custom logs to narrow down the issue and find the 
following information

 

Log for test `closingChannelSendFailure` on JDK 17 ({color:#de350b}Fail{color})

 
{code:java}
[2024-07-28 19:46:31,751] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:46:31,751] DEBUG Leon: request obtained by callbackQueue is null 
(kafka.request.logger:471)
[2024-07-28 19:46:31,751] DEBUG Leon: the connectionQuotas of inetAddress is 1 
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] DEBUG Leon: the connectionQuotas of inetAddress is 0 
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] ERROR Exception while processing disconnection of 
127.0.0.1:51325-127.0.0.1:51327-0 
(kafka.network.SocketServerTest$TestableProcessor:76)
java.lang.IllegalArgumentException: Attempted to decrease connection count for 
address with no connections, address: /127.0.0.1
        at 
kafka.network.ConnectionQuotas.$anonfun$dec$1(SocketServer.scala:1535)
        at scala.collection.mutable.HashMap.getOrElse(HashMap.scala:451)
        at kafka.network.ConnectionQuotas.dec(SocketServer.scala:1535)
        at 
kafka.network.Processor.$anonfun$processDisconnected$1(SocketServer.scala:1225)
        at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008)
        at kafka.network.Processor.processDisconnected(SocketServer.scala:1216)
        at kafka.network.Processor.run(SocketServer.scala:1019)
        at java.base/java.lang.Thread.run(Thread.java:840)
[2024-07-28 19:46:41,755] DEBUG Leon: request obtained by requestQueue is null 
(kafka.request.logger:476)
[2024-07-28 19:46:41,755] DEBUG Leon: before finally::proxyServer.close 
(kafka:703) {code}
 

 

Log for test `closingChannelSendFailure` on JDK 11 
({color:#00875a}Success{color})

 
{code:java}
[2024-07-28 19:24:48,265] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:24:48,265] DEBUG Leon: request obtained by callbackQueue is null 
(kafka.request.logger:471)
[2024-07-28 19:24:48,265] DEBUG Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":0,"requestApiVersion":11,"correlationId":-1,"clientId":"","requestApiKeyName":"PRODUCE"},"request":{"transactionalId":null,"acks":0,"timeoutMs":10000,"topicData":[]},"response":"","connection":"127.0.0.1:50229-127.0.0.1:50231-0","totalTimeMs":104.595,"requestQueueTimeMs":0.0,"localTimeMs":3.2080946725E7,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.316,"sendTimeMs":0.03,"securityProtocol":"SSL","principal":"User:ANONYMOUS","listener":"SSL","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
 (kafka.request.logger:279)
[2024-07-28 19:24:48,265] TRACE Socket server received empty response to send, 
registering for read: Response(type=NoOp, request=Request(processor=0, 
connectionId=127.0.0.1:50229-127.0.0.1:50231-0, 
session=org.apache.kafka.network.Session@242ff747, 
listenerName=ListenerName(SSL), securityProtocol=SSL, 
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None)) 
(kafka.network.SocketServerTest$TestableProcessor:54)
[2024-07-28 19:24:48,266] TRACE Processor 0 received request: 
RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=, correlationId=-1, 
headerVersion=2) -- {acks=0,timeout=10000,partitionSizes=[]} 
(kafka.network.RequestChannel$:45)
[2024-07-28 19:24:48,266] DEBUG Leon: request obtained by requestQueue is 
Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, 
session=org.apache.kafka.network.Session@5329f6b3, 
listenerName=ListenerName(SSL), securityProtocol=SSL, 
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None) 
(kafka.request.logger:476)
[2024-07-28 19:24:48,267] DEBUG Leon: before finally::proxyServer.close 
(kafka:703)
[2024-07-28 19:24:48,267] DEBUG Leon: before sslConnect (kafka:1469)
[2024-07-28 19:24:48,268] DEBUG Leon: before sendRequest (kafka:1471) {code}
 

 

This test will call `receiveRequest` in _RequestChannel.scala_ to poll for 
`callbackQueue` and `requestQueue`.

I found a daemon in _SocketServer.scala_ that will monitor and count the valid 
connections which is maintained by `connectionQuotas`. It seems that the 
connections in JDK 17 are disconnected unexpectedly. *Please find the 2 lines 
marked in red.*

*Could you help to verify if you can find the same behavior on your environment 
as well?* Here's the code snippet that I use for printing out the 
connectionQuotas.
{code:java}
private def processDisconnected(): Unit = {
  selector.disconnected.keySet.forEach { connectionId =>
    try {
      val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
        throw new IllegalStateException(s"connectionId has unexpected format: 
$connectionId")
      }.remoteHost
      inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
      // the channel has been closed by the selector but the quotas still need 
to be updated
      val inetAddress = InetAddress.getByName(remoteHost)
      debug(s"Leon: the connectionQuotas of inetAddress is 
${connectionQuotas.get(inetAddress)}")
      connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
    } catch {
      case e: Throwable => processException(s"Exception while processing 
disconnection of $connectionId", e)
    }
  }
} {code}
 

It's rarely the case that `connectionQuota` goes down so quickly. I can only 
find 2 cases under JDK 17, but `closingChannelSendFailure` is the only one that 
goes down to 0.

In the current stage, I cannot come up a valid root cause of this issue. Any 
ideas?

> Some SocketServerTest buffered close tests flaky failing locally
> ----------------------------------------------------------------
>
>                 Key: KAFKA-16701
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16701
>             Project: Kafka
>          Issue Type: Test
>          Components: core, unit tests
>    Affects Versions: 3.5.0, 3.6.0, 3.7.0
>            Reporter: Greg Harris
>            Assignee: bboyleonp
>            Priority: Major
>              Labels: flaky-test
>
> These tests are failing for me on a local development environment, but don't 
> appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. 
> I'm using an M1 Mac, so it is possible that either the Mac's linear port 
> allocation, or a native implementation is impacting this.
> closingChannelSendFailure()
>  
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>       at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>       at 
> kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690)
>       at 
> kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434)
>       at 
> kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430)
>       at 
> kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat}
> closingChannelWithBufferedReceivesFailedSend()
>  
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>       at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>       at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>       at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
>       at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
>       at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat}
> closingChannelWithCompleteAndIncompleteBufferedReceives()
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>       at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>       at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>       at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
>       at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
>       at 
> kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511)
>  {noformat}
> remoteCloseWithBufferedReceives()
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>       at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>       at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
>       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>       at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
>       at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
>       at 
> kafka.network.SocketServerTest.remoteCloseWithBufferedReceives(SocketServerTest.scala:1453){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to