jsancio commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1642996134


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -46,6 +51,8 @@ import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
 
+  val clusterMetadataDir: String = "/__cluster_metadata-0"

Review Comment:
   This is defined in 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L31-L35
   
   And we resolve the actual directory like this: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L57-L62



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -226,7 +238,12 @@ object StorageTool extends Logging {
       help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION}")
     formatParser.addArgument("--feature", "-f").
       help("A feature upgrade we should perform, in feature=level format. For 
example: `metadata.version=5`.").
-      action(append());
+      action(append())
+    formatParser.addArgument("--standalone", "-s").
+      help("This command will 1) create a meta.properties file in 
metadata.log.dir with a randomly generated" +
+        " directory.id, 2) create a snapshot at 
00000000000000000000-0000000000.checkpoint with the necessary control" +
+        " records (KRaftVersionRecord and VotersRecord) to make this Kafka 
node the only voter for the quorum").

Review Comment:
   I think this is too low level. Most users wont understand what this mean. 
How about?
   > This flag will bootstrap the controller in standalone as the only KRaft 
controller if the Kafka cluster. Use the --controller-quorum-voters flag 
instead to bootstrap a controller cluster with more than one controller.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -548,6 +570,7 @@ object StorageTool extends Logging {
     if (metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
       stream.println("All of the log directories are already formatted.")
     } else {
+      val directoryId = copier.generateValidDirectoryId()

Review Comment:
   This doesn't look correct. We need to use the directoryId of the metadata 
log directory. After the `copier` has generated all of the directory ids in 
`copier.writeLogDirChanges()`, you want to read the directory id in the 
`meta.properties` of the `metadataLogDir`.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
       (nameAndLevel._1, nameAndLevel._2)
     }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+                          advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+    val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+    if(advertisedListenerEndpoints.nonEmpty){
+      val voterSet: VoterSet = getVoterSet(directoryId, 
advertisedListenerEndpoints)
+
+      val builder = new RecordsSnapshotWriter.Builder()
+        .setKraftVersion(1)
+        .setVoterSet(Optional.of(voterSet))
+        .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde)
+
+      // Close the builder to finalize the snapshot
+      builder.freeze()
+      builder.close()
+      stream.println(s"Snapshot written to $checkpointFilePath")
+    }
+  }
+
+  private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: 
collection.Seq[EndPoint]) = {
+    // Create a VotersRecord endpoint collection
+    val endpointCollection = new VotersRecord.EndpointCollection()
+    advertisedListenerEndpoints.foreach(endpoint => {
+      endpointCollection.add(new 
VotersRecord.Endpoint().setName(endpoint.listenerName.value())
+        .setHost(endpoint.host).setPort(endpoint.port))
+    })
+
+    // Create voters
+    val voters: util.List[VotersRecord.Voter] = new util.ArrayList()
+    voters.add(new VotersRecord.Voter()
+      .setVoterId(1)
+      .setVoterDirectoryId(directoryId)
+      .setEndpoints(endpointCollection))
+
+    // Create Voter set
+    val voterNewRecord = new VotersRecord().setVersion(1).setVoters(voters)
+    val voterSet = VoterSet.fromVotersRecord(voterNewRecord)
+    voterSet
+  }
+}

Review Comment:
   Missing newline character.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
       (nameAndLevel._1, nameAndLevel._2)
     }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+                          advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+    val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+    if(advertisedListenerEndpoints.nonEmpty){
+      val voterSet: VoterSet = getVoterSet(directoryId, 
advertisedListenerEndpoints)
+
+      val builder = new RecordsSnapshotWriter.Builder()
+        .setKraftVersion(1)
+        .setVoterSet(Optional.of(voterSet))
+        .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde)
+
+      // Close the builder to finalize the snapshot
+      builder.freeze()
+      builder.close()

