kevin-wu24 commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2504612769


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1020,12 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
-
+                        if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+                            // For bootstrap snapshots, extract feature levels 
from all data records
+                            if (batch.controlRecords().isEmpty()) {

Review Comment:
   Please fix this if condition. We should be checking `if 
(!messages.isEmpty())` on L1025. We should not be doing anything else besides 
setting `bootstrapMetadata` when loading a `0-0.checkpoint` that contains 
metadata records. In the current code, you are still iterating through the 
`messages` after setting `bootstrapMetadata`. This is why you need 
`forceBootstrapWrite`, but that boolean should be removed.



##########
metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java:
##########
@@ -155,10 +155,11 @@ static ControllerResult<Void> generate(
         Consumer<String> activationMessageConsumer,
         long transactionStartOffset,
         BootstrapMetadata bootstrapMetadata,
+        boolean forceBootstrapWrite,
         Optional<MetadataVersion> curMetadataVersion,
         int defaultMinInSyncReplicas
     ) {
-        if (curMetadataVersion.isEmpty()) {
+        if (forceBootstrapWrite || curMetadataVersion.isEmpty()) {

Review Comment:
   Please remove this boolean. See my other comments.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -73,6 +76,28 @@ public BootstrapMetadata read() throws Exception {
         }
     }
 
+    public BootstrapMetadata read() throws Exception {
+        Path path = Paths.get(directoryPath);
+        if (!Files.isDirectory(path)) {
+            if (Files.exists(path)) {
+                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
+                        "a directory.");
+            } else {
+                throw new RuntimeException("No such directory as " + 
directoryPath);
+            }
+        }
+        Path binaryBootstrapPath = Paths.get(directoryPath, 
String.format("%s-%d",
+            CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+            CLUSTER_METADATA_TOPIC_PARTITION.partition()),
+            BINARY_BOOTSTRAP_CHECKPOINT_FILENAME);
+        if (!Files.exists(binaryBootstrapPath)) {
+            return readFromConfiguration();
+        } else {
+            return readFromBinaryFile(binaryBootstrapPath.toString());
+        }

Review Comment:
   We can remove this and just use the old `read()` implementation.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,7 +1020,12 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
-
+                        if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
+                            // For bootstrap snapshots, extract feature levels 
from all data records
+                            if (batch.controlRecords().isEmpty()) {

Review Comment:
   ```suggestion
   // check if 0-0.checkpoint has data records
   if (!messages.isEmpty()) {
       bootstrapMetadata = BootstrapMetadata.fromRecords(messages, "bootstrap");
       return;
   }
   ```



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1139,10 +1142,14 @@ class CompleteActivationEvent implements 
ControllerWriteOperation<Void> {
         @Override
         public ControllerResult<Void> generateRecordsAndResult() {
             try {
+                boolean forceBootstrapWrite = 
featureControl.metadataVersion().isPresent() &&
+                    offsetControl.lastCommittedOffset() == 
Snapshots.BOOTSTRAP_SNAPSHOT_ID.offset() &&
+                    offsetControl.lastCommittedEpoch() == 
Snapshots.BOOTSTRAP_SNAPSHOT_ID.epoch();

Review Comment:
   Please remove `forceBootstrapWrite`. Instead, we should not be iterating 
through the data records for the `0-0.checkpoint` in `handleLoadSnapshot`. 



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -42,6 +43,8 @@
 public class BootstrapDirectory {

Review Comment:
   We should not need to materially change this file at all (at least for now), 
since this file only deals with the old `bootstrap.checkpoint` file that is 
deprecated going forward.
   
   It looks like it handles the `bootstrap.checkpoint` file not existing by 
defaulting to a bootstrap record set of just the latest MV. This means 
`bootstrapMetadata` is never `null`.
   
   Maybe we want to rename the file to `LegacyBootstrapDirectory`, mark this as 
deprecated, and log a message when we successfully read `bootstrap.checkpoint`.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -384,8 +384,6 @@ public Builder setUncleanLeaderElectionCheckIntervalMs(long 
uncleanLeaderElectio
         public QuorumController build() throws Exception {
             if (raftClient == null) {
                 throw new IllegalStateException("You must set a raft client.");
-            } else if (bootstrapMetadata == null) {
-                throw new IllegalStateException("You must specify an initial 
metadata.version using the kafka-storage tool.");

Review Comment:
   We can keep this assertion because `bootstrapMetadata` is never null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to