[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362262#comment-15362262 ] Joseph Francis commented on KAFKA-3047: --- Hi [~guozhang], We were running kafka 0.9 in production and encountered this issue while restarting a slow broker. {code:java} [2016-06-24 10:04:47,546] ERROR There was an error in one of the threads during logs loading: kafka.common.InvalidOffsetException: Attempt to append an offset (36392155) to position 32437 no larger than the last offset appended (36392171) to /xyz.index. (kafka.log.LogManager) [2016-06-24 10:04:47,550] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InvalidOffsetException: Attempt to append an offset (36392155) to position 32437 no larger than the last offset appended (36392171) to /xyz.index. {code} Just mentioning this as this happened to us in a working production cluster. > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196012#comment-15196012 ] ASF GitHub Bot commented on KAFKA-3047: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/1071 > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15195108#comment-15195108 ] ASF GitHub Bot commented on KAFKA-3047: --- GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/1071 MINOR: Add test that verifies fix for KAFKA-3047 Also clean-up `LogTest` a little. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka kafka-3047-explicit-offset-assignment-corrupt-log-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1071.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1071 commit aab8f78cee26b869f14b6f3652cbac2245362076 Author: Ismael JumaDate: 2016-03-15T11:08:21Z Add test that verifies fix for KAFKA-3047 > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191656#comment-15191656 ] Guozhang Wang commented on KAFKA-3047: -- Good to know, thanks for explanation! > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma >Priority: Blocker > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191652#comment-15191652 ] Guozhang Wang commented on KAFKA-3047: -- Good to know, thanks for explanation! > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma >Priority: Blocker > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191642#comment-15191642 ] Guozhang Wang commented on KAFKA-3047: -- Good to know, thanks for explanation! > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma >Priority: Blocker > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191589#comment-15191589 ] Maciek Makowski commented on KAFKA-3047: [~ijuma]: thanks for the fix! [~guozhang]: I discovered it when I attempted to use the {{Log}} component on its own -- I wanted a library that would do reliable logging and housekeeping, but without the networking. So no, nothing in Kafka proper that I'm aware of would expose it. > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma >Priority: Blocker > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191427#comment-15191427 ] Guozhang Wang commented on KAFKA-3047: -- Hi [~mmakowski], thanks for reporting this. I'm curious how you encountered this issue, since currently {{append(messageSet, assignOffsets = false}} is only called by the replica fetcher thread, in which {{nextOffsetMetadata.messageOffset}} and {{messageSet}}'s offset should be usually consistent. Just trying to see if it some other scenarios to lead to this issue? > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma >Priority: Blocker > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15184629#comment-15184629 ] Ismael Juma commented on KAFKA-3047: Thanks for the report and suggested fix Maciek. > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log
[ https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15184628#comment-15184628 ] ASF GitHub Bot commented on KAFKA-3047: --- GitHub user ijuma opened a pull request: https://github.com/apache/kafka/pull/1029 KAFKA-3047: Explicit offset assignment in Log.append can corrupt the log This fix was suggested by Maciek Makowski, who also reported the problem. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ijuma/kafka KAFKA-3047-log-append-can-corrupt-the-log Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1029 commit 58d098c589697259482207631f5cfe1d8271a186 Author: Ismael JumaDate: 2016-03-08T08:10:17Z Only set `firstOffset` again if `assignOffsets` is true > Explicit offset assignment in Log.append can corrupt the log > > > Key: KAFKA-3047 > URL: https://issues.apache.org/jira/browse/KAFKA-3047 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.9.0.0 >Reporter: Maciek Makowski >Assignee: Ismael Juma > Fix For: 0.10.0.0 > > > {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, > should cause Kafka to use the offsets specified in the > {{ByteBufferMessageSet}} and not recalculate them based on > {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} > is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can > cause corruption of the log in the following scenario: > * {{nextOffsetMetadata.messageOffset}} is 2001 > * {{append(messageSet, assignOffsets = false)}} is called, where > {{messageSet}} contains offsets 1001...1500 > * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, > {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500 > * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, > {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500 > * consistency check {{if(!appendInfo.offsetsMonotonic || > appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the > second condition can never fail due to unconditional assignment) and writing > proceeds > * the message set is appended to current log segment starting at offset 2001, > but the offsets in the set are 1001...1500 > * the system shuts down abruptly > * on restart, the following unrecoverable error is reported: > {code} > Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to > append an offset (1001) to position 12345 no larger than the last offset > appended (1950) to xyz/.index. > at > kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at kafka.log.OffsetIndex.append(OffsetIndex.scala:197) > at kafka.log.LogSegment.recover(LogSegment.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) > at kafka.log.Log.loadSegments(Log.scala:160) > at kafka.log.Log.(Log.scala:90) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) > at java.util.concurrent.FutureTask.run(FutureTask.java:166) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > {code} > *Proposed fix:* the assignment {{appendInfo.firstOffset = > nextOffsetMetadata.messageOffset}} should only happen in {{if > (assignOffsets)}} branch of code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)