[ https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot updated KAFKA-13457: -------------------------------- Summary: SocketChannel in Acceptor#accept is not closed upon IOException (was: socketChannel in Acceptor#accept is not closed upon IOException) > SocketChannel in Acceptor#accept is not closed upon IOException > --------------------------------------------------------------- > > Key: KAFKA-13457 > URL: https://issues.apache.org/jira/browse/KAFKA-13457 > Project: Kafka > Issue Type: Bug > Components: network > Affects Versions: 2.8.0 > Reporter: Haoze Wu > Priority: Major > > When the kafka.network.Acceptor in SocketServer.scala accepts a new > connection in the `accept` function, it handles the > `TooManyConnectionsException` and `ConnectionThrottledException`. However, > the socketChannel operations (line 720 or 721 or 722) within the try block > may potentially throw an IOException as well, which is not handled. > > {code:java} > //core/src/main/scala/kafka/network/SocketServer.scala > // Acceptor class > private def accept(key: SelectionKey): Option[SocketChannel] = { > val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] > val socketChannel = serverSocketChannel.accept() // line 717 > try { > connectionQuotas.inc(endPoint.listenerName, > socketChannel.socket.getInetAddress, blockedPercentMeter) > socketChannel.configureBlocking(false) // line 720 > socketChannel.socket().setTcpNoDelay(true) // line 721 > socketChannel.socket().setKeepAlive(true) // line 722 > if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) > socketChannel.socket().setSendBufferSize(sendBufferSize) > Some(socketChannel) > } catch { > case e: TooManyConnectionsException => > info(s"Rejected connection from ${e.ip}, address already has the > configured maximum of ${e.count} connections.") > close(endPoint.listenerName, socketChannel) > None > case e: ConnectionThrottledException => > val ip = socketChannel.socket.getInetAddress > debug(s"Delaying closing of connection from $ip for > ${e.throttleTimeMs} ms") > val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs > throttledSockets += DelayedCloseSocket(socketChannel, > endThrottleTimeMs) > None > } > } > {code} > This thrown IOException is caught in the caller `acceptNewConnections` in > line 706, which only prints an error message. The socketChannel that throws > this IOException is not closed. > > {code:java} > //core/src/main/scala/kafka/network/SocketServer.scala > private def acceptNewConnections(): Unit = { > val ready = nioSelector.select(500) > if (ready > 0) { > val keys = nioSelector.selectedKeys() > val iter = keys.iterator() > while (iter.hasNext && isRunning) { > try { > val key = iter.next > iter.remove() if (key.isAcceptable) { > accept(key).foreach { socketChannel => > ... > } while (!assignNewConnection(socketChannel, processor, > retriesLeft == 0)) > } > } else > throw new IllegalStateException("Unrecognized key state for > acceptor thread.") > } catch { > case e: Throwable => error("Error while accepting connection", e) > // line 706 > } > } > } > } > {code} > We found during testing this would cause our Kafka clients to experience > errors (InvalidReplicationFactorException) for 40+ seconds when creating new > topics. After 40 seconds, the clients would be able to create new topics > successfully. > We check that after adding the socketChannel.close() upon IOException, the > symptoms will disappear, so the clients do not need to wait for 40s to be > working again. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)