I should we *think* this exception brought down the Consumer thread. The problematic partition on our system was 2-29, so this is definitely the related thread.
Philip On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole <phi...@loggly.com> wrote: > Saw this thrown today, which brought down a Consumer thread -- we're using > Consumers built on the High-level consumer framework. What may have > happened here? We are using a custom C++ Producer which does not do > compression, and which hasn't changed in months, but this error is > relatively new to us, and is occurring occasionally. We are running the Sun > JDK: > > java version "1.7.0_25" > Java(TM) SE Runtime Environment (build 1.7.0_25-b15) > Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) > > Restarting the Consumer clears it up, so the message on the Broker itself > does not appear to be problematic. We are running 3 Consumers, each of > which has 48 ConsumerConnector objects. Our code explicitly calls commit(), > we do not auto-commit. > > Thanks, > > Philip > > 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in > FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed > offset = 120758878080 > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: > 120758878080 > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) > at > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) > at > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) > at > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:45) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) > 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in > FetcherRunnable > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: > 120758878080 > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) > at > kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) > at > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) > at > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) > at > kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:45) > at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >