[ 
https://issues.apache.org/jira/browse/KAFKA-1764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14211781#comment-14211781
 ] 

Chris Cope commented on KAFKA-1764:
-----------------------------------

Why do builds always break just as I put the kids to sleep and grab a glass of 
wine?

> ZookeeperConsumerConnector could put multiple shutdownCommand to the same 
> data chunk queue.
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1764
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1764
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>         Attachments: KAFKA-1764.patch, KAFKA-1764_2014-11-12_14:05:35.patch
>
>
> In ZookeeperConsumerConnector shutdown(), we could potentially put multiple 
> shutdownCommand into the same data chunk queue, provided the topics are 
> sharing the same data chunk queue in topicThreadIdAndQueues.
> From email thread to document:
> In ZookeeperConsumerConnector shutdown(), we could potentially put
> multiple shutdownCommand into the same data chunk queue, provided the
> topics are sharing the same data chunk queue in topicThreadIdAndQueues.
> In our case, we only have 1 consumer stream for all the topics, the data
> chunk queue capacity is set to 1. The execution sequence causing problem is
> as below:
> 1. ZookeeperConsumerConnector shutdown() is called, it tries to put
> shutdownCommand for each queue in topicThreadIdAndQueues. Since we only
> have 1 queue, multiple shutdownCommand will be put into the queue.
> 2. In sendShutdownToAllQueues(), between queue.clean() and
> queue.put(shutdownCommand), consumer iterator receives the shutdownCommand
> and put it back into the data chunk queue. After that,
> ZookeeperConsumerConnector tries to put another shutdownCommand into the
> data chunk queue but will block forever.
> The thread stack trace is as below:
> {code}
> "Thread-23" #58 prio=5 os_prio=0 tid=0x00007ff440004800 nid=0x40a waiting
> on condition [0x00007ff4f0124000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000680b96bf0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
>         at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262)
>         at
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259)
>         at
> kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199)
>         at
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192)
>         - locked <0x0000000680dd5848> (a java.lang.Object)
>         at
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
>         at
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to