jsancio commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1642996134
########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -46,6 +51,8 @@ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { + val clusterMetadataDir: String = "/__cluster_metadata-0" Review Comment: This is defined in https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L31-L35 And we resolve the actual directory like this: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L57-L62 ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -226,7 +238,12 @@ object StorageTool extends Logging { help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") formatParser.addArgument("--feature", "-f"). help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). - action(append()); + action(append()) + formatParser.addArgument("--standalone", "-s"). + help("This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated" + + " directory.id, 2) create a snapshot at 00000000000000000000-0000000000.checkpoint with the necessary control" + + " records (KRaftVersionRecord and VotersRecord) to make this Kafka node the only voter for the quorum"). Review Comment: I think this is too low level. Most users wont understand what this mean. How about? > This flag will bootstrap the controller in standalone as the only KRaft controller if the Kafka cluster. Use the --controller-quorum-voters flag instead to bootstrap a controller cluster with more than one controller. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -548,6 +570,7 @@ object StorageTool extends Logging { if (metaPropertiesEnsemble.emptyLogDirs().isEmpty) { stream.println("All of the log directories are already formatted.") } else { + val directoryId = copier.generateValidDirectoryId() Review Comment: This doesn't look correct. We need to use the directoryId of the metadata log directory. After the `copier` has generated all of the directory ids in `copier.writeLogDirChanges()`, you want to read the directory id in the `meta.properties` of the `metadataLogDir`. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { + val snapshotCheckpointDir = logDir + clusterMetadataDir + + // Ensure the directory exists + val snapshotDir = Paths.get(snapshotCheckpointDir) + if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) + } + + // Create the full path for the checkpoint file + val checkpointFilePath = snapshotDir.resolve(snapshotDir) + + // Create the raw snapshot writer + val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + + if(advertisedListenerEndpoints.nonEmpty){ + val voterSet: VoterSet = getVoterSet(directoryId, advertisedListenerEndpoints) + + val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.of(voterSet)) + .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde) + + // Close the builder to finalize the snapshot + builder.freeze() + builder.close() + stream.println(s"Snapshot written to $checkpointFilePath") + } + } + + private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: collection.Seq[EndPoint]) = { + // Create a VotersRecord endpoint collection + val endpointCollection = new VotersRecord.EndpointCollection() + advertisedListenerEndpoints.foreach(endpoint => { + endpointCollection.add(new VotersRecord.Endpoint().setName(endpoint.listenerName.value()) + .setHost(endpoint.host).setPort(endpoint.port)) + }) + + // Create voters + val voters: util.List[VotersRecord.Voter] = new util.ArrayList() + voters.add(new VotersRecord.Voter() + .setVoterId(1) + .setVoterDirectoryId(directoryId) + .setEndpoints(endpointCollection)) + + // Create Voter set + val voterNewRecord = new VotersRecord().setVersion(1).setVoters(voters) + val voterSet = VoterSet.fromVotersRecord(voterNewRecord) + voterSet + } +} Review Comment: Missing newline character. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { + val snapshotCheckpointDir = logDir + clusterMetadataDir + + // Ensure the directory exists + val snapshotDir = Paths.get(snapshotCheckpointDir) + if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) + } + + // Create the full path for the checkpoint file + val checkpointFilePath = snapshotDir.resolve(snapshotDir) + + // Create the raw snapshot writer + val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + + if(advertisedListenerEndpoints.nonEmpty){ + val voterSet: VoterSet = getVoterSet(directoryId, advertisedListenerEndpoints) + + val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.of(voterSet)) + .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde) + + // Close the builder to finalize the snapshot + builder.freeze() + builder.close() Review Comment: Let's use `try ... finally ...` so that the builder is always closed. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { + val snapshotCheckpointDir = logDir + clusterMetadataDir + + // Ensure the directory exists + val snapshotDir = Paths.get(snapshotCheckpointDir) + if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) + } + + // Create the full path for the checkpoint file + val checkpointFilePath = snapshotDir.resolve(snapshotDir) + + // Create the raw snapshot writer + val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + + if(advertisedListenerEndpoints.nonEmpty){ + val voterSet: VoterSet = getVoterSet(directoryId, advertisedListenerEndpoints) + + val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.of(voterSet)) + .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde) + + // Close the builder to finalize the snapshot + builder.freeze() + builder.close() + stream.println(s"Snapshot written to $checkpointFilePath") + } + } + + private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: collection.Seq[EndPoint]) = { + // Create a VotersRecord endpoint collection + val endpointCollection = new VotersRecord.EndpointCollection() + advertisedListenerEndpoints.foreach(endpoint => { + endpointCollection.add(new VotersRecord.Endpoint().setName(endpoint.listenerName.value()) + .setHost(endpoint.host).setPort(endpoint.port)) + }) + + // Create voters + val voters: util.List[VotersRecord.Voter] = new util.ArrayList() + voters.add(new VotersRecord.Voter() + .setVoterId(1) + .setVoterDirectoryId(directoryId) + .setEndpoints(endpointCollection)) + + // Create Voter set + val voterNewRecord = new VotersRecord().setVersion(1).setVoters(voters) + val voterSet = VoterSet.fromVotersRecord(voterNewRecord) Review Comment: See. Let's avoid creating a `VotersRecord` just to get a `VoterSet`. Feel free to add a static method to `VoterSet` like `VoterSet fromMap(Map<Integer, VoterNode> voters)`. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -102,6 +109,11 @@ object StorageTool extends Logging { setClusterId(clusterId). setNodeId(config.nodeId). build() + val standaloneMode = namespace.getBoolean("standalone") + val advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint] = config.effectiveAdvertisedListeners + + // effectiveAdvertisedControllerListeners to be added Review Comment: Okay. Are you going to wait for this PR to get merged and update this PR then? https://github.com/apache/kafka/pull/16235 ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { + val snapshotCheckpointDir = logDir + clusterMetadataDir + + // Ensure the directory exists + val snapshotDir = Paths.get(snapshotCheckpointDir) + if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) + } + + // Create the full path for the checkpoint file + val checkpointFilePath = snapshotDir.resolve(snapshotDir) + + // Create the raw snapshot writer + val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + + if(advertisedListenerEndpoints.nonEmpty){ + val voterSet: VoterSet = getVoterSet(directoryId, advertisedListenerEndpoints) + + val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.of(voterSet)) + .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde) + + // Close the builder to finalize the snapshot + builder.freeze() + builder.close() + stream.println(s"Snapshot written to $checkpointFilePath") + } + } + + private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: collection.Seq[EndPoint]) = { + // Create a VotersRecord endpoint collection + val endpointCollection = new VotersRecord.EndpointCollection() + advertisedListenerEndpoints.foreach(endpoint => { + endpointCollection.add(new VotersRecord.Endpoint().setName(endpoint.listenerName.value()) + .setHost(endpoint.host).setPort(endpoint.port)) + }) + + // Create voters + val voters: util.List[VotersRecord.Voter] = new util.ArrayList() + voters.add(new VotersRecord.Voter() + .setVoterId(1) Review Comment: You can't assume that the voter id is 1. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { + val snapshotCheckpointDir = logDir + clusterMetadataDir + + // Ensure the directory exists + val snapshotDir = Paths.get(snapshotCheckpointDir) + if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) + } + + // Create the full path for the checkpoint file + val checkpointFilePath = snapshotDir.resolve(snapshotDir) Review Comment: What does this mean? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { + val snapshotCheckpointDir = logDir + clusterMetadataDir + + // Ensure the directory exists + val snapshotDir = Paths.get(snapshotCheckpointDir) + if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) + } + + // Create the full path for the checkpoint file + val checkpointFilePath = snapshotDir.resolve(snapshotDir) + + // Create the raw snapshot writer + val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + + if(advertisedListenerEndpoints.nonEmpty){ + val voterSet: VoterSet = getVoterSet(directoryId, advertisedListenerEndpoints) + + val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.of(voterSet)) + .setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde) Review Comment: Add newline between the characters `)` and `.`. E.g. ```java .setRawSnapshotWriter(rawSnapshotWriter) .build(new StringSerde) ``` ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -507,10 +524,13 @@ object StorageTool extends Logging { directories: Seq[String], metaProperties: MetaProperties, metadataVersion: MetadataVersion, - ignoreFormatted: Boolean + ignoreFormatted: Boolean, + standaloneMode: Boolean, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint] Review Comment: This is not general enough. How are you planning to implement --controller-quorum-voters with this signature? Why do you need `standaloneMode` if you are passing the local endpoints? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { + val snapshotCheckpointDir = logDir + clusterMetadataDir + + // Ensure the directory exists + val snapshotDir = Paths.get(snapshotCheckpointDir) + if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) + } + + // Create the full path for the checkpoint file + val checkpointFilePath = snapshotDir.resolve(snapshotDir) + + // Create the raw snapshot writer + val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) Review Comment: Instead of hard-coding `OffsetAndEpoch(0, 0)` let's add a constant to `o.a.k.s.Snapshots` named `BOOTSTRAP_SNAPSHOT_ID`. ########## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ########## @@ -182,23 +188,60 @@ Found problem: val stream = new ByteArrayOutputStream() val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, "test format command") assertEquals(0, StorageTool. - formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) + formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, + MetadataVersion.latestTesting(), ignoreFormatted = false, standaloneMode = false, + advertisedListenerEndpoints = scala.collection.Seq.empty[kafka.cluster.EndPoint])) assertTrue(stringAfterFirstLine(stream.toString()).startsWith("Formatting %s".format(tempDir))) try assertEquals(1, StorageTool. - formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false)) catch { + formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, + bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = false, standaloneMode = false, + advertisedListenerEndpoints = scala.collection.Seq.empty[kafka.cluster.EndPoint])) catch { case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " + "formatted. Use --ignore-formatted to ignore this directory and format the " + "others.", e.getMessage) } val stream2 = new ByteArrayOutputStream() assertEquals(0, StorageTool. - formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, bootstrapMetadata, MetadataVersion.latestTesting(), ignoreFormatted = true)) + formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, bootstrapMetadata, + MetadataVersion.latestTesting(), ignoreFormatted = true, standaloneMode = false, + advertisedListenerEndpoints = scala.collection.Seq.empty[kafka.cluster.EndPoint])) assertEquals("All of the log directories are already formatted.%n".format(), stringAfterFirstLine(stream2.toString())) } finally Utils.delete(tempDir) } + @Test + def testFormatEmptyDirectoryWithStandaloneMode(): Unit = { + val tempDir = TestUtils.tempDir() + try { + val metaProperties = new MetaProperties.Builder(). + setVersion(MetaPropertiesVersion.V1). + setClusterId("XcZZOzUqS5yHOjhMQB2JLR"). + setNodeId(2). + build() + val stream = new ByteArrayOutputStream() + val bootstrapMetadata = StorageTool.buildBootstrapMetadata(MetadataVersion.latestTesting(), None, + "test format command") + val advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint] = Seq.empty[EndPoint] + val host = "PLAINTEXT" + val newEndPoint = new EndPoint(host = host, port = 9092, listenerName = new ListenerName("PLAINTEXT"), + SecurityProtocol.PLAINTEXT) + val updatedAdvertisedListenerEndpoints: scala.collection.Seq[EndPoint] = advertisedListenerEndpoints :+ newEndPoint + + assertEquals(0, StorageTool. + formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, bootstrapMetadata, + MetadataVersion.latestTesting(), ignoreFormatted = false, standaloneMode = true, + updatedAdvertisedListenerEndpoints)) + val checkpointDir = tempDir + clusterMetadataDir + assertTrue(stringAfterFirstLine(stream.toString()).startsWith("Snapshot written to %s".format(checkpointDir) + + "\n" + "Formatting %s".format(tempDir))) + val checkpointFilePath = Paths.get(checkpointDir + "/"+ checkpointFileName) Review Comment: Let's use `Snapshots.snapshotPath`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org