Hi Jun,

I did put in only one topic while starting the consumer and have used the same 
API "createMessageStreams".
As for the trace level logs of kafka consumer, we will send that to you soon.

Thanks again for replying.

Nihit

On 10-Jul-2013, at 10:38 PM, Jun Rao <jun...@gmail.com> wrote:

> Also, just so that we are on the same page. I assume that you used the
> following api. Did you just put in one topic in the topicCountMap?
>  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
> List[KafkaStream[Array[Byte],Array[Byte]]]]
> 
> Thank,
> 
> Jun
> 
> 
> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <npur...@sprinklr.com> wrote:
> 
>> Hi Jun,
>> 
>> Thanks for helping out so far.
>> 
>> As per your explanation we are doing exactly as you have mentioned in your
>> workaround below.
>>> A workaround is to use different consumer connectors, each consuming a
>>> single topic.
>> 
>> 
>> Here is the problem...
>> 
>> We have a topic which gets a lot of events (around a million in a day), so
>> this topic on the server has a high number of partitions, and we have
>> dedicated consumers only listening to this topic and the processing time is
>> in the order of 15-30 millis. So we are assured that our consumers are not
>> slow in processing.
>> 
>> Every now then, it so happens, that our consumers threads stalls and do
>> not receive any events (as suggested in my previous email with the thread
>> stack on idle threads) even though we can see the offset lag increasing for
>> the consumers.
>> 
>> We also noticed that if we force rebalance the consumers (either by
>> starting a new consumer or killing an existing one) data starts to flow in
>> again to these consumer threads. The consumers remains stable (processing
>> events) for about 20-30 mins before the threads go idle again and the
>> backlog starts growing. This happens in a cycle for us and we are not able
>> to figure out the cause for events not flowing in.
>> 
>> As a side note, we are also monitoring the GC cycles and there are hardly
>> any.
>> 
>> Please let us know if you need any additional details.
>> 
>> Thanks
>> Nihit.
>> 
>> 
>> On 10-Jul-2013, at 8:30 PM, Jun Rao <jun...@gmail.com> wrote:
>> 
>>> Ok. One of the issues is that when you have a consumer that consumes
>>> multiple topics, if one of the consumer threads is slow in consuming
>>> messages from one topic, it can block the consumption of other consumer
>>> threads. This is because we use a shared fetcher to fetch all topics.
>> There
>>> is an in-memory queue per topic. If one of the queues is full, the
>> fetcher
>>> will block and can't put the data into other queues.
>>> 
>>> A workaround is to use different consumer connectors, each consuming a
>>> single topic.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <npur...@sprinklr.com>
>> wrote:
>>> 
>>>> Hi Jun,
>>>> 
>>>> Please see my comments inline again :)
>>>> 
>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <jun...@gmail.com> wrote:
>>>> 
>>>>> This indicates our in-memory queue is empty. So the consumer thread is
>>>>> blocked.
>>>> 
>>>> What should we do about this.
>>>> As I mentioned in the previous mail, events are there to be consumed.
>>>> Killing one consumer makes the other consumer consume events again.
>>>> 
>>>> 
>>>>> What about the Kafka fetcher threads? Are they blocked on anything?
>>>> 
>>>> One of the fetcher threads is blocked on putting to a queue, the other
>> is
>>>> sleeping.
>>>> Please look below:
>>>> 
>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
>>>> condition [0x00007fcb833eb000]
>>>>  java.lang.Thread.State: WAITING (parking)
>>>>       at sun.misc.Unsafe.park(Native Method)
>>>>       - parking to wait for  <0x00000006809e8000> (a
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>       at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>       at
>>>> 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>       at
>>>> 
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>>>       at
>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>>>>       at
>>>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>>>>       at
>>>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>>>>       at
>>>> 
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>>>>       at scala.collection.immutable.List.foreach(List.scala:45)
>>>>       at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>>>> 
>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
>>>> condition [0x00007fcb836ee000]
>>>>  java.lang.Thread.State: TIMED_WAITING (sleeping)
>>>>       at java.lang.Thread.sleep(Native Method)
>>>>       at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> 
>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <npur...@sprinklr.com>
>>>> wrote:
>>>>> 
>>>>>> Hello Jun,
>>>>>> 
>>>>>> Please see my comments inline.
>>>>>> 
>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <jun...@gmail.com> wrote:
>>>>>> 
>>>>>>> I assume that each consumer instance consumes all 15 topics.
>>>>>> No, we kept dedicated consumer listening to the topic in question.
>>>>>> We did this because this queue processes huge amounts of data.
>>>>>> 
>>>>>> 
>>>>>>> Are all your
>>>>>>> consumer threads alive? If one of your thread dies, it will
>> eventually
>>>>>>> block the consumption in other threads.
>>>>>> 
>>>>>> Yes. We can see all the threads in the thread dump.
>>>>>> We have ensured that the threads do not die due to an Exception.
>>>>>> 
>>>>>> Please look at the stack trace below. We see all the threads waiting
>>>> like
>>>>>> this:
>>>>>> 
>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting
>> on
>>>>>> condition [0x00007efedae6d000]
>>>>>> java.lang.Thread.State: WAITING (parking)
>>>>>>      at sun.misc.Unsafe.park(Native Method)
>>>>>>      - parking to wait for  <0x0000000640248618> (a
>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>>      at
>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>>>>      at
>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>>>>>      at
>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>>>>>      at
>>>>>> 
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>>>>      at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>>>>      at
>>>>>> 
>>>> 
>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>>>>>      at
>>>>>> 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>      at
>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>      at java.lang.Thread.run(Thread.java:662)
>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <npur...@sprinklr.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>>>>>>>> 
>>>>>>>> Our cluster configuration:
>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a
>> zookeeper
>>>>>>>> instance running as well.
>>>>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
>>>> like)
>>>>>>>> by defining the same group across different kafka consumers.
>>>>>>>> On the consumer side, we are using High Level Consumer.
>>>>>>>> 
>>>>>>>> However we are seeing a weird behaviour.
>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated
>> consumers
>>>>>>>> listening to that queue only.
>>>>>>>> This queue is defined with 150 partitions on each broker & the
>> number
>>>> of
>>>>>>>> streams defined on the 2 dedicated consumers is 150.
>>>>>>>> After a while we see that most the consumer threads keep waiting for
>>>>>>>> events and the lag keeps growing.
>>>>>>>> If we kill one of the dedicated consumers, then the other consumer
>>>>>> starts
>>>>>>>> getting messaging in a hurry.
>>>>>>>> 
>>>>>>>> Consumer had no Full GCs.
>>>>>>>> 
>>>>>>>> How we measure lag?
>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>>>>>>>> event_queue --zkconnect
>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
>>>>>> event_queue
>>>>>>>> 
>>>>>>>> Around the time, the events stopped coming to the new consumer..
>> this
>>>>>> was
>>>>>>>> printed on the logs:
>>>>>>>> 
>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>> 
>>>>>>>> Config Overidden:
>>>>>>>> Consumer:
>>>>>>>> fetch.size=3MB
>>>>>>>> autooffset.reset=largest
>>>>>>>> autocommit.interval.ms=500
>>>>>>>> Producer:
>>>>>>>> maxMessageSize=3MB
>>>>>>>> 
>>>>>>>> Please let us know if we are doing some wrong OR facing some known
>>>> issue
>>>>>>>> here?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Nihit
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to