[ https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajini Sivaram resolved KAFKA-9796. ----------------------------------- Fix Version/s: 2.6.0 Reviewer: Rajini Sivaram Resolution: Fixed > Broker shutdown could be stuck forever under certain conditions > --------------------------------------------------------------- > > Key: KAFKA-9796 > URL: https://issues.apache.org/jira/browse/KAFKA-9796 > Project: Kafka > Issue Type: Bug > Reporter: David Jacot > Assignee: David Jacot > Priority: Major > Fix For: 2.6.0 > > > During the broker initialisation, the Acceptor threads are started early to > know the bound port and delays starting the processors to the end of the > initialisation sequence. We have found out that the shutdown of a broker > could be stuck forever under the following conditions: > - the shutdown procedure is started before the processors are started; > - the `newConnections` queues of the processors are full; and > - an extra new connection has been accepted but can't be queued up in a > processor. > For instance, this could happen if a `NodeExistsException` is raised when the > broker tries to register itself in ZK. > When the above conditions happens, the shutting down triggers the shutdown of > the acceptor threads and waits until they are (first thread dump bellow). If > an acceptor as a pending connection which can't be queued up in a processor, > it ends up waiting until space is made is new queue to accept the new > connection (second thread dump bellow). As the processors are not started, > the new connection queues are not drained so it never releases the acceptor > thread. > *Shutdown wait on acceptor to shutdown* > {noformat} > "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s > tid=0x00007f625001c800 nid=0x272 waiting on condition [0x00007f6257ca4000] > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method) > - parking to wait for <0x0000000689a61800> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345) > at > java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232) > at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430) > at kafka.network.Acceptor.shutdown(SocketServer.scala:521) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267) > at > kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267) > at > kafka.network.SocketServer$$Lambda$604/0x0000000840540840.apply(Unknown > Source) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213) > at > kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267) > - locked <0x0000000689a61ac0> (a kafka.network.SocketServer) > at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806) > at > kafka.server.KafkaServer$$Lambda$602/0x000000084052b040.apply$mcV$sp(Unknown > Source) > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68) > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806) > at kafka.server.KafkaServer.startup(KafkaServer.scala:522) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) > at kafka.Kafka$.main(Kafka.scala:82) > at kafka.Kafka.main(Kafka.scala) > {noformat} > *Acceptor waits on processor to accept the new connection* > {noformat} > "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 > prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x00007f62523b5000 > nid=0x2ca waiting on condition [0x00007f6157130000] > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method) > - parking to wait for <0x0000000689a7cad8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.5/AbstractQueuedSynchronizer.java:2081) > at > java.util.concurrent.ArrayBlockingQueue.put(java.base@11.0.5/ArrayBlockingQueue.java:367) > at kafka.network.Processor.accept(SocketServer.scala:1020) > at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:639) > at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:566) > at kafka.network.Acceptor.run(SocketServer.scala:550) > at java.lang.Thread.run(java.base@11.0.5/Thread.java:834) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)