[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356289#comment-14356289
 ] 

Ashwin Jayaprakash commented on KAFKA-1716:
-------------------------------------------

We upgraded to Kafka 0.8.2 last week and now we can reproduce this issue every 
time on our Kafka consumer JVMs.

Our setup is like this. We start {{ConsumerConnector}} instances dynamically 
based on a configurable property. Each of those {{ConsumerConnector}} instances 
creates a {{ConsumerIterator}}. Right now we have 4 such instances in each JVM. 
Naturally we have 4 separate threads consuming from those 4 iterators in 
parallel. 

All this worked ok until recently, where we faced some issues with consumer 
rebalancing and an overloaded ZK subtree, see 
http://markmail.org/thread/gnodacjjya6r573m. While we were trying to address 
that we changed the defaults to these {{rebalance.max.retries 16}} and 
{{rebalance.backoff.ms 10000}}. Note that we also upgraded to 0.8.2.

Everytime we shutdown the JVM, we first try to shutdown the consumers one by 
one before exiting. With these recent changes, the JVM exit gets stuck because:
# The shutdown thread is different from the 4 consumer threads (in addition to 
the background threads that ZK and Kafka create)
# The shutdown thread shuts down the first consumer and so that consumer exits 
quickly and gracefully
# In the meanwhile the second, third and fourth consumers are trying to 
rebalance the partitions
# Shutdown thread proceeds to call shutdown on the second consumer
## The shutdown thread appears to make some progress in shutting down the 
second consumer but then gets stuck on a monitor that has been acquired by the 
{{xx_watcher_executor}}
## This appears to be a deadlock because the {{xx_watcher_executor}} thread has 
acquired the monitor lock and gone to sleep
# The shutdown then takes a long time because all the 3 remaining consumers 
retry for {{16}} times and then give up

The thread dumps here should make it clear.

{code}    
"Thread-13@8222" prio=5 tid=0x53 nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
         waiting for 
indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160
 to release lock on <0x2b5e> (a java.lang.Object)
          at 
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:191)
          at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:119)
          at 
xx.yy.zz.processor.kafka.consumer.KafkaMessageSource.close(KafkaMessageSource.java:239)
          at 
xx.yy.zz.pipeline.source.MessageSourceStage.stop(MessageSourceStage.java:162)
          at 
xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53)
          at xx.yy.zz.pipeline.framework.Pipeline.stop(Pipeline.java:205)
          at 
xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53)
          at xx.yy.zz.pipeline.framework.Pipelines.stop(Pipelines.java:225)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
          at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
          at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.lang.reflect.Method.invoke(Method.java:606)
          at xx.yy.AppLifecycle.callAnnotatedMethods(AppLifecycle.java:163)
          at xx.yy.AppLifecycle.stop(AppLifecycle.java:144)
          - locked <0x2b31> (a xx.yy.AppLifecycle)
          at xx.yy.AppLifecycle$6.stop(AppLifecycle.java:247)
          at io.dropwizard.lifecycle.JettyManaged.doStop(JettyManaged.java:32)
          at 
org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90)
          - locked <0x2b83> (a java.lang.Object)
          at 
org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:129)
          at 
org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:148)
          at 
org.eclipse.jetty.server.handler.AbstractHandler.doStop(AbstractHandler.java:71)
          at org.eclipse.jetty.server.Server.doStop(Server.java:410)
          at 
org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90)
          - locked <0x2b84> (a java.lang.Object)
          at 
org.eclipse.jetty.util.thread.ShutdownThread.run(ShutdownThread.java:133)

"indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160"
 daemon prio=5 tid=0x74 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
         blocks Thread-13@8222
          at java.lang.Thread.sleep(Thread.java:-1)
          at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:627)
          at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
          at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602)
          at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
          at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
          at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
          at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598)
          - locked <0x2b5e> (a java.lang.Object)
          at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:551)
{code}

Log snippet attached. 


> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
>                 Key: KAFKA-1716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1716
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.8.1.1
>            Reporter: Sean Fay
>            Assignee: Neha Narkhede
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}    -- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>     at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
>     at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>     at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
>     at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
>     at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
>     at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>     at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
>     at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
>     at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
>     at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
>     ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
>     at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
>     at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
>     at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}    -- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
>     at jrockit/vm/Locks.park0(J)V(Native Method)
>     at jrockit/vm/Locks.park(Locks.java:2230)
>     at sun/misc/Unsafe.park(ZJ)V(Native Method)
>     at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
>     at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>     at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>     at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>     at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
>     at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
>     at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
>     at kafka/utils/Utils$.inLock(Utils.scala:538)
>     at 
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
>     at 
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>     at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
>     at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to