[ 
https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17768674#comment-17768674
 ] 

Ron Dagostino commented on KAFKA-15495:
---------------------------------------

Thanks, Ismael.  Yes, I read [the 
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas]
 and [Jack's blog post about 
it|https://jack-vanlightly.com/blog/2023/8/17/kafka-kip-966-fixing-the-last-replica-standing-issue|]
 last night after I posted this, and I have asked some folks who have been 
involved in that discussion if the broker epoch communicated in the broker 
registration request via the clean shutdown file might also serve to give the 
controller a signal that the disk is new.  I'll comment more here soon.

> KRaft partition truncated when the only ISR member restarts with an empty disk
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-15495
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15495
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>            Reporter: Ron Dagostino
>            Priority: Critical
>
> Assume a topic-partition in KRaft has just a single leader replica in the 
> ISR.  Assume next that this replica goes offline.  This replica's log will 
> define the contents of that partition when the replica restarts, which is 
> correct behavior.  However, assume now that the replica has a disk failure, 
> and we then replace the failed disk with a new, empty disk that we also 
> format with the storage tool so it has the correct cluster ID.  If we then 
> restart the broker, the topic-partition will have no data in it, and any 
> other replicas that might exist will truncate their logs to match, which 
> results in data loss.  See below for a step-by-step demo of how to reproduce 
> this.
> [KIP-858: Handle JBOD broker disk failure in 
> KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
>  introduces the concept of a Disk UUID that we can use to solve this problem. 
>  Specifically, when the leader restarts with an empty (but 
> correctly-formatted) disk, the actual UUID associated with the disk will be 
> different.  The controller will notice upon broker re-registration that its 
> disk UUID differs from what was previously registered.  Right now we have no 
> way of detecting this situation, but the disk UUID gives us that capability.
> STEPS TO REPRODUCE:
> Create a single broker cluster with single controller.  The standard files 
> under config/kraft work well:
> bin/kafka-storage.sh random-uuid
> J8qXRwI-Qyi2G0guFTiuYw
> #ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/controller.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker.properties
> bin/kafka-server-start.sh config/kraft/controller.properties
> bin/kafka-server-start.sh config/kraft/broker.properties
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 
> --partitions 1 --replication-factor 1
> #create __consumer-offsets topics
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 
> --from-beginning
> ^C
> #confirm that __consumer_offsets topic partitions are all created and on 
> broker with node id 2
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
> Now create 2 more brokers, with node IDs 11 and 12
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 
> 's/localhost:9092/localhost:9011/g' |  sed 
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > 
> config/kraft/broker11.properties
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 
> 's/localhost:9092/localhost:9012/g' |  sed 
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > 
> config/kraft/broker12.properties
> #ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker11.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker12.properties
> bin/kafka-server-start.sh config/kraft/broker11.properties
> bin/kafka-server-start.sh config/kraft/broker12.properties
> #create a topic with a single partition replicated on two brokers
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 
> --partitions 1 --replication-factor 2
> #reassign partitions onto brokers with Node IDs 11 and 12
> echo '{"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], 
> "version":1}' > /tmp/reassign.json
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file /tmp/reassign.json --execute
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file /tmp/reassign.json --verify
> #make preferred leader 11 the actual leader if it not
> bin/kafka-leader-election.sh --bootstrap-server localhost:9092 
> --all-topic-partitions --election-type preferred
> #Confirm both brokers are in ISR and 11 is the leader
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
> 12,11
> #Emit some messages to the topic
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
> 1
> 2
> 3
> 4
> 5
> ^C
> #confirm we see the messages
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
> --from-beginning
> 1
> 2
> 3
> 4
> 5
> ^C
> #Again confirm both brokers are in ISR, leader is 11
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
> 12,11
> #kill non-leader broker
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 11
> #kill leader broker
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: none    Replicas: 11,12 Isr: 11
> #Note that bringing the non-leader broker 12 back up at this point has no 
> effect: it is offline with no leader, only node 11 is in the ISR, and the 
> partition cannot return until node 11 returns
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: none    Replicas: 11,12 Isr: 11
> #erase and reformat leader broker’s disk, and then restart the leader with 
> that empty disk.  (Note that follower broker remains untouched/unchanged if 
> it wasn’t started, or it is started and is waiting for 11 to come back)
> /bin/rm -rf /tmp/kraft-broker-logs11
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker11.properties
> bin/kafka-server-start.sh config/kraft/broker11.properties
> #if node 12 was running it will emit log messages indicating truncation
> #INFO [ReplicaFetcher replicaId=12, leaderId=11, fetcherId=0] Truncating 
> partition foo2-0 with TruncationState(offset=0, completed=true) due to leader 
> epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=3, 
> endOffset=0) (kafka.server.ReplicaFetcherThread)
> #Leader broker is the leader again (the below output either will or won’t 
> show node 12 as being in the ISR depending on whether it had been running or 
> not, respectively)
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 11
> #read from topic-partition: it is now empty
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
> --from-beginning
> #produce a message to it, message will appear on console consumer
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
> 1
> ^C
> #restart follower broker if it had not already been restarted, and will emit 
> a log message indicating the log was truncated:
> bin/kafka-server-start.sh config/kraft/broker12.properties
> # WARN [UnifiedLog partition=foo2-0, dir=/tmp/kraft-broker-logs12] 
> Non-monotonic update of high watermark from (offset=5, segment=[0:165]) to 
> (offset=0, segment=[0:0]) (kafka.log.UnifiedLog)
> #follower is back in the ISR
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2   TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
> ReplicationFactor: 2    Configs: segment.bytes=1073741824
>       Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
> 11,12
> #can redo consumer to again show data is gone
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
> --from-beginning



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to