Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
cmccabe commented on PR #16325: URL: https://github.com/apache/kafka/pull/16325#issuecomment-2266231744 We ended up doing this in: #16669 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on PR #16325: URL: https://github.com/apache/kafka/pull/16325#issuecomment-2265007883 Closing this, as the StorageTool class will be refactored in a different PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani closed pull request #16325: KAFKA-16518; Adding standalone argument for storage URL: https://github.com/apache/kafka/pull/16325 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1672845320 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -698,6 +663,42 @@ object StorageTool extends Logging { voterSet } + def createStandaloneVoterMap(config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { +val advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners +val listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() +advertisedListenerEndpoints.foreach(endpoint => { + val host: String = endpoint.host + listeners.put(endpoint.listenerName, new InetSocketAddress(host, endpoint.port)) +}) +listeners + } + + private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress], + metaProperties: MetaProperties, + config: KafkaConfig): util.Map[ListenerName, InetSocketAddress] = { Review Comment: From the kip description "When the format command is executed with this option it will read the node.id configured in the properties file specified by the --config option and compare it against the specified in --controller-quorum-voters. If there is a match, it will write the specified to the directory.id property in the meta.properties for the metadata.log.dir directory." I tried adding the if condition if (metaProperties.nodeId().getAsInt == replicaId) ) in the method May be am wrong. Can you pls suggest code maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1672833818 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -112,20 +112,21 @@ object StorageTool extends Logging { setNodeId(config.nodeId). build() val standaloneMode = namespace.getBoolean("standalone") -var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = List() val controllersQuorumVoters = namespace.getString("controller_quorum_voters") if(standaloneMode && controllersQuorumVoters != null) { throw new TerseFailure("Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set.") } +var listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() if (standaloneMode) { - advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners + listeners = createStandaloneVoterMap(config) } else if(controllersQuorumVoters != null) { if (!validateControllerQuorumVoters(controllersQuorumVoters)) { throw new TerseFailure("Expected schema for --controller-quorum-voters is [-]@:") } - advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners + val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = parseVoterConnections(Collections.singletonList(controllersQuorumVoters)) + listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, metaProperties, config) Review Comment: This means we would have to update https://github.com/apache/kafka/blob/25d775b742406477a0ff678b9990ed149d2157cc/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java#L178 to return Uuid too ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
jsancio commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1668893365 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -300,19 +301,19 @@ object StorageTool extends Logging { } private def getUserScramCredentialRecord( -mechanism: String, -config: String - ) : UserScramCredentialRecord = { +mechanism: String, +config: String + ) : UserScramCredentialRecord = { /* * Remove '[' amd ']' * Split K->V pairs on ',' and no K or V should contain ',' * Split K and V on '=' but V could contain '=' if inside "" * Create Map of K to V and replace all " in V */ val argMap = config.substring(1, config.length - 1) - .split(",") - .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) - .map(args => args(0) -> args(1).replaceAll("\"", "")).toMap + .split(",") + .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)")) + .map(args => args(0) -> args(1).replaceAll("\"", "")).toMap Review Comment: Again, this is not the correct indentation. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -112,20 +112,21 @@ object StorageTool extends Logging { setNodeId(config.nodeId). build() val standaloneMode = namespace.getBoolean("standalone") -var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = List() val controllersQuorumVoters = namespace.getString("controller_quorum_voters") if(standaloneMode && controllersQuorumVoters != null) { throw new TerseFailure("Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set.") } +var listeners: util.Map[ListenerName, InetSocketAddress] = new util.HashMap() if (standaloneMode) { - advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners + listeners = createStandaloneVoterMap(config) } else if(controllersQuorumVoters != null) { if (!validateControllerQuorumVoters(controllersQuorumVoters)) { throw new TerseFailure("Expected schema for --controller-quorum-voters is [-]@:") } - advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners + val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = parseVoterConnections(Collections.singletonList(controllersQuorumVoters)) + listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, metaProperties, config) Review Comment: This doesn't look correct. `VoterSet` is basically a `Map[Integer, (Uuid, Map[ListenerName, InetSocketAddress])]` where `Integer` is the replica id and `Uuid` is the replica directory id. The type for listener doesn't contain all of the information needed to generate a `VoterSet` for all the possible configuration cases. Note that the value for `--controller-quorum-voters` has the follow schema: `-@:` ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -300,19 +301,19 @@ object StorageTool extends Logging { } private def getUserScramCredentialRecord( -mechanism: String, -config: String - ) : UserScramCredentialRecord = { +mechanism: String, +config: String + ) : UserScramCredentialRecord = { Review Comment: Extra space between `)` and `:`. It should be `): User... = {` ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -335,10 +336,10 @@ object StorageTool extends Logging { if (argMap.contains("salt")) { val iterations = argMap("iterations").toInt if (iterations < scramMechanism.minIterations()) { - throw new TerseFailure(s"The 'iterations' value must be >= ${scramMechanism.minIterations()} for add-scram") +throw new TerseFailure(s"The 'iterations' value must be >= ${scramMechanism.minIterations()} for add-scram") } if (iterations > scramMechanism.maxIterations()) { - throw new TerseFailure(s"The 'iterations' value must be <= ${scramMechanism.maxIterations()} for add-scram") +throw new TerseFailure(s"The 'iterations' value must be <= ${scramMechanism.maxIterations()} for add-scram") Review Comment: Looks like you have extra two spaces! ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -506,8 +507,8 @@ object StorageTool extends Logging { val metadataRecords = new util.ArrayList[ApiMessageAndVersion] metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel(metadataVersion.featureLevel()), 0.toShort)) +s
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1665647113 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -503,24 +537,29 @@ object StorageTool extends Logging { } def formatCommand( -stream: PrintStream, -directories: Seq[String], -metaProperties: MetaProperties, -metadataVersion: MetadataVersion, -ignoreFormatted: Boolean - ): Int = { + stream: PrintStream, + directories: Seq[String], + metaProperties: MetaProperties, + metadataVersion: MetadataVersion, + ignoreFormatted: Boolean, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint], + controllersQuorumVoters: String + ): Int = { val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, None, "format command") -formatCommand(stream, directories, metaProperties, bootstrapMetadata, metadataVersion, ignoreFormatted) +formatCommand(stream, directories, metaProperties, bootstrapMetadata, metadataVersion, ignoreFormatted, + advertisedListenerEndpoints, controllersQuorumVoters) } def formatCommand( -stream: PrintStream, -directories: Seq[String], -metaProperties: MetaProperties, -bootstrapMetadata: BootstrapMetadata, -metadataVersion: MetadataVersion, -ignoreFormatted: Boolean - ): Int = { + stream: PrintStream, + directories: Seq[String], + metaProperties: MetaProperties, + bootstrapMetadata: BootstrapMetadata, + metadataVersion: MetadataVersion, + ignoreFormatted: Boolean, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint], + controllersQuorumVoters: String Review Comment: Tried considering but directoryId is derived only in format method. So updated format method with only listeners param. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1665644755 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -102,6 +111,23 @@ object StorageTool extends Logging { setClusterId(clusterId). setNodeId(config.nodeId). build() +val standaloneMode = namespace.getBoolean("standalone") +var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = List() + +val controllersQuorumVoters = namespace.getString("controller_quorum_voters") +if(standaloneMode && controllersQuorumVoters != null) { + throw new TerseFailure("Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set.") +} + +if (standaloneMode) { + advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners +} else if(controllersQuorumVoters != null) { + if (!validateControllerQuorumVoters(controllersQuorumVoters)) { +throw new TerseFailure("Expected schema for --controller-quorum-voters is [-]@:") + } + advertisedListenerEndpoints = config.effectiveAdvertisedControllerListeners Review Comment: Fixed. ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -285,6 +362,49 @@ Found problem: "Expected the default metadata.version to be 3.3-IV2") } + @Test + def testStandaloneModeWithArguments(): Unit = { +val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAT", +"-s")) +val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, null))) +val exitCode = StorageTool.runFormatCommand(namespace, config) +val tempDirs = config.logDirs +tempDirs.foreach(tempDir => { + val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME + val checkpointFilePath = Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID) + assertTrue(checkpointFilePath.toFile.exists) + assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost")) + Utils.delete(new File(tempDir)) +}) +assertEquals(0, exitCode) + } + +// @Test TODO +// def testControllerQuorumVotersWithArguments(): Unit = { +//val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yQQjhMQB6JAT", +// "--controller-quorum-voters", "1@localhost:9092")) +//val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, null))) Review Comment: Not sure how to set controller listener -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1661035915 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -285,6 +362,49 @@ Found problem: "Expected the default metadata.version to be 3.3-IV2") } + @Test + def testStandaloneModeWithArguments(): Unit = { +val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAT", +"-s")) +val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, null))) +val exitCode = StorageTool.runFormatCommand(namespace, config) +val tempDirs = config.logDirs +tempDirs.foreach(tempDir => { + val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME + val checkpointFilePath = Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID) + assertTrue(checkpointFilePath.toFile.exists) + assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost")) + Utils.delete(new File(tempDir)) +}) +assertEquals(0, exitCode) + } + +// @Test TODO +// def testControllerQuorumVotersWithArguments(): Unit = { +//val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yQQjhMQB6JAT", +// "--controller-quorum-voters", "1@localhost:9092")) +//val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, null))) Review Comment: Not sure how to set controller listener -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on PR #16325: URL: https://github.com/apache/kafka/pull/16325#issuecomment-2200108407 @jsancio , updated with controller option ``` ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -q 1@localhost:9093 metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY}) Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0. Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -s metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY}) Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0. Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -q 1@localhost:9093 -s Both --standalone and --controller-quorum-voters were set. Only one of the two flags can be set. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
jsancio commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1651072253 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -102,6 +109,11 @@ object StorageTool extends Logging { setClusterId(clusterId). setNodeId(config.nodeId). build() +val standaloneMode = namespace.getBoolean("standalone") +val advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint] = config.effectiveAdvertisedListeners + +// effectiveAdvertisedControllerListeners to be added Review Comment: I think we should wait. The bootstrap checkpoint file is incorrect without https://github.com/apache/kafka/pull/16235https://github.com/apache/kafka/pull/16235 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518 : Adding standalone argument for storage [kafka]
jsancio commented on PR #16325: URL: https://github.com/apache/kafka/pull/16325#issuecomment-2173806713 > cat /tmp/kraft-combined-logs/__cluster_metadata-0/-00.checkpoint @muralibasani You can use `bin/kafka-dump-log --cluster-metadata-decoder --files /tmp/kraft-combined-logs/__cluster_metadata-0/-00.checkpoint`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518 : Adding standalone argument for storage [kafka]
jsancio commented on code in PR #16325: URL: https://github.com/apache/kafka/pull/16325#discussion_r1642996134 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -46,6 +51,8 @@ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { + val clusterMetadataDir: String = "/__cluster_metadata-0" Review Comment: This is defined in https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L31-L35 And we resolve the actual directory like this: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L57-L62 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -226,7 +238,12 @@ object StorageTool extends Logging { help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") formatParser.addArgument("--feature", "-f"). help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). - action(append()); + action(append()) +formatParser.addArgument("--standalone", "-s"). + help("This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated" + +" directory.id, 2) create a snapshot at -00.checkpoint with the necessary control" + +" records (KRaftVersionRecord and VotersRecord) to make this Kafka node the only voter for the quorum"). Review Comment: I think this is too low level. Most users wont understand what this mean. How about? > This flag will bootstrap the controller in standalone as the only KRaft controller if the Kafka cluster. Use the --controller-quorum-voters flag instead to bootstrap a controller cluster with more than one controller. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -548,6 +570,7 @@ object StorageTool extends Logging { if (metaPropertiesEnsemble.emptyLogDirs().isEmpty) { stream.println("All of the log directories are already formatted.") } else { + val directoryId = copier.generateValidDirectoryId() Review Comment: This doesn't look correct. We need to use the directoryId of the metadata log directory. After the `copier` has generated all of the directory ids in `copier.writeLogDirChanges()`, you want to read the directory id in the `meta.properties` of the `metadataLogDir`. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -589,4 +616,56 @@ object StorageTool extends Logging { (nameAndLevel._1, nameAndLevel._2) }.toMap } -} + + def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: Uuid, + advertisedListenerEndpoints: scala.collection.Seq[kafka.cluster.EndPoint]): Unit = { +val snapshotCheckpointDir = logDir + clusterMetadataDir + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +if(advertisedListenerEndpoints.nonEmpty){ + val voterSet: VoterSet = getVoterSet(directoryId, advertisedListenerEndpoints) + + val builder = new RecordsSnapshotWriter.Builder() +.setKraftVersion(1) +.setVoterSet(Optional.of(voterSet)) +.setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde) + + // Close the builder to finalize the snapshot + builder.freeze() + builder.close() + stream.println(s"Snapshot written to $checkpointFilePath") +} + } + + private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: collection.Seq[EndPoint]) = { +// Create a VotersRecord endpoint collection +val endpointCollection = new VotersRecord.EndpointCollection() +advertisedListenerEndpoints.foreach(endpoint => { + endpointCollection.add(new VotersRecord.Endpoint().setName(endpoint.listenerName.value()) +.setHost(endpoint.host).setPort(endpoint.port)) +}) + +// Create voters +val voters: util.List[VotersRecord.Voter] = new util.ArrayList() +voters.add(new VotersRecord.Voter() + .setVoterId(1) + .setVoterDirectoryId(directoryId) + .setEndpoints(endpointCollection)) + +// Create Voter set +val voterNewRecord = new VotersRecord().setVersion(1).setVoters(voters) +val voterSet = VoterSet.fromVotersRecord(voterNewRecord) +voterSet + } +} Review Comment: Missing newline character. ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -589,4 +616,56 @@
Re: [PR] KAFKA-16518 : Adding standalone argument for storage [kafka]
muralibasani commented on PR #16325: URL: https://github.com/apache/kafka/pull/16325#issuecomment-2170896788 @jsancio Output looks like below. ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -s metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY}) Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata-0 Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4. % cat /tmp/kraft-combined-logs/__cluster_metadata-0/-00.checkpoint �[� �%�%��* l�#�� �ﱃS�=��7 PLAINTEXT localhost#�?)��� �% �% �% -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani closed pull request #16094: KAFKA-16518; Adding standalone argument for storage URL: https://github.com/apache/kafka/pull/16094 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16094: URL: https://github.com/apache/kafka/pull/16094#discussion_r1619077543 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,21 +460,68 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { +System.out.println("logDir : "+ logDir) copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). setDirectoryId(copier.generateValidDirectoryId()). build()) copier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) + System.out.println("bootstrapDirectory : "+ bootstrapDirectory) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) copier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) copier.writeLogDirChanges() +// Write new file checkpoint file if standalone mode +if(standaloneMode){ + writeCheckpointFile(logDir) +} }) } 0 } + + def writeCheckpointFile(logDir: String): Unit = { +val snapshotCheckpointDir = logDir + "/__cluster_metadata-0" + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.empty()) Review Comment: @jsancio Thanks. when trying to create a voterset, VoterNode constructor is not accessible in core\tools, as VoterNode is part of raft package. Any suggestions ? We need this public constructor https://github.com/apache/kafka/blob/8068a086a3f41b9f7e4d4a1dab3338f029089f23/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java#L49 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16094: URL: https://github.com/apache/kafka/pull/16094#discussion_r1619077543 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,21 +460,68 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { +System.out.println("logDir : "+ logDir) copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). setDirectoryId(copier.generateValidDirectoryId()). build()) copier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) + System.out.println("bootstrapDirectory : "+ bootstrapDirectory) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) copier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) copier.writeLogDirChanges() +// Write new file checkpoint file if standalone mode +if(standaloneMode){ + writeCheckpointFile(logDir) +} }) } 0 } + + def writeCheckpointFile(logDir: String): Unit = { +val snapshotCheckpointDir = logDir + "/__cluster_metadata-0" + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.empty()) Review Comment: @jsancio Thanks. when trying to create a voterset, VoterNode constructor is not accessible in core\tools, as VoterNode is part of raft package. Any suggestions ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16094: URL: https://github.com/apache/kafka/pull/16094#discussion_r1619109614 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,21 +460,68 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { +System.out.println("logDir : "+ logDir) copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). setDirectoryId(copier.generateValidDirectoryId()). build()) copier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) + System.out.println("bootstrapDirectory : "+ bootstrapDirectory) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) copier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) copier.writeLogDirChanges() +// Write new file checkpoint file if standalone mode +if(standaloneMode){ + writeCheckpointFile(logDir) +} }) } 0 } + + def writeCheckpointFile(logDir: String): Unit = { +val snapshotCheckpointDir = logDir + "/__cluster_metadata-0" + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.empty()) Review Comment: trying this way ``` val voterMap: java.util.Map[Integer, VoterSet.VoterNode] = new util.HashMap() voterMap.put(1, new VoterSet.VoterNode( ReplicaKey.of(nodeId, Optional.of(directoryId)), Collections.singletonMap(controllerListenerNames, new InetSocketAddress(host, port)), new SupportedVersionRange(0.toShort, 1.toShort) )) val voterSet = new VoterSet(voterMap) ``` ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,21 +460,68 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { +System.out.println("logDir : "+ logDir) copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). setDirectoryId(copier.generateValidDirectoryId()). build()) copier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) + System.out.println("bootstrapDirectory : "+ bootstrapDirectory) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) copier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) copier.writeLogDirChanges() +// Write new file checkpoint file if standalone mode +if(standaloneMode){ + writeCheckpointFile(logDir) +} }) } 0 } + + def writeCheckpointFile(logDir: String): Unit = { +val snapshotCheckpointDir = logDir + "/__cluster_metadata-0" + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.empty()) Review Comment: trying this way ``` val voterMap: java.util.Map[Integer, VoterSet.VoterNode] = new util.HashMap() voterMap.put(1, new VoterSet.VoterNode( ReplicaKey.of(nodeId, Optional.of(directoryId)), Collections.singletonMap(controllerListenerNames, new InetSocketAddress(host, port)), new SupportedVersionRange(0.toShort, 1.toS
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16094: URL: https://github.com/apache/kafka/pull/16094#discussion_r1619109614 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,21 +460,68 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { +System.out.println("logDir : "+ logDir) copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). setDirectoryId(copier.generateValidDirectoryId()). build()) copier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) + System.out.println("bootstrapDirectory : "+ bootstrapDirectory) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) copier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) copier.writeLogDirChanges() +// Write new file checkpoint file if standalone mode +if(standaloneMode){ + writeCheckpointFile(logDir) +} }) } 0 } + + def writeCheckpointFile(logDir: String): Unit = { +val snapshotCheckpointDir = logDir + "/__cluster_metadata-0" + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.empty()) Review Comment: trying this way ``` val voterMap: java.util.Map[Integer, VoterSet.VoterNode] = new util.HashMap() voterMap.put(1, new VoterSet.VoterNode( ReplicaKey.of(nodeId, Optional.of(directoryId)), Collections.singletonMap(controllerListenerNames, new InetSocketAddress(host, port)), new SupportedVersionRange(0.toShort, 1.toShort) )) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16094: URL: https://github.com/apache/kafka/pull/16094#discussion_r1619109614 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,21 +460,68 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { +System.out.println("logDir : "+ logDir) copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). setDirectoryId(copier.generateValidDirectoryId()). build()) copier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) + System.out.println("bootstrapDirectory : "+ bootstrapDirectory) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) copier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) copier.writeLogDirChanges() +// Write new file checkpoint file if standalone mode +if(standaloneMode){ + writeCheckpointFile(logDir) +} }) } 0 } + + def writeCheckpointFile(logDir: String): Unit = { +val snapshotCheckpointDir = logDir + "/__cluster_metadata-0" + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.empty()) Review Comment: trying this way ``` val voterMap: java.util.Map[Integer, InetSocketAddress] = new util.HashMap() voterMap.put(1, new VoterSet.VoterNode( ReplicaKey.of(nodeId, Optional.of(directoryId)), Collections.singletonMap(controllerListenerNames, new InetSocketAddress(host, port)), new SupportedVersionRange(0.toShort, 1.toShort) )) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]
muralibasani commented on code in PR #16094: URL: https://github.com/apache/kafka/pull/16094#discussion_r1619077543 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -452,21 +460,68 @@ object StorageTool extends Logging { stream.println("All of the log directories are already formatted.") } else { metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => { +System.out.println("logDir : "+ logDir) copier.setLogDirProps(logDir, new MetaProperties.Builder(metaProperties). setDirectoryId(copier.generateValidDirectoryId()). build()) copier.setPreWriteHandler((logDir, _, _) => { stream.println(s"Formatting $logDir with metadata.version $metadataVersion.") Files.createDirectories(Paths.get(logDir)) val bootstrapDirectory = new BootstrapDirectory(logDir, Optional.empty()) + System.out.println("bootstrapDirectory : "+ bootstrapDirectory) bootstrapDirectory.writeBinaryFile(bootstrapMetadata) }) copier.setWriteErrorHandler((logDir, e) => { throw new TerseFailure(s"Error while writing meta.properties file $logDir: ${e.getMessage}") }) copier.writeLogDirChanges() +// Write new file checkpoint file if standalone mode +if(standaloneMode){ + writeCheckpointFile(logDir) +} }) } 0 } + + def writeCheckpointFile(logDir: String): Unit = { +val snapshotCheckpointDir = logDir + "/__cluster_metadata-0" + +// Ensure the directory exists +val snapshotDir = Paths.get(snapshotCheckpointDir) +if (!Files.exists(snapshotDir)) { + Files.createDirectories(snapshotDir) +} + +// Create the full path for the checkpoint file +val checkpointFilePath = snapshotDir.resolve(snapshotDir) + +// Create the raw snapshot writer +val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, new OffsetAndEpoch(0, 0)) + +val builder = new RecordsSnapshotWriter.Builder() + .setKraftVersion(1) + .setVoterSet(Optional.empty()) Review Comment: Thanks. when trying to create a voterset, VoterNode constructor is not accessible in tools, as VoterNode is part of raft package. Any suggestions ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org