jsancio commented on code in PR #20707:
URL: https://github.com/apache/kafka/pull/20707#discussion_r2962731859


##########
metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java:
##########
@@ -32,52 +33,58 @@
 import java.util.List;
 import java.util.Objects;
 
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-
 /**
- * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.checkpoint" is used and the
- * format is the same as a KRaft snapshot.
+ * Reads bootstrap metadata from a binary checkpoint file.
  */
 public class BootstrapDirectory {
     public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
 
-    private final String directoryPath;
+    private final Path binaryBootstrapPath;
+    private final boolean requireFile;
+
+    /**
+     * Reads from {directoryPath}/bootstrap.checkpoint, falling back to 
defaults if missing.
+     */
+    public BootstrapDirectory(String directoryPath) {
+        this(Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME), false);
+    }
 
     /**
-     * Create a new BootstrapDirectory object.
-     *
-     * @param directoryPath     The path to the directory with the bootstrap 
file.
+     * Reads from the given path. If requireFile is true, throws when the file 
is missing.
      */
-    public BootstrapDirectory(
-        String directoryPath
-    ) {
-        this.directoryPath = Objects.requireNonNull(directoryPath);
+    public BootstrapDirectory(Path binaryBootstrapPath, boolean requireFile) {

Review Comment:
   The implementation and usage has changed substantially that it is not clear 
to me why you need this type.
   
   Why not add these static methods to BootstrapMetadata:
   ```java
   /**
    * Make sure to document that it returns the "default" BootstrapMetadata if 
the bootstrap.checkpoint doesn't exist.
    */
   public static BootstrapMetadata fromDirectory(Path directory) { ... }
   
   /**
    * Document this method
    */
   public static BootstrapMetadata fromCheckpointFile(Path file) { ... }
   ```



##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -1406,33 +1405,6 @@ public void 
testDirectToControllerCommunicationFailsOnOlderMetadataVersion() thr
         }
     }
 
-    @Test
-    public void testStartupWithNonDefaultKControllerDynamicConfiguration() 
throws Exception {
-        var bootstrapRecords = List.of(
-            new ApiMessageAndVersion(new FeatureLevelRecord()
-                .setName(MetadataVersion.FEATURE_NAME)
-                .setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()), 
(short) 0),
-            new ApiMessageAndVersion(new ConfigRecord()
-                .setResourceType(ConfigResource.Type.BROKER.id())
-                .setResourceName("")
-                .setName("num.io.threads")
-                .setValue("9"), (short) 0));
-        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
-            new 
TestKitNodes.Builder(BootstrapMetadata.fromRecords(bootstrapRecords, 
"testRecords"))

Review Comment:
   Again, why are you removing this test? Isn't the solution to fix the 
bootstrapping of the TestKitNodes so that you pass something else besides a 
BootstrapMetadata?



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -477,6 +480,17 @@ private void formatNode(
             formatter.setIgnoreFormatted(false);
             formatter.setControllerListenerName(controllerListenerName);
             formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+
+            for (Feature feature : Feature.PRODUCTION_FEATURES) {
+                String featureName = feature.featureName();
+                if (featureName.equals(MetadataVersion.FEATURE_NAME) ||
+                    featureName.equals(KRaftVersion.FEATURE_NAME)) {
+                    continue;
+                }

Review Comment:
   In a comment, please explain why you are skipping these features.



##########
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java:
##########
@@ -148,12 +149,24 @@ public void testDirectories(int numDirs) throws Exception 
{
             assertEquals(OptionalInt.of(DEFAULT_NODE_ID), ensemble.nodeId());
             assertEquals(Optional.of(DEFAULT_CLUSTER_ID.toString()), 
ensemble.clusterId());
             assertEquals(new HashSet<>(testEnv.directories), 
ensemble.logDirProps().keySet());
-            BootstrapMetadata bootstrapMetadata =
-                new BootstrapDirectory(testEnv.directory(0)).read();
+            BootstrapMetadata bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
             assertEquals(MetadataVersion.latestProduction(), 
bootstrapMetadata.metadataVersion());
         }
     }
 
+    @Test
+    public void testSkipsBootstrapSnapshotWhenDisabled() throws Exception {
+        try (TestEnv testEnv = new TestEnv(1)) {
+            FormatterContext context = testEnv.newFormatter();
+            context.formatter.setWriteBootstrapSnapshot(false);
+            context.formatter.run();
+            File clusterMetadataDir = new File(testEnv.directory(0), 
String.format("%s-%d",

Review Comment:
   Can we use LocalLog.logDirName?



##########
metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java:
##########
@@ -122,6 +124,17 @@ private BatchAndType 
nextControlBatch(FileChannelRecordBatch input) {
                         messages.add(new ApiMessageAndVersion(message, (short) 
0));
                         break;
                     }
+                    case KRAFT_VERSION: {
+                        KRaftVersionRecord message = new KRaftVersionRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;
+                    }
+                    case KRAFT_VOTERS:
+                        VotersRecord message =  new VotersRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;

Review Comment:
   Let's file an issue to clean this up. Ideally we remove this type and use 
the snapshot types define in the raft module.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1436,7 +1446,7 @@ private void replay(ApiMessage message, 
Optional<OffsetAndEpoch> snapshotId, lon
     /**
      * The bootstrap metadata to use for initialization if needed.
      */
-    private final BootstrapMetadata bootstrapMetadata;
+    private volatile BootstrapMetadata bootstrapMetadata;

Review Comment:
   > but read by other threads
   
   What are these other threads that read bootstrapMetadata? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to