[ 
https://issues.apache.org/jira/browse/KAFKA-5721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-5721.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.11.0.2

The specific deadlock in the stack trace in this JIRA was fixed under 
KAFKA-3994 (0.10.2.0). There have been a couple of other deadlocks which have 
been fixed since. The two releases that don't have any known deadlocks are 
1.0.0 and 0.11.0.2.

> Kafka broker stops after network failure
> ----------------------------------------
>
>                 Key: KAFKA-5721
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5721
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Val Feron
>             Fix For: 0.11.0.2
>
>         Attachments: thread_dump_kafka.out
>
>
> +Cluster description+
> * 3 brokers
> * version 0.10.1.1
> * running on AWS
> +Description+
> The following will happen at random intervals, on random brokers
> From the logs here is the information I could gather :
> # Shrinking Intra-cluster replication on a random broker (I suppose it could 
> be a temporary network failure but couldn't produce evidence of it)
> # System starts showing close to no activity @02:27:20 (note that it's not 
> load related as it happens at very quiet times)
> !https://i.stack.imgur.com/g1Pem.png!
> # From there, this kafka broker doesn't process messages which is expected 
> IMO as it dropped out of the cluster replication.
> # Now the real issue appears as the number of connections in CLOSE_WAIT is 
> constantly increasing until it reaches the configured ulimit of the 
> system/process, ending up crashing the kafka process.
> Now, I've been changing limits to see if kafka would eventually join again 
> the ISR before crashing but even with a limit that's very high, kafka just 
> seems stuck in a weird state and never recovers.
> Note that between the time when the faulty broker is on its own and the time 
> it crashes, kafka is listening and kafka producer.
> For this single crash, I could see 320 errors like this from the producers :
> {code}java.util.concurrent.ExecutionException: 
> org.springframework.kafka.core.KafkaProducerException: Failed to send; nested 
> exception is org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.
> {code}
> ----
> The configuration being the default one and the use being quite standard, I'm 
> wondering if I missed something.
> I put in place a script that check the number of kafka file descriptors and 
> restarts the service when it gets abnormally high, which does the trick for 
> now but I still lose messages when it crashes.
> I'm available to make any verification / test you need.
> Could see some similarities with ticket KAFKA-5007 too.
> cc [~junrao]
> Confirming the logs seen at the time of failure :
> {code}
>  /var/log/kafka/kafkaServer.out:[2017-08-09 22:13:29,045] WARN 
> [ReplicaFetcherThread-0-2], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@4a63f79 
> (kafka.server.ReplicaFetcherThread)
>     /var/log/kafka/kafkaServer.out-java.io.IOException: Connection to 2 was 
> disconnected before the response was read
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
>     /var/log/kafka/kafkaServer.out-   at 
> scala.Option.foreach(Option.scala:257)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {code}
> {code}
>     /var/log/kafka/kafkaServer.out:[2017-08-09 22:17:39,940] WARN 
> [ReplicaFetcherThread-0-2], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@3a8493a8 
> (kafka.server.ReplicaFetcherThread)
>     /var/log/kafka/kafkaServer.out-java.io.IOException: Connection 
> [masked:xxx.xxx.xxx.xxx]:9092 (id: 2 rack: null) failed
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:83)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:93)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:248)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>     /var/log/kafka/kafkaServer.out-   at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {code}
> Finally, here is the thread dump of the faulty node at the time of failure 
> (taken when the number of connections in any state on port 9092 > 1000) :
> {code}
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.111-b14 mixed mode):
> "Attach Listener" #57 daemon prio=9 os_prio=0 tid=0x00007f8d94003000 
> nid=0x20fd waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> "executor-Produce" #56 prio=5 os_prio=0 tid=0x00007f8d78002800 nid=0x1c02 
> waiting on condition [0x00007f8d402fd000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c06d8008> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "executor-Heartbeat" #55 prio=5 os_prio=0 tid=0x00007f8d98040800 nid=0x1bfe 
> waiting for monitor entry [0x00007f8d417fe000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:739)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
>       at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-9" #54 daemon prio=5 os_prio=0 tid=0x00007f8d58010800 
> nid=0x6b09 waiting on condition [0x00007f8ca30f9000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-8" #53 daemon prio=5 os_prio=0 tid=0x000000000243d000 
> nid=0x6b08 waiting on condition [0x00007f8ca3bfa000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-7" #52 daemon prio=5 os_prio=0 tid=0x00007f8dc003d800 
> nid=0x6b07 waiting on condition [0x00007f8ca3cfb000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "ReplicaFetcherThread-0-1" #51 prio=5 os_prio=0 tid=0x00007f8dc003a800 
> nid=0x6b06 runnable [0x00007f8ca3dfc000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>       - locked <0x00000000c009c370> (a sun.nio.ch.Util$3)
>       - locked <0x00000000c009c360> (a java.util.Collections$UnmodifiableSet)
>       - locked <0x00000000c009c380> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>       at org.apache.kafka.common.network.Selector.select(Selector.java:470)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>       at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
>       at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
>       at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
>       at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
>       at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>       at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "executor-Fetch" #50 prio=5 os_prio=0 tid=0x00007f8d74003000 nid=0x6b05 
> waiting on condition [0x00007f8ca3efd000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068a9c8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "ReplicaFetcherThread-0-3" #49 prio=5 os_prio=0 tid=0x00007f8dc0026800 
> nid=0x6b04 runnable [0x00007f8ca3ffe000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>       - locked <0x00000000c009c840> (a sun.nio.ch.Util$3)
>       - locked <0x00000000c009c830> (a java.util.Collections$UnmodifiableSet)
>       - locked <0x00000000c009c850> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>       at org.apache.kafka.common.network.Selector.select(Selector.java:470)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>       at 
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:136)
>       at 
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
>       at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
>       at 
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
>       at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
>       at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
>       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
>       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "kafka-request-handler-7" #48 daemon prio=5 os_prio=0 tid=0x00007f8df0d57000 
> nid=0x6b01 waiting for monitor entry [0x00007f8d5c3fe000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.coordinator.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:432)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:420)
>       at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:322)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-6" #47 daemon prio=5 os_prio=0 tid=0x00007f8df0d55000 
> nid=0x6b00 waiting for monitor entry [0x00007f8d681eb000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:308)
>       - waiting to lock <0x00000000da1804a8> (a java.util.LinkedList)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:199)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:909)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:902)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:902)
>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:475)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:523)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-5" #46 daemon prio=5 os_prio=0 tid=0x00007f8df0d53000 
> nid=0x6aff waiting for monitor entry [0x00007f8d682ec000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.coordinator.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:432)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:420)
>       at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:322)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-4" #45 daemon prio=5 os_prio=0 tid=0x00007f8df0d51000 
> nid=0x6afe waiting for monitor entry [0x00007f8d683ed000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.coordinator.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:432)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:420)
>       at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:322)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:86)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-3" #44 daemon prio=5 os_prio=0 tid=0x00007f8df0d4f800 
> nid=0x6afd waiting for monitor entry [0x00007f8d684ee000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:308)
>       - waiting to lock <0x00000000da1804a8> (a java.util.LinkedList)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:199)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:909)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:902)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:902)
>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:475)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:523)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-2" #43 daemon prio=5 os_prio=0 tid=0x00007f8df0d4a800 
> nid=0x6afc waiting for monitor entry [0x00007f8d685ef000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.coordinator.GroupCoordinator.handleHeartbeat(GroupCoordinator.scala:351)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at kafka.server.KafkaApis.handleHeartbeatRequest(KafkaApis.scala:1095)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:90)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-1" #42 daemon prio=5 os_prio=0 tid=0x00007f8df0d49000 
> nid=0x6afb waiting for monitor entry [0x00007f8d686f0000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$putCacheCallback$2(GroupMetadataManager.scala:301)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$prepareStoreOffsets$1.apply(GroupMetadataManager.scala:357)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$prepareStoreOffsets$1.apply(GroupMetadataManager.scala:357)
>       at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:123)
>       at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>       at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:105)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
>       - locked <0x00000000da180450> (a kafka.server.DelayedProduce)
>       - locked <0x00000000da1804a8> (a java.util.LinkedList)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:199)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:909)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:902)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:902)
>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:475)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:523)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-0" #41 daemon prio=5 os_prio=0 tid=0x00007f8df0d46800 
> nid=0x6afa waiting for monitor entry [0x00007f8d68a78000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:308)
>       - waiting to lock <0x00000000da1804a8> (a java.util.LinkedList)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:199)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:909)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:902)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:902)
>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:475)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:523)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> "group-metadata-manager-0" #40 daemon prio=5 os_prio=0 tid=0x00007f8df0d2b000 
> nid=0x6af9 waiting for monitor entry [0x00007f8d68b79000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:308)
>       - waiting to lock <0x00000000da1804a8> (a java.util.LinkedList)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:199)
>       at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:374)
>       at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:457)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1$$anonfun$apply$10.apply(GroupMetadataManager.scala:600)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1$$anonfun$apply$10.apply(GroupMetadataManager.scala:593)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1.apply(GroupMetadataManager.scala:593)
>       - locked <0x00000000c8941218> (a kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1.apply(GroupMetadataManager.scala:579)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at kafka.utils.Pool$$anon$1.foreach(Pool.scala:89)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at kafka.utils.Pool.foreach(Pool.scala:26)
>       at 
> kafka.coordinator.GroupMetadataManager.cleanupGroupMetadata(GroupMetadataManager.scala:579)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$1.apply$mcV$sp(GroupMetadataManager.scala:101)
>       at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "ExpirationReaper-2" #39 prio=5 os_prio=0 tid=0x00007f8df0d02000 nid=0x6af8 
> waiting on condition [0x00007f8d68c7a000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068c358> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at java.util.concurrent.DelayQueue.poll(DelayQueue.java:259)
>       at kafka.utils.timer.SystemTimer.advanceClock(Timer.scala:106)
>       at 
> kafka.server.DelayedOperationPurgatory.advanceClock(DelayedOperation.scala:350)
>       at 
> kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:374)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "ExpirationReaper-2" #38 prio=5 os_prio=0 tid=0x00007f8df0cf0800 nid=0x6af7 
> waiting on condition [0x00007f8d68d7b000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068c5b0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at java.util.concurrent.DelayQueue.poll(DelayQueue.java:259)
>       at kafka.utils.timer.SystemTimer.advanceClock(Timer.scala:106)
>       at 
> kafka.server.DelayedOperationPurgatory.advanceClock(DelayedOperation.scala:350)
>       at 
> kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:374)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "ExpirationReaper-2" #37 prio=5 os_prio=0 tid=0x00007f8df0cdc800 nid=0x6af6 
> waiting on condition [0x00007f8d68e7c000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068c808> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at java.util.concurrent.DelayQueue.poll(DelayQueue.java:259)
>       at kafka.utils.timer.SystemTimer.advanceClock(Timer.scala:106)
>       at 
> kafka.server.DelayedOperationPurgatory.advanceClock(DelayedOperation.scala:350)
>       at 
> kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:374)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "kafka-scheduler-6" #36 daemon prio=5 os_prio=0 tid=0x00007f8d6007a800 
> nid=0x6af5 waiting on condition [0x00007f8d68f7d000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-5" #35 daemon prio=5 os_prio=0 tid=0x00007f8df0c58000 
> nid=0x6af4 waiting on condition [0x00007f8d6907e000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-4" #34 daemon prio=5 os_prio=0 tid=0x00007f8d58018800 
> nid=0x6af3 waiting on condition [0x00007f8d6917f000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-3" #33 daemon prio=5 os_prio=0 tid=0x00007f8df0c56800 
> nid=0x6af2 waiting on condition [0x00007f8d69280000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-2" #32 daemon prio=5 os_prio=0 
> tid=0x00007f8df0c54000 nid=0x6af1 waiting on condition [0x00007f8d6a7f3000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068caf8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "ExpirationReaper-2" #31 prio=5 os_prio=0 tid=0x00007f8df0c4a800 nid=0x6af0 
> waiting on condition [0x00007f8d6a8f4000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068de68> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at java.util.concurrent.DelayQueue.poll(DelayQueue.java:259)
>       at kafka.utils.timer.SystemTimer.advanceClock(Timer.scala:106)
>       at 
> kafka.server.DelayedOperationPurgatory.advanceClock(DelayedOperation.scala:350)
>       at 
> kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:374)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "ExpirationReaper-2" #30 prio=5 os_prio=0 tid=0x00007f8df0c3d000 nid=0x6aef 
> waiting on condition [0x00007f8d6a9f5000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068e0c0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at java.util.concurrent.DelayQueue.poll(DelayQueue.java:259)
>       at kafka.utils.timer.SystemTimer.advanceClock(Timer.scala:106)
>       at 
> kafka.server.DelayedOperationPurgatory.advanceClock(DelayedOperation.scala:350)
>       at 
> kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper.doWork(DelayedOperation.scala:374)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "kafka-socket-acceptor-PLAINTEXT-9092" #29 prio=5 os_prio=0 
> tid=0x00007f8df0bde000 nid=0x6aee runnable [0x00007f8d6aaf6000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>       - locked <0x00000000c068e348> (a sun.nio.ch.Util$3)
>       - locked <0x00000000c068e338> (a java.util.Collections$UnmodifiableSet)
>       - locked <0x00000000c068e358> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>       at kafka.network.Acceptor.run(SocketServer.scala:260)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-network-thread-2-PLAINTEXT-1" #28 prio=5 os_prio=0 
> tid=0x00007f8df0bdc000 nid=0x6aed waiting on condition [0x00007f8d6abf7000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c06ba8d0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
>       at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:212)
>       at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:493)
>       at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
>       at kafka.network.Processor.run(SocketServer.scala:417)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-network-thread-2-PLAINTEXT-0" #27 prio=5 os_prio=0 
> tid=0x00007f8df0bda800 nid=0x6aec waiting on condition [0x00007f8d6acf8000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c06ba8d0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
>       at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:212)
>       at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:493)
>       at 
> kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> kafka.network.Processor.processCompletedReceives(SocketServer.scala:487)
>       at kafka.network.Processor.run(SocketServer.scala:417)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-log-cleaner-thread-0" #23 prio=5 os_prio=0 tid=0x00007f8df08d4800 
> nid=0x6aeb waiting on condition [0x00007f8d6aff9000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068eb60> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
>       at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:258)
>       at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:218)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "kafka-scheduler-2" #26 daemon prio=5 os_prio=0 tid=0x00007f8df08d2000 
> nid=0x6aea waiting on condition [0x00007f8d6b235000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-1" #25 daemon prio=5 os_prio=0 tid=0x00007f8df08cf800 
> nid=0x6ae9 waiting on condition [0x00007f8d6b336000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-scheduler-0" #24 daemon prio=5 os_prio=0 tid=0x00007f8df08ce800 
> nid=0x6ae8 waiting on condition [0x00007f8db41d2000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c03239e8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "metrics-meter-tick-thread-1" #22 daemon prio=5 os_prio=0 
> tid=0x00007f8df0b2a800 nid=0x6ae7 waiting on condition [0x00007f8d6bbfe000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068caf8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "main-EventThread" #19 daemon prio=5 os_prio=0 tid=0x00007f8df0a81800 
> nid=0x6a8c waiting on condition [0x00007f8db42d3000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c0172470> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>       at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)
> "main-SendThread(xxx.xxx.xxx.xxx:2181)" #18 daemon prio=5 os_prio=0 
> tid=0x00007f8df0a87800 nid=0x6a8b runnable [0x00007f8db43d4000]
>    java.lang.Thread.State: RUNNABLE
>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>       - locked <0x00000000c0172588> (a sun.nio.ch.Util$3)
>       - locked <0x00000000c0172598> (a java.util.Collections$UnmodifiableSet)
>       - locked <0x00000000c0172540> (a sun.nio.ch.EPollSelectorImpl)
>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>       at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
>       at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> "ZkClient-EventThread-17-xxx.xxx.22.69:2181,xxx.xxx.37.169:2181,xxx.xxx.47.202:2181"
>  #17 daemon prio=5 os_prio=0 tid=0x00007f8df0a71000 nid=0x6a8a waiting on 
> condition [0x00007f8db44d5000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c0175d18> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>       at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>       at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
> "ThrottledRequestReaper-Produce" #16 prio=5 os_prio=0 tid=0x00007f8df0a62800 
> nid=0x6a89 waiting on condition [0x00007f8db45d6000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c0175f40> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at java.util.concurrent.DelayQueue.poll(DelayQueue.java:259)
>       at 
> kafka.server.ClientQuotaManager$ThrottledRequestReaper.doWork(ClientQuotaManager.scala:158)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "ThrottledRequestReaper-Fetch" #15 prio=5 os_prio=0 tid=0x00007f8df0a5d800 
> nid=0x6a88 waiting on condition [0x00007f8db46d7000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c0176140> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at java.util.concurrent.DelayQueue.poll(DelayQueue.java:259)
>       at 
> kafka.server.ClientQuotaManager$ThrottledRequestReaper.doWork(ClientQuotaManager.scala:158)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> "SensorExpiryThread" #14 daemon prio=5 os_prio=0 tid=0x00007f8df0a56000 
> nid=0x6a87 waiting on condition [0x00007f8db47d8000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c0176188> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
>       at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "RMI TCP Accept-0" #11 daemon prio=5 os_prio=0 tid=0x00007f8df0811000 
> nid=0x6a85 runnable [0x00007f8db4dda000]
>    java.lang.Thread.State: RUNNABLE
>       at java.net.PlainSocketImpl.socketAccept(Native Method)
>       at 
> java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
>       at java.net.ServerSocket.implAccept(ServerSocket.java:545)
>       at java.net.ServerSocket.accept(ServerSocket.java:513)
>       at 
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
>       at 
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
>       at 
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
>       at java.lang.Thread.run(Thread.java:745)
> "Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007f8df047c800 nid=0x6a84 
> runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> "C1 CompilerThread2" #8 daemon prio=9 os_prio=0 tid=0x00007f8df0471800 
> nid=0x6a83 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> "C2 CompilerThread1" #7 daemon prio=9 os_prio=0 tid=0x00007f8df046f000 
> nid=0x6a82 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> "C2 CompilerThread0" #6 daemon prio=9 os_prio=0 tid=0x00007f8df046d000 
> nid=0x6a81 waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> "Signal Dispatcher" #5 daemon prio=9 os_prio=0 tid=0x00007f8df046b000 
> nid=0x6a80 runnable [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> "Surrogate Locker Thread (Concurrent GC)" #4 daemon prio=9 os_prio=0 
> tid=0x00007f8df0469800 nid=0x6a7f waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
> "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f8df0437000 nid=0x6a7e in 
> Object.wait() [0x00007f8dc8293000]
>    java.lang.Thread.State: WAITING (on object monitor)
>       at java.lang.Object.wait(Native Method)
>       at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>       - locked <0x00000000c0177d78> (a java.lang.ref.ReferenceQueue$Lock)
>       at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
>       at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
> "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f8df0432800 
> nid=0x6a7d in Object.wait() [0x00007f8dc8394000]
>    java.lang.Thread.State: WAITING (on object monitor)
>       at java.lang.Object.wait(Native Method)
>       at java.lang.Object.wait(Object.java:502)
>       at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
>       - locked <0x00000000c0179958> (a java.lang.ref.Reference$Lock)
>       at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
> "main" #1 prio=5 os_prio=0 tid=0x00007f8df000c800 nid=0x6a70 waiting on 
> condition [0x00007f8df6ee2000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c068f298> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at kafka.server.KafkaServer.awaitShutdown(KafkaServer.scala:625)
>       at 
> kafka.server.KafkaServerStartable.awaitShutdown(KafkaServerStartable.scala:70)
>       at kafka.Kafka$.main(Kafka.scala:68)
>       at kafka.Kafka.main(Kafka.scala)
> "VM Thread" os_prio=0 tid=0x00007f8df042a800 nid=0x6a7c runnable 
> "Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007f8df0023800 
> nid=0x6a71 runnable 
> "Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007f8df0025000 
> nid=0x6a72 runnable 
> "Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007f8df0027000 
> nid=0x6a73 runnable 
> "Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007f8df0028800 
> nid=0x6a74 runnable 
> "G1 Main Concurrent Mark GC Thread" os_prio=0 tid=0x00007f8df0043800 
> nid=0x6a7a runnable 
> "Gang worker#0 (G1 Parallel Marking Threads)" os_prio=0 
> tid=0x00007f8df0045800 nid=0x6a7b runnable 
> "G1 Concurrent Refinement Thread#0" os_prio=0 tid=0x00007f8df0032800 
> nid=0x6a79 runnable 
> "G1 Concurrent Refinement Thread#1" os_prio=0 tid=0x00007f8df0030800 
> nid=0x6a78 runnable 
> "G1 Concurrent Refinement Thread#2" os_prio=0 tid=0x00007f8df002f000 
> nid=0x6a77 runnable 
> "G1 Concurrent Refinement Thread#3" os_prio=0 tid=0x00007f8df002d000 
> nid=0x6a76 runnable 
> "G1 Concurrent Refinement Thread#4" os_prio=0 tid=0x00007f8df002b000 
> nid=0x6a75 runnable 
> "VM Periodic Task Thread" os_prio=0 tid=0x00007f8df0826800 nid=0x6a86 waiting 
> on condition 
> JNI global references: 287
> Found one Java-level deadlock:
> =============================
> "executor-Heartbeat":
>   waiting to lock monitor 0x00007f8dbc3df928 (object 0x00000000c8941218, a 
> kafka.coordinator.GroupMetadata),
>   which is held by "group-metadata-manager-0"
> "group-metadata-manager-0":
>   waiting to lock monitor 0x00007f8dac031f58 (object 0x00000000da1804a8, a 
> java.util.LinkedList),
>   which is held by "kafka-request-handler-1"
> "kafka-request-handler-1":
>   waiting to lock monitor 0x00007f8dbc3df928 (object 0x00000000c8941218, a 
> kafka.coordinator.GroupMetadata),
>   which is held by "group-metadata-manager-0"
> Java stack information for the threads listed above:
> ===================================================
> "executor-Heartbeat":
>       at 
> kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:739)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
>       at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "group-metadata-manager-0":
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:308)
>       - waiting to lock <0x00000000da1804a8> (a java.util.LinkedList)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:199)
>       at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:374)
>       at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:457)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1$$anonfun$apply$10.apply(GroupMetadataManager.scala:600)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1$$anonfun$apply$10.apply(GroupMetadataManager.scala:593)
>       at scala.Option.foreach(Option.scala:257)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1.apply(GroupMetadataManager.scala:593)
>       - locked <0x00000000c8941218> (a kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$cleanupGroupMetadata$1.apply(GroupMetadataManager.scala:579)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at kafka.utils.Pool$$anon$1.foreach(Pool.scala:89)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at kafka.utils.Pool.foreach(Pool.scala:26)
>       at 
> kafka.coordinator.GroupMetadataManager.cleanupGroupMetadata(GroupMetadataManager.scala:579)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$1.apply$mcV$sp(GroupMetadataManager.scala:101)
>       at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> "kafka-request-handler-1":
>       at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$putCacheCallback$2(GroupMetadataManager.scala:301)
>       - waiting to lock <0x00000000c8941218> (a 
> kafka.coordinator.GroupMetadata)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$prepareStoreOffsets$1.apply(GroupMetadataManager.scala:357)
>       at 
> kafka.coordinator.GroupMetadataManager$$anonfun$prepareStoreOffsets$1.apply(GroupMetadataManager.scala:357)
>       at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:123)
>       at 
> kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>       at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:105)
>       at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
>       - locked <0x00000000da180450> (a kafka.server.DelayedProduce)
>       - locked <0x00000000da1804a8> (a java.util.LinkedList)
>       at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
>       at 
> kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:199)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:909)
>       at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:902)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>       at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:902)
>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:475)
>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:523)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>       at java.lang.Thread.run(Thread.java:745)
> Found 1 deadlock.
> {code}
> Also attached as text file if that's easier for you.
> (I've masked the ip addresses)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to