Also, just so that we are on the same page. I assume that you used the
following api. Did you just put in one topic in the topicCountMap?
  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
List[KafkaStream[Array[Byte],Array[Byte]]]]

Thank,

Jun


On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <npur...@sprinklr.com> wrote:

> Hi Jun,
>
> Thanks for helping out so far.
>
> As per your explanation we are doing exactly as you have mentioned in your
> workaround below.
> > A workaround is to use different consumer connectors, each consuming a
> > single topic.
>
>
> Here is the problem...
>
> We have a topic which gets a lot of events (around a million in a day), so
> this topic on the server has a high number of partitions, and we have
> dedicated consumers only listening to this topic and the processing time is
> in the order of 15-30 millis. So we are assured that our consumers are not
> slow in processing.
>
> Every now then, it so happens, that our consumers threads stalls and do
> not receive any events (as suggested in my previous email with the thread
> stack on idle threads) even though we can see the offset lag increasing for
> the consumers.
>
> We also noticed that if we force rebalance the consumers (either by
> starting a new consumer or killing an existing one) data starts to flow in
> again to these consumer threads. The consumers remains stable (processing
> events) for about 20-30 mins before the threads go idle again and the
> backlog starts growing. This happens in a cycle for us and we are not able
> to figure out the cause for events not flowing in.
>
> As a side note, we are also monitoring the GC cycles and there are hardly
> any.
>
> Please let us know if you need any additional details.
>
> Thanks
> Nihit.
>
>
> On 10-Jul-2013, at 8:30 PM, Jun Rao <jun...@gmail.com> wrote:
>
> > Ok. One of the issues is that when you have a consumer that consumes
> > multiple topics, if one of the consumer threads is slow in consuming
> > messages from one topic, it can block the consumption of other consumer
> > threads. This is because we use a shared fetcher to fetch all topics.
> There
> > is an in-memory queue per topic. If one of the queues is full, the
> fetcher
> > will block and can't put the data into other queues.
> >
> > A workaround is to use different consumer connectors, each consuming a
> > single topic.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com>
> wrote:
> >
> >> Hi Jun,
> >>
> >> Please see my comments inline again :)
> >>
> >> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote:
> >>
> >>> This indicates our in-memory queue is empty. So the consumer thread is
> >>> blocked.
> >>
> >> What should we do about this.
> >> As I mentioned in the previous mail, events are there to be consumed.
> >> Killing one consumer makes the other consumer consume events again.
> >>
> >>
> >>> What about the Kafka fetcher threads? Are they blocked on anything?
> >>
> >> One of the fetcher threads is blocked on putting to a queue, the other
> is
> >> sleeping.
> >> Please look below:
> >>
> >> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
> >> condition [0x00007fcb833eb000]
> >>   java.lang.Thread.State: WAITING (parking)
> >>        at sun.misc.Unsafe.park(Native Method)
> >>        - parking to wait for  <0x00000006809e8000> (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>        at
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>        at
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>        at
> >>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >>        at
> >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
> >>        at
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
> >>        at
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
> >>        at
> >>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >>        at scala.collection.immutable.List.foreach(List.scala:45)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >>
> >> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
> >> condition [0x00007fcb836ee000]
> >>   java.lang.Thread.State: TIMED_WAITING (sleeping)
> >>        at java.lang.Thread.sleep(Native Method)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
> >>
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com>
> >> wrote:
> >>>
> >>>> Hello Jun,
> >>>>
> >>>> Please see my comments inline.
> >>>>
> >>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote:
> >>>>
> >>>>> I assume that each consumer instance consumes all 15 topics.
> >>>> No, we kept dedicated consumer listening to the topic in question.
> >>>> We did this because this queue processes huge amounts of data.
> >>>>
> >>>>
> >>>>> Are all your
> >>>>> consumer threads alive? If one of your thread dies, it will
> eventually
> >>>>> block the consumption in other threads.
> >>>>
> >>>> Yes. We can see all the threads in the thread dump.
> >>>> We have ensured that the threads do not die due to an Exception.
> >>>>
> >>>> Please look at the stack trace below. We see all the threads waiting
> >> like
> >>>> this:
> >>>>
> >>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting
> on
> >>>> condition [0x00007efedae6d000]
> >>>>  java.lang.Thread.State: WAITING (parking)
> >>>>       at sun.misc.Unsafe.park(Native Method)
> >>>>       - parking to wait for  <0x0000000640248618> (a
> >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>>>       at
> >>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >>>>       at
> >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> >>>>       at
> >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> >>>>       at
> >>>>
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>>>       at
> >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>>>       at
> >>>>
> >>
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
> >>>>       at
> >>>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> >>>>       at
> >>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>>>       at java.lang.Thread.run(Thread.java:662)
> >>>>
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>>
> >>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npur...@sprinklr.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>>>>>
> >>>>>> Our cluster configuration:
> >>>>>> 3 brokers on 3 different machines. Each broker machine has a
> zookeeper
> >>>>>> instance running as well.
> >>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
> >> like)
> >>>>>> by defining the same group across different kafka consumers.
> >>>>>> On the consumer side, we are using High Level Consumer.
> >>>>>>
> >>>>>> However we are seeing a weird behaviour.
> >>>>>> One of our heavily used queue (event_queue) has 2 dedicated
> consumers
> >>>>>> listening to that queue only.
> >>>>>> This queue is defined with 150 partitions on each broker & the
> number
> >> of
> >>>>>> streams defined on the 2 dedicated consumers is 150.
> >>>>>> After a while we see that most the consumer threads keep waiting for
> >>>>>> events and the lag keeps growing.
> >>>>>> If we kill one of the dedicated consumers, then the other consumer
> >>>> starts
> >>>>>> getting messaging in a hurry.
> >>>>>>
> >>>>>> Consumer had no Full GCs.
> >>>>>>
> >>>>>> How we measure lag?
> >>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >>>>>> event_queue --zkconnect
> >>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> >>>> event_queue
> >>>>>>
> >>>>>> Around the time, the events stopped coming to the new consumer..
> this
> >>>> was
> >>>>>> printed on the logs:
> >>>>>>
> >>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>>
> >>>>>> Config Overidden:
> >>>>>> Consumer:
> >>>>>> fetch.size=3MB
> >>>>>> autooffset.reset=largest
> >>>>>> autocommit.interval.ms=500
> >>>>>> Producer:
> >>>>>> maxMessageSize=3MB
> >>>>>>
> >>>>>> Please let us know if we are doing some wrong OR facing some known
> >> issue
> >>>>>> here?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Nihit
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to