jsancio commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2669295310


##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -19,65 +19,33 @@
 
 import org.apache.kafka.metadata.util.BatchFileReader;
 import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
-import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 
 /**
- * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.checkpoint" is used and the
- * format is the same as a KRaft snapshot.
+ * Abstraction for reading controller bootstrap metadata from disk.
  */
-public class BootstrapDirectory {
-    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
-
-    private final String directoryPath;
+public interface BootstrapDirectory {
 
     /**
-     * Create a new BootstrapDirectory object.
+     * Read the bootstrap metadata from the configured location.
      *
-     * @param directoryPath     The path to the directory with the bootstrap 
file.
+     * @return the loaded {@link BootstrapMetadata}
+     * @throws Exception if the metadata cannot be read
      */
-    public BootstrapDirectory(
-        String directoryPath
-    ) {
-        this.directoryPath = Objects.requireNonNull(directoryPath);
-    }
-
-    public BootstrapMetadata read() throws Exception {
-        Path path = Paths.get(directoryPath);
-        if (!Files.isDirectory(path)) {
-            if (Files.exists(path)) {
-                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
-                        "a directory.");
-            } else {
-                throw new RuntimeException("No such directory as " + 
directoryPath);
-            }
-        }
-        Path binaryBootstrapPath = Paths.get(directoryPath, 
BINARY_BOOTSTRAP_FILENAME);
-        if (!Files.exists(binaryBootstrapPath)) {
-            return readFromConfiguration();
-        } else {
-            return readFromBinaryFile(binaryBootstrapPath.toString());
-        }
-    }
-
-    BootstrapMetadata readFromConfiguration() {
-        return 
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default 
bootstrap");
-    }
+    BootstrapMetadata read() throws Exception;

Review Comment:
   The type Exception is too generic. What are the actual exception that it can 
return? Having said that it is better to throw unchecked exceptions if the 
caller cannot handle the exceptions.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata 
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {

Review Comment:
   Is this is used only for test then the implementation should live in the 
test configuration.
   
   Having said the name of type should match the implementation not the context 
in which it is used. Test is the context but the implementation seem be based 
on the filename 00..-00...checkpoint.
   
   Does it make send to have different implementation when the only difference 
is the file name? Seem like avoidable code duplication.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -123,6 +123,7 @@ object StorageTool extends Logging {
     val formatter = new Formatter().
       setPrintStream(printStream).
       setNodeId(config.nodeId).
+      
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).

Review Comment:
   We need to mention this in the KIP. I don't think the KIP mentions this 
behavior.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1436,7 +1446,7 @@ private void replay(ApiMessage message, 
Optional<OffsetAndEpoch> snapshotId, lon
     /**
      * The bootstrap metadata to use for initialization if needed.
      */
-    private final BootstrapMetadata bootstrapMetadata;
+    private BootstrapMetadata bootstrapMetadata;

Review Comment:
   Don't you have multiple writes and readers for this field? For example, the 
writes are the thread that constructs the QuorumController and the 
KafkaEventQueue thread.



##########
raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java:
##########
@@ -144,33 +145,35 @@ public synchronized void 
handleLoadSnapshot(SnapshotReader<Integer> reader) {
             // Since the state machine is only one value, expect only one data 
record
             boolean foundDataRecord = false;
             while (reader.hasNext()) {
-                Batch<Integer> batch = reader.next();
-                if (!batch.records().isEmpty()) {
-                    if (foundDataRecord) {
-                        throw new AssertionError(
-                            String.format(
-                                "Expected the snapshot at %s to only one data 
batch %s",
-                                reader.snapshotId(),
-                                batch
-                            )
-                        );
-                    } else if (batch.records().size() != 1) {
-                        throw new AssertionError(
-                            String.format(
-                                "Expected the snapshot at %s to only contain 
one record %s",
-                                reader.snapshotId(),
-                                batch.records()
-                            )
-                        );
+                if 
(!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {

Review Comment:
   Same comment here. To me the issue seems to be that the kafka raft client 
can not return committed and uncommitted snapshots and they need to be handle 
differently.



##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -492,27 +497,36 @@ static DirectoryType calculate(
         }
     }
 
-    static void writeDynamicQuorumSnapshot(
+    public static void writeBoostrapSnapshot(
         String writeLogDir,
-        DynamicVoters initialControllers,
+        BootstrapMetadata bootstrapMetadata,
+        Optional<DynamicVoters> initialControllers,
         short kraftVersion,
         String controllerListenerName
     ) {
-        File parentDir = new File(writeLogDir);
-        File clusterMetadataDirectory = new File(parentDir, 
String.format("%s-%d",
-                CLUSTER_METADATA_TOPIC_PARTITION.topic(),
-                CLUSTER_METADATA_TOPIC_PARTITION.partition()));
-        VoterSet voterSet = 
initialControllers.toVoterSet(controllerListenerName);
-        RecordsSnapshotWriter.Builder builder = new 
RecordsSnapshotWriter.Builder().
-            setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
-            setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
-            setRawSnapshotWriter(FileRawSnapshotWriter.create(
-                clusterMetadataDirectory.toPath(),
-                Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
-            setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
-            setVoterSet(Optional.of(voterSet));
-        try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = 
builder.build(new MetadataRecordSerde())) {
-            writer.freeze();
+        try {
+            File parentDir = new File(writeLogDir);
+            File clusterMetadataDirectory = new File(parentDir, 
String.format("%s-%d",
+                    CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+                    CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+            RecordsSnapshotWriter.Builder builder = new 
RecordsSnapshotWriter.Builder().
+                setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
+                setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
+                setRawSnapshotWriter(FileRawSnapshotWriter.create(
+                    clusterMetadataDirectory.toPath(),
+                    Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
+                setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion));
+            if (initialControllers.isPresent()) {
+                VoterSet voterSet = 
initialControllers.get().toVoterSet(controllerListenerName);
+                builder.setVoterSet(Optional.of(voterSet));
+            }
+            try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = 
builder.build(new MetadataRecordSerde())) {
+                writer.append(bootstrapMetadata.records());
+                writer.freeze();
+            }
+        } catch (UncheckedIOException e) {

Review Comment:
   Why are you caching this exception specifically? The code above throws 
others exceptions.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,24 +1023,33 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
+                        // KIP-1170: The 0-0.checkpoint can contain metadata 
records. If it does, they should be considered the bootstrap metadata for the 
cluster.
+                        if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) && 
!messages.isEmpty()) {

Review Comment:
   I see. Think the issue that you are running into is that now 
handleLoadSnapshot may return committed snapshots. Snapshot that were 
replicated to the majority of the replicas and uncommitted snapshots (the 
000..-00...checkpoint) that was not replicated to the majority of the snapshot 
and needs to be replicated by the active leader.
   
   I am thinking that instead of the controller deducing this implicitly based 
on the snapshot id, it can be a property for the snapshot reader.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -19,65 +19,33 @@
 
 import org.apache.kafka.metadata.util.BatchFileReader;
 import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
-import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 
 /**
- * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.checkpoint" is used and the
- * format is the same as a KRaft snapshot.
+ * Abstraction for reading controller bootstrap metadata from disk.
  */
-public class BootstrapDirectory {
-    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
-
-    private final String directoryPath;
+public interface BootstrapDirectory {
 
     /**
-     * Create a new BootstrapDirectory object.
+     * Read the bootstrap metadata from the configured location.
      *
-     * @param directoryPath     The path to the directory with the bootstrap 
file.
+     * @return the loaded {@link BootstrapMetadata}
+     * @throws Exception if the metadata cannot be read
      */
-    public BootstrapDirectory(
-        String directoryPath
-    ) {
-        this.directoryPath = Objects.requireNonNull(directoryPath);
-    }
-
-    public BootstrapMetadata read() throws Exception {
-        Path path = Paths.get(directoryPath);
-        if (!Files.isDirectory(path)) {
-            if (Files.exists(path)) {
-                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
-                        "a directory.");
-            } else {
-                throw new RuntimeException("No such directory as " + 
directoryPath);
-            }
-        }
-        Path binaryBootstrapPath = Paths.get(directoryPath, 
BINARY_BOOTSTRAP_FILENAME);
-        if (!Files.exists(binaryBootstrapPath)) {
-            return readFromConfiguration();
-        } else {
-            return readFromBinaryFile(binaryBootstrapPath.toString());
-        }
-    }
-
-    BootstrapMetadata readFromConfiguration() {
-        return 
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default 
bootstrap");
-    }
+    BootstrapMetadata read() throws Exception;
 
-    BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
+    /**
+     * Read bootstrap metadata from the given binary file path.
+     *
+     * @param binaryPath the path to the binary bootstrap file
+     * @return the loaded {@link BootstrapMetadata}
+     * @throws Exception if the metadata cannot be read
+     */
+    default BootstrapMetadata readFromBinaryFile(String binaryPath) throws 
Exception {

Review Comment:
   This is a confusing interface API. Why do you have 
BootstrapDirectory#readFromBinaryFile when the implementation is unrelated to 
the rest of the interface specifically BootstrapDirectory#read()?



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/LegacyBootstrapDirectory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+
+/**
+ * Reads bootstrap metadata from the legacy {@code bootstrap.checkpoint} file.
+ */
+public class LegacyBootstrapDirectory implements BootstrapDirectory {
+    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
+
+    private final String directoryPath;
+
+    public LegacyBootstrapDirectory(String directoryPath) {
+        this.directoryPath = Objects.requireNonNull(directoryPath);
+    }
+
+    @Override
+    public BootstrapMetadata read() throws Exception {
+        Path path = Paths.get(directoryPath);
+        if (!Files.isDirectory(path)) {
+            if (Files.exists(path)) {
+                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
+                        "a directory.");
+            } else {
+                throw new RuntimeException("No such directory as " + 
directoryPath);

Review Comment:
   Let's avoid using the generic RuntimeException. This seem to be illegal 
state exceptions.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/LegacyBootstrapDirectory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+
+/**
+ * Reads bootstrap metadata from the legacy {@code bootstrap.checkpoint} file.
+ */
+public class LegacyBootstrapDirectory implements BootstrapDirectory {
+    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
+
+    private final String directoryPath;
+
+    public LegacyBootstrapDirectory(String directoryPath) {
+        this.directoryPath = Objects.requireNonNull(directoryPath);
+    }
+
+    @Override
+    public BootstrapMetadata read() throws Exception {
+        Path path = Paths.get(directoryPath);
+        if (!Files.isDirectory(path)) {
+            if (Files.exists(path)) {
+                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
+                        "a directory.");
+            } else {
+                throw new RuntimeException("No such directory as " + 
directoryPath);
+            }
+        }
+        Path binaryBootstrapPath = Paths.get(directoryPath, 
BINARY_BOOTSTRAP_FILENAME);
+        if (!Files.exists(binaryBootstrapPath)) {
+            return readFromConfiguration();
+        } else {
+            return readFromBinaryFile(binaryBootstrapPath.toString());
+        }
+    }
+
+    BootstrapMetadata readFromConfiguration() {

Review Comment:
   Make it private visibility.



##########
metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:
##########
@@ -492,27 +497,36 @@ static DirectoryType calculate(
         }
     }
 
-    static void writeDynamicQuorumSnapshot(
+    public static void writeBoostrapSnapshot(
         String writeLogDir,
-        DynamicVoters initialControllers,
+        BootstrapMetadata bootstrapMetadata,
+        Optional<DynamicVoters> initialControllers,
         short kraftVersion,
         String controllerListenerName
     ) {
-        File parentDir = new File(writeLogDir);
-        File clusterMetadataDirectory = new File(parentDir, 
String.format("%s-%d",
-                CLUSTER_METADATA_TOPIC_PARTITION.topic(),
-                CLUSTER_METADATA_TOPIC_PARTITION.partition()));
-        VoterSet voterSet = 
initialControllers.toVoterSet(controllerListenerName);
-        RecordsSnapshotWriter.Builder builder = new 
RecordsSnapshotWriter.Builder().
-            setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
-            setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
-            setRawSnapshotWriter(FileRawSnapshotWriter.create(
-                clusterMetadataDirectory.toPath(),
-                Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
-            setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
-            setVoterSet(Optional.of(voterSet));
-        try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = 
builder.build(new MetadataRecordSerde())) {
-            writer.freeze();
+        try {
+            File parentDir = new File(writeLogDir);
+            File clusterMetadataDirectory = new File(parentDir, 
String.format("%s-%d",
+                    CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+                    CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+            RecordsSnapshotWriter.Builder builder = new 
RecordsSnapshotWriter.Builder().
+                setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
+                setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
+                setRawSnapshotWriter(FileRawSnapshotWriter.create(
+                    clusterMetadataDirectory.toPath(),
+                    Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
+                setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion));
+            if (initialControllers.isPresent()) {
+                VoterSet voterSet = 
initialControllers.get().toVoterSet(controllerListenerName);
+                builder.setVoterSet(Optional.of(voterSet));
+            }
+            try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = 
builder.build(new MetadataRecordSerde())) {
+                writer.append(bootstrapMetadata.records());
+                writer.freeze();
+            }
+        } catch (UncheckedIOException e) {
+            throw new FormatterException("Error while writing 
00000000000000000000-0000000000.checkpoint file " +

Review Comment:
   Let's print the entire path that failed not just this hardcode file name.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/LegacyBootstrapDirectory.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+
+/**
+ * Reads bootstrap metadata from the legacy {@code bootstrap.checkpoint} file.
+ */
+public class LegacyBootstrapDirectory implements BootstrapDirectory {
+    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
+
+    private final String directoryPath;
+
+    public LegacyBootstrapDirectory(String directoryPath) {
+        this.directoryPath = Objects.requireNonNull(directoryPath);
+    }

Review Comment:
   Document the constructor and it's parameters.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1022,24 +1023,33 @@ public void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
                         long offset = batch.lastOffset();
                         List<ApiMessageAndVersion> messages = batch.records();
+                        // KIP-1170: The 0-0.checkpoint can contain metadata 
records. If it does, they should be considered the bootstrap metadata for the 
cluster.
+                        if 
(reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID) && 
!messages.isEmpty()) {
+                            if 
(bootstrapMetadata.source().contains(LegacyBootstrapDirectory.BINARY_BOOTSTRAP_FILENAME))
 {
+                                log.warn("{} with metadata records exists 
alongside {}",
+                                    
Snapshots.filenameFromSnapshotId(reader.snapshotId()) + ".checkpoint",
+                                    
LegacyBootstrapDirectory.BINARY_BOOTSTRAP_FILENAME);
+                            }
+                            bootstrapMetadata = 
BootstrapMetadata.fromRecords(messages, "bootstrap");

Review Comment:
   I suggest not leaking this level of implementation detail to the controller. 
The interfaces BootstrapDirectory and SnapshotReader exist to hide/abstract 
this level of detail.



##########
metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java:
##########
@@ -122,6 +124,17 @@ private BatchAndType 
nextControlBatch(FileChannelRecordBatch input) {
                         messages.add(new ApiMessageAndVersion(message, (short) 
0));
                         break;
                     }
+                    case KRAFT_VERSION: {
+                        KRaftVersionRecord message = new KRaftVersionRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;
+                    }
+                    case KRAFT_VOTERS:
+                        VotersRecord message =  new VotersRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;

Review Comment:
   @mannoopj Please answer @kevin-wu24 's question.
   
   Where is this type used? We really need to delete this type and use the 
functionality provide by the raft module.



##########
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##########
@@ -444,14 +444,16 @@ SnapshotManifest loadSnapshot(
         while (reader.hasNext()) {
             Batch<ApiMessageAndVersion> batch = reader.next();
             loadControlRecords(batch);
-            for (ApiMessageAndVersion record : batch.records()) {
-                try {
-                    delta.replay(record.message());
-                } catch (Throwable e) {
-                    faultHandler.handleFault("Error loading metadata log 
record " + snapshotIndex +
-                            " in snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+            if (!reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {

Review Comment:
   Similar comment here. I get the impression that we are trying to handle the 
case that snapshot can now be committed vs uncommitted snapshots.



##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/TestBootstrapDirectory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.io.FileNotFoundException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+
+/**
+ * Test-only implementation that reads bootstrap metadata from the metadata 
partition snapshot.
+ */
+public class TestBootstrapDirectory implements BootstrapDirectory {
+    private static final String BINARY_BOOTSTRAP_CHECKPOINT_FILENAME = 
"00000000000000000000-0000000000.checkpoint";

Review Comment:
   Why do you need this implementation in test when the src/main code would 
never do this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to