[ 
https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-9796.
-----------------------------------
    Fix Version/s: 2.6.0
         Reviewer: Rajini Sivaram
       Resolution: Fixed

> Broker shutdown could be stuck forever under certain conditions
> ---------------------------------------------------------------
>
>                 Key: KAFKA-9796
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9796
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: David Jacot
>            Assignee: David Jacot
>            Priority: Major
>             Fix For: 2.6.0
>
>
> During the broker initialisation, the Acceptor threads are started early to 
> know the bound port and delays starting the processors to the end of the 
> initialisation sequence. We have found out that the shutdown of a broker 
> could be stuck forever under the following conditions:
>  - the shutdown procedure is started before the processors are started;
>  - the `newConnections` queues of the processors are full; and
>  - an extra new connection has been accepted but can't be queued up in a 
> processor.
> For instance, this could happen if a `NodeExistsException` is raised when the 
> broker tries to register itself in ZK.
> When the above conditions happens, the shutting down triggers the shutdown of 
> the acceptor threads and waits until they are (first thread dump bellow). If 
> an acceptor as a pending connection which can't be queued up in a processor, 
> it ends up waiting until space is made is new queue to accept the new 
> connection (second thread dump bellow). As the processors are not started, 
> the new connection queues are not drained so it never releases the acceptor 
> thread.
> *Shutdown wait on acceptor to shutdown*
> {noformat}
> "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s 
> tid=0x00007f625001c800 nid=0x272 waiting on condition  [0x00007f6257ca4000]
>    java.lang.Thread.State: WAITING (parking)
>       at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
>       - parking to wait for  <0x0000000689a61800> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345)
>       at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232)
>       at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
>       at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
>       at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
>       at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
>       at 
> kafka.network.SocketServer$$Lambda$604/0x0000000840540840.apply(Unknown 
> Source)
>       at scala.collection.Iterator.foreach(Iterator.scala:941)
>       at scala.collection.Iterator.foreach$(Iterator.scala:941)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>       at 
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
>       at 
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
>       - locked <0x0000000689a61ac0> (a kafka.network.SocketServer)
>       at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806)
>       at 
> kafka.server.KafkaServer$$Lambda$602/0x000000084052b040.apply$mcV$sp(Unknown 
> Source)
>       at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
>       at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806)
>       at kafka.server.KafkaServer.startup(KafkaServer.scala:522)
>       at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
>       at kafka.Kafka$.main(Kafka.scala:82)
>       at kafka.Kafka.main(Kafka.scala)
> {noformat}
> *Acceptor waits on processor to accept the new connection*
> {noformat}
> "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54 
> prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x00007f62523b5000 
> nid=0x2ca waiting on condition  [0x00007f6157130000]
>    java.lang.Thread.State: WAITING (parking)
>       at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
>       - parking to wait for  <0x0000000689a7cad8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.5/AbstractQueuedSynchronizer.java:2081)
>       at 
> java.util.concurrent.ArrayBlockingQueue.put(java.base@11.0.5/ArrayBlockingQueue.java:367)
>       at kafka.network.Processor.accept(SocketServer.scala:1020)
>       at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:639)
>       at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:566)
>       at kafka.network.Acceptor.run(SocketServer.scala:550)
>       at java.lang.Thread.run(java.base@11.0.5/Thread.java:834)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to