First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What 
we found before the incident was a monotonically increasing offset, where each 
compressed batch normally contained one or two records. Then the is a batch 
that contains many records, whose first records has an offset below the 
previous batch and whose last record has an offset above the previous batch. 
Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip is 
pre-outage normal operation. The period of larger batches is from just after 
the outage, where producers have a back log to processes when the partition 
becomes available, and then things return to normal batch sizes again once the 
back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, 
from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log set 
1239444214, completes load with offset 1239740602, and becomes leader of the 
2012 comes up next, recovers its log,  recovers unflushed log set 1239444214, 
truncates to offset 1239742830, (thats 2,228 records ahead of the recovered 
offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log set 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

Our working hypothesis here is that the partition becomes writeable again, 
possibly as brokers 2012 & 2024 get added to the ISR set before halting, and 
maybe don’t remove themselves when they halt? - hence remain in the ISR set for 
36 seconds. Mean while our producers are happily sending large compressed 
batches, as they have a backlog, to broker 2011, which is accepting them, (as 
there are enough replicas in the ISR set), and appending them to its log - 
moving its offset beyond brokers 2012 and 2024.

Log entries:

(Interleaved log entries from the three brokers - the broker id is in the [id] 

Just as the power was going out I see this in the broker that was the 

2016-04-11 12:01:42 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 

2016-04-11 12:01:56 - [2026] - "[Replica state machine on controller 2026]: 
Invoking state change to OfflineReplica for replicas

2016-04-11 12:01:56 - [2026] - "[Controller 2026]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011, 2012)”

2016-04-11 12:01:56 - [2026] - "[Channel manager on controller 2026]: Not 
sending request 
 to broker 2024, since it is offline.”

2016-04-11 12:04:46 - [2026] - [Replica state machine on controller 2026]: 
Invoking state change to OnlineReplica for replicas

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Starting preferred replica 
leader election for partitions [mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 

2016-04-11 12:04:58 - [2026] - "[PreferredReplicaPartitionLeaderSelector]: 
Current leader 2011 for partition [mt_xp_its_music_main_itsevent,20] is not the 
preferred replica. Trigerring preferred replica leader election”

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Partition 
[mt_xp_its_music_main_itsevent,20] completed preferred replica leader election. 
New leader is 2024"

Then following on from the outage I see this:

2016-06-02 13:00:49 - [2011] - starts back up

2016-06-02 13:02:10 - [2011] - "Found a corrupted index file, 
deleting and rebuilding index..."

2016-06-02 13:02:13 - [2011] - "Recovering unflushed segment 1239444214 in log 

2016-06-02 13:02:15 - [2011] - "Completed load of log 
mt_xp_its_music_main_itsevent-20 with log end offset 1239740602”

2016-06-02 13:03:27 - [2011] -[Replica state machine on controller 2011]: 
Started replica state machine with initial state ->
        [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2011] -> 
ReplicaDeletionIneligible, …

2016-06-02 13:03:27,560 - [2011] - "Controller 2011 epoch 114 initiated state 
change for partition [mt_xp_its_music_main_itsevent,20] from OfflinePartition 
to OnlinePartition failed" kafka.common.NoReplicaOnlineException: No broker in 
ISR for partition [mt_xp_its_music_main_itsevent,20] is alive. Live brokers 
are: [Set()], ISR brokers are: [2012,2024,2011]

2016-06-02 13:03:32 - [2011] - [Partition state machine on Controller 2011]: 
Started partition state machine with initial state ->
        [mt_xp_its_music_main_itsevent,20] -> OfflinePartition, ...

2016-06-02 13:03:32 - [2011] - [Replica state machine on controller 2011]: 
Invoking state change to OnlineReplica for replicas …
       [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2011], …

2016-06-02 13:03:32 - [2011] - "[OfflinePartitionLeaderSelector]: Selected new 
leader and ISR {"leader":2011,"leader_epoch":12,"isr":[2011]} for offline 
partition [mt_xp_its_music_main_itsevent,20]”

2016-06-02 13:03:32 - [2011] - "Broker 2011 received LeaderAndIsrRequest with 
correlation id 1 from controller 2011 epoch 113 for partition 
[mt_xp_its_music_main_itsevent,20] but cannot become follower since the new 
leader 2024 is unavailable."

2016-06-02 13:03:36 - [2011] - "[ReplicaFetcherManager on broker 2011] Removed 
fetcher for partitions…
        [mt_xp_its_music_main_itsevent,20], …

2016/06/02 13:05:37 - [2012] - starts back up.

2016-06-02 13:05:39 - [2024] - starts back up.

2016-06-02 13:06:29 - [2012] - "Recovering unflushed segment 1239444214 in log 

2016-06-02 13:06:31 - [2012] - "Completed load of log 
mt_xp_its_music_main_itsevent-20 with log end offset 1239742830”

2016-06-02 13:06:50- [2024] - "Found a corrupted index file, 
deleting and rebuilding index…"

2016-06-02 13:06:54 - [2024] - Recovering unflushed segment 1239444214 in log 

2016-06-02 13:06:56 - [2024] - Completed load of log 
mt_xp_its_music_main_itsevent-20 with log end offset 1239742250"

2016-06-02 13:07:59 - [2011] - [Replica state machine on controller 2011]: 
Invoking state change to OnlineReplica for replicas  
        [Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024], …

2016-06-02 13:08:00 - [2024] - "Truncating log mt_xp_its_music_main_itsevent-20 
to offset 1239742250.”

2016-06-02 13:08:00  - [2024] - "[ReplicaFetcherManager on broker 2024] Removed 
fetcher for partitions [… [mt_xp_its_music_main_itsevent,20]”

2016-06-02 13:08:00 - [2024] - "[ReplicaFetcherManager on broker 2024] Added 
fetcher for partitions List([... [[mt_xp_its_music_main_itsevent,20], 
initOffset 1239742250 to broker 
BrokerEndPoint(2011,st13p00it-bdkafka122.rock.apple.com,9092)] … “

2016-06-02 13:08:00 - [2024] - "[ReplicaFetcherThread-0-2017], Halting because 
log truncation is not allowed for topic mt_xp_its_main_itsevent, Current leader 
2017's latest offset 7128021741 is less than replica 2024's latest offset 

2016-06-02 13:08:12 - [2011] - [Replica state machine on controller 2011]: 
Invoking state change to OfflineReplica for replicas

2016-06-02 13:08:13 - [2011] - [Controller 2011]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011)”

2016-06-02 13:08:13 - [2011] - [Channel manager on controller 2011]: Not 
sending request 
 to broker 2012, since it is offline.”

2016-06-02 13:08:13 - [2011] - "Broker 2011 ignoring LeaderAndIsr request from 
controller 2011 with correlation id 30 epoch 114 for partition 
[mt_xp_its_music_main_itsevent,20] since its associated leader epoch 12 is old. 
Current leader epoch is 12”

2016-06-02 13:08:13 - [2011] - "[Channel manager on controller 2011]: Not 
sending request 
 to broker 2024, since it is offline.”

2016-06-02 13:08:15 - [2011] - "[Replica state machine on controller 2011]: 
Invoking state change to OnlineReplica for replicas 

2016-06-02 13:08:16 - [2012] - [ReplicaFetcherManager on broker 2012] Removed 
fetcher for partitions:
        [mt_xp_its_music_main_itsevent,20], …

2016-06-02 13:08:16 - [2012] - "Truncating log mt_xp_its_music_main_itsevent-20 
to offset 1239742830."

2016-06-02 13:08:16 - [2012] - [ReplicaFetcherManager on broker 2012] Added 
fetcher for partitions List(
        ..[[mt_xp_its_music_main_itsevent,20], initOffset 1239742830 to broker 
BrokerEndPoint(2011,st13p00it-bdkafka122.rock.apple.com,9092)]), ...

2016-06-02 13:08:17 - [2012] - [ReplicaFetcherThread-0-2011], Halting because 
log truncation is not allowed for topic OneDirection_output, Current leader 
2011's latest offset 44631162 is less than replica 2012's latest offset 44738961

2016-06-02 13:08:27 - [2011] - "[Replica state machine on controller 2011]: 
Invoking state change to OfflineReplica for replicas 

2016-06-02 13:08:27 - [2011] - "[Controller 2011]: Cannot remove replica 2012 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011)”

2016-06-02 13:08:28 - [2011] -"[Channel manager on controller 2011]: Not 
sending request 
 to broker 2024, since it is offline."

2016-06-02 13:08:28 - [2011] - "Broker 2011 ignoring LeaderAndIsr request from 
controller 2011 with correlation id 42 epoch 114 for partition 
[mt_xp_its_music_main_itsevent,20] since its associated leader epoch 12 is old. 
Current leader epoch is 12”

2016-06-02 13:08:28 - [2011] - "[Channel manager on controller 2011]: Not 
sending request 
 to broker 2012, since it is offline."

2016-06-02 13:10:07 - [2012] - Starts back up

2016-06-02 13:10:48 - [2012] - "Found a corrupted index file, 
/data3/kafka/OneDirection_output-14/00000000000044555803.index, deleting and 
rebuilding index…"

2016-06-02 13:10:53 - [2012] - "There was an error in one of the threads during 
logs loading: kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 

Broker config:

timestamp="2016-06-02 13:05:39,440" thread="main" level=INFO 
logger=kafka.server.KafkaConfig message="KafkaConfig values:
        advertised.host.name = null
        metric.reporters = 
        quota.producer.default = 9223372036854775807
        offsets.topic.num.partitions = 50
        log.flush.interval.messages = 9223372036854775807
        auto.create.topics.enable = true
        controller.socket.timeout.ms = 30000
        log.flush.interval.ms = null
        principal.builder.class = class 
        replica.socket.receive.buffer.bytes = 65536
        min.insync.replicas = 2
        replica.fetch.wait.max.ms = 500
        num.recovery.threads.per.data.dir = 2
        ssl.keystore.type = JKS
        default.replication.factor = 3
        ssl.truststore.password = null
        log.preallocate = false
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        fetch.purgatory.purge.interval.requests = 1000
        ssl.endpoint.identification.algorithm = null
        replica.socket.timeout.ms = 30000
        message.max.bytes = 10485760
        num.io.threads = 16
        offsets.commit.required.acks = -1
        log.flush.offset.checkpoint.interval.ms = 60000
        delete.topic.enable = true
        quota.window.size.seconds = 1
        ssl.truststore.type = JKS
        offsets.commit.timeout.ms = 5000
        quota.window.num = 11
        zookeeper.connect = 
        authorizer.class.name =
        num.replica.fetchers = 1
        log.retention.ms = null
        log.roll.jitter.hours = 0
        log.cleaner.enable = true
        offsets.load.buffer.size = 5242880
        log.cleaner.delete.retention.ms = 86400000
        ssl.client.auth = none
        controlled.shutdown.max.retries = 3
        queued.max.requests = 500
        offsets.topic.replication.factor = 3
        log.cleaner.threads = 1
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        socket.request.max.bytes = 104857600
        ssl.trustmanager.algorithm = PKIX
        zookeeper.session.timeout.ms = 6000
        log.retention.bytes = -1
        sasl.kerberos.min.time.before.relogin = 60000
        zookeeper.set.acl = false
        connections.max.idle.ms = 600000
        offsets.retention.minutes = 1440
        replica.fetch.backoff.ms = 1000
        inter.broker.protocol.version =
        log.retention.hours = 720
        num.partitions = 150
        broker.id.generation.enable = true
        listeners = null
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        log.roll.ms = null
        log.flush.scheduler.interval.ms = 9223372036854775807
        ssl.cipher.suites = null
        log.index.size.max.bytes = 10485760
        ssl.keymanager.algorithm = SunX509
        security.inter.broker.protocol = PLAINTEXT
        replica.fetch.max.bytes = 10485760
        advertised.port = null
        log.cleaner.dedupe.buffer.size = 134217728
        replica.high.watermark.checkpoint.interval.ms = 5000
        log.cleaner.io.buffer.size = 524288
        sasl.kerberos.ticket.renew.window.factor = 0.8
        zookeeper.connection.timeout.ms = 1000000
        controlled.shutdown.retry.backoff.ms = 5000
        log.roll.hours = 168
        log.cleanup.policy = delete
        host.name =
        log.roll.jitter.ms = null
        max.connections.per.ip = 2147483647
        offsets.topic.segment.bytes = 104857600
        background.threads = 10
        quota.consumer.default = 9223372036854775807
        request.timeout.ms = 30000
        log.index.interval.bytes = 4096
        log.dir = /tmp/kafka-logs
        log.segment.bytes = 1073741824
        log.cleaner.backoff.ms = 15000
        offset.metadata.max.bytes = 4096
        ssl.truststore.location = null
        group.max.session.timeout.ms = 600000
        ssl.keystore.password = null
        zookeeper.sync.time.ms = 2000
        port = 9092
        log.retention.minutes = null
        log.segment.delete.delay.ms = 60000
        log.dirs = /data1/kafka,/data2/kafka,/data3/kafka
        controlled.shutdown.enable = true
        compression.type = producer
        max.connections.per.ip.overrides =
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        auto.leader.rebalance.enable = true
        leader.imbalance.check.interval.seconds = 300
        log.cleaner.min.cleanable.ratio = 0.5
        replica.lag.time.max.ms = 10000
        num.network.threads = 16
        ssl.key.password = null
        reserved.broker.max.id = 8000
        metrics.num.samples = 2
        socket.send.buffer.bytes = 1048576
        ssl.protocol = TLS
        socket.receive.buffer.bytes = 1048576
        ssl.keystore.location = null
        replica.fetch.min.bytes = 1
        unclean.leader.election.enable = false
        group.min.session.timeout.ms = 6000
        log.cleaner.io.buffer.load.factor = 0.9
        offsets.retention.check.interval.ms = 600000
        producer.purgatory.purge.interval.requests = 1000
        metrics.sample.window.ms = 60000
        broker.id = 2024
        offsets.topic.compression.codec = 0
        log.retention.check.interval.ms = 300000
        advertised.listeners = null
        leader.imbalance.per.broker.percentage = 10)