Review Comment:
   Let's use `try ... finally ...` so that the builder is always closed.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
       (nameAndLevel._1, nameAndLevel._2)
     }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+                          advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+    val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+    if(advertisedListenerEndpoints.nonEmpty){
+      val voterSet: VoterSet = getVoterSet(directoryId, 
advertisedListenerEndpoints)
+
+      val builder = new RecordsSnapshotWriter.Builder()
+        .setKraftVersion(1)
+        .setVoterSet(Optional.of(voterSet))
+        .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde)
+
+      // Close the builder to finalize the snapshot
+      builder.freeze()
+      builder.close()
+      stream.println(s"Snapshot written to $checkpointFilePath")
+    }
+  }
+
+  private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: 
collection.Seq[EndPoint]) = {
+    // Create a VotersRecord endpoint collection
+    val endpointCollection = new VotersRecord.EndpointCollection()
+    advertisedListenerEndpoints.foreach(endpoint => {
+      endpointCollection.add(new 
VotersRecord.Endpoint().setName(endpoint.listenerName.value())
+        .setHost(endpoint.host).setPort(endpoint.port))
+    })
+
+    // Create voters
+    val voters: util.List[VotersRecord.Voter] = new util.ArrayList()
+    voters.add(new VotersRecord.Voter()
+      .setVoterId(1)
+      .setVoterDirectoryId(directoryId)
+      .setEndpoints(endpointCollection))
+
+    // Create Voter set
+    val voterNewRecord = new VotersRecord().setVersion(1).setVoters(voters)
+    val voterSet = VoterSet.fromVotersRecord(voterNewRecord)

Review Comment:
   See. Let's avoid creating a `VotersRecord` just to get a `VoterSet`. Feel 
free to add a static method to `VoterSet` like `VoterSet fromMap(Map<Integer, 
VoterNode> voters)`.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -102,6 +109,11 @@ object StorageTool extends Logging {
       setClusterId(clusterId).
       setNodeId(config.nodeId).
       build()
+    val standaloneMode = namespace.getBoolean("standalone")
+    val advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint] = 
config.effectiveAdvertisedListeners
+
+    // effectiveAdvertisedControllerListeners to be added

Review Comment:
   Okay. Are you going to wait for this PR to get merged and update this PR 
then?
   https://github.com/apache/kafka/pull/16235



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
       (nameAndLevel._1, nameAndLevel._2)
     }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+                          advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+    val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+    if(advertisedListenerEndpoints.nonEmpty){
+      val voterSet: VoterSet = getVoterSet(directoryId, 
advertisedListenerEndpoints)
+
+      val builder = new RecordsSnapshotWriter.Builder()
+        .setKraftVersion(1)
+        .setVoterSet(Optional.of(voterSet))
+        .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde)
+
+      // Close the builder to finalize the snapshot
+      builder.freeze()
+      builder.close()
+      stream.println(s"Snapshot written to $checkpointFilePath")
+    }
+  }
+
+  private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: 
collection.Seq[EndPoint]) = {
+    // Create a VotersRecord endpoint collection
+    val endpointCollection = new VotersRecord.EndpointCollection()
+    advertisedListenerEndpoints.foreach(endpoint => {
+      endpointCollection.add(new 
VotersRecord.Endpoint().setName(endpoint.listenerName.value())
+        .setHost(endpoint.host).setPort(endpoint.port))
+    })
+
+    // Create voters
+    val voters: util.List[VotersRecord.Voter] = new util.ArrayList()
+    voters.add(new VotersRecord.Voter()
+      .setVoterId(1)

Review Comment:
   You can't assume that the voter id is 1.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
       (nameAndLevel._1, nameAndLevel._2)
     }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+                          advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+    val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)

Review Comment:
   What does this mean?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
       (nameAndLevel._1, nameAndLevel._2)
     }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+                          advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+    val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+    if(advertisedListenerEndpoints.nonEmpty){
+      val voterSet: VoterSet = getVoterSet(directoryId, 
advertisedListenerEndpoints)
+
+      val builder = new RecordsSnapshotWriter.Builder()
+        .setKraftVersion(1)
+        .setVoterSet(Optional.of(voterSet))
+        .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde)

Review Comment:
   Add newline between the characters `)` and `.`. E.g.
   ```java
           .setRawSnapshotWriter(rawSnapshotWriter)
           .build(new StringSerde)
   ```



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -507,10 +524,13 @@ object StorageTool extends Logging {
     directories: Seq[String],
     metaProperties: MetaProperties,
     metadataVersion: MetadataVersion,
-    ignoreFormatted: Boolean
+    ignoreFormatted: Boolean,
+    standaloneMode: Boolean,
+    advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]

Review Comment:
   This is not general enough. How are you planning to implement 
--controller-quorum-voters with this signature? Why do you need 
`standaloneMode` if you are passing the local endpoints?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
       (nameAndLevel._1, nameAndLevel._2)
     }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+                          advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+    val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+    // Ensure the directory exists
