Imran Patel created KAFKA-3040:
----------------------------------
Summary: Broker didn't report new data after change in leader
Key: KAFKA-3040
URL: https://issues.apache.org/jira/browse/KAFKA-3040
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 0.9.0.0
Environment: Debian 3.2.54-2 x86_64 GNU/Linux
Reporter: Imran Patel
Priority: Critical
Recently we had an event that causes large Kafka backlogs to develop suddenty.
This happened across multiple partitions. We noticed that after a brief
connection loss to Zookeeper, Kafka brokers were not reporting no new data to
our (SimpleConsumer) consumer although the producers were enqueueing fine. This
went on until another zk blip led to a reconfiguration which suddenly caused
the consumers to "see" the data. Our consumers and our monitoring tools did not
see the offsets move during the outage window. Here is the sequence of events
for a single partition (with logs attached below).
The brokers are running 0.9, the producer is using library version
kafka_2.10:0.8.2.1 and consumer is using kafka_2.10:0.8.0 (both are Java
programs). Our monitoring tool uses kafka-python-9.0
Can you tell us if this could be due to a consumer bug (the libraries being too
"old" to operate with 0.9 broker, for e.g.)? Or does it look a Kafka core
issue? Please note that we recently upgraded the brokers to 0.9 and hadn't seen
a similar issue prior to that.
- after a brief connection loss to zookeeper, the partition leader (broker 9
for partition 29 in logs below) came back and shrunk the ISR to itself.
- producers kept on successfully sending data to Kafka and the remaining
replicas (brokers 3 and 4) recorded this data. AFAICT, 3 was the new leader.
Broker 9 did NOT replicate this data. It did repeatedly print the ISR shrinking
message over and over again.
- consumer on the other hand reported no new data presumably because it was
talking to 9 and that broker was doing nothing.
- 6 hours later, another zookeeper blip causes the brokers to reconfigure and
now consumers started seeing new data.
Broker 9:
[2015-12-16 19:46:01,523] INFO Partition [messages,29] on broker 9: Expanding
ISR for partition [messages,29] from 9,4 to 9,4,3 (kafka.cluster.Partition
[2015-12-18 00:59:25,511] INFO New leader is 9
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-12-18 01:00:18,451] INFO Partition [messages,29] on broker 9: Shrinking
ISR for partition [messages,29] from 9,4,3 to 9 (kafka.cluster.Partition)
[2015-12-18 01:00:18,458] INFO Partition [messages,29] on broker 9: Cached
zkVersion [472] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
[2015-12-18 07:04:44,552] INFO Truncating log messages-29 to offset
14169556269. (kafka.log.Log)
[2015-12-18 07:04:44,649] INFO [ReplicaFetcherManager on broker 9] Added
fetcher for partitions List([[messages,61], initOffset 14178575900 to broker
BrokerEndPoint(6,kafka006-prod.c.foo.internal,9092)] , [[messages,13],
initOffset 14156091271 to broker
BrokerEndPoint(2,kafka002-prod.c.foo.internal,9092)] , [[messages,45],
initOffset 14135826155 to broker
BrokerEndPoint(4,kafka004-prod.c.foo.internal,9092)] , [[messages,41],
initOffset 14157926400 to broker
BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] , [[messages,29],
initOffset 14169556269 to broker
BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] , [[messages,57],
initOffset 14175218230 to broker
BrokerEndPoint(1,kafka001-prod.c.foo.internal,9092)] )
(kafka.server.ReplicaFetcherManager)
Broker 3:
[2015-12-18 01:00:01,763] INFO [ReplicaFetcherManager on broker 3] Removed
fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
[2015-12-18 07:09:04,631] INFO Partition [messages,29] on broker 3: Expanding
ISR for partition [messages,29] from 4,3 to 4,3,9 (kafka.cluster.Partition)
[2015-12-18 07:09:49,693] INFO [ReplicaFetcherManager on broker 3] Removed
fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
Broker 4:
[2015-12-18 01:00:01,783] INFO [ReplicaFetcherManager on broker 4] Removed
fetcher for partitions [messages,29] (kafka.server.ReplicaFetcherManager)
[2015-12-18 01:00:01,866] INFO [ReplicaFetcherManager on broker 4] Added
fetcher for partitions List([[messages,29], initOffset 14169556262 to broker
BrokerEndPoint(3,kafka003-prod.c.foo.internal,9092)] )
(kafka.server.ReplicaFetcherManager)
[2015-12-18 07:09:50,191] ERROR [ReplicaFetcherThread-0-3], Error for partition
[messages,29] to broker
3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
Consumer:
2015-12-18 01:00:01.791 [P29-Reader] INFO
com.example.utils.kafkalib.KafkaConsumer - 7 messages read from partition 29
starting from offset 14169556262
2015-12-18 01:00:01.791 [P29-Reader] INFO
com.example.utils.kafkalib.KafkaConsumer - 0 messages read from partition 29
starting from offset 14169556269
2015-12-18 07:04:44.293 [P29-Reader] INFO
com.example.utils.kafkalib.KafkaConsumer - 0 messages read from partition 29
starting from offset 14169556269
2015-12-18 07:04:44.303 [P29-Reader] WARN
com.example.project.consumer.kafka.PartitionReader - Error fetching data from
the Broker:kafka009-prod.c.foo.internal Reason: NotLeaderForPartitionCode
2015-12-18 07:04:44.304 [P29-Reader] INFO
com.example.project.consumer.kafka.PartitionData - Attempting to connectAndRead
leader for topic: messages partition: 29
2015-12-18 07:04:44.309 [P29-Reader] INFO
com.example.project.consumer.kafka.PartitionData - Leader for topic: messages
partition: 29 set to kafka003-prod.c.foo.internal
2015-12-18 07:04:44.749 [P29-Reader] INFO
com.example.utils.kafkalib.KafkaConsumer - 6514 messages read from partition 29
starting from offset 14169556269
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)