Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-10 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1672845320


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -698,6 +663,42 @@ object StorageTool extends Logging {
 voterSet
   }
 
+  def createStandaloneVoterMap(config: KafkaConfig): util.Map[ListenerName, 
InetSocketAddress] = {
+val advertisedListenerEndpoints = 
config.effectiveAdvertisedControllerListeners
+val listeners: util.Map[ListenerName, InetSocketAddress] = new 
util.HashMap()
+advertisedListenerEndpoints.foreach(endpoint => {
+  val host: String = endpoint.host
+  listeners.put(endpoint.listenerName, new InetSocketAddress(host, 
endpoint.port))
+})
+listeners
+  }
+
+  private def parseControllerQuorumVotersMap(controllerQuorumVoterMap: 
util.Map[Integer, InetSocketAddress],
+   metaProperties: MetaProperties,
+   config: KafkaConfig): util.Map[ListenerName, 
InetSocketAddress] = {

Review Comment:
   From the kip description 
   "When the format command is executed with this option it will read the 
node.id configured in the properties file specified by the --config option and 
compare it against the  specified in --controller-quorum-voters. If 
there is a match, it will write the  specified to the 
directory.id property in the meta.properties for the metadata.log.dir 
directory."
   
   I tried adding the if condition 
   if (metaProperties.nodeId().getAsInt == replicaId) ) 
   
   in the method 
   
   May be am wrong. Can you pls suggest code maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-10 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1672833818


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -112,20 +112,21 @@ object StorageTool extends Logging {
   setNodeId(config.nodeId).
   build()
 val standaloneMode = namespace.getBoolean("standalone")
-var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = 
List()
 
 val controllersQuorumVoters = 
namespace.getString("controller_quorum_voters")
 if(standaloneMode && controllersQuorumVoters != null) {
   throw new TerseFailure("Both --standalone and --controller-quorum-voters 
were set. Only one of the two flags can be set.")
 }
 
+var listeners: util.Map[ListenerName, InetSocketAddress] = new 
util.HashMap()
 if (standaloneMode) {
-  advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners
+  listeners = createStandaloneVoterMap(config)
 } else if(controllersQuorumVoters != null) {
   if (!validateControllerQuorumVoters(controllersQuorumVoters)) {
 throw new TerseFailure("Expected schema for --controller-quorum-voters 
is [-]@:")
   }
-  advertisedListenerEndpoints = 
config.effectiveAdvertisedControllerListeners
+  val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = 
parseVoterConnections(Collections.singletonList(controllersQuorumVoters))
+  listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, 
metaProperties, config)

Review Comment:
   This means we would have to update 
https://github.com/apache/kafka/blob/25d775b742406477a0ff678b9990ed149d2157cc/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java#L178
 to return Uuid too ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-08 Thread via GitHub


jsancio commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1668893365


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -300,19 +301,19 @@ object StorageTool extends Logging {
   }
 
   private def getUserScramCredentialRecord(
-mechanism: String,
-config: String
-  ) : UserScramCredentialRecord = {
+mechanism: String,
+config: String
+  ) : UserScramCredentialRecord = {
 /*
  * Remove  '[' amd ']'
  * Split K->V pairs on ',' and no K or V should contain ','
  * Split K and V on '=' but V could contain '=' if inside ""
  * Create Map of K to V and replace all " in V
  */
 val argMap = config.substring(1, config.length - 1)
-  .split(",")
-  .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
-  .map(args => args(0) -> args(1).replaceAll("\"", "")).toMap
+   .split(",")
+   .map(_.split("=(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)"))
+   .map(args => args(0) -> args(1).replaceAll("\"", 
"")).toMap

Review Comment:
   Again, this is not the correct indentation.



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -112,20 +112,21 @@ object StorageTool extends Logging {
   setNodeId(config.nodeId).
   build()
 val standaloneMode = namespace.getBoolean("standalone")
-var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = 
List()
 
 val controllersQuorumVoters = 
namespace.getString("controller_quorum_voters")
 if(standaloneMode && controllersQuorumVoters != null) {
   throw new TerseFailure("Both --standalone and --controller-quorum-voters 
were set. Only one of the two flags can be set.")
 }
 
+var listeners: util.Map[ListenerName, InetSocketAddress] = new 
util.HashMap()
 if (standaloneMode) {
-  advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners
+  listeners = createStandaloneVoterMap(config)
 } else if(controllersQuorumVoters != null) {
   if (!validateControllerQuorumVoters(controllersQuorumVoters)) {
 throw new TerseFailure("Expected schema for --controller-quorum-voters 
is [-]@:")
   }
-  advertisedListenerEndpoints = 
config.effectiveAdvertisedControllerListeners
+  val controllerQuorumVoterMap: util.Map[Integer, InetSocketAddress] = 
parseVoterConnections(Collections.singletonList(controllersQuorumVoters))
+  listeners = parseControllerQuorumVotersMap(controllerQuorumVoterMap, 
metaProperties, config)

Review Comment:
   This doesn't look correct. `VoterSet` is basically a `Map[Integer, (Uuid, 
Map[ListenerName, InetSocketAddress])]` where `Integer` is the replica id and 
`Uuid` is the replica directory id. The type for listener doesn't contain all 
of the information needed to generate a `VoterSet` for all the possible 
configuration cases.
   
   Note that the value for `--controller-quorum-voters` has the follow schema: 
`-@:`



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -300,19 +301,19 @@ object StorageTool extends Logging {
   }
 
   private def getUserScramCredentialRecord(
-mechanism: String,
-config: String
-  ) : UserScramCredentialRecord = {
+mechanism: String,
+config: String
+  ) : UserScramCredentialRecord = {

Review Comment:
   Extra space between `)` and `:`. It should be `): User... = {`



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -335,10 +336,10 @@ object StorageTool extends Logging {
   if (argMap.contains("salt")) {
 val iterations = argMap("iterations").toInt
 if (iterations < scramMechanism.minIterations()) {
-  throw new TerseFailure(s"The 'iterations' value must be >= 
${scramMechanism.minIterations()} for add-scram")
+throw new TerseFailure(s"The 'iterations' value must be >= 
${scramMechanism.minIterations()} for add-scram")
 }
 if (iterations > scramMechanism.maxIterations()) {
-  throw new TerseFailure(s"The 'iterations' value must be <= 
${scramMechanism.maxIterations()} for add-scram")
+throw new TerseFailure(s"The 'iterations' value must be <= 
${scramMechanism.maxIterations()} for add-scram")

Review Comment:
   Looks like you have extra two spaces!



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -506,8 +507,8 @@ object StorageTool extends Logging {
 
 val metadataRecords = new util.ArrayList[ApiMessageAndVersion]
 metadataRecords.add(new ApiMessageAndVersion(new FeatureLevelRecord().
-  setName(MetadataVersion.FEATURE_NAME).
-  setFeatureLevel(metadataVersion.featureLevel()), 0.toShort))
+

Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-04 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1665647113


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -503,24 +537,29 @@ object StorageTool extends Logging {
   }
 
   def formatCommand(
-stream: PrintStream,
-directories: Seq[String],
-metaProperties: MetaProperties,
-metadataVersion: MetadataVersion,
-ignoreFormatted: Boolean
-  ): Int = {
+ stream: PrintStream,
+ directories: Seq[String],
+ metaProperties: MetaProperties,
+ metadataVersion: MetadataVersion,
+ ignoreFormatted: Boolean,
+ advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint],
+ controllersQuorumVoters: String
+   ): Int = {
 val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, None, 
"format command")
-formatCommand(stream, directories, metaProperties, bootstrapMetadata, 
metadataVersion, ignoreFormatted)
+formatCommand(stream, directories, metaProperties, bootstrapMetadata, 
metadataVersion, ignoreFormatted,
+  advertisedListenerEndpoints, controllersQuorumVoters)
   }
 
   def formatCommand(
-stream: PrintStream,
-directories: Seq[String],
-metaProperties: MetaProperties,
-bootstrapMetadata: BootstrapMetadata,
-metadataVersion: MetadataVersion,
-ignoreFormatted: Boolean
-  ): Int = {
+ stream: PrintStream,
+ directories: Seq[String],
+ metaProperties: MetaProperties,
+ bootstrapMetadata: BootstrapMetadata,
+ metadataVersion: MetadataVersion,
+ ignoreFormatted: Boolean,
+ advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint],
+ controllersQuorumVoters: String

Review Comment:
   Tried considering but directoryId is derived only in format method. So 
updated format method with only listeners param.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-04 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1665644755


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -102,6 +111,23 @@ object StorageTool extends Logging {
   setClusterId(clusterId).
   setNodeId(config.nodeId).
   build()
+val standaloneMode = namespace.getBoolean("standalone")
+var advertisedListenerEndpoints: collection.Seq[kafka.cluster.EndPoint] = 
List()
+
+val controllersQuorumVoters = 
namespace.getString("controller_quorum_voters")
+if(standaloneMode && controllersQuorumVoters != null) {
+  throw new TerseFailure("Both --standalone and --controller-quorum-voters 
were set. Only one of the two flags can be set.")
+}
+
+if (standaloneMode) {
+  advertisedListenerEndpoints = config.effectiveAdvertisedBrokerListeners
+} else if(controllersQuorumVoters != null) {
+  if (!validateControllerQuorumVoters(controllersQuorumVoters)) {
+throw new TerseFailure("Expected schema for --controller-quorum-voters 
is [-]@:")
+  }
+  advertisedListenerEndpoints = 
config.effectiveAdvertisedControllerListeners

Review Comment:
   Fixed.



##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -285,6 +362,49 @@ Found problem:
   "Expected the default metadata.version to be 3.3-IV2")
   }
 
+  @Test
+  def testStandaloneModeWithArguments(): Unit = {
+val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAT",
+"-s"))
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))
+val exitCode = StorageTool.runFormatCommand(namespace, config)
+val tempDirs = config.logDirs
+tempDirs.foreach(tempDir => {
+  val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME
+  val checkpointFilePath = 
Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID)
+  assertTrue(checkpointFilePath.toFile.exists)
+  
assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost"))
+  Utils.delete(new File(tempDir))
+})
+assertEquals(0, exitCode)
+  }
+
+//  @Test TODO
+//  def testControllerQuorumVotersWithArguments(): Unit = {
+//val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yQQjhMQB6JAT",
+//  "--controller-quorum-voters", "1@localhost:9092"))
+//val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))

Review Comment:
   Not sure how to set controller listener



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-02 Thread via GitHub


muralibasani commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1661035915


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -285,6 +362,49 @@ Found problem:
   "Expected the default metadata.version to be 3.3-IV2")
   }
 
+  @Test
+  def testStandaloneModeWithArguments(): Unit = {
+val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yPOjhMQB6JAT",
+"-s"))
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))
+val exitCode = StorageTool.runFormatCommand(namespace, config)
+val tempDirs = config.logDirs
+tempDirs.foreach(tempDir => {
+  val checkpointDir = tempDir + "/" + CLUSTER_METADATA_TOPIC_NAME
+  val checkpointFilePath = 
Snapshots.snapshotPath(Paths.get(checkpointDir), BOOTSTRAP_SNAPSHOT_ID)
+  assertTrue(checkpointFilePath.toFile.exists)
+  
assertTrue(Utils.readFileAsString(checkpointFilePath.toFile.getPath).contains("localhost"))
+  Utils.delete(new File(tempDir))
+})
+assertEquals(0, exitCode)
+  }
+
+//  @Test TODO
+//  def testControllerQuorumVotersWithArguments(): Unit = {
+//val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yQQjhMQB6JAT",
+//  "--controller-quorum-voters", "1@localhost:9092"))
+//val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(1, 
null)))

Review Comment:
   Not sure how to set controller listener



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-07-02 Thread via GitHub


muralibasani commented on PR #16325:
URL: https://github.com/apache/kafka/pull/16325#issuecomment-2200108407

   @jsancio , updated with controller option
   
   ```
   ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties -q 1@localhost:9093
   metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY})
   Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0.
   Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata
   
   
   muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh 
format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -s
 
   metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY})
   Formatting /tmp/kraft-combined-logs with metadata.version 3.8-IV0.
   Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata
   
   
   muralidharbasani@Muralidhars-MacBook-Pro kafka % ./bin/kafka-storage.sh 
format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties -q 
1@localhost:9093 -s
   Both --standalone and --controller-quorum-voters were set. Only one of the 
two flags can be set.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-06-24 Thread via GitHub


jsancio commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1651072253


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -102,6 +109,11 @@ object StorageTool extends Logging {
   setClusterId(clusterId).
   setNodeId(config.nodeId).
   build()
+val standaloneMode = namespace.getBoolean("standalone")
+val advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint] = 
config.effectiveAdvertisedListeners
+
+// effectiveAdvertisedControllerListeners to be added

Review Comment:
   I think we should wait. The bootstrap checkpoint file is incorrect without 
https://github.com/apache/kafka/pull/16235https://github.com/apache/kafka/pull/16235



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518 : Adding standalone argument for storage [kafka]

2024-06-17 Thread via GitHub


jsancio commented on PR #16325:
URL: https://github.com/apache/kafka/pull/16325#issuecomment-2173806713

   > cat 
/tmp/kraft-combined-logs/__cluster_metadata-0/-00.checkpoint
   
   @muralibasani You can use `bin/kafka-dump-log --cluster-metadata-decoder 
--files 
/tmp/kraft-combined-logs/__cluster_metadata-0/-00.checkpoint`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518 : Adding standalone argument for storage [kafka]

2024-06-17 Thread via GitHub


jsancio commented on code in PR #16325:
URL: https://github.com/apache/kafka/pull/16325#discussion_r1642996134


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -46,6 +51,8 @@ import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
 
+  val clusterMetadataDir: String = "/__cluster_metadata-0"

Review Comment:
   This is defined in 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L31-L35
   
   And we resolve the actual directory like this: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L57-L62



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -226,7 +238,12 @@ object StorageTool extends Logging {
   help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION}")
 formatParser.addArgument("--feature", "-f").
   help("A feature upgrade we should perform, in feature=level format. For 
example: `metadata.version=5`.").
-  action(append());
+  action(append())
+formatParser.addArgument("--standalone", "-s").
+  help("This command will 1) create a meta.properties file in 
metadata.log.dir with a randomly generated" +
+" directory.id, 2) create a snapshot at 
-00.checkpoint with the necessary control" +
+" records (KRaftVersionRecord and VotersRecord) to make this Kafka 
node the only voter for the quorum").

Review Comment:
   I think this is too low level. Most users wont understand what this mean. 
How about?
   > This flag will bootstrap the controller in standalone as the only KRaft 
controller if the Kafka cluster. Use the --controller-quorum-voters flag 
instead to bootstrap a controller cluster with more than one controller.



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -548,6 +570,7 @@ object StorageTool extends Logging {
 if (metaPropertiesEnsemble.emptyLogDirs().isEmpty) {
   stream.println("All of the log directories are already formatted.")
 } else {
+  val directoryId = copier.generateValidDirectoryId()

Review Comment:
   This doesn't look correct. We need to use the directoryId of the metadata 
log directory. After the `copier` has generated all of the directory ids in 
`copier.writeLogDirChanges()`, you want to read the directory id in the 
`meta.properties` of the `metadataLogDir`.



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -589,4 +616,56 @@ object StorageTool extends Logging {
   (nameAndLevel._1, nameAndLevel._2)
 }.toMap
   }
-}
+
+  def writeCheckpointFile(stream: PrintStream, logDir: String, directoryId: 
Uuid,
+  advertisedListenerEndpoints: 
scala.collection.Seq[kafka.cluster.EndPoint]): Unit = {
+val snapshotCheckpointDir = logDir + clusterMetadataDir
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+if(advertisedListenerEndpoints.nonEmpty){
+  val voterSet: VoterSet = getVoterSet(directoryId, 
advertisedListenerEndpoints)
+
+  val builder = new RecordsSnapshotWriter.Builder()
+.setKraftVersion(1)
+.setVoterSet(Optional.of(voterSet))
+.setRawSnapshotWriter(rawSnapshotWriter).build(new StringSerde)
+
+  // Close the builder to finalize the snapshot
+  builder.freeze()
+  builder.close()
+  stream.println(s"Snapshot written to $checkpointFilePath")
+}
+  }
+
+  private def getVoterSet(directoryId: Uuid, advertisedListenerEndpoints: 
collection.Seq[EndPoint]) = {
+// Create a VotersRecord endpoint collection
+val endpointCollection = new VotersRecord.EndpointCollection()
+advertisedListenerEndpoints.foreach(endpoint => {
+  endpointCollection.add(new 
VotersRecord.Endpoint().setName(endpoint.listenerName.value())
+.setHost(endpoint.host).setPort(endpoint.port))
+})
+
+// Create voters
+val voters: util.List[VotersRecord.Voter] = new util.ArrayList()
+voters.add(new VotersRecord.Voter()
+  .setVoterId(1)
+  .setVoterDirectoryId(directoryId)
+  .setEndpoints(endpointCollection))
+
+// Create Voter set
+val voterNewRecord = new VotersRecord().setVersion(1).setVoters(voters)
+val voterSet = VoterSet.fromVotersRecord(voterNewRecord)
+voterSet
+  }
+}

Review Comment:
   Missing newline character.



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -589,4 +616,56 

Re: [PR] KAFKA-16518 : Adding standalone argument for storage [kafka]

2024-06-15 Thread via GitHub


muralibasani commented on PR #16325:
URL: https://github.com/apache/kafka/pull/16325#issuecomment-2170896788

   @jsancio Output looks like below.
   
   ./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties -s
   metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY})
   Snapshot written to /tmp/kraft-combined-logs/__cluster_metadata-0
   Formatting /tmp/kraft-combined-logs with metadata.version 3.7-IV4.
   
   % cat 
