Works for me. It's the least disruptive option. 

> On Feb 11, 2014, at 3:11 PM, Suyog Rao <su...@loggly.com> wrote:
> 
> Actually, looking at the code, the consumer client code can also catch this 
> exception while iterating for messages. The fetcher thread inserts a special 
> message before dying, which triggers an exception while client calls next 
> message
> 
> https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/FetcherRunnable.scala#L87
> 
> So, we could create a new ConsumerConnector object when this error happens, 
> which will re-initialize the fetcher threads. 
> 
> Thanks,
> Suyog
> 
>> On Feb 11, 2014, at 12:16 PM, vinh <v...@loggly.com> wrote:
>> 
>> In that case, is there a way to detect that a consumer instance is no longer 
>> usable, so that we can recreate the instance on the fly again to have it 
>> reconnect?  Without having to restart our app?
>> 
>> Thanks,
>> -Vinh
>> 
>>> On Feb 11, 2014, at 7:45 AM, Jun Rao <jun...@gmail.com> wrote:
>>> 
>>> We do catch the exception. However, we don't know what to do with it.
>>> Retrying may not fix the problem. So, we just log it and let the thread die.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
>>>> On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole <phi...@loggly.com> wrote:
>>>> 
>>>> Yes, there might be - we experience link resets every so often, and
>>>> definitely did today.
>>>> 
>>>> Assume it is this, are you surprised the thread went down? Perhaps we need
>>>> to catch this?
>>>> 
>>>> Philip
>>>> 
>>>>> On Feb 10, 2014, at 8:38 PM, Jun Rao <jun...@gmail.com> wrote:
>>>>> 
>>>>> This indicates that message checksum validation failed. Is there any
>>>> issue
>>>>> with the network?
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> 
>>>>>> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole <phi...@loggly.com>
>>>> wrote:
>>>>>> 
>>>>>> Saw this thrown today, which brought down a Consumer thread -- we're
>>>> using
>>>>>> Consumers built on the High-level consumer framework. What may have
>>>>>> happened here? We are using a custom C++ Producer which does not do
>>>>>> compression, and which hasn't changed in months, but this error is
>>>>>> relatively new to us, and is occurring occasionally. We are running the
>>>> Sun
>>>>>> JDK:
>>>>>> 
>>>>>> java version "1.7.0_25"
>>>>>> Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
>>>>>> Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
>>>>>> 
>>>>>> Restarting the Consumer clears it up, so the message on the Broker
>>>> itself
>>>>>> does not appear to be problematic. We are running 3 Consumers, each of
>>>>>> which has 48 ConsumerConnector objects. Our code explicitly calls
>>>> commit(),
>>>>>> we do not auto-commit.
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Philip
>>>>>> 
>>>>>> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
>>>>>> FetcherRunnable for premapped:2-29: fetched offset = 120758878080:
>>>> consumed
>>>>>> offset = 120758878080
>>>>>> kafka.message.InvalidMessageException: message is invalid, compression
>>>>>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
>>>> offset:
>>>>>> 120758878080
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>>>>>     at
>>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>>>>     at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
>>>>>>     at
>>>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>>>>>>     at scala.collection.immutable.List.foreach(List.scala:45)
>>>>>>     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>>>>>> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
>>>>>> FetcherRunnable
>>>>>> kafka.message.InvalidMessageException: message is invalid, compression
>>>>>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
>>>> offset:
>>>>>> 120758878080
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>>>>>>     at
>>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>>>>     at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
>>>>>>     at
>>>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
>>>>>>     at
>>>>>> 
>>>>>> 
>>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>>>>>>     at scala.collection.immutable.List.foreach(List.scala:45)
>>>>>>     at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>>>>>> 
>>>> 
>> 
> 

Reply via email to