[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-07-05 Thread Joseph Francis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362262#comment-15362262
 ] 

Joseph Francis commented on KAFKA-3047:
---

Hi [~guozhang], 
We were running kafka 0.9 in production and encountered this issue while 
restarting a slow broker.
{code:java}
[2016-06-24 10:04:47,546] ERROR There was an error in one of the threads during 
logs loading: kafka.common.InvalidOffsetException: Attempt to append an offset 
(36392155) to position 32437 no larger than the last offset appended (36392171) 
to /xyz.index. (kafka.log.LogManager)
[2016-06-24 10:04:47,550] FATAL Fatal error during KafkaServer startup. Prepare 
to shutdown (kafka.server.KafkaServer)
kafka.common.InvalidOffsetException: Attempt to append an offset (36392155) to 
position 32437 no larger than the last offset appended (36392171) to /xyz.index.
{code}
Just mentioning this as this happened to us in a working production cluster.

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196012#comment-15196012
 ] 

ASF GitHub Bot commented on KAFKA-3047:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1071


> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15195108#comment-15195108
 ] 

ASF GitHub Bot commented on KAFKA-3047:
---

GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1071

MINOR: Add test that verifies fix for KAFKA-3047

Also clean-up `LogTest` a little.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
kafka-3047-explicit-offset-assignment-corrupt-log-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1071.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1071


commit aab8f78cee26b869f14b6f3652cbac2245362076
Author: Ismael Juma 
Date:   2016-03-15T11:08:21Z

Add test that verifies fix for KAFKA-3047




> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191656#comment-15191656
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Good to know, thanks for explanation!

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191652#comment-15191652
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Good to know, thanks for explanation!

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191642#comment-15191642
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Good to know, thanks for explanation!

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Maciek Makowski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191589#comment-15191589
 ] 

Maciek Makowski commented on KAFKA-3047:


[~ijuma]: thanks for the fix!

[~guozhang]: I discovered it when I attempted to use the {{Log}} component on 
its own -- I wanted a library that would do reliable logging and housekeeping, 
but without the networking. So no, nothing in Kafka proper that I'm aware of 
would expose it.

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-11 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15191427#comment-15191427
 ] 

Guozhang Wang commented on KAFKA-3047:
--

Hi [~mmakowski], thanks for reporting this. I'm curious how you encountered 
this issue, since currently {{append(messageSet, assignOffsets = false}} is 
only called by the replica fetcher thread, in which 
{{nextOffsetMetadata.messageOffset}} and {{messageSet}}'s offset should be 
usually consistent. Just trying to see if it some other scenarios to lead to 
this issue?

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15184629#comment-15184629
 ] 

Ismael Juma commented on KAFKA-3047:


Thanks for the report and suggested fix Maciek.

> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3047) Explicit offset assignment in Log.append can corrupt the log

2016-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15184628#comment-15184628
 ] 

ASF GitHub Bot commented on KAFKA-3047:
---

GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/1029

KAFKA-3047: Explicit offset assignment in Log.append can corrupt the log

This fix was suggested by Maciek Makowski, who also reported the problem.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
KAFKA-3047-log-append-can-corrupt-the-log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1029.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1029


commit 58d098c589697259482207631f5cfe1d8271a186
Author: Ismael Juma 
Date:   2016-03-08T08:10:17Z

Only set `firstOffset` again if `assignOffsets` is true




> Explicit offset assignment in Log.append can corrupt the log
> 
>
> Key: KAFKA-3047
> URL: https://issues.apache.org/jira/browse/KAFKA-3047
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.9.0.0
>Reporter: Maciek Makowski
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> {{Log.append()}} has {{assignOffsets}} parameter, which, when set to false, 
> should cause Kafka to use the offsets specified in the 
> {{ByteBufferMessageSet}} and not recalculate them based on 
> {{nextOffsetMetadata}}. However, in that function, {{appendInfo.firstOffset}} 
> is unconditionally set to {{nextOffsetMetadata.messageOffset}}. This can 
> cause corruption of the log in the following scenario:
> * {{nextOffsetMetadata.messageOffset}} is 2001
> * {{append(messageSet, assignOffsets = false)}} is called, where 
> {{messageSet}} contains offsets 1001...1500 
> * after {{val appendInfo = analyzeAndValidateMessageSet(messages)}} call, 
> {{appendInfo.fistOffset}} is 1001 and {{appendInfo.lastOffset}} is 1500
> * after {{appendInfo.firstOffset = nextOffsetMetadata.messageOffset}} call, 
> {{appendInfo.fistOffset}} is 2001 and {{appendInfo.lastOffset}} is 1500
> * consistency check {{if(!appendInfo.offsetsMonotonic || 
> appendInfo.firstOffset < nextOffsetMetadata.messageOffset)}} succeeds (the 
> second condition can never fail due to unconditional assignment) and writing 
> proceeds
> * the message set is appended to current log segment starting at offset 2001, 
> but the offsets in the set are 1001...1500
> * the system shuts down abruptly
> * on restart, the following unrecoverable error is reported: 
> {code}
> Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to 
> append an offset (1001) to position 12345 no larger than the last offset 
> appended (1950) to xyz/.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>   at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>   at java.lang.Thread.run(Thread.java:722)
> {code} 
> *Proposed fix:* the assignment {{appendInfo.firstOffset = 
> nextOffsetMetadata.messageOffset}} should only happen in {{if 
> (assignOffsets)}} branch of code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)