/tmp/kraft-combined-logs/__cluster_metadata-0/-00.checkpoint
   �[� �%�%��*
   l�#�� �ﱃS�=��7
   PLAINTEXT
   localhost#�?)��� �% �% �%


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-06-13 Thread via GitHub


muralibasani closed pull request #16094: KAFKA-16518; Adding standalone 
argument for storage 
URL: https://github.com/apache/kafka/pull/16094


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-05-30 Thread via GitHub


muralibasani commented on code in PR #16094:
URL: https://github.com/apache/kafka/pull/16094#discussion_r1619077543


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+System.out.println("logDir : "+ logDir)
 copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
   setDirectoryId(copier.generateValidDirectoryId()).
   build())
 copier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+  System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
 copier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
 copier.writeLogDirChanges()
+// Write new file checkpoint file if standalone mode
+if(standaloneMode){
+  writeCheckpointFile(logDir)
+}
   })
 }
 0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+val builder = new RecordsSnapshotWriter.Builder()
+  .setKraftVersion(1)
+  .setVoterSet(Optional.empty())

Review Comment:
   @jsancio Thanks. when trying to create a voterset, VoterNode constructor is 
not accessible in core\tools, as VoterNode is part of raft package. Any 
suggestions ?
   
   We need this public constructor 
