[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk

2023-09-24 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15495:
--
Description: 
Assume a topic-partition in KRaft has just a single leader replica in the ISR.  
Assume next that this replica goes offline.  This replica's log will define the 
contents of that partition when the replica restarts, which is correct 
behavior.  However, assume now that the replica has a disk failure, and we then 
replace the failed disk with a new, empty disk that we also format with the 
storage tool so it has the correct cluster ID.  If we then restart the broker, 
the topic-partition will have no data in it, and any other replicas that might 
exist will truncate their logs to match, which results in data loss.  See below 
for a step-by-step demo of how to reproduce this.

[KIP-858: Handle JBOD broker disk failure in 
KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
 introduces the concept of a Disk UUID that we can use to solve this problem.  
Specifically, when the leader restarts with an empty (but correctly-formatted) 
disk, the actual UUID associated with the disk will be different.  The 
controller will notice upon broker re-registration that its disk UUID differs 
from what was previously registered.  Right now we have no way of detecting 
this situation, but the disk UUID gives us that capability.


STEPS TO REPRODUCE:

Create a single broker cluster with single controller.  The standard files 
under config/kraft work well:

bin/kafka-storage.sh random-uuid
J8qXRwI-Qyi2G0guFTiuYw

#ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/controller.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker.properties

bin/kafka-server-start.sh config/kraft/controller.properties
bin/kafka-server-start.sh config/kraft/broker.properties

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 
--partitions 1 --replication-factor 1

#create __consumer-offsets topics
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 
--from-beginning
^C

#confirm that __consumer_offsets topic partitions are all created and on broker 
with node id 2
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Now create 2 more brokers, with node IDs 11 and 12

cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 
's/localhost:9092/localhost:9011/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > 
config/kraft/broker11.properties
cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 
's/localhost:9092/localhost:9012/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > 
config/kraft/broker12.properties

#ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker11.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker12.properties

bin/kafka-server-start.sh config/kraft/broker11.properties
bin/kafka-server-start.sh config/kraft/broker12.properties

#create a topic with a single partition replicated on two brokers
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 
--partitions 1 --replication-factor 2

#reassign partitions onto brokers with Node IDs 11 and 12
echo '{"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], 
"version":1}' > /tmp/reassign.json

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file /tmp/reassign.json --execute
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file /tmp/reassign.json --verify

#make preferred leader 11 the actual leader if it not
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 
--all-topic-partitions --election-type preferred

#Confirm both brokers are in ISR and 11 is the leader
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1   
ReplicationFactor: 2Configs: segment.bytes=1073741824
Topic: foo2 Partition: 0Leader: 11  Replicas: 11,12 Isr: 
12,11


#Emit some messages to the topic
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
1
2
3
4
5
^C

#confirm we see the messages
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
--from-beginning
1
2
3
4
5
^C

#Again confirm both brokers are in ISR, leader is 11
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCL

[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk

2023-09-24 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15495:
--
Description: 
Assume a topic-partition in KRaft has just a single leader replica in the ISR.  
Assume next that this replica goes offline.  This replica's log will define the 
contents of that partition when the replica restarts, which is correct 
behavior.  However, assume now that the replica has a disk failure, and we then 
replace the failed disk with a new, empty disk that we also format with the 
storage tool so it has the correct cluster ID.  If we then restart the broker, 
the topic-partition will have no data in it, and any other replicas that might 
exist will truncate their logs to match, which results in data loss.  See below 
for a step-by-step demo of how to reproduce this.

[KIP-858: Handle JBOD broker disk failure in 
KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
 introduces the concept of a Disk UUID that we can use to solve this problem.  
Specifically, when the leader restarts with an empty (but correctly-formatted) 
disk, the actual UUID associated with the disk will be different.  The 
controller will notice upon broker re-registration that its disk UUID differs 
from what was previously registered.  Right now we have no way of detecting 
this situation, but the disk UUID gives us that capability.


STEPS TO REPRODUCE:

Create a single broker cluster with single controller.  The standard files 
under config/kraft work well:

bin/kafka-storage.sh random-uuid
J8qXRwI-Qyi2G0guFTiuYw

# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/controller.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker.properties

bin/kafka-server-start.sh config/kraft/controller.properties
bin/kafka-server-start.sh config/kraft/broker.properties

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 
--partitions 1 --replication-factor 1

# create __consumer-offsets topics
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 
--from-beginning
^C

# confirm that __consumer_offsets topic partitions are all created and on 
broker with node id 2
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Now create 2 more brokers, with node IDs 11 and 12

cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 
's/localhost:9092/localhost:9011/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > 
config/kraft/broker11.properties
cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 
's/localhost:9092/localhost:9012/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > 
config/kraft/broker12.properties

# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker11.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker12.properties

bin/kafka-server-start.sh config/kraft/broker11.properties
bin/kafka-server-start.sh config/kraft/broker12.properties

# create a topic with a single partition replicated on two brokers
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 
--partitions 1 --replication-factor 2

# reassign partitions onto brokers with Node IDs 11 and 12
cat > /tmp/reassign.json 

[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk

2023-09-24 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15495:
--
Description: 
Assume a topic-partition in KRaft has just a single leader replica in the ISR.  
Assume next that this replica goes offline.  This replica's log will define the 
contents of that partition when the replica restarts, which is correct 
behavior.  However, assume now that the replica has a disk failure, and we then 
replace the failed disk with a new, empty disk that we also format with the 
storage tool so it has the correct cluster ID.  If we then restart the broker, 
the topic-partition will have no data in it, and any other replicas that might 
exist will truncate their logs to match, which results in data loss.  See below 
for a step-by-step demo of how to reproduce this.

[KIP-858: Handle JBOD broker disk failure in 
KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
 introduces the concept of a Disk UUID that we can use to solve this problem.  
Specifically, when the leader restarts with an empty (but correctly-formatted) 
disk, the actual UUID associated with the disk will be different.  The 
controller will notice upon broker re-registration that its disk UUID differs 
from what was previously registered.  Right now we have no way of detecting 
this situation, but the disk UUID gives us that capability.


STEPS TO REPRODUCE:

Create a single broker cluster with single controller.  The standard files 
under config/kraft work well:

bin/kafka-storage.sh random-uuid
J8qXRwI-Qyi2G0guFTiuYw

# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/controller.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker.properties

bin/kafka-server-start.sh config/kraft/controller.properties
bin/kafka-server-start.sh config/kraft/broker.properties

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 
--partitions 1 --replication-factor 1

# create __consumer-offsets topics
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 
--from-beginning
^C

# confirm that __consumer_offsets topic partitions are all created and on 
broker with node id 2
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Now create 2 more brokers, with node IDs 3 and 4

cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 
's/localhost:9092/localhost:9011/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > 
config/kraft/broker11.properties
cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 
's/localhost:9092/localhost:9012/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > 
config/kraft/broker12.properties

# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker11.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker12.properties

bin/kafka-server-start.sh config/kraft/broker11.properties
bin/kafka-server-start.sh config/kraft/broker12.properties

# create a topic with a single partition replicated on two brokers
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 
--partitions 1 --replication-factor 2

# reassign partitions onto brokers with Node IDs 11 and 12
cat > /tmp/reassign.json 

[jira] [Updated] (KAFKA-15495) KRaft partition truncated when the only ISR member restarts with an empty disk

2023-09-24 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-15495:
--
Summary: KRaft partition truncated when the only ISR member restarts with 
an empty disk  (was: KRaft partition truncated when the only ISR member 
restarts with and empty disk)

> KRaft partition truncated when the only ISR member restarts with an empty disk
> --
>
> Key: KAFKA-15495
> URL: https://issues.apache.org/jira/browse/KAFKA-15495
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
>Reporter: Ron Dagostino
>Priority: Critical
>
> Assume a topic-partition in KRaft has just a single leader replica in the 
> ISR.  Assume next that this replica goes offline.  This replica's log will 
> define the contents of that partition when the replica restarts, which is 
> correct behavior.  However, assume now that the replica has a disk failure, 
> and we then replace the failed disk with a new, empty disk that we also 
> format with the storage tool so it has the correct cluster ID.  If we then 
> restart the broker, the topic-partition will have no data in it, and any 
> other replicas that might exist will truncate their logs to match.  See below 
> for a step-by-step demo of how to reproduce this.
> [KIP-858: Handle JBOD broker disk failure in 
> KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
>  introduces the concept of a Disk UUID that we can use to solve this problem. 
>  Specifically, when the leader restarts with an empty (but 
> correctly-formatted) disk, the actual UUID associated with the disk will be 
> different.  The controller will notice upon broker re-registration that its 
> disk UUID differs from what was previously registered.  Right now we have no 
> way of detecting this situation, but the disk UUID gives us that capability.
> STEPS TO REPRODUCE:
> Create a single broker cluster with single controller.  The standard files 
> under config/kraft work well:
> bin/kafka-storage.sh random-uuid
> J8qXRwI-Qyi2G0guFTiuYw
> # ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/controller.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker.properties
> bin/kafka-server-start.sh config/kraft/controller.properties
> bin/kafka-server-start.sh config/kraft/broker.properties
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 
> --partitions 1 --replication-factor 1
> # create __consumer-offsets topics
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 
> --from-beginning
> ^C
> # confirm that __consumer_offsets topic partitions are all created and on 
> broker with node id 2
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
> Now create 2 more brokers, with node IDs 3 and 4
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 
> 's/localhost:9092/localhost:9011/g' |  sed 
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > 
> config/kraft/broker11.properties
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 
> 's/localhost:9092/localhost:9012/g' |  sed 
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > 
> config/kraft/broker12.properties
> # ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker11.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
> config/kraft/broker12.properties
> bin/kafka-server-start.sh config/kraft/broker11.properties
> bin/kafka-server-start.sh config/kraft/broker12.properties
> # create a topic with a single partition replicated on two brokers
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 
> --partitions 1 --replication-factor 2
> # reassign partitions onto brokers with Node IDs 11 and 12
> cat > /tmp/reassign.json < {"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], 
> "version":1}
> DONE
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file /tmp/reassign.json --execute
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
> --reassignment-json-file /tmp/reassign.json --verify
> # make preferred leader 11 the actual leader if it not
> bin/kafka-leader-election.sh --bootstrap-server localhost:9092 
> --all-topic-partitions --election-type preferred
> # Confirm both brokers are in ISR and 11 is the leader
> bin/kafka-topic