[
https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Jacot resolved KAFKA-13457.
---------------------------------
Fix Version/s: 3.2.0
Reviewer: David Jacot
Resolution: Fixed
> SocketChannel in Acceptor#accept is not closed upon IOException
> ---------------------------------------------------------------
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
> Issue Type: Bug
> Components: network
> Affects Versions: 2.8.0
> Reporter: Haoze Wu
> Priority: Major
> Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new
> connection in the `accept` function, it handles the
> `TooManyConnectionsException` and `ConnectionThrottledException`. However,
> the socketChannel operations (line 720 or 721 or 722) within the try block
> may potentially throw an IOException as well, which is not handled.
>
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
> private def accept(key: SelectionKey): Option[SocketChannel] = {
> val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
> val socketChannel = serverSocketChannel.accept() // line 717
> try {
> connectionQuotas.inc(endPoint.listenerName,
> socketChannel.socket.getInetAddress, blockedPercentMeter)
> socketChannel.configureBlocking(false) // line 720
> socketChannel.socket().setTcpNoDelay(true) // line 721
> socketChannel.socket().setKeepAlive(true) // line 722
> if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
> socketChannel.socket().setSendBufferSize(sendBufferSize)
> Some(socketChannel)
> } catch {
> case e: TooManyConnectionsException =>
> info(s"Rejected connection from ${e.ip}, address already has the
> configured maximum of ${e.count} connections.")
> close(endPoint.listenerName, socketChannel)
> None
> case e: ConnectionThrottledException =>
> val ip = socketChannel.socket.getInetAddress
> debug(s"Delaying closing of connection from $ip for
> ${e.throttleTimeMs} ms")
> val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
> throttledSockets += DelayedCloseSocket(socketChannel,
> endThrottleTimeMs)
> None
> }
> }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in
> line 706, which only prints an error message. The socketChannel that throws
> this IOException is not closed.
>
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> private def acceptNewConnections(): Unit = {
> val ready = nioSelector.select(500)
> if (ready > 0) {
> val keys = nioSelector.selectedKeys()
> val iter = keys.iterator()
> while (iter.hasNext && isRunning) {
> try {
> val key = iter.next
> iter.remove() if (key.isAcceptable) {
> accept(key).foreach { socketChannel =>
> ...
> } while (!assignNewConnection(socketChannel, processor,
> retriesLeft == 0))
> }
> } else
> throw new IllegalStateException("Unrecognized key state for
> acceptor thread.")
> } catch {
> case e: Throwable => error("Error while accepting connection", e)
> // line 706
> }
> }
> }
> }
> {code}
> We found during testing this would cause our Kafka clients to experience
> errors (InvalidReplicationFactorException) for 40+ seconds when creating new
> topics. After 40 seconds, the clients would be able to create new topics
> successfully.
> We check that after adding the socketChannel.close() upon IOException, the
> symptoms will disappear, so the clients do not need to wait for 40s to be
> working again.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)