https://github.com/apache/kafka/blob/8068a086a3f41b9f7e4d4a1dab3338f029089f23/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java#L49



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-05-30 Thread via GitHub


muralibasani commented on code in PR #16094:
URL: https://github.com/apache/kafka/pull/16094#discussion_r1619077543


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+System.out.println("logDir : "+ logDir)
 copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
   setDirectoryId(copier.generateValidDirectoryId()).
   build())
 copier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+  System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
 copier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
 copier.writeLogDirChanges()
+// Write new file checkpoint file if standalone mode
+if(standaloneMode){
+  writeCheckpointFile(logDir)
+}
   })
 }
 0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+val builder = new RecordsSnapshotWriter.Builder()
+  .setKraftVersion(1)
+  .setVoterSet(Optional.empty())

Review Comment:
   @jsancio Thanks. when trying to create a voterset, VoterNode constructor is 
not accessible in core\tools, as VoterNode is part of raft package. Any 
suggestions ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-05-29 Thread via GitHub


muralibasani commented on code in PR #16094:
URL: https://github.com/apache/kafka/pull/16094#discussion_r1619109614


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+System.out.println("logDir : "+ logDir)
 copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
   setDirectoryId(copier.generateValidDirectoryId()).
   build())
 copier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+  System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
 copier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
 copier.writeLogDirChanges()
