We do catch the exception. However, we don't know what to do with it.
Retrying may not fix the problem. So, we just log it and let the thread die.

Thanks,

Jun


On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole <phi...@loggly.com> wrote:

> Yes, there might be - we experience link resets every so often, and
> definitely did today.
>
> Assume it is this, are you surprised the thread went down? Perhaps we need
> to catch this?
>
> Philip
>
> > On Feb 10, 2014, at 8:38 PM, Jun Rao <jun...@gmail.com> wrote:
> >
> > This indicates that message checksum validation failed. Is there any
> issue
> > with the network?
> >
> > Thanks,
> >
> > Jun
> >
> >
> >> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole <phi...@loggly.com>
> wrote:
> >>
> >> Saw this thrown today, which brought down a Consumer thread -- we're
> using
> >> Consumers built on the High-level consumer framework. What may have
> >> happened here? We are using a custom C++ Producer which does not do
> >> compression, and which hasn't changed in months, but this error is
> >> relatively new to us, and is occurring occasionally. We are running the
> Sun
> >> JDK:
> >>
> >>    java version "1.7.0_25"
> >>    Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
> >>    Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)
> >>
> >> Restarting the Consumer clears it up, so the message on the Broker
> itself
> >> does not appear to be problematic. We are running 3 Consumers, each of
> >> which has 48 ConsumerConnector objects. Our code explicitly calls
> commit(),
> >> we do not auto-commit.
> >>
> >> Thanks,
> >>
> >> Philip
> >>
> >> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in
> >> FetcherRunnable for premapped:2-29: fetched offset = 120758878080:
> consumed
> >> offset = 120758878080
> >> kafka.message.InvalidMessageException: message is invalid, compression
> >> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
> offset:
> >> 120758878080
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >>        at
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>        at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> >>        at
> >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> >>        at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> >>        at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> >>        at
> >>
> >>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> >>        at scala.collection.immutable.List.foreach(List.scala:45)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in
> >> FetcherRunnable
> >> kafka.message.InvalidMessageException: message is invalid, compression
> >> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init
> offset:
> >> 120758878080
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >>        at
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>        at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64)
> >>        at
> >>
> >>
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59)
> >>        at
> >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57)
> >>        at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79)
> >>        at
> >>
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65)
> >>        at
> >>
> >>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> >>        at scala.collection.immutable.List.foreach(List.scala:45)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >>
>

Reply via email to