[
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960854#comment-15960854
]
Dustin Cote commented on KAFKA-3955:
------------------------------------
Adding 0.10.1.1 as this kind of behavior has been observed on that release as
well.
> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to
> failed broker boot
> ------------------------------------------------------------------------------------------------
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2.0, 0.8.2.1, 0.8.2.2,
> 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.1.1
> Reporter: Tom Crayford
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the
> appendix of this bug for a sample output from {{DumpLogSegments}}), then
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for
> {{InvalidOffsetException}}. When that catches the issue, it truncates the
> whole log (not just this segment):
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
> to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in
> {{loadSegments}} - they mean we'll never hit truncation here, as we always
> throw this exception and that goes all the way to the toplevel exception
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for
> this is to move this crash recovery/truncate code inside a new method in
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place.
> That code should return the number of {{truncatedBytes}} like we do in
> {{Log.recoverLog}} and then truncate the log. The callers will have to be
> notified "stop iterating over files in the directory", likely via a return
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how
> this log segment got into this state with non-monotonic offsets. This segment
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to
> recovery exists in trunk, but I'm unsure if the new handling around
> compressed messages (KIP-31) means the bug where non-monotonic offsets get
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder
> yourself (delete all .index/.log files including and after the one with the
> bad offset). However, kafka should (and can) handle this case well - with
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN t=pool-3-thread-4 at=Log Found a corrupted index file,
> /$DIRECTORY/$TOPIC-22/00000000000014306536.index, deleting and rebuilding
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append
> an offset (15000337) to position 111719 no larger than the last offset
> appended (15000337) to /$DIRECTORY/$TOPIC-8/00000000000014008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup.
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an
> offset (15000337) to position 111719 no larger than the last offset appended
> (15000337) to /$DIRECTORY/$TOPIC-8/00000000000014008931.index.
> at
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> at
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
> at kafka.log.LogSegment.recover(LogSegment.scala:188)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
> at scala.collection.TraversableLike$With...
> ...Filter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
> at kafka.log.Log.loadSegments(Log.scala:160)
> at kafka.log.Log.<init>(Log.scala:90)
> at
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> non-monotonic offsets from DumpLogSegments tool (with --deep-iteration) (they
> go from 15000362 to 15000337):
> {code}
> offset: 15000361 position: 485145166 isvalid: true payloadsize: 1474
> magic: 0 compresscodec: NoCompressionCodec crc: 3958745838 keysize: 36
> offset: 15000362 position: 485145166 isvalid: true payloadsize: 1474
> magic: 0 compresscodec: NoCompressionCodec crc: 374463118 keysize: 36
> offset: 15000337 position: 485149591 isvalid: true payloadsize: 1474
> magic: 0 compresscodec: NoCompressionCodec crc: 3955938191 keysize: 36
> offset: 15000338 position: 485149591 isvalid: true payloadsize: 1474
> magic: 0 compresscodec: NoCompressionCodec crc: 4121377803 keysize: 36
> offset: 15000339 position: 485149591 isvalid: true payloadsize: 1474
> magic: 0 compresscodec: NoCompressionCodec crc: 372883992 keysize: 36
> offset: 15000340 position: 485149591 isvalid: true payloadsize: 1474
> magic: 0 compresscodec: NoCompressionCodec crc: 1294476491 keysize: 36
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)