[ https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ron Dagostino updated KAFKA-15495: ---------------------------------- Affects Version/s: 3.6.1 3.5.2 3.5.0 3.3.1 3.2.3 3.2.2 3.4.0 > Partition truncated when the only ISR member restarts with an empty disk > ------------------------------------------------------------------------ > > Key: KAFKA-15495 > URL: https://issues.apache.org/jira/browse/KAFKA-15495 > Project: Kafka > Issue Type: Bug > Affects Versions: 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.6.0, > 3.5.1, 3.5.2, 3.6.1 > Reporter: Ron Dagostino > Priority: Critical > > Assume a topic-partition in KRaft has just a single leader replica in the > ISR. Assume next that this replica goes offline. This replica's log will > define the contents of that partition when the replica restarts, which is > correct behavior. However, assume now that the replica has a disk failure, > and we then replace the failed disk with a new, empty disk that we also > format with the storage tool so it has the correct cluster ID. If we then > restart the broker, the topic-partition will have no data in it, and any > other replicas that might exist will truncate their logs to match, which > results in data loss. See below for a step-by-step demo of how to reproduce > this. > [KIP-858: Handle JBOD broker disk failure in > KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft] > introduces the concept of a Disk UUID that we can use to solve this problem. > Specifically, when the leader restarts with an empty (but > correctly-formatted) disk, the actual UUID associated with the disk will be > different. The controller will notice upon broker re-registration that its > disk UUID differs from what was previously registered. Right now we have no > way of detecting this situation, but the disk UUID gives us that capability. > STEPS TO REPRODUCE: > Create a single broker cluster with single controller. The standard files > under config/kraft work well: > bin/kafka-storage.sh random-uuid > J8qXRwI-Qyi2G0guFTiuYw > #ensure we start clean > /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/controller.properties > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/broker.properties > bin/kafka-server-start.sh config/kraft/controller.properties > bin/kafka-server-start.sh config/kraft/broker.properties > bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 > --partitions 1 --replication-factor 1 > #create __consumer-offsets topics > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 > --from-beginning > ^C > #confirm that __consumer_offsets topic partitions are all created and on > broker with node id 2 > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe > Now create 2 more brokers, with node IDs 11 and 12 > cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed > 's/localhost:9092/localhost:9011/g' | sed > 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > > config/kraft/broker11.properties > cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed > 's/localhost:9092/localhost:9012/g' | sed > 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > > config/kraft/broker12.properties > #ensure we start clean > /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12 > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/broker11.properties > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/broker12.properties > bin/kafka-server-start.sh config/kraft/broker11.properties > bin/kafka-server-start.sh config/kraft/broker12.properties > #create a topic with a single partition replicated on two brokers > bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 > --partitions 1 --replication-factor 2 > #reassign partitions onto brokers with Node IDs 11 and 12 > echo '{"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], > "version":1}' > /tmp/reassign.json > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 > --reassignment-json-file /tmp/reassign.json --execute > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 > --reassignment-json-file /tmp/reassign.json --verify > #make preferred leader 11 the actual leader if it not > bin/kafka-leader-election.sh --bootstrap-server localhost:9092 > --all-topic-partitions --election-type preferred > #Confirm both brokers are in ISR and 11 is the leader > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 > Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: > 12,11 > #Emit some messages to the topic > bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2 > 1 > 2 > 3 > 4 > 5 > ^C > #confirm we see the messages > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 > --from-beginning > 1 > 2 > 3 > 4 > 5 > ^C > #Again confirm both brokers are in ISR, leader is 11 > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 > Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: > 12,11 > #kill non-leader broker > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 > Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11 > #kill leader broker > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 > Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11 > #Note that bringing the non-leader broker 12 back up at this point has no > effect: it is offline with no leader, only node 11 is in the ISR, and the > partition cannot return until node 11 returns > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 > Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11 > #erase and reformat leader broker’s disk, and then restart the leader with > that empty disk. (Note that follower broker remains untouched/unchanged if > it wasn’t started, or it is started and is waiting for 11 to come back) > /bin/rm -rf /tmp/kraft-broker-logs11 > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/broker11.properties > bin/kafka-server-start.sh config/kraft/broker11.properties > #if node 12 was running it will emit log messages indicating truncation > #INFO [ReplicaFetcher replicaId=12, leaderId=11, fetcherId=0] Truncating > partition foo2-0 with TruncationState(offset=0, completed=true) due to leader > epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=3, > endOffset=0) (kafka.server.ReplicaFetcherThread) > #Leader broker is the leader again (the below output either will or won’t > show node 12 as being in the ISR depending on whether it had been running or > not, respectively) > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 > Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11 > #read from topic-partition: it is now empty > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 > --from-beginning > #produce a message to it, message will appear on console consumer > bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2 > 1 > ^C > #restart follower broker if it had not already been restarted, and will emit > a log message indicating the log was truncated: > bin/kafka-server-start.sh config/kraft/broker12.properties > # WARN [UnifiedLog partition=foo2-0, dir=/tmp/kraft-broker-logs12] > Non-monotonic update of high watermark from (offset=5, segment=[0:165]) to > (offset=0, segment=[0:0]) (kafka.log.UnifiedLog) > #follower is back in the ISR > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 > Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 > ReplicationFactor: 2 Configs: segment.bytes=1073741824 > Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: > 11,12 > #can redo consumer to again show data is gone > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 > --from-beginning -- This message was sent by Atlassian Jira (v8.20.10#820010)