[
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179980#comment-14179980
]
Chris Richardson commented on KAFKA-1716:
-----------------------------------------
All of my ConsumerFetcherThreads look like this:
{quote}
at sun.nio.ch.Net.poll(Native Method)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:950)
- locked <0x000000077b15f408> (a java.lang.Object)
at
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
- locked <0x000000077b15f3d8> (a java.lang.Object)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
- locked <0x000000077b15f520> (a
sun.nio.ch.SocketAdaptor$SocketInputStream)
at
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
- locked <0x000000077b15fe60> (a java.lang.Object)
at kafka.utils.Utils$.read(Utils.scala:375)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
- locked <0x000000077b15fdb8> (a java.lang.Object)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
{quote}
And all of my shutdown threads are like this:
{quote}
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c2c9a0a0> (a
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
at
kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
at
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
at
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
- locked <0x00000006c2cb9e60> (a java.lang.Object)
at
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
at
kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
at
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167)
{quote}
> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.1.1
> Reporter: Sean Fay
> Assignee: Neha Narkhede
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to
> wedge in the case that some consumer fetcher threads receive messages during
> the shutdown process.
> Shutdown thread:
> {code} -- Parking to wait for:
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code} -- Parking to wait for:
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at kafka/utils/Utils$.inLock(Utils.scala:538)
> at
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> at
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
> at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)