Works for me. It's the least disruptive option.
> On Feb 11, 2014, at 3:11 PM, Suyog Rao <su...@loggly.com> wrote: > > Actually, looking at the code, the consumer client code can also catch this > exception while iterating for messages. The fetcher thread inserts a special > message before dying, which triggers an exception while client calls next > message > > https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/FetcherRunnable.scala#L87 > > So, we could create a new ConsumerConnector object when this error happens, > which will re-initialize the fetcher threads. > > Thanks, > Suyog > >> On Feb 11, 2014, at 12:16 PM, vinh <v...@loggly.com> wrote: >> >> In that case, is there a way to detect that a consumer instance is no longer >> usable, so that we can recreate the instance on the fly again to have it >> reconnect? Without having to restart our app? >> >> Thanks, >> -Vinh >> >>> On Feb 11, 2014, at 7:45 AM, Jun Rao <jun...@gmail.com> wrote: >>> >>> We do catch the exception. However, we don't know what to do with it. >>> Retrying may not fix the problem. So, we just log it and let the thread die. >>> >>> Thanks, >>> >>> Jun >>> >>> >>>> On Mon, Feb 10, 2014 at 8:42 PM, Philip O'Toole <phi...@loggly.com> wrote: >>>> >>>> Yes, there might be - we experience link resets every so often, and >>>> definitely did today. >>>> >>>> Assume it is this, are you surprised the thread went down? Perhaps we need >>>> to catch this? >>>> >>>> Philip >>>> >>>>> On Feb 10, 2014, at 8:38 PM, Jun Rao <jun...@gmail.com> wrote: >>>>> >>>>> This indicates that message checksum validation failed. Is there any >>>> issue >>>>> with the network? >>>>> >>>>> Thanks, >>>>> >>>>> Jun >>>>> >>>>> >>>>>> On Mon, Feb 10, 2014 at 5:00 PM, Philip O'Toole <phi...@loggly.com> >>>> wrote: >>>>>> >>>>>> Saw this thrown today, which brought down a Consumer thread -- we're >>>> using >>>>>> Consumers built on the High-level consumer framework. What may have >>>>>> happened here? We are using a custom C++ Producer which does not do >>>>>> compression, and which hasn't changed in months, but this error is >>>>>> relatively new to us, and is occurring occasionally. We are running the >>>> Sun >>>>>> JDK: >>>>>> >>>>>> java version "1.7.0_25" >>>>>> Java(TM) SE Runtime Environment (build 1.7.0_25-b15) >>>>>> Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode) >>>>>> >>>>>> Restarting the Consumer clears it up, so the message on the Broker >>>> itself >>>>>> does not appear to be problematic. We are running 3 Consumers, each of >>>>>> which has 48 ConsumerConnector objects. Our code explicitly calls >>>> commit(), >>>>>> we do not auto-commit. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Philip >>>>>> >>>>>> 2014-02-10 19:36:30,960 [ERROR] [FetcherRunnable.error] error in >>>>>> FetcherRunnable for premapped:2-29: fetched offset = 120758878080: >>>> consumed >>>>>> offset = 120758878080 >>>>>> kafka.message.InvalidMessageException: message is invalid, compression >>>>>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init >>>> offset: >>>>>> 120758878080 >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>>>>> at >>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>>>>> at >>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) >>>>>> at >>>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) >>>>>> at >>>>>> >>>>>> >>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) >>>>>> at >>>>>> >>>>>> >>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) >>>>>> at >>>>>> >>>>>> >>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) >>>>>> at scala.collection.immutable.List.foreach(List.scala:45) >>>>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >>>>>> 2014-02-10 19:36:30,962 [ERROR] [FetcherRunnable.error] error in >>>>>> FetcherRunnable >>>>>> kafka.message.InvalidMessageException: message is invalid, compression >>>>>> codec: NoCompressionCodec size: 8058 curr offset: 120759424904 init >>>> offset: >>>>>> 120758878080 >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) >>>>>> at >>>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >>>>>> at >>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:64) >>>>>> at >>>>>> >>>>>> >>>> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:59) >>>>>> at >>>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:57) >>>>>> at >>>>>> >>>>>> >>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:79) >>>>>> at >>>>>> >>>>>> >>>> kafka.consumer.FetcherRunnable$$anonfun$run$6.apply(FetcherRunnable.scala:65) >>>>>> at >>>>>> >>>>>> >>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) >>>>>> at scala.collection.immutable.List.foreach(List.scala:45) >>>>>> at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65) >>>>>> >>>> >> >