+// Write new file checkpoint file if standalone mode
+if(standaloneMode){
+  writeCheckpointFile(logDir)
+}
   })
 }
 0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+val builder = new RecordsSnapshotWriter.Builder()
+  .setKraftVersion(1)
+  .setVoterSet(Optional.empty())

Review Comment:
   trying this way 
   ```
   
 val voterMap: java.util.Map[Integer, VoterSet.VoterNode] = new 
util.HashMap()
 voterMap.put(1, new VoterSet.VoterNode(
   ReplicaKey.of(nodeId, Optional.of(directoryId)),
   Collections.singletonMap(controllerListenerNames, new 
InetSocketAddress(host, port)),
   new SupportedVersionRange(0.toShort, 1.toShort)
 ))
   val voterSet = new VoterSet(voterMap)
   
   ```



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+System.out.println("logDir : "+ logDir)
 copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
   setDirectoryId(copier.generateValidDirectoryId()).
   build())
 copier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+  System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
 copier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
 copier.writeLogDirChanges()
+// Write new file checkpoint file if standalone mode
+if(standaloneMode){
+  writeCheckpointFile(logDir)
+}
   })
 }
 0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+val builder = new RecordsSnapshotWriter.Builder()
+  .setKraftVersion(1)
+  .setVoterSet(Optional.empty())

