[
https://issues.apache.org/jira/browse/KAFKA-3918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andy Coates resolved KAFKA-3918.
--------------------------------
Resolution: Duplicate
> Broker faills to start after ungraceful shutdown due to non-monotonically
> incrementing offsets in logs
> ------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-3918
> URL: https://issues.apache.org/jira/browse/KAFKA-3918
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.9.0.1
> Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a
> proportion of our cluster disappear. When the power came back on several
> brokers halted on start up with the error:
> {noformat}
> Fatal error during KafkaServerStartable startup. Prepare to shutdown”
> kafka.common.InvalidOffsetException: Attempt to append an offset
> (1239742691) to position 35728 no larger than the last offset appended
> (1239742822) to
> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
> at
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
> at kafka.log.LogSegment.recover(LogSegment.scala:188)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at kafka.log.Log.loadSegments(Log.scala:160)
> at kafka.log.Log.<init>(Log.scala:90)
> at
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that
> contained non monotonically incrementing offsets.
> I’ve spent some time digging through the logs and I feel I may have worked
> out the sequence of events leading to this issue, (though this is based on
> some assumptions I've made about the way Kafka is working, which may be
> wrong):
> Given:
> * A topic that is produced to using acks = 1
> * A topic that is produced to using gzip compression
> * A topic that has min.isr set to less than the number of replicas, (i.e.
> min.isr=2, #replicas=3)
> * Following ISRs are lagging behind the leader by some small number of
> messages, (which is normal with acks=1)
> * brokers are configured with fairly large zk session timeout e.g. 30s.
> Then:
> When something like a power outage take out all three replicas, its possible
> to get into a state such that the indexes won’t rebuild on a restart and a
> broker fails to start. This can happen when:
> * Enough brokers, but not the pre-outage leader, come on-line for the
> partition to be writeable
> * Producers produce enough records to the partition that the head offset is
> now greater than the pre-outage leader head offset.
> * The pre-outage leader comes back online.
> At this point the logs on the pre-outage leader have diverged from the other
> replicas. It has some messages that are not in the other replicas, and the
> other replicas have some records not in the pre-outage leader's log.
> I’m assuming that because the current leader has a higher offset that the
> pre-outage leader, the pre-outage leader just starts following the leader and
> requesting the records it thinks its missing.
> I’m also assuming that because the producers were using gzip, so each record
> is actual a compressed message set, that when the pre-outage leader requests
> records from the leader, the offset it requests just happened to be in the
> middle of a compressed batch, but the leader returned the full batch. When
> the pre-outage leader appends this batch to its own log it thinks all is OK.
> But what has happened is that the offsets in the log are no longer
> monotonically incrementing. Instead they actually dip by the number of
> records in the compressed batch that were before the requested offset. If
> and when this broker restarts this dip may be at the 4K boundary the indexer
> checks. If it is, the broker won’t start.
> Several of our brokers were unlucky enough to hit that 4K boundary, causing a
> protracted outage. We’ve written a little utility that shows several more
> brokers have a dip outside of the 4K boundary.
> There are some assumptions in there, which I’ve not got around to confirming
> / denying. (A quick attempt to recreate this failed and I've not found the
> time to invest more).
> Of course I'd really appreciate the community / experts stepping in and
> commenting on whether my assumptions are right or wrong, or if there is
> another explanation to the problem.
> But assuming I’m mostly right, then the fact the broker won’t start is
> obviously a bug, and one I’d like to fix. A Kafka broker should not corrupt
> its own log during normal operation to the point that it can’t restart!
> A secondary issue is if we think the divergent logs are acceptable? This may
> be deemed acceptable given the producers have chosen availability over
> consistency when they produced with acks = 1? Though personally, the system
> having diverging replicas of an immutable commit log just doesn't sit right.
> I see us having a few options here:
> * Have the replicas detect the divergence of their logs e.g. a follower
> compares the checksum of its last record with the same offset on the leader.
> The follower can then workout that its log has diverged from the leader. At
> which point it could either halt, stop replicating that partition or search
> backwards to find the point of divergence, truncate and recover. (possibly
> saving the truncated part somewhere). This would be a protocol change for
> Kafka. This solution trades availability, (you’ve got less ISRs during the
> extended re-sync process), for consistency.
> * Leave the logs as they are and have the indexing of offsets in the log on
> start up handle such a situation gracefully. This leaves logs in a divergent
> state between replicas, (meaning replays would yield different messages if
> the leader was up to down), but gives better availability, (no time spent not
> being an ISR while it repairs any divergence).
> * Support multiple options and allow it be tuned, ideally by topic.
> * Something else...
> I’m happy/keen to contribute here. But I’d like to first discuss which option
> should be investigated.
> Andy
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)