[ 
https://issues.apache.org/jira/browse/KAFKA-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847751#comment-15847751
 ] 

Jason Gustafson commented on KAFKA-4686:
----------------------------------------

[~rodrigo.saramago] Thanks for the updates. I spotted this error which actually 
tells us which partition has the corrupt message:
{code}
[2017-01-30 07:03:34,149] ERROR [ReplicaFetcherThread-0-1003], Error due to  
(kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: error processing data for partition 
[zupme-gateway,12] offset 106
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:170)
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
        at scala.Option.foreach(Option.scala:257)
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
        at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
        at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: kafka.common.KafkaException: Message payload is null: Message(magic 
= 0, attributes = 1, crc = 3187236729, key = null, payload = null)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.<init>(ByteBufferMessageSet.scala:90)
        at 
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
        at 
kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:370)
        at 
kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:383)
        at 
kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:338)
        at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
        at kafka.message.MessageSet.toString(MessageSet.scala:105)
        at java.lang.String.valueOf(String.java:2994)
        at 
scala.collection.mutable.StringBuilder.append(StringBuilder.scala:200)
        at kafka.log.Log.append(Log.scala:395)
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
        at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
        at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:153)
        ... 13 more
{code}
This may not be the only partition which is corrupt, but at least we can 
analyze it a bit more. A couple questions:

1. Do you know if compaction has been enabled on the topic "zupme-gateway"? 
2. What version of Kafka were you on before upgrading to 0.10.1.1? Were you 
ever able to successfully start the brokers running 0.10.1.1?

Also, could use the DumpLogSegments utility (bin/kafka-run-class 
kafka.tools.DumpLogSegments) on all of the segments for partition 12 of 
"zupme-gateway"? Judging by the offset from that message, it should be the 
first segment which is corrupt. If you run with the "--deep-iteration" option, 
you will probably see the same "Message payload is null" error, but we should 
be able to still do the shallow iteration if you leave that option out. Please 
upload the results if possible.

> Null Message payload is shutting down broker
> --------------------------------------------
>
>                 Key: KAFKA-4686
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4686
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.1
>         Environment: Amazon Linux AMI release 2016.03 kernel 
> 4.4.19-29.55.amzn1.x86_64
>            Reporter: Rodrigo Queiroz Saramago
>             Fix For: 0.10.3.0
>
>         Attachments: KAFKA-4686-NullMessagePayloadError.tar.xz, 
> kafkaServer.out
>
>
> Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in 
> which clients connect using two-way ssl authentication. I use kafka version 
> 0.10.1.1, the system works as expected for a while, but if the broker goes 
> down and then is restarted, something got corrupted and is not possible start 
> broker again, it always fails with the same error. What this error mean? What 
> can I do in this case? Is this the expected behavior?
> [2017-01-23 07:03:28,927] ERROR There was an error in one of the threads 
> during logs loading: kafka.common.KafkaException: Message payload is null: 
> Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload = 
> null) (kafka.log.LogManager)
> [2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
> attributes = 1, crc = 4122289508, key = null, payload = null)
>     at 
> kafka.message.ByteBufferMessageSet$$anon$1.<init>(ByteBufferMessageSet.scala:90)
>     at 
> kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
>     at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
>     at kafka.log.LogSegment.recover(LogSegment.scala:223)
>     at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
>     at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
>     at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>     at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>     at kafka.log.Log.loadSegments(Log.scala:179)
>     at kafka.log.Log.<init>(Log.scala:108)
>     at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> [2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer)
> [2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread. 
> (org.I0Itec.zkclient.ZkEventThread)
> [2017-01-23 07:03:28,954] INFO EventThread shut down for session: 
> 0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn)
> [2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed 
> (org.apache.zookeeper.ZooKeeper)
> [2017-01-23 07:03:28,957] INFO shut down completed (kafka.server.KafkaServer)
> [2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable 
> startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
> attributes = 1, crc = 4122289508, key = null, payload = null)
>     at 
> kafka.message.ByteBufferMessageSet$$anon$1.<init>(ByteBufferMessageSet.scala:90)
>     at 
> kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
>     at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
>     at kafka.log.LogSegment.recover(LogSegment.scala:223)
>     at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
>     at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
>     at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>     at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>     at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>     at kafka.log.Log.loadSegments(Log.scala:179)
>     at kafka.log.Log.<init>(Log.scala:108)
>     at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> [2017-01-23 07:03:28,961] INFO shutting down (kafka.server.KafkaServer)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to