Review Comment:
   trying this way 
   ```
   
 val voterMap: java.util.Map[Integer, VoterSet.VoterNode] = new 
util.HashMap()
 voterMap.put(1, new VoterSet.VoterNode(
   ReplicaKey.of(nodeId, Optional.of(directoryId)),
   Collections.singletonMap(controllerListenerNames, new 
InetSocketAddress(host, port)),
   new SupportedVersionRange(0.toShort, 

Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-05-29 Thread via GitHub


muralibasani commented on code in PR #16094:
URL: https://github.com/apache/kafka/pull/16094#discussion_r1619109614


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+System.out.println("logDir : "+ logDir)
 copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
   setDirectoryId(copier.generateValidDirectoryId()).
   build())
 copier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+  System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
 copier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
 copier.writeLogDirChanges()
+// Write new file checkpoint file if standalone mode
+if(standaloneMode){
+  writeCheckpointFile(logDir)
+}
   })
 }
 0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+val builder = new RecordsSnapshotWriter.Builder()
+  .setKraftVersion(1)
+  .setVoterSet(Optional.empty())

Review Comment:
   trying this way 
   ```
   
 val voterMap: java.util.Map[Integer, VoterSet.VoterNode] = new 
util.HashMap()
 voterMap.put(1, new VoterSet.VoterNode(
   ReplicaKey.of(nodeId, Optional.of(directoryId)),
   Collections.singletonMap(controllerListenerNames, new 
InetSocketAddress(host, port)),
   new SupportedVersionRange(0.toShort, 1.toShort)
 ))
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-05-29 Thread via GitHub


