Thanks Jun!

Jiangjie, could you file a JIRA? Thanks.

Guozhang

On Tue, Nov 11, 2014 at 9:27 AM, Jun Rao <jun...@gmail.com> wrote:

> Hi, Jiangjie,
>
> Thanks for the investigation. Yes, this seems like a real issue.
>
> 1. It doesn't seem that we need to put the shutdownCommand back into the
> queue. Once an iterator receives a shutdownCommand, it will be in a Done
> state and will remain in that state forever.
>
> 2. Yes, we just need to get the unique set of queues and put in a
> shutdownCommand
> per queue.
>
> Jun
>
>
> On Mon, Nov 10, 2014 at 7:27 PM, Becket Qin <becket....@gmail.com> wrote:
>
> > Hi,
> >
> > We encountered a production issue recently that Mirror Maker could not
> > properly shutdown because ZookeeperConsumerConnector is blocked on
> > shutdown(). After looking into the code, we found 2 issues that caused
> this
> > problem.
> >
> > 1. After consumer iterator receives the shutdownCommand, It puts the
> > shutdownCommand back into the data chunk queue. Is there any reason for
> > doing this?
> >
> > 2. In ZookeeperConsumerConnector shutdown(), we could potentially put
> > multiple shutdownCommand into the same data chunk queue, provided the
> > topics are sharing the same data chunk queue in topicThreadIdAndQueues.
> > (KAFKA-1764 is opened)
> >
> > In our case, we only have 1 consumer stream for all the topics, the data
> > chunk queue capacity is set to 1. The execution sequence causing problem
> is
> > as below:
> > 1. ZookeeperConsumerConnector shutdown() is called, it tries to put
> > shutdownCommand for each queue in topicThreadIdAndQueues. Since we only
> > have 1 queue, multiple shutdownCommand will be put into the queue.
> > 2. In sendShutdownToAllQueues(), between queue.clean() and
> > queue.put(shutdownCommand), consumer iterator receives the
> shutdownCommand
> > and put it back into the data chunk queue. After that,
> > ZookeeperConsumerConnector tries to put another shutdownCommand into the
> > data chunk queue but will block forever.
> >
> >
> > The thread stack trace is as below:
> >
> > "Thread-23" #58 prio=5 os_prio=0 tid=0x00007ff440004800 nid=0x40a waiting
> > on condition [0x00007ff4f0124000]
> >    java.lang.Thread.State: WAITING (parking)
> >         at sun.misc.Unsafe.park(Native Method)
> >         - parking to wait for  <0x0000000680b96bf0> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >         at
> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >         at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >         at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199)
> >         at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192)
> >         - locked <0x0000000680dd5848> (a java.lang.Object)
> >         at
> >
> >
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
> >         at
> >
> >
> kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185)
> >         at scala.collection.immutable.List.foreach(List.scala:318)
> >         at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185)
> >         at kafka.tools.MirrorMaker$$anon$1.run(MirrorMaker.scala:169)
> >
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
>



-- 
-- Guozhang

Reply via email to