[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk
[ https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-15495: -- Description: Assume a topic-partition in KRaft has just a single leader replica in the ISR. Assume next that this replica goes offline. This replica's log will define the contents of that partition when the replica restarts, which is correct behavior. However, assume now that the replica has a disk failure, and we then replace the failed disk with a new, empty disk that we also format with the storage tool so it has the correct cluster ID. If we then restart the broker, the topic-partition will have no data in it, and any other replicas that might exist will truncate their logs to match, which results in data loss. See below for a step-by-step demo of how to reproduce this. [KIP-858: Handle JBOD broker disk failure in KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft] introduces the concept of a Disk UUID that we can use to solve this problem. Specifically, when the leader restarts with an empty (but correctly-formatted) disk, the actual UUID associated with the disk will be different. The controller will notice upon broker re-registration that its disk UUID differs from what was previously registered. Right now we have no way of detecting this situation, but the disk UUID gives us that capability. STEPS TO REPRODUCE: Create a single broker cluster with single controller. The standard files under config/kraft work well: bin/kafka-storage.sh random-uuid J8qXRwI-Qyi2G0guFTiuYw #ensure we start clean /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/controller.properties bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker.properties bin/kafka-server-start.sh config/kraft/controller.properties bin/kafka-server-start.sh config/kraft/broker.properties bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 --partitions 1 --replication-factor 1 #create __consumer-offsets topics bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 --from-beginning ^C #confirm that __consumer_offsets topic partitions are all created and on broker with node id 2 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe Now create 2 more brokers, with node IDs 11 and 12 cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 's/localhost:9092/localhost:9011/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > config/kraft/broker11.properties cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 's/localhost:9092/localhost:9012/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > config/kraft/broker12.properties #ensure we start clean /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12 bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker11.properties bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker12.properties bin/kafka-server-start.sh config/kraft/broker11.properties bin/kafka-server-start.sh config/kraft/broker12.properties #create a topic with a single partition replicated on two brokers bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 --partitions 1 --replication-factor 2 #reassign partitions onto brokers with Node IDs 11 and 12 echo '{"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], "version":1}' > /tmp/reassign.json bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/reassign.json --execute bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/reassign.json --verify #make preferred leader 11 the actual leader if it not bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --all-topic-partitions --election-type preferred #Confirm both brokers are in ISR and 11 is the leader bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1 ReplicationFactor: 2Configs: segment.bytes=1073741824 Topic: foo2 Partition: 0Leader: 11 Replicas: 11,12 Isr: 12,11 #Emit some messages to the topic bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2 1 2 3 4 5 ^C #confirm we see the messages bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 --from-beginning 1 2 3 4 5 ^C #Again confirm both brokers are in ISR, leader is 11 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2 Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCL
[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk
[ https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-15495: -- Description: Assume a topic-partition in KRaft has just a single leader replica in the ISR. Assume next that this replica goes offline. This replica's log will define the contents of that partition when the replica restarts, which is correct behavior. However, assume now that the replica has a disk failure, and we then replace the failed disk with a new, empty disk that we also format with the storage tool so it has the correct cluster ID. If we then restart the broker, the topic-partition will have no data in it, and any other replicas that might exist will truncate their logs to match, which results in data loss. See below for a step-by-step demo of how to reproduce this. [KIP-858: Handle JBOD broker disk failure in KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft] introduces the concept of a Disk UUID that we can use to solve this problem. Specifically, when the leader restarts with an empty (but correctly-formatted) disk, the actual UUID associated with the disk will be different. The controller will notice upon broker re-registration that its disk UUID differs from what was previously registered. Right now we have no way of detecting this situation, but the disk UUID gives us that capability. STEPS TO REPRODUCE: Create a single broker cluster with single controller. The standard files under config/kraft work well: bin/kafka-storage.sh random-uuid J8qXRwI-Qyi2G0guFTiuYw # ensure we start clean /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/controller.properties bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker.properties bin/kafka-server-start.sh config/kraft/controller.properties bin/kafka-server-start.sh config/kraft/broker.properties bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 --partitions 1 --replication-factor 1 # create __consumer-offsets topics bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 --from-beginning ^C # confirm that __consumer_offsets topic partitions are all created and on broker with node id 2 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe Now create 2 more brokers, with node IDs 11 and 12 cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 's/localhost:9092/localhost:9011/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > config/kraft/broker11.properties cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 's/localhost:9092/localhost:9012/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > config/kraft/broker12.properties # ensure we start clean /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12 bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker11.properties bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker12.properties bin/kafka-server-start.sh config/kraft/broker11.properties bin/kafka-server-start.sh config/kraft/broker12.properties # create a topic with a single partition replicated on two brokers bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 --partitions 1 --replication-factor 2 # reassign partitions onto brokers with Node IDs 11 and 12 cat > /tmp/reassign.json
[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk
[ https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-15495: -- Description: Assume a topic-partition in KRaft has just a single leader replica in the ISR. Assume next that this replica goes offline. This replica's log will define the contents of that partition when the replica restarts, which is correct behavior. However, assume now that the replica has a disk failure, and we then replace the failed disk with a new, empty disk that we also format with the storage tool so it has the correct cluster ID. If we then restart the broker, the topic-partition will have no data in it, and any other replicas that might exist will truncate their logs to match, which results in data loss. See below for a step-by-step demo of how to reproduce this. [KIP-858: Handle JBOD broker disk failure in KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft] introduces the concept of a Disk UUID that we can use to solve this problem. Specifically, when the leader restarts with an empty (but correctly-formatted) disk, the actual UUID associated with the disk will be different. The controller will notice upon broker re-registration that its disk UUID differs from what was previously registered. Right now we have no way of detecting this situation, but the disk UUID gives us that capability. STEPS TO REPRODUCE: Create a single broker cluster with single controller. The standard files under config/kraft work well: bin/kafka-storage.sh random-uuid J8qXRwI-Qyi2G0guFTiuYw # ensure we start clean /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/controller.properties bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker.properties bin/kafka-server-start.sh config/kraft/controller.properties bin/kafka-server-start.sh config/kraft/broker.properties bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 --partitions 1 --replication-factor 1 # create __consumer-offsets topics bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 --from-beginning ^C # confirm that __consumer_offsets topic partitions are all created and on broker with node id 2 bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe Now create 2 more brokers, with node IDs 3 and 4 cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 's/localhost:9092/localhost:9011/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > config/kraft/broker11.properties cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 's/localhost:9092/localhost:9012/g' | sed 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > config/kraft/broker12.properties # ensure we start clean /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12 bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker11.properties bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config config/kraft/broker12.properties bin/kafka-server-start.sh config/kraft/broker11.properties bin/kafka-server-start.sh config/kraft/broker12.properties # create a topic with a single partition replicated on two brokers bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 --partitions 1 --replication-factor 2 # reassign partitions onto brokers with Node IDs 11 and 12 cat > /tmp/reassign.json
[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk
[ https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino updated KAFKA-15495: -- Summary: KRaft partition truncated when the only ISR member restarts with an empty disk (was: KRaft partition truncated when the only ISR member restarts with and empty disk) > KRaft partition truncated when the only ISR member restarts with an empty disk > -- > > Key: KAFKA-15495 > URL: https://issues.apache.org/jira/browse/KAFKA-15495 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Ron Dagostino >Priority: Critical > > Assume a topic-partition in KRaft has just a single leader replica in the > ISR. Assume next that this replica goes offline. This replica's log will > define the contents of that partition when the replica restarts, which is > correct behavior. However, assume now that the replica has a disk failure, > and we then replace the failed disk with a new, empty disk that we also > format with the storage tool so it has the correct cluster ID. If we then > restart the broker, the topic-partition will have no data in it, and any > other replicas that might exist will truncate their logs to match. See below > for a step-by-step demo of how to reproduce this. > [KIP-858: Handle JBOD broker disk failure in > KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft] > introduces the concept of a Disk UUID that we can use to solve this problem. > Specifically, when the leader restarts with an empty (but > correctly-formatted) disk, the actual UUID associated with the disk will be > different. The controller will notice upon broker re-registration that its > disk UUID differs from what was previously registered. Right now we have no > way of detecting this situation, but the disk UUID gives us that capability. > STEPS TO REPRODUCE: > Create a single broker cluster with single controller. The standard files > under config/kraft work well: > bin/kafka-storage.sh random-uuid > J8qXRwI-Qyi2G0guFTiuYw > # ensure we start clean > /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/controller.properties > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/broker.properties > bin/kafka-server-start.sh config/kraft/controller.properties > bin/kafka-server-start.sh config/kraft/broker.properties > bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 > --partitions 1 --replication-factor 1 > # create __consumer-offsets topics > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 > --from-beginning > ^C > # confirm that __consumer_offsets topic partitions are all created and on > broker with node id 2 > bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe > Now create 2 more brokers, with node IDs 3 and 4 > cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed > 's/localhost:9092/localhost:9011/g' | sed > 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > > config/kraft/broker11.properties > cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed > 's/localhost:9092/localhost:9012/g' | sed > 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > > config/kraft/broker12.properties > # ensure we start clean > /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12 > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/broker11.properties > bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config > config/kraft/broker12.properties > bin/kafka-server-start.sh config/kraft/broker11.properties > bin/kafka-server-start.sh config/kraft/broker12.properties > # create a topic with a single partition replicated on two brokers > bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 > --partitions 1 --replication-factor 2 > # reassign partitions onto brokers with Node IDs 11 and 12 > cat > /tmp/reassign.json < {"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], > "version":1} > DONE > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 > --reassignment-json-file /tmp/reassign.json --execute > bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 > --reassignment-json-file /tmp/reassign.json --verify > # make preferred leader 11 the actual leader if it not > bin/kafka-leader-election.sh --bootstrap-server localhost:9092 > --all-topic-partitions --election-type preferred > # Confirm both brokers are in ISR and 11 is the leader > bin/kafka-topic