muralibasani commented on code in PR #16094:
URL: https://github.com/apache/kafka/pull/16094#discussion_r1619109614


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+System.out.println("logDir : "+ logDir)
 copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
   setDirectoryId(copier.generateValidDirectoryId()).
   build())
 copier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+  System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
 copier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
 copier.writeLogDirChanges()
+// Write new file checkpoint file if standalone mode
+if(standaloneMode){
+  writeCheckpointFile(logDir)
+}
   })
 }
 0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+val builder = new RecordsSnapshotWriter.Builder()
+  .setKraftVersion(1)
+  .setVoterSet(Optional.empty())

Review Comment:
   trying this way 
   ```
   
 val voterMap: java.util.Map[Integer, InetSocketAddress] = new 
util.HashMap()
 voterMap.put(1, new VoterSet.VoterNode(
   ReplicaKey.of(nodeId, Optional.of(directoryId)),
   Collections.singletonMap(controllerListenerNames, new 
InetSocketAddress(host, port)),
   new SupportedVersionRange(0.toShort, 1.toShort)
 ))
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16518; Adding standalone argument for storage [kafka]

2024-05-29 Thread via GitHub


muralibasani commented on code in PR #16094:
URL: https://github.com/apache/kafka/pull/16094#discussion_r1619077543


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -452,21 +460,68 @@ object StorageTool extends Logging {
   stream.println("All of the log directories are already formatted.")
 } else {
   metaPropertiesEnsemble.emptyLogDirs().forEach(logDir => {
+System.out.println("logDir : "+ logDir)
 copier.setLogDirProps(logDir, new 
MetaProperties.Builder(metaProperties).
   setDirectoryId(copier.generateValidDirectoryId()).
   build())
 copier.setPreWriteHandler((logDir, _, _) => {
   stream.println(s"Formatting $logDir with metadata.version 
$metadataVersion.")
   Files.createDirectories(Paths.get(logDir))
   val bootstrapDirectory = new BootstrapDirectory(logDir, 
Optional.empty())
+  System.out.println("bootstrapDirectory : "+ bootstrapDirectory)
   bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
 })
 copier.setWriteErrorHandler((logDir, e) => {
   throw new TerseFailure(s"Error while writing meta.properties file 
$logDir: ${e.getMessage}")
 })
 copier.writeLogDirChanges()
+// Write new file checkpoint file if standalone mode
+if(standaloneMode){
+  writeCheckpointFile(logDir)
+}
   })
 }
 0
   }
+
+  def writeCheckpointFile(logDir: String): Unit = {
+val snapshotCheckpointDir = logDir + "/__cluster_metadata-0"
+
+// Ensure the directory exists
+val snapshotDir = Paths.get(snapshotCheckpointDir)
+if (!Files.exists(snapshotDir)) {
+  Files.createDirectories(snapshotDir)
+}
+
+// Create the full path for the checkpoint file
+val checkpointFilePath = snapshotDir.resolve(snapshotDir)
+
+// Create the raw snapshot writer
+val rawSnapshotWriter = FileRawSnapshotWriter.create(checkpointFilePath, 
new OffsetAndEpoch(0, 0))
+
+val builder = new RecordsSnapshotWriter.Builder()
+  .setKraftVersion(1)
+  .setVoterSet(Optional.empty())

Review Comment:
   Thanks. when trying to create a voterset, VoterNode constructor is not 
accessible in tools, as VoterNode is part of raft package. Any suggestions ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org