Ron Dagostino created KAFKA-15495:
-------------------------------------
Summary: KRaft partition truncated when the only ISR member
restarts with and empty disk
Key: KAFKA-15495
URL: https://issues.apache.org/jira/browse/KAFKA-15495
Project: Kafka
Issue Type: Bug
Affects Versions: 3.5.1, 3.4.1, 3.3.2, 3.6.0
Reporter: Ron Dagostino
Assume a topic-partition in KRaft has just a single leader replica in the ISR.
Assume next that this replica goes offline. This replica's log will define the
contents of that partition when the replica restarts, which is correct
behavior. However, assume now that the replica has a disk failure, and we then
replace the failed disk with a new, empty disk that we also format with the
storage tool so it has the correct cluster ID. If we then restart the broker,
the topic-partition will have no data in it, and any other replicas that might
exist will truncate their logs to match. See below for a step-by-step demo of
how to reproduce this.
[KIP-858: Handle JBOD broker disk failure in
KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
introduces the concept of a Disk UUID that we can use to solve this problem.
Specifically, when the leader restarts with an empty (but correctly-formatted)
disk, the actual UUID associated with the disk will be different. The
controller will notice upon broker re-registration that its disk UUID differs
from what was previously registered. Right now we have no way of detecting
this situation, but the disk UUID gives us that capability.
STEPS TO REPRODUCE:
Create a single broker cluster with single controller. The standard files
under config/kraft work well:
bin/kafka-storage.sh random-uuid
J8qXRwI-Qyi2G0guFTiuYw
# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
config/kraft/controller.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
config/kraft/broker.properties
bin/kafka-server-start.sh config/kraft/controller.properties
bin/kafka-server-start.sh config/kraft/broker.properties
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1
--partitions 1 --replication-factor 1
# create __consumer-offsets topics
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1
--from-beginning
^C
# confirm that __consumer_offsets topic partitions are all created and on
broker with node id 2
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Now create 2 more brokers, with node IDs 3 and 4
cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed
's/localhost:9092/localhost:9011/g' | sed
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' >
config/kraft/broker11.properties
cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed
's/localhost:9092/localhost:9012/g' | sed
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' >
config/kraft/broker12.properties
# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
config/kraft/broker11.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
config/kraft/broker12.properties
bin/kafka-server-start.sh config/kraft/broker11.properties
bin/kafka-server-start.sh config/kraft/broker12.properties
# create a topic with a single partition replicated on two brokers
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2
--partitions 1 --replication-factor 2
# reassign partitions onto brokers with Node IDs 11 and 12
cat > /tmp/reassign.json <<DONE
{"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}],
"version":1}
DONE
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092
--reassignment-json-file /tmp/reassign.json --execute
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092
--reassignment-json-file /tmp/reassign.json --verify
# make preferred leader 11 the actual leader if it not
bin/kafka-leader-election.sh --bootstrap-server localhost:9092
--all-topic-partitions --election-type preferred
# Confirm both brokers are in ISR and 11 is the leader
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr:
12,11
# Emit some messages to the topic
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
1
2
3
4
5
^C
# confirm we see the messages
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2
--from-beginning
1
2
3
4
5
^C
# Again confirm both brokers are in ISR, leader is 11
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr:
12,11
# kill non-leader broker
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11
# kill leader broker
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11
# Note that bringing the non-leader broker 12 back up at this point has no
effect: it is offline with no leader, only node 11 is in the ISR, and the
partition cannot return until node 11 returns
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11
# erase and reformat leader broker’s disk, and then restart the leader with
that empty disk. (Note that follower broker remains untouched/unchanged if it
wasn’t started, or it is started and is waiting for 11 to come back)
/bin/rm -rf /tmp/kraft-broker-logs11
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
config/kraft/broker11.properties
bin/kafka-server-start.sh config/kraft/broker11.properties
# if node 12 was running it will emit log messages indicating truncation
# INFO [ReplicaFetcher replicaId=12, leaderId=11, fetcherId=0] Truncating
partition foo2-0 with TruncationState(offset=0, completed=true) due to leader
epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=3,
endOffset=0) (kafka.server.ReplicaFetcherThread)
# Leader broker is the leader again (the below output either will or won’t show
node 12 as being in the ISR depending on whether it had been running or not,
respectively)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11
# read from topic-partition: it is now empty
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2
--from-beginning
# produce a message to it, message will appear on console consumer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
1
^C
# restart follower broker if it had not already been restarted, and will emit a
log message indicating the log was truncated:
bin/kafka-server-start.sh config/kraft/broker12.properties
# WARN [UnifiedLog partition=foo2-0, dir=/tmp/kraft-broker-logs12]
Non-monotonic update of high watermark from (offset=5, segment=[0:165]) to
(offset=0, segment=[0:0]) (kafka.log.UnifiedLog)
# follower is back in the ISR
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr:
11,12
# can redo consumer to again show data is gone
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2
--from-beginning
--
This message was sent by Atlassian Jira
(v8.20.10#820010)