Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec
We do catch the exception. However, we don't know what to do with it. Retrying may not fix the problem. So, we just log it and let the thread die. Thanks, Jun On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole phi...@loggly.com wrote: Yes, there might be - we experience link resets every so often, and definitely did today. Assume it is this, are you surprised the thread went down? Perhaps we need to catch this? Philip On Feb 10, 2014, at 8:38 PM, Jun Rao jun...@gmail.com wrote: This indicates that message checksum validation failed. Is there any issue with the network? Thanks, Jun On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole phi...@loggly.com wrote: Saw this thrown today, which brought down a Consumer thread -- we're using Consumers built on the High-level consumer framework. What may have happened here? We are using a custom C++ Producer which does not do compression, and which hasn't changed in months, but this error is relatively new to us, and is occurring occasionally. We are running the Sun JDK: java version 1.7.0_25 Java(TM) SE Runtime Environment (build 1.7.0_25-b15) Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) Restarting the Consumer clears it up, so the message on the Broker itself does not appear to be problematic. We are running 3 Consumers, each of which has 48 ConsumerConnector objects. Our code explicitly calls commit(), we do not auto-commit. Thanks, Philip 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed offset = 120758878080 kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in FetcherRunnable kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec
Works for me. It's the least disruptive option. On Feb 11, 2014, at 3:11 PM, Suyog Rao su...@loggly.com wrote: Actually, looking at the code, the consumer client code can also catch this exception while iterating for messages. The fetcher thread inserts a special message before dying, which triggers an exception while client calls next message https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/FetcherRunnable.scala#L87 So, we could create a new ConsumerConnector object when this error happens, which will re-initialize the fetcher threads. Thanks, Suyog On Feb 11, 2014, at 12:16 PM, vinh v...@loggly.com wrote: In that case, is there a way to detect that a consumer instance is no longer usable, so that we can recreate the instance on the fly again to have it reconnect? Without having to restart our app? Thanks, -Vinh On Feb 11, 2014, at 7:45 AM, Jun Rao jun...@gmail.com wrote: We do catch the exception. However, we don't know what to do with it. Retrying may not fix the problem. So, we just log it and let the thread die. Thanks, Jun On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole phi...@loggly.com wrote: Yes, there might be - we experience link resets every so often, and definitely did today. Assume it is this, are you surprised the thread went down? Perhaps we need to catch this? Philip On Feb 10, 2014, at 8:38 PM, Jun Rao jun...@gmail.com wrote: This indicates that message checksum validation failed. Is there any issue with the network? Thanks, Jun On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole phi...@loggly.com wrote: Saw this thrown today, which brought down a Consumer thread -- we're using Consumers built on the High-level consumer framework. What may have happened here? We are using a custom C++ Producer which does not do compression, and which hasn't changed in months, but this error is relatively new to us, and is occurring occasionally. We are running the Sun JDK: java version 1.7.0_25 Java(TM) SE Runtime Environment (build 1.7.0_25-b15) Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) Restarting the Consumer clears it up, so the message on the Broker itself does not appear to be problematic. We are running 3 Consumers, each of which has 48 ConsumerConnector objects. Our code explicitly calls commit(), we do not auto-commit. Thanks, Philip 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed offset = 120758878080 kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in FetcherRunnable kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at
0.72 Consumer: message is invalid, compression codec: NoCompressionCodec
Saw this thrown today, which brought down a Consumer thread -- we're using Consumers built on the High-level consumer framework. What may have happened here? We are using a custom C++ Producer which does not do compression, and which hasn't changed in months, but this error is relatively new to us, and is occurring occasionally. We are running the Sun JDK: java version 1.7.0_25 Java(TM) SE Runtime Environment (build 1.7.0_25-b15) Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) Restarting the Consumer clears it up, so the message on the Broker itself does not appear to be problematic. We are running 3 Consumers, each of which has 48 ConsumerConnector objects. Our code explicitly calls commit(), we do not auto-commit. Thanks, Philip 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed offset = 120758878080 kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in FetcherRunnable kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec
I should we *think* this exception brought down the Consumer thread. The problematic partition on our system was 2-29, so this is definitely the related thread. Philip On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole phi...@loggly.com wrote: Saw this thrown today, which brought down a Consumer thread -- we're using Consumers built on the High-level consumer framework. What may have happened here? We are using a custom C++ Producer which does not do compression, and which hasn't changed in months, but this error is relatively new to us, and is occurring occasionally. We are running the Sun JDK: java version 1.7.0_25 Java(TM) SE Runtime Environment (build 1.7.0_25-b15) Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) Restarting the Consumer clears it up, so the message on the Broker itself does not appear to be problematic. We are running 3 Consumers, each of which has 48 ConsumerConnector objects. Our code explicitly calls commit(), we do not auto-commit. Thanks, Philip 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed offset = 120758878080 kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in FetcherRunnable kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec
This indicates that message checksum validation failed. Is there any issue with the network? Thanks, Jun On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole phi...@loggly.com wrote: Saw this thrown today, which brought down a Consumer thread -- we're using Consumers built on the High-level consumer framework. What may have happened here? We are using a custom C++ Producer which does not do compression, and which hasn't changed in months, but this error is relatively new to us, and is occurring occasionally. We are running the Sun JDK: java version 1.7.0_25 Java(TM) SE Runtime Environment (build 1.7.0_25-b15) Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) Restarting the Consumer clears it up, so the message on the Broker itself does not appear to be problematic. We are running 3 Consumers, each of which has 48 ConsumerConnector objects. Our code explicitly calls commit(), we do not auto-commit. Thanks, Philip 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed offset = 120758878080 kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in FetcherRunnable kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
Re: 0.72 Consumer: message is invalid, compression codec: NoCompressionCodec
Yes, there might be - we experience link resets every so often, and definitely did today. Assume it is this, are you surprised the thread went down? Perhaps we need to catch this? Philip On Feb 10, 2014, at 8:38 PM, Jun Rao jun...@gmail.com wrote: This indicates that message checksum validation failed. Is there any issue with the network? Thanks, Jun On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole phi...@loggly.com wrote: Saw this thrown today, which brought down a Consumer thread -- we're using Consumers built on the High-level consumer framework. What may have happened here? We are using a custom C++ Producer which does not do compression, and which hasn't changed in months, but this error is relatively new to us, and is occurring occasionally. We are running the Sun JDK: java version 1.7.0_25 Java(TM) SE Runtime Environment (build 1.7.0_25-b15) Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) Restarting the Consumer clears it up, so the message on the Broker itself does not appear to be problematic. We are running 3 Consumers, each of which has 48 ConsumerConnector objects. Our code explicitly calls commit(), we do not auto-commit. Thanks, Philip 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in FetcherRunnable for premapped:2-29: fetched offset = 120758878080: consumed offset = 120758878080 kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in FetcherRunnable kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init offset: 120758878080 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) at kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) at kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)