[ https://issues.apache.org/jira/browse/KAFKA-6043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma resolved KAFKA-6043. -------------------------------- Resolution: Auto Closed The old consumer is deprecated and no further changes are planned. Please upgrade to the Java consumer whenever possible. > Kafka 8.1.1 - > .ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) > blocked > ------------------------------------------------------------------------------------------------ > > Key: KAFKA-6043 > URL: https://issues.apache.org/jira/browse/KAFKA-6043 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.8.1.1 > Reporter: Ramkumar > Attachments: 6043.v1, stdout.LRMIID-158150.zip > > > we are using 3 node kafka cluster. Around this kafka cluster we have a > RESTful service which provides http APIs for client. This service maintains > the consumer connection in cache. And this cache is set to expire in 60 > minutes after which , the consumer connection will get disconnected. > But we see this zookeeperconnection thread is blocked and the consumer object > is still hanging in jvm. Can you pls let me know if there is any solution > identified for this > Below is the output from thread dump when this occurred > pool-25100-thread-1" prio=10 tid=0x00007f711804e820 nid=0x6726 waiting for > monitor entry [0x00007f6cd999b000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161) > - waiting to lock <0x000000076a922c40> (a java.lang.Object) > at > kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:110) > at > com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:207) > at > com.att.nsa.cambria.backends.kafka.KafkaConsumer$2.call(KafkaConsumer.java:201) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > T--APPC-LCM-READ-E2E_T2_watcher_executor" prio=10 tid=0x00007f7180198030 > nid=0x55a2 waiting on condition [0x00007f6c9f4fa000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000076b2e1508> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) > at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) > - locked <0x000000076a922340> (a java.lang.Object) > at > kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala > :524) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:562) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:457) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:408) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:402) > - locked <0x000000076a922c40> (a java.lang.Object) > at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.4.14#64029)