+    val snapshotDir = Paths.get(snapshotCheckpointDir)
+    if (!Files.exists(snapshotDir)) {
+      Files.createDirectories(snapshotDir)
+    }
+
+    // Create the full path for the checkpoint file
+    val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+    // Create the raw snapshot writer
+    val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))

Review Comment:
   Instead of hard-coding `OffsetAndEpoch(0, 0)` let's add a constant to 
`o.a.k.s.Snapshots` named `BOOTSTRAP_SNAPSHOT_ID`.



##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -182,23 +188,60 @@ Found problem:
       val stream = new ByteArrayOutputStream()
       val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test 
format command")
       assertEquals(0, StorageTool.
-        formatCommand(new PrintStream(stream), Seq(tempDir.toString), 
metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), 
ignoreFormatted = false))
+        formatCommand(new PrintStream(stream), Seq(tempDir.toString), 
metaProperties, bootstrapMetadata,
+          MetadataVersion.latestTesting(), ignoreFormatted = false, 
standaloneMode = false,
+          advertisedListenerEndpoints = 
scala.collection.Seq.empty[kafka.cluster.EndPoint]))
       
assertTrue(stringAfterFirstLine(stream.toString()).startsWith("Formatting 
%s".format(tempDir)))
 
       try assertEquals(1, StorageTool.
-        formatCommand(new PrintStream(new ByteArrayOutputStream()), 
Seq(tempDir.toString), metaProperties, bootstrapMetadata, 
MetadataVersion.latestTesting(), ignoreFormatted = false)) catch {
+        formatCommand(new PrintStream(new ByteArrayOutputStream()), 
Seq(tempDir.toString), metaProperties,
+          bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted 
= false, standaloneMode = false,
+          advertisedListenerEndpoints = 
scala.collection.Seq.empty[kafka.cluster.EndPoint])) catch {
         case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is 
already " +
           "formatted. Use --ignore-formatted to ignore this directory and 
format the " +
           "others.", e.getMessage)
       }
 
       val stream2 = new ByteArrayOutputStream()
       assertEquals(0, StorageTool.
-        formatCommand(new PrintStream(stream2), Seq(tempDir.toString), 
metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), 
ignoreFormatted = true))
+        formatCommand(new PrintStream(stream2), Seq(tempDir.toString), 
metaProperties, bootstrapMetadata,
+          MetadataVersion.latestTesting(), ignoreFormatted = true, 
standaloneMode = false,
+          advertisedListenerEndpoints = 
scala.collection.Seq.empty[kafka.cluster.EndPoint]))
       assertEquals("All of the log directories are already 
formatted.%n".format(), stringAfterFirstLine(stream2.toString()))
     } finally Utils.delete(tempDir)
   }
 
+  @Test
+  def testFormatEmptyDirectoryWithStandaloneMode(): Unit = {
+    val tempDir = TestUtils.tempDir()
+    try {
+      val metaProperties = new MetaProperties.Builder().
+        setVersion(MetaPropertiesVersion.V1).
+        setClusterId("XcZZOzUqS5yHOjhMQB2JLR").
+        setNodeId(2).
+        build()
+      val stream = new ByteArrayOutputStream()
+      val bootstrapMetadata = 
StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None,
+        "test format command")
+      val advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint] = Seq.empty[EndPoint]
+      val host = "PLAINTEXT"
+      val newEndPoint = new EndPoint(host = host, port = 9092, listenerName = 
new ListenerName("PLAINTEXT"),
+        SecurityProtocol.PLAINTEXT)
+      val updatedAdvertisedListenerEndpoints: scala.collection.Seq[EndPoint] = 
advertisedListenerEndpoints :+ newEndPoint
+
+      assertEquals(0, StorageTool.
+        formatCommand(new PrintStream(stream), Seq(tempDir.toString), 
metaProperties, bootstrapMetadata,
+          MetadataVersion.latestTesting(), ignoreFormatted = false, 
standaloneMode = true,
+          updatedAdvertisedListenerEndpoints))
+      val checkpointDir = tempDir + clusterMetadataDir
+      assertTrue(stringAfterFirstLine(stream.toString()).startsWith("Snapshot 
written to %s".format(checkpointDir) +
+        "\n" + "Formatting %s".format(tempDir)))
+      val checkpointFilePath = Paths.get(checkpointDir + "/"+ 
checkpointFileName)

Review Comment:
   Let's use `Snapshots.snapshotPath`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to