[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-11-30 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3919:
---
Labels: reliability  (was: )

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>  Labels: reliability
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being 
> the preferred leader.
> * Producers using acks=1, compression=gzip
> * Brokers configured with unclean.elections=false, zk.session-timeout=36s
> Post outage:
> * 2011 comes up f

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-05 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

We've spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested), and we're producing to the topic with gzip 
compression and acks=1

We looked through the data logs that were causing the brokers to not start. 
What we found the initial part of the log has monotonically increasing offset, 
where each compressed batch normally contained one or two records. Then the is 
a batch that contains many records, whose first records have an offset below 
the previous batch and whose last record has an offset above the previous 
batch. Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip, with the 
small batches, is pre-outage normal operation. The period of larger batches is 
from just after the outage, where producers have a back log to processes when 
the partition becomes available, and then things return to normal batch sizes 
again once the back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this. Here’s what we know happened, with 
regards to one partition that has issues, from the logs:

Prior to outage:
* Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
* Producers using acks=1, compression=gzip
* Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
* 2011 comes up first, (also as the Controller), recovers unflushed log segment 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
* 2012 comes up next, recovers its log,  recovers unflushed log segment 
1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
recovered offset of the current leader), and starts following.
* 2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
* The Controller adds 2024 to the r

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-05 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

We've spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested), and we're producing to the topic with gzip 
compression and acks=1

We looked through the data logs that were causing the brokers to not start. 
What we found the initial part of the log has monotonically increasing offset, 
where each compressed batch normally contained one or two records. Then the is 
a batch that contains many records, whose first records have an offset below 
the previous batch and whose last record has an offset above the previous 
batch. Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip, with the 
small batches, is pre-outage normal operation. The period of larger batches is 
from just after the outage, where producers have a back log to processes when 
the partition becomes available, and then things return to normal batch sizes 
again once the back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this. Here’s what we know happened, with 
regards to one partition that has issues, from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log segment 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log segment 
1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
recovered offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set jus

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.
* brokers are configured so that unclean leader elections are disabled.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 
broke

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.
* brokers are configured so that unclean leader elections are disabled.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 
broke

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-06-29 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 
brokers have a dip outside of the 4K boundary.

There are some assumptions in