[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-3047: ------------------------------- Reviewer: Jun Rao Status: Patch Available (was: Open) > Explicit offset assignment in Log.append can corrupt the log > ------------------------------------------------------------ > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log > Affects Versions: 0.9.0.0 > Reporter: Maciek Makowski > Assignee: Ismael Juma > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/00000000000000000000.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.<init>(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)