Ok. One of the issues is that when you have a consumer that consumes
multiple topics, if one of the consumer threads is slow in consuming
messages from one topic, it can block the consumption of other consumer
threads. This is because we use a shared fetcher to fetch all topics. There
is an in-memory queue per topic. If one of the queues is full, the fetcher
will block and can't put the data into other queues.

A workaround is to use different consumer connectors, each consuming a
single topic.

Thanks,

Jun


On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com> wrote:

> Hi Jun,
>
> Please see my comments inline again :)
>
> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote:
>
> > This indicates our in-memory queue is empty. So the consumer thread is
> > blocked.
>
> What should we do about this.
> As I mentioned in the previous mail, events are there to be consumed.
> Killing one consumer makes the other consumer consume events again.
>
>
> > What about the Kafka fetcher threads? Are they blocked on anything?
>
> One of the fetcher threads is blocked on putting to a queue, the other is
> sleeping.
> Please look below:
>
> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
> condition [0x00007fcb833eb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006809e8000> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>         at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>
> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
> condition [0x00007fcb836ee000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
>
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com>
> wrote:
> >
> >> Hello Jun,
> >>
> >> Please see my comments inline.
> >>
> >> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote:
> >>
> >>> I assume that each consumer instance consumes all 15 topics.
> >> No, we kept dedicated consumer listening to the topic in question.
> >> We did this because this queue processes huge amounts of data.
> >>
> >>
> >>> Are all your
> >>> consumer threads alive? If one of your thread dies, it will eventually
> >>> block the consumption in other threads.
> >>
> >> Yes. We can see all the threads in the thread dump.
> >> We have ensured that the threads do not die due to an Exception.
> >>
> >> Please look at the stack trace below. We see all the threads waiting
> like
> >> this:
> >>
> >> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on
> >> condition [0x00007efedae6d000]
> >>   java.lang.Thread.State: WAITING (parking)
> >>        at sun.misc.Unsafe.park(Native Method)
> >>        - parking to wait for  <0x0000000640248618> (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>        at
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>        at
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>        at
> >>
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >>        at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> >>        at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> >>        at
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>        at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>        at
> >>
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
> >>        at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> >>        at
> >> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>        at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>        at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>        at java.lang.Thread.run(Thread.java:662)
> >>
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npur...@sprinklr.com>
> >> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>>>
> >>>> Our cluster configuration:
> >>>> 3 brokers on 3 different machines. Each broker machine has a zookeeper
> >>>> instance running as well.
> >>>> We have 15 topics defined. We are trying to use them as queue (JMS
> like)
> >>>> by defining the same group across different kafka consumers.
> >>>> On the consumer side, we are using High Level Consumer.
> >>>>
> >>>> However we are seeing a weird behaviour.
> >>>> One of our heavily used queue (event_queue) has 2 dedicated consumers
> >>>> listening to that queue only.
> >>>> This queue is defined with 150 partitions on each broker & the number
> of
> >>>> streams defined on the 2 dedicated consumers is 150.
> >>>> After a while we see that most the consumer threads keep waiting for
> >>>> events and the lag keeps growing.
> >>>> If we kill one of the dedicated consumers, then the other consumer
> >> starts
> >>>> getting messaging in a hurry.
> >>>>
> >>>> Consumer had no Full GCs.
> >>>>
> >>>> How we measure lag?
> >>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >>>> event_queue --zkconnect
> >>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> >> event_queue
> >>>>
> >>>> Around the time, the events stopped coming to the new consumer.. this
> >> was
> >>>> printed on the logs:
> >>>>
> >>>> [INFO] zookeeper state changed (Disconnected)
> >>>> [INFO] zookeeper state changed (Disconnected)
> >>>> [INFO] zookeeper state changed (SyncConnected)
> >>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>
> >>>> Config Overidden:
> >>>> Consumer:
> >>>> fetch.size=3MB
> >>>> autooffset.reset=largest
> >>>> autocommit.interval.ms=500
> >>>> Producer:
> >>>> maxMessageSize=3MB
> >>>>
> >>>> Please let us know if we are doing some wrong OR facing some known
> issue
> >>>> here?
> >>>>
> >>>> Thanks,
> >>>> Nihit
> >>
> >>
>
>

Reply via email to