[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
-------------------------------
    Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
        Fatal error during KafkaServerStartable startup. Prepare to shutdown”
        kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
        at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
        at kafka.log.LogSegment.recover(LogSegment.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.log.Log.loadSegments(Log.scala:160)
        at kafka.log.Log.<init>(Log.scala:90)
        at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 
brokers have a dip outside of the 4K boundary.

There are some assumptions in there, which I’ve not got around to confirming / 
denying. (A quick attempt to recreate this failed and I've not found the time 
to invest more).

Of course I'd really appreciate the community / experts stepping in and 
commenting on whether my assumptions are right or wrong, or if there is another 
explanation to the problem. 

But assuming I’m mostly right, then the fact the broker won’t start is 
obviously a bug, and one I’d like to fix.  A Kafka broker should not corrupt 
its own log during normal operation to the point that it can’t restart!

A secondary issue is if we think the divergent logs are acceptable? This may be 
deemed acceptable given the producers have chosen availability over consistency 
when they produced with acks = 1?  Though personally, the system having 
diverging replicas of an immutable commit log just doesn't sit right.

I see us having a few options here:
* Have the replicas detect the divergence of their logs e.g. a follower 
compares the checksum of its last record with the same offset on the leader. 
The follower can then workout that its log has diverged from the leader.  At 
which point it could either halt, stop replicating that partition or search 
backwards to find the point of divergence, truncate and recover. (possibly 
saving the truncated part somewhere). This would be a protocol change for 
Kafka.  This solution trades availability, (you’ve got less ISRs during the 
extended re-sync process), for consistency.
* Leave the logs as they are and have the indexing of offsets in the log on 
start up handle such a situation gracefully.  This leaves logs in a divergent 
state between replicas, (meaning replays would yield different messages if the 
leader was up to down), but gives better availability, (no time spent not being 
an ISR while it repairs any divergence).
* Support multiple options and allow it be tuned, ideally by topic.
* Something else...


I’m happy/keen to contribute here. But I’d like to first discuss which option 
should be investigated.

Andy

  was:
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
        Fatal error during KafkaServerStartable startup. Prepare to shutdown”
        kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
        at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
        at kafka.log.LogSegment.recover(LogSegment.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.log.Log.loadSegments(Log.scala:160)
        at kafka.log.Log.<init>(Log.scala:90)
        at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log.

I’m assuming that because the current leader has a higher offset that the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that when the pre-outage leader requests 
records from the leader, the offset it requests just happened to be in the 
middle of a compressed batch, but the leader returned the full batch.  When the 
pre-outage leader appends this batch to its own log it thinks all is OK. But 
what has happened is that the offsets in the log are no longer monotonically 
incrementing. Instead they actually dip by the number of records in the 
compressed batch that were before the requested offset.  If and when this 
broker restarts this dip may be at the 4K boundary the indexer checks. If it 
is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 
brokers have a dip outside of the 4K boundary.

There are some assumptions in there, which I’ve not got around to confirming / 
denying. (A quick attempt to recreate this failed and I've not found the time 
to invest more).

Of course I'd really appreciate the community / experts stepping in and 
commenting on whether my assumptions are right or wrong, or if there is another 
explanation to the problem. 

But assuming I’m mostly right, then the fact the broker won’t start is 
obviously a bug, and one I’d like to fix.  A Kafka broker should not corrupt 
its own log during normal operation to the point that it can’t restart!

A secondary issue is if we think the divergent logs are acceptable? This may be 
deemed acceptable given the producers have chosen availability over consistency 
when they produced with acks = 1?  Though personally, the system having 
diverging replicas of an immutable commit log just doesn't sit right.

I see us having a few options here:
* Have the replicas detect the divergence of their logs e.g. a follower 
compares the checksum of its last record with the same offset on the leader. 
The follower can then workout that its log has diverged from the leader.  At 
which point it could either halt, stop replicating that partition or search 
backwards to find the point of divergence, truncate and recover. (possibly 
saving the truncated part somewhere). This would be a protocol change for 
Kafka.  This solution trades availability, (you’ve got less ISRs during the 
extended re-sync process), for consistency.
* Leave the logs as they are and have the indexing of offsets in the log on 
start up handle such a situation gracefully.  This leaves logs in a divergent 
state between replicas, (meaning replays would yield different messages if the 
leader was up to down), but gives better availability, (no time spent not being 
an ISR while it repairs any divergence).
* Support multiple options and allow it be tuned, ideally by topic.
* Something else...


I’m happy/keen to contribute here. But I’d like to first discuss which option 
should be investigated.

Andy


> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3919
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>       Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>       kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
>       at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>       at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>       at kafka.log.LogSegment.recover(LogSegment.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at kafka.log.Log.loadSegments(Log.scala:160)
>       at kafka.log.Log.<init>(Log.scala:90)
>       at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> I’ve spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong):
> Given:
> * A topic that is produced to using acks = 1
> * A topic that is produced to using gzip compression
> * A topic that has min.isr set to less than the number of replicas, (i.e. 
> min.isr=2, #replicas=3)
> * Following ISRs are lagging behind the leader by some small number of 
> messages, (which is normal with acks=1)
> * brokers are configured with fairly large zk session timeout e.g. 30s.
> Then:
> When something like a power outage take out all three replicas, its possible 
> to get into a state such that the indexes won’t rebuild on a restart and a 
> broker fails to start. This can happen when:
> * Enough brokers, but not the pre-outage leader, come on-line for the 
> partition to be writeable
> * Producers produce enough records to the partition that the head offset is 
> now greater than the pre-outage leader head offset.
> * The pre-outage leader comes back online.
> At this point the logs on the pre-outage leader have diverged from the other 
> replicas.  It has some messages that are not in the other replicas, and the 
> other replicas have some records not in the pre-outage leader's log - at the 
> same offsets.
> I’m assuming that because the current leader has at higher offset than the 
> pre-outage leader, the pre-outage leader just starts following the leader and 
> requesting the records it thinks its missing.
> I’m also assuming that because the producers were using gzip, so each record 
> is actual a compressed message set, that iwhen the pre-outage leader requests 
> records from the leader, the offset it requests could just happened to be in 
> the middle of a compressed batch, but the leader returns the full batch.  
> When the pre-outage leader appends this batch to its own log it thinks all is 
> OK. But what has happened is that the offsets in the log are no longer 
> monotonically incrementing. Instead they actually dip by the number of 
> records in the compressed batch that were before the requested offset.  If 
> and when this broker restarts this dip may be at the 4K boundary the indexer 
> checks. If it is, the broker won’t start.
> Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
> protracted outage.  We’ve written a little utility that shows several more 
> brokers have a dip outside of the 4K boundary.
> There are some assumptions in there, which I’ve not got around to confirming 
> / denying. (A quick attempt to recreate this failed and I've not found the 
> time to invest more).
> Of course I'd really appreciate the community / experts stepping in and 
> commenting on whether my assumptions are right or wrong, or if there is 
> another explanation to the problem. 
> But assuming I’m mostly right, then the fact the broker won’t start is 
> obviously a bug, and one I’d like to fix.  A Kafka broker should not corrupt 
> its own log during normal operation to the point that it can’t restart!
> A secondary issue is if we think the divergent logs are acceptable? This may 
> be deemed acceptable given the producers have chosen availability over 
> consistency when they produced with acks = 1?  Though personally, the system 
> having diverging replicas of an immutable commit log just doesn't sit right.
> I see us having a few options here:
> * Have the replicas detect the divergence of their logs e.g. a follower 
> compares the checksum of its last record with the same offset on the leader. 
> The follower can then workout that its log has diverged from the leader.  At 
> which point it could either halt, stop replicating that partition or search 
> backwards to find the point of divergence, truncate and recover. (possibly 
> saving the truncated part somewhere). This would be a protocol change for 
> Kafka.  This solution trades availability, (you’ve got less ISRs during the 
> extended re-sync process), for consistency.
> * Leave the logs as they are and have the indexing of offsets in the log on 
> start up handle such a situation gracefully.  This leaves logs in a divergent 
> state between replicas, (meaning replays would yield different messages if 
> the leader was up to down), but gives better availability, (no time spent not 
> being an ISR while it repairs any divergence).
> * Support multiple options and allow it be tuned, ideally by topic.
> * Something else...
> I’m happy/keen to contribute here. But I’d like to first discuss which option 
> should be investigated.
> Andy



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to