[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
-------------------------------
    Comment: was deleted

(was: Hi [~junrao], thanks for taking the time to look at this.

Note: I've incorporated some of what I say below into the problem description 
above, so that it doesn't get lost in the comments.

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What 
we found before the incident was a monotonically increasing offset, where each 
compressed batch normally contained one or two records. Then the is a batch 
that contains many records, whose first records has an offset below the 
previous batch and whose last record has an offset above the previous batch. 
Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip is 
pre-outage normal operation. The period of larger batches is from just after 
the outage, where producers have a back log to processes when the partition 
becomes available, and then things return to normal batch sizes again once the 
back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, 
from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log set 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log set 1239444214, 
truncates to offset 1239742830, (thats 2,228 records ahead of the recovered 
offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log set 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

Our working hypothesis here is that the partition becomes writeable again, 
possibly as brokers 2012 & 2024 get added to the ISR set before halting, and 
maybe don’t remove themselves when they halt? - hence remain in the ISR set for 
36 seconds. Mean while our producers are happily sending large compressed 
batches, as they have a backlog, to broker 2011, which is accepting them, (as 
there are enough replicas in the ISR set), and appending them to its log - 
moving its offset beyond brokers 2012 and 2024.

Log entries:

(Interleaved log entries from the three brokers - the broker id is in the [id] 
brackets)

Just as the power was going out I see this in the broker that was the 
controller:

2016-04-11 12:01:42 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:01:56 - [2026] - "[Replica state machine on controller 2026]: 
Invoking state change to OfflineReplica for replicas
        [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024] 

2016-04-11 12:01:56 - [2026] - "[Controller 2026]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011, 2012)”

2016-04-11 12:01:56 - [2026] - "[Channel manager on controller 2026]: Not 
sending request 
{controller_id=2026,controller_epoch=111,delete_partitions=0,partitions=[{topic=mt_xp_its_music_main_itsevent,partition=20}]}
 to broker 2024, since it is offline.”

2016-04-11 12:04:46 - [2026] - [Replica state machine on controller 2026]: 
Invoking state change to OnlineReplica for replicas
        [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024]

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Starting preferred replica 
leader election for partitions [mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] - "[PreferredReplicaPartitionLeaderSelector]: 
Current leader 2011 for partition [mt_xp_its_music_main_itsevent,20] is not the 
preferred replica. Trigerring preferred replica leader election”

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Partition 
[mt_xp_its_music_main_itsevent,20] completed preferred replica leader election. 
New leader is 2024"

Then following on from the outage I see this:

2016-06-02 13:00:49 - [2011] - starts back up

2016-06-02 13:02:10 - [2011] - "Found a corrupted index file, 
/data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index, 
deleting and rebuilding index..."

2016-06-02 13:02:13 - [2011] - "Recovering unflushed segment 1239444214 in log 
mt_xp_its_music_main_itsevent-20.”

2016-06-02 13:02:15 - [2011] - "Completed load of log 
mt_xp_its_music_main_itsevent-20 with log end offset 1239740602”

2016-06-02 13:03:27 - [2011] -[Replica state machine on controller 2011]: 
Started replica state machine with initial state ->
        [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2011] -> 
ReplicaDeletionIneligible, …

2016-06-02 13:03:27,560 - [2011] - "Controller 2011 epoch 114 initiated state 
change for partition [mt_xp_its_music_main_itsevent,20] from OfflinePartition 
to OnlinePartition failed" kafka.common.NoReplicaOnlineException: No broker in 
ISR for partition [mt_xp_its_music_main_itsevent,20] is alive. Live brokers 
are: [Set()], ISR brokers are: [2012,2024,2011]

2016-06-02 13:03:32 - [2011] - [Partition state machine on Controller 2011]: 
Started partition state machine with initial state ->
        [mt_xp_its_music_main_itsevent,20] -> OfflinePartition, ...

2016-06-02 13:03:32 - [2011] - [Replica state machine on controller 2011]: 
Invoking state change to OnlineReplica for replicas …
       [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2011], …

2016-06-02 13:03:32 - [2011] - "[OfflinePartitionLeaderSelector]: Selected new 
leader and ISR {"leader":2011,"leader_epoch":12,"isr":[2011]} for offline 
partition [mt_xp_its_music_main_itsevent,20]”

2016-06-02 13:03:32 - [2011] - "Broker 2011 received LeaderAndIsrRequest with 
correlation id 1 from controller 2011 epoch 113 for partition 
[mt_xp_its_music_main_itsevent,20] but cannot become follower since the new 
leader 2024 is unavailable."

2016-06-02 13:03:36 - [2011] - "[ReplicaFetcherManager on broker 2011] Removed 
fetcher for partitions…
        [mt_xp_its_music_main_itsevent,20], …

2016/06/02 13:05:37 - [2012] - starts back up.

2016-06-02 13:05:39 - [2024] - starts back up.

2016-06-02 13:06:29 - [2012] - "Recovering unflushed segment 1239444214 in log 
mt_xp_its_music_main_itsevent-20.”

2016-06-02 13:06:31 - [2012] - "Completed load of log 
mt_xp_its_music_main_itsevent-20 with log end offset 1239742830”

2016-06-02 13:06:50- [2024] - "Found a corrupted index file, 
/data2/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index, 
deleting and rebuilding index…"

2016-06-02 13:06:54 - [2024] - Recovering unflushed segment 1239444214 in log 
mt_xp_its_music_main_itsevent-20."

2016-06-02 13:06:56 - [2024] - Completed load of log 
mt_xp_its_music_main_itsevent-20 with log end offset 1239742250"

2016-06-02 13:07:59 - [2011] - [Replica state machine on controller 2011]: 
Invoking state change to OnlineReplica for replicas  
        [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024], …

2016-06-02 13:08:00 - [2024] - "Truncating log mt_xp_its_music_main_itsevent-20 
to offset 1239742250.”

2016-06-02 13:08:00  - [2024] - "[ReplicaFetcherManager on broker 2024] Removed 
fetcher for partitions [… [mt_xp_its_music_main_itsevent,20]”

2016-06-02 13:08:00 - [2024] - "[ReplicaFetcherManager on broker 2024] Added 
fetcher for partitions List([... [[mt_xp_its_music_main_itsevent,20], 
initOffset 1239742250 to broker 
BrokerEndPoint(2011,st13p00it-bdkafka122.rock.apple.com,9092)] … “

2016-06-02 13:08:00 - [2024] - "[ReplicaFetcherThread-0-2017], Halting because 
log truncation is not allowed for topic mt_xp_its_main_itsevent, Current leader 
2017's latest offset 7128021741 is less than replica 2024's latest offset 
7128915601"

2016-06-02 13:08:12 - [2011] - [Replica state machine on controller 2011]: 
Invoking state change to OfflineReplica for replicas
        ...[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024],…"

2016-06-02 13:08:13 - [2011] - [Controller 2011]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011)”

2016-06-02 13:08:13 - [2011] - [Channel manager on controller 2011]: Not 
sending request 
{controller_id=2011,controller_epoch=114,partition_states=[...,{topic=mt_xp_its_music_main_itsevent,partition=20,controller_epoch=114,leader=2011,leader_epoch=12,isr=[2011],zk_version=25,replicas=[2024,2011,2012]},...}
 to broker 2012, since it is offline.”

2016-06-02 13:08:13 - [2011] - "Broker 2011 ignoring LeaderAndIsr request from 
controller 2011 with correlation id 30 epoch 114 for partition 
[mt_xp_its_music_main_itsevent,20] since its associated leader epoch 12 is old. 
Current leader epoch is 12”

2016-06-02 13:08:13 - [2011] - "[Channel manager on controller 2011]: Not 
sending request 
{controller_id=2011,controller_epoch=114,delete_partitions=0,partitions=[{topic=mt_xp_its_music_main_itsevent,partition=20}]}
 to broker 2024, since it is offline.”

2016-06-02 13:08:15 - [2011] - "[Replica state machine on controller 2011]: 
Invoking state change to OnlineReplica for replicas 
…,[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2012],...

2016-06-02 13:08:16 - [2012] - [ReplicaFetcherManager on broker 2012] Removed 
fetcher for partitions:
        [mt_xp_its_music_main_itsevent,20], …

2016-06-02 13:08:16 - [2012] - "Truncating log mt_xp_its_music_main_itsevent-20 
to offset 1239742830."

2016-06-02 13:08:16 - [2012] - [ReplicaFetcherManager on broker 2012] Added 
fetcher for partitions List(
        ..[[mt_xp_its_music_main_itsevent,20], initOffset 1239742830 to broker 
BrokerEndPoint(2011,st13p00it-bdkafka122.rock.apple.com,9092)]), ...

2016-06-02 13:08:17 - [2012] - [ReplicaFetcherThread-0-2011], Halting because 
log truncation is not allowed for topic OneDirection_output, Current leader 
2011's latest offset 44631162 is less than replica 2012's latest offset 44738961

2016-06-02 13:08:27 - [2011] - "[Replica state machine on controller 2011]: 
Invoking state change to OfflineReplica for replicas 
...,[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2012],…"

2016-06-02 13:08:27 - [2011] - "[Controller 2011]: Cannot remove replica 2012 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011)”

2016-06-02 13:08:28 - [2011] -"[Channel manager on controller 2011]: Not 
sending request 
{controller_id=2011,controller_epoch=114,partition_states=[..,{topic=mt_xp_its_music_main_itsevent,partition=20,controller_epoch=114,leader=2011,leader_epoch=12,isr=[2011],zk_version=25,replicas=[2024,2011,2012]},...]}
 to broker 2024, since it is offline."

2016-06-02 13:08:28 - [2011] - "Broker 2011 ignoring LeaderAndIsr request from 
controller 2011 with correlation id 42 epoch 114 for partition 
[mt_xp_its_music_main_itsevent,20] since its associated leader epoch 12 is old. 
Current leader epoch is 12”

2016-06-02 13:08:28 - [2011] - "[Channel manager on controller 2011]: Not 
sending request 
{controller_id=2011,controller_epoch=114,delete_partitions=0,partitions=[{topic=mt_xp_its_music_main_itsevent,partition=20}]}
 to broker 2012, since it is offline."

2016-06-02 13:10:07 - [2012] - Starts back up

2016-06-02 13:10:48 - [2012] - "Found a corrupted index file, 
/data3/kafka/OneDirection_output-14/00000000000044555803.index, deleting and 
rebuilding index…"

2016-06-02 13:10:53 - [2012] - "There was an error in one of the threads during 
logs loading: kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.”

Broker config:

timestamp="2016-06-02 13:05:39,440" thread="main" level=INFO 
logger=kafka.server.KafkaConfig message="KafkaConfig values:
        advertised.host.name = null
        metric.reporters = 
[com.apple.its.bigdata.kafka.metrics.HubbleMetricsReporter]
        quota.producer.default = 9223372036854775807
        offsets.topic.num.partitions = 50
        log.flush.interval.messages = 9223372036854775807
        auto.create.topics.enable = true
        controller.socket.timeout.ms = 30000
        log.flush.interval.ms = null
        principal.builder.class = class 
org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        replica.socket.receive.buffer.bytes = 65536
        min.insync.replicas = 2
        replica.fetch.wait.max.ms = 500
        num.recovery.threads.per.data.dir = 2
        ssl.keystore.type = JKS
        default.replication.factor = 3
        ssl.truststore.password = null
        log.preallocate = false
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        fetch.purgatory.purge.interval.requests = 1000
        ssl.endpoint.identification.algorithm = null
        replica.socket.timeout.ms = 30000
        message.max.bytes = 10485760
        num.io.threads = 16
        offsets.commit.required.acks = -1
        log.flush.offset.checkpoint.interval.ms = 60000
        delete.topic.enable = true
        quota.window.size.seconds = 1
        ssl.truststore.type = JKS
        offsets.commit.timeout.ms = 5000
        quota.window.num = 11
        zookeeper.connect = 
st11p00it-volacct029:2181,st11p00it-volacct030:2181,st11p00it-volacct054:2181/kafka-three
        authorizer.class.name =
        num.replica.fetchers = 1
        log.retention.ms = null
        log.roll.jitter.hours = 0
        log.cleaner.enable = true
        offsets.load.buffer.size = 5242880
        log.cleaner.delete.retention.ms = 86400000
        ssl.client.auth = none
        controlled.shutdown.max.retries = 3
        queued.max.requests = 500
        offsets.topic.replication.factor = 3
        log.cleaner.threads = 1
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        socket.request.max.bytes = 104857600
        ssl.trustmanager.algorithm = PKIX
        zookeeper.session.timeout.ms = 6000
        log.retention.bytes = -1
        sasl.kerberos.min.time.before.relogin = 60000
        zookeeper.set.acl = false
        connections.max.idle.ms = 600000
        offsets.retention.minutes = 1440
        replica.fetch.backoff.ms = 1000
        inter.broker.protocol.version = 0.9.0.0
        log.retention.hours = 720
        num.partitions = 150
        broker.id.generation.enable = true
        listeners = null
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        log.roll.ms = null
        log.flush.scheduler.interval.ms = 9223372036854775807
        ssl.cipher.suites = null
        log.index.size.max.bytes = 10485760
        ssl.keymanager.algorithm = SunX509
        security.inter.broker.protocol = PLAINTEXT
        replica.fetch.max.bytes = 10485760
        advertised.port = null
        log.cleaner.dedupe.buffer.size = 134217728
        replica.high.watermark.checkpoint.interval.ms = 5000
        log.cleaner.io.buffer.size = 524288
        sasl.kerberos.ticket.renew.window.factor = 0.8
        zookeeper.connection.timeout.ms = 1000000
        controlled.shutdown.retry.backoff.ms = 5000
        log.roll.hours = 168
        log.cleanup.policy = delete
        host.name =
        log.roll.jitter.ms = null
        max.connections.per.ip = 2147483647
        offsets.topic.segment.bytes = 104857600
        background.threads = 10
        quota.consumer.default = 9223372036854775807
        request.timeout.ms = 30000
        log.index.interval.bytes = 4096
        log.dir = /tmp/kafka-logs
        log.segment.bytes = 1073741824
        log.cleaner.backoff.ms = 15000
        offset.metadata.max.bytes = 4096
        ssl.truststore.location = null
        group.max.session.timeout.ms = 600000
        ssl.keystore.password = null
        zookeeper.sync.time.ms = 2000
        port = 9092
        log.retention.minutes = null
        log.segment.delete.delay.ms = 60000
        log.dirs = /data1/kafka,/data2/kafka,/data3/kafka
        controlled.shutdown.enable = true
        compression.type = producer
        max.connections.per.ip.overrides =
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        auto.leader.rebalance.enable = true
        leader.imbalance.check.interval.seconds = 300
        log.cleaner.min.cleanable.ratio = 0.5
        replica.lag.time.max.ms = 10000
        num.network.threads = 16
        ssl.key.password = null
        reserved.broker.max.id = 8000
        metrics.num.samples = 2
        socket.send.buffer.bytes = 1048576
        ssl.protocol = TLS
        socket.receive.buffer.bytes = 1048576
        ssl.keystore.location = null
        replica.fetch.min.bytes = 1
        unclean.leader.election.enable = false
        group.min.session.timeout.ms = 6000
        log.cleaner.io.buffer.load.factor = 0.9
        offsets.retention.check.interval.ms = 600000
        producer.purgatory.purge.interval.requests = 1000
        metrics.sample.window.ms = 60000
        broker.id = 2024
        offsets.topic.compression.codec = 0
        log.retention.check.interval.ms = 300000
        advertised.listeners = null
        leader.imbalance.per.broker.percentage = 10)

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3919
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>       Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>       kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
>       at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>       at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>       at kafka.log.LogSegment.recover(LogSegment.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at kafka.log.Log.loadSegments(Log.scala:160)
>       at kafka.log.Log.<init>(Log.scala:90)
>       at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> I’ve spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> Given:
> * A topic that is produced to using acks = 1
> * A topic that is produced to using gzip compression
> * A topic that has min.isr set to less than the number of replicas, (i.e. 
> min.isr=2, #replicas=3)
> * Following ISRs are lagging behind the leader by some small number of 
> messages, (which is normal with acks=1)
> * brokers are configured with fairly large zk session timeout e.g. 30s.
> * brokers are configured so that unclean leader elections are disabled.
> Then:
> When something like a power outage take out all three replicas, its possible 
> to get into a state such that the indexes won’t rebuild on a restart and a 
> broker fails to start. This can happen when:
> * Enough brokers, but not the pre-outage leader, come on-line for the 
> partition to be writeable
> * Producers produce enough records to the partition that the head offset is 
> now greater than the pre-outage leader head offset.
> * The pre-outage leader comes back online.
> At this point the logs on the pre-outage leader have diverged from the other 
> replicas.  It has some messages that are not in the other replicas, and the 
> other replicas have some records not in the pre-outage leader's log - at the 
> same offsets.
> I’m assuming that because the current leader has at higher offset than the 
> pre-outage leader, the pre-outage leader just starts following the leader and 
> requesting the records it thinks its missing.
> I’m also assuming that because the producers were using gzip, so each record 
> is actual a compressed message set, that iwhen the pre-outage leader requests 
> records from the leader, the offset it requests could just happened to be in 
> the middle of a compressed batch, but the leader returns the full batch.  
> When the pre-outage leader appends this batch to its own log it thinks all is 
> OK. But what has happened is that the offsets in the log are no longer 
> monotonically incrementing. Instead they actually dip by the number of 
> records in the compressed batch that were before the requested offset.  If 
> and when this broker restarts this dip may be at the 4K boundary the indexer 
> checks. If it is, the broker won’t start.
> Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
> protracted outage.  We’ve written a little utility that shows several more 
> brokers have a dip outside of the 4K boundary.
> There are some assumptions in there, which I’ve not got around to confirming 
> / denying. (A quick attempt to recreate this failed and I've not found the 
> time to invest more).
> Of course I'd really appreciate the community / experts stepping in and 
> commenting on whether my assumptions are right or wrong, or if there is 
> another explanation to the problem. 
> But assuming I’m mostly right, then the fact the broker won’t start is 
> obviously a bug, and one I’d like to fix.  A Kafka broker should not corrupt 
> its own log during normal operation to the point that it can’t restart!
> A secondary issue is if we think the divergent logs are acceptable? This may 
> be deemed acceptable given the producers have chosen availability over 
> consistency when they produced with acks = 1?  Though personally, the system 
> having diverging replicas of an immutable commit log just doesn't sit right.
> I see us having a few options here:
> * Have the replicas detect the divergence of their logs e.g. a follower 
> compares the checksum of its last record with the same offset on the leader. 
> The follower can then workout that its log has diverged from the leader.  At 
> which point it could either halt, stop replicating that partition or search 
> backwards to find the point of divergence, truncate and recover. (possibly 
> saving the truncated part somewhere). This would be a protocol change for 
> Kafka.  This solution trades availability, (you’ve got less ISRs during the 
> extended re-sync process), for consistency.
> * Leave the logs as they are and have the indexing of offsets in the log on 
> start up handle such a situation gracefully.  This leaves logs in a divergent 
> state between replicas, (meaning replays would yield different messages if 
> the leader was up to down), but gives better availability, (no time spent not 
> being an ISR while it repairs any divergence).
> * Support multiple options and allow it be tuned, ideally by topic.
> * Something else...
> I’m happy/keen to contribute here. But I’d like to first discuss which option 
> should be investigated.
> Andy



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to