[ 
https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15495:
----------------------------------
    Affects Version/s: 3.6.1
                       3.5.2
                       3.5.0
                       3.3.1
                       3.2.3
                       3.2.2
                       3.4.0

> Partition truncated when the only ISR member restarts with an empty disk
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-15495
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15495
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.5.0, 3.4.1, 3.6.0, 
> 3.5.1, 3.5.2, 3.6.1
>            Reporter: Ron Dagostino
>            Priority: Critical
>
> Assume a topic-partition in KRaft has just a single leader replica in the 
> ISR.  Assume next that this replica goes offline.  This replica's log will 
> define the contents of that partition when the replica restarts, which is 
> correct behavior.  However, assume now that the replica has a disk failure, 
> and we then replace the failed disk with a new, empty disk that we also 
> format with the storage tool so it has the correct cluster ID.  If we then 
> restart the broker, the topic-partition will have no data in it, and any 
> other replicas that might exist will truncate their logs to match, which 
> results in data loss.  See below for a step-by-step demo of how to reproduce 
> this.
> [KIP-858: Handle JBOD broker disk failure in 
> KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
>  introduces the concept of a Disk UUID that we can use to solve this problem. 
>  Specifically, when the leader restarts with an empty (but 
> correctly-formatted) disk, the actual UUID associated with the disk will be 
> different.  The controller will notice upon broker re-registration that its 
> disk UUID differs from what was previously registered.  Right now we have no 
> way of detecting this situation, but the disk UUID gives us that capability.
> STEPS TO REPRODUCE:
> Create a single broker cluster with single controller.  The standard files 
> under config/kraft work well:
> bin/kafka-storage.sh random-uuid
> J8qXRwI-Qyi2G0guFTiuYw
> #ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/controller.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker.properties
> bin/kafka-server-start.sh config/kraft/controller.properties
> bin/kafka-server-start.sh config/kraft/broker.properties
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 
> --partitions 1 --replication-factor 1
> #create __consumer-offsets topics
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 
> --from-beginning
> ^C
> #confirm that __consumer_offsets topic partitions are all created and on 
> broker with node id 2
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
> Now create 2 more brokers, with node IDs 11 and 12
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 
> 's/localhost:9092/localhost:9011/g' |  sed 
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > 
> config/kraft/broker11.properties
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 
> 's/localhost:9092/localhost:9012/g' |  sed 
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > 
> config/kraft/broker12.properties
> #ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker11.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker12.properties
> bin/kafka-server-start.sh config/kraft/broker11.properties
> bin/kafka-server-start.sh config/kraft/broker12.properties
> #create a topic with a single partition replicated on two brokers
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 
> --partitions 1 --replication-factor 2
> #reassign partitions onto brokers with Node IDs 11 and 12
> echo '{"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], 
> "version":1}' > /tmp/reassign.json
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file /tmp/reassign.json --execute
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file /tmp/reassign.json --verify
> #make preferred leader 11 the actual leader if it not
> bin/kafka-leader-election.sh --bootstrap-server localhost:9092 
> --all-topic-partitions --election-type preferred
> #Confirm both brokers are in ISR and 11 is the leader
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
> 12,11
> #Emit some messages to the topic
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
> 1
> 2
> 3
> 4
> 5
> ^C
> #confirm we see the messages
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
> --from-beginning
> 1
> 2
> 3
> 4
> 5
> ^C
> #Again confirm both brokers are in ISR, leader is 11
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
> 12,11
> #kill non-leader broker
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 11
> #kill leader broker
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: none    Replicas: 11,12 Isr: 11
> #Note that bringing the non-leader broker 12 back up at this point has no 
> effect: it is offline with no leader, only node 11 is in the ISR, and the 
> partition cannot return until node 11 returns
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: none    Replicas: 11,12 Isr: 11
> #erase and reformat leader broker’s disk, and then restart the leader with 
> that empty disk.  (Note that follower broker remains untouched/unchanged if 
> it wasn’t started, or it is started and is waiting for 11 to come back)
> /bin/rm -rf /tmp/kraft-broker-logs11
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker11.properties
> bin/kafka-server-start.sh config/kraft/broker11.properties
> #if node 12 was running it will emit log messages indicating truncation
> #INFO [ReplicaFetcher replicaId=12, leaderId=11, fetcherId=0] Truncating 
> partition foo2-0 with TruncationState(offset=0, completed=true) due to leader 
> epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=3, 
> endOffset=0) (kafka.server.ReplicaFetcherThread)
> #Leader broker is the leader again (the below output either will or won’t 
> show node 12 as being in the ISR depending on whether it had been running or 
> not, respectively)
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 11
> #read from topic-partition: it is now empty
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
> --from-beginning
> #produce a message to it, message will appear on console consumer
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
> 1
> ^C
> #restart follower broker if it had not already been restarted, and will emit 
> a log message indicating the log was truncated:
> bin/kafka-server-start.sh config/kraft/broker12.properties
> # WARN [UnifiedLog partition=foo2-0, dir=/tmp/kraft-broker-logs12] 
> Non-monotonic update of high watermark from (offset=5, segment=[0:165]) to 
> (offset=0, segment=[0:0]) (kafka.log.UnifiedLog)
> #follower is back in the ISR
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
> 11,12
> #can redo consumer to again show data is gone
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
> --from-beginning



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to