Also, just so that we are on the same page. I assume that you used the following api. Did you just put in one topic in the topicCountMap? def createMessageStreams(topicCountMap: Map[String,Int]): Map[String, List[KafkaStream[Array[Byte],Array[Byte]]]]
Thank, Jun On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <npur...@sprinklr.com> wrote: > Hi Jun, > > Thanks for helping out so far. > > As per your explanation we are doing exactly as you have mentioned in your > workaround below. > > A workaround is to use different consumer connectors, each consuming a > > single topic. > > > Here is the problem... > > We have a topic which gets a lot of events (around a million in a day), so > this topic on the server has a high number of partitions, and we have > dedicated consumers only listening to this topic and the processing time is > in the order of 15-30 millis. So we are assured that our consumers are not > slow in processing. > > Every now then, it so happens, that our consumers threads stalls and do > not receive any events (as suggested in my previous email with the thread > stack on idle threads) even though we can see the offset lag increasing for > the consumers. > > We also noticed that if we force rebalance the consumers (either by > starting a new consumer or killing an existing one) data starts to flow in > again to these consumer threads. The consumers remains stable (processing > events) for about 20-30 mins before the threads go idle again and the > backlog starts growing. This happens in a cycle for us and we are not able > to figure out the cause for events not flowing in. > > As a side note, we are also monitoring the GC cycles and there are hardly > any. > > Please let us know if you need any additional details. > > Thanks > Nihit. > > > On 10-Jul-2013, at 8:30 PM, Jun Rao <jun...@gmail.com> wrote: > > > Ok. One of the issues is that when you have a consumer that consumes > > multiple topics, if one of the consumer threads is slow in consuming > > messages from one topic, it can block the consumption of other consumer > > threads. This is because we use a shared fetcher to fetch all topics. > There > > is an in-memory queue per topic. If one of the queues is full, the > fetcher > > will block and can't put the data into other queues. > > > > A workaround is to use different consumer connectors, each consuming a > > single topic. > > > > Thanks, > > > > Jun > > > > > > On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com> > wrote: > > > >> Hi Jun, > >> > >> Please see my comments inline again :) > >> > >> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote: > >> > >>> This indicates our in-memory queue is empty. So the consumer thread is > >>> blocked. > >> > >> What should we do about this. > >> As I mentioned in the previous mail, events are there to be consumed. > >> Killing one consumer makes the other consumer consume events again. > >> > >> > >>> What about the Kafka fetcher threads? Are they blocked on anything? > >> > >> One of the fetcher threads is blocked on putting to a queue, the other > is > >> sleeping. > >> Please look below: > >> > >> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on > >> condition [0x00007fcb833eb000] > >> java.lang.Thread.State: WAITING (parking) > >> at sun.misc.Unsafe.park(Native Method) > >> - parking to wait for <0x00000006809e8000> (a > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >> at > >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > >> at > >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > >> at > >> > java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) > >> at > >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61) > >> at > >> > kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79) > >> at > >> > kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65) > >> at > >> > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > >> at scala.collection.immutable.List.foreach(List.scala:45) > >> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) > >> > >> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on > >> condition [0x00007fcb836ee000] > >> java.lang.Thread.State: TIMED_WAITING (sleeping) > >> at java.lang.Thread.sleep(Native Method) > >> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99) > >> > >>> > >>> Thanks, > >>> > >>> Jun > >>> > >>> > >>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com> > >> wrote: > >>> > >>>> Hello Jun, > >>>> > >>>> Please see my comments inline. > >>>> > >>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote: > >>>> > >>>>> I assume that each consumer instance consumes all 15 topics. > >>>> No, we kept dedicated consumer listening to the topic in question. > >>>> We did this because this queue processes huge amounts of data. > >>>> > >>>> > >>>>> Are all your > >>>>> consumer threads alive? If one of your thread dies, it will > eventually > >>>>> block the consumption in other threads. > >>>> > >>>> Yes. We can see all the threads in the thread dump. > >>>> We have ensured that the threads do not die due to an Exception. > >>>> > >>>> Please look at the stack trace below. We see all the threads waiting > >> like > >>>> this: > >>>> > >>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting > on > >>>> condition [0x00007efedae6d000] > >>>> java.lang.Thread.State: WAITING (parking) > >>>> at sun.misc.Unsafe.park(Native Method) > >>>> - parking to wait for <0x0000000640248618> (a > >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >>>> at > >>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) > >>>> at > >>>> > >> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > >>>> at > >>>> > >> > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > >>>> at > >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60) > >>>> at > >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32) > >>>> at > >>>> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > >>>> at > >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > >>>> at > >>>> > >> > com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49) > >>>> at > >>>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > >>>> at > >>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:138) > >>>> at > >>>> > >> > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > >>>> at > >>>> > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > >>>> at java.lang.Thread.run(Thread.java:662) > >>>> > >>>>> > >>>>> Thanks, > >>>>> > >>>>> Jun > >>>>> > >>>>> > >>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npur...@sprinklr.com> > >>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5) > >>>>>> > >>>>>> Our cluster configuration: > >>>>>> 3 brokers on 3 different machines. Each broker machine has a > zookeeper > >>>>>> instance running as well. > >>>>>> We have 15 topics defined. We are trying to use them as queue (JMS > >> like) > >>>>>> by defining the same group across different kafka consumers. > >>>>>> On the consumer side, we are using High Level Consumer. > >>>>>> > >>>>>> However we are seeing a weird behaviour. > >>>>>> One of our heavily used queue (event_queue) has 2 dedicated > consumers > >>>>>> listening to that queue only. > >>>>>> This queue is defined with 150 partitions on each broker & the > number > >> of > >>>>>> streams defined on the 2 dedicated consumers is 150. > >>>>>> After a while we see that most the consumer threads keep waiting for > >>>>>> events and the lag keeps growing. > >>>>>> If we kill one of the dedicated consumers, then the other consumer > >>>> starts > >>>>>> getting messaging in a hurry. > >>>>>> > >>>>>> Consumer had no Full GCs. > >>>>>> > >>>>>> How we measure lag? > >>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group > >>>>>> event_queue --zkconnect > >>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic > >>>> event_queue > >>>>>> > >>>>>> Around the time, the events stopped coming to the new consumer.. > this > >>>> was > >>>>>> printed on the logs: > >>>>>> > >>>>>> [INFO] zookeeper state changed (Disconnected) > >>>>>> [INFO] zookeeper state changed (Disconnected) > >>>>>> [INFO] zookeeper state changed (SyncConnected) > >>>>>> [INFO] zookeeper state changed (SyncConnected) > >>>>>> > >>>>>> Config Overidden: > >>>>>> Consumer: > >>>>>> fetch.size=3MB > >>>>>> autooffset.reset=largest > >>>>>> autocommit.interval.ms=500 > >>>>>> Producer: > >>>>>> maxMessageSize=3MB > >>>>>> > >>>>>> Please let us know if we are doing some wrong OR facing some known > >> issue > >>>>>> here? > >>>>>> > >>>>>> Thanks, > >>>>>> Nihit > >>>> > >>>> > >> > >> > >