Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-08 Thread via GitHub


soarez commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2155838189

   @ableegoldman @mimaison please see  #16251


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-07 Thread via GitHub


ableegoldman commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2155777043

   Hey @soarez / @mimaison -- I've noticed the 
ReassignPartitionsCommandTest.testReassignmentCompletionDuringPartialUpgrade 
test seems to be failing consistently since this was merged, with an error 
message that seems to be related to my unfamiliar eyes. Can you take a quick 
look into this failing test?
   
   You can find the full logs on the [build for this 
PR](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15834/7/tests),
 but here is the fatal exception:
   
   ```
   java.lang.IllegalArgumentException: requirement failed: Multiple log 
directories (aka JBOD) are not supported with the configured 3.0-IV1 
inter.broker.protocol.version. Need 3.7-IV2 or higher
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-07 Thread via GitHub


soarez commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2154323969

   Thanks for reviewing this @mimaison. Backporting to 3.8 and 3.7.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-07 Thread via GitHub


soarez merged PR #15834:
URL: https://github.com/apache/kafka/pull/15834


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-06 Thread via GitHub


mimaison commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2152667234

   You can backport it to 3.7 too if you want.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-06 Thread via GitHub


mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1629590599


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -656,5 +657,47 @@ Found problem:
   assertEquals(1, exitStatus)
 }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+val args = Array("format",
+  "-c", "dummy.properties",
+  "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+  "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+assertEquals(0, exitCode)
+  }
+
+  private def createPropsFile(properties: Properties): String = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+try {
+  properties.store(propsStream, "config.props")
+} finally {
+  propsStream.close()
+}
+propsFile.toPath.toString
+  }
+
+  @Test
+  def testJbodSupportValidation(): Unit = {
+def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): 
Integer = {
+  val properties = TestUtils.createBrokerConfig(10, null, logDirCount = 
logDirCount)
+  
properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)

Review Comment:
   Ah right, thanks for checking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-05 Thread via GitHub


soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1628002868


##
core/src/main/java/kafka/server/MetadataVersionConfigValidator.java:
##
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+
+public class MetadataVersionConfigValidator implements MetadataPublisher {

Review Comment:
   I'm glad you asked.
   
   There are two ways the broker can assume a MetadataVersion:
   
   1. Via the `inter.broker.protocol.version` configuration property in ZK mode
   2. Via discovering a `FeatureRecord` for `metadata.version` in the cluster 
metadata in KRaft mode. This is preset with `--release-version` in the format 
command or with `kafka-features.sh upgrade --metadata`.
   
   For (1) we have the version set at startup time, and we can catch the 
conifguration error in `KafkaConfig` as the properties are loaded. But for (2) 
we can only validate config later when the version is discovered from metadata.
   
   Ealier in this PR I had the check in `BrokerMetadataPublisher` — this class 
streams metadata records in the broker and distributes them to various types of 
`MetadataPublisher` for reconciliation — but instaed of adding the check to 
that class I think it's better reuse the publisher pattern, similar to how the 
broker registration re-sending was extracted to `BrokerRegistrationTracker` in 
#15945.
   
   Perhaps we could implement `MetadataPublisher` directly in `KafkaConfig`, 
but I thought that class is big enough already.
   
   Let me know what you think.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-05 Thread via GitHub


soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1627976190


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -656,5 +657,47 @@ Found problem:
   assertEquals(1, exitStatus)
 }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+val args = Array("format",
+  "-c", "dummy.properties",
+  "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+  "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+assertEquals(0, exitCode)
+  }
+
+  private def createPropsFile(properties: Properties): String = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+try {
+  properties.store(propsStream, "config.props")
+} finally {
+  propsStream.close()
+}
+propsFile.toPath.toString
+  }
+
+  @Test
+  def testJbodSupportValidation(): Unit = {
+def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): 
Integer = {
+  val properties = TestUtils.createBrokerConfig(10, null, logDirCount = 
logDirCount)
+  
properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)

Review Comment:
   This property is ignored in KRaft. It assumes the value of 
`MINIMUM_KRAFT_VERSION` and cannot be set.
   
   
https://github.com/apache/kafka/blob/f2aafcc66faca4d07a27e4ef90158136cb317b44/core/src/main/scala/kafka/server/KafkaConfig.scala#L896-L914
   
   So I don't think we need to test this? What test did you have in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-05 Thread via GitHub


soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1627855813


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -656,5 +657,47 @@ Found problem:
   assertEquals(1, exitStatus)
 }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+val args = Array("format",
+  "-c", "dummy.properties",
+  "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+  "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+assertEquals(0, exitCode)
+  }
+
+  private def createPropsFile(properties: Properties): String = {
+val propsFile = TestUtils.tempFile()
+val propsStream = Files.newOutputStream(propsFile.toPath)
+try {
+  properties.store(propsStream, "config.props")
+} finally {
+  propsStream.close()
+}
+propsFile.toPath.toString
+  }
+
+  @Test
+  def testJbodSupportValidation(): Unit = {
+def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): 
Integer = {
+  val properties = TestUtils.createBrokerConfig(10, null, logDirCount = 
logDirCount)
+  
properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
+  StorageTool.runMain(Array("format",

Review Comment:
   Makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-05 Thread via GitHub


soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1627741886


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
-  def main(args: Array[String]): Unit = {
-try {
-  val namespace = parseArguments(args)
-  val command = namespace.getString("command")
-  val config = Option(namespace.getString("config")).flatMap(
-p => Some(new KafkaConfig(Utils.loadProps(p
-  command match {
-case "info" =>
-  val directories = configToLogDirectories(config.get)
-  val selfManagedMode = configToSelfManagedMode(config.get)
-  Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-case "format" =>
-  val directories = configToLogDirectories(config.get)
-  val clusterId = namespace.getString("cluster_id")
-  val metaProperties = new MetaProperties.Builder().
-setVersion(MetaPropertiesVersion.V1).
-setClusterId(clusterId).
-setNodeId(config.get.nodeId).
-build()
-  val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-  val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-  if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-  }
-  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  validateMetadataVersion(metadataVersion, config)
-  // Get all other features, validate, and create records for them
-  // Use latest default for features if --release-version is not 
specified
-  generateFeatureRecords(
-metadataRecords,
-metadataVersion,
-featureNamesAndLevelsMap,
-Features.PRODUCTION_FEATURES.asScala.toList,
-config.get.unstableFeatureVersionsEnabled,
-releaseVersionFlagSpecified
-  )
-  
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-if (!metadataVersion.isScramSupported) {
-  throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-}
-for (record <- userScramCredentialRecords) {
-  metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-}
-  })
-
-  val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-  val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-  if (!configToSelfManagedMode(config.get)) {
-throw new TerseFailure("The kafka configuration file appears to be 
for " +
-  "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-  }
-  Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-  metadataVersion,ignoreFormatted))
 
-case "random-uuid" =>
-  System.out.println(Uuid.randomUuid)
-  Exit.exit(0)
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return The exit code
+   */
+  def runMain(args: Array[String]): Int = {

Review Comment:
   I hadn't noticed this yet. Thanks for pointing this out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-04 Thread via GitHub


mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1626218724


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
-  def main(args: Array[String]): Unit = {
-try {
-  val namespace = parseArguments(args)
-  val command = namespace.getString("command")
-  val config = Option(namespace.getString("config")).flatMap(
-p => Some(new KafkaConfig(Utils.loadProps(p
-  command match {
-case "info" =>
-  val directories = configToLogDirectories(config.get)
-  val selfManagedMode = configToSelfManagedMode(config.get)
-  Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-case "format" =>
-  val directories = configToLogDirectories(config.get)
-  val clusterId = namespace.getString("cluster_id")
-  val metaProperties = new MetaProperties.Builder().
-setVersion(MetaPropertiesVersion.V1).
-setClusterId(clusterId).
-setNodeId(config.get.nodeId).
-build()
-  val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-  val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-  if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-  }
-  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  validateMetadataVersion(metadataVersion, config)
-  // Get all other features, validate, and create records for them
-  // Use latest default for features if --release-version is not 
specified
-  generateFeatureRecords(
-metadataRecords,
-metadataVersion,
-featureNamesAndLevelsMap,
-Features.PRODUCTION_FEATURES.asScala.toList,
-config.get.unstableFeatureVersionsEnabled,
-releaseVersionFlagSpecified
-  )
-  
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-if (!metadataVersion.isScramSupported) {
-  throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-}
-for (record <- userScramCredentialRecords) {
-  metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-}
-  })
-
-  val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-  val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-  if (!configToSelfManagedMode(config.get)) {
-throw new TerseFailure("The kafka configuration file appears to be 
for " +
-  "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-  }
-  Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-  metadataVersion,ignoreFormatted))
 
-case "random-uuid" =>
-  System.out.println(Uuid.randomUuid)
-  Exit.exit(0)
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return The exit code
+   */
+  def runMain(args: Array[String]): Int = {

Review Comment:
   In other tools, the method that is called by `main()` is typically called 
`execute()`. I think it would make sense to use that name here too.



##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -656,5 +657,47 @@ Found problem:
   assertEquals(1, exitStatus)
 }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+val args = Array("format",
+  "-c", "dummy.properties",
+  "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+  "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+assertEquals(0, exitCode)
+  }
+
+  private def createPropsFile(properties: Properties): String = {
+val propsFile = 

Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-04 Thread via GitHub


soarez commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2147620965

   @mimaison I rebased due to conflicts and changed how the MV change via 
metadata records is detected to use a publisher following #15945. Please take 
another look, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-04 Thread via GitHub


soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1626025676


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
+
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return The exit code
+   */
+  def runMain(args: Array[String]): Int = {
+val namespace = parseArguments(args)
+val command = namespace.getString("command")
+val config = parseConfig(namespace.getString("config"))

Review Comment:
   You're right, thanks for pointing this out. I missed that when I simplified 
the `Option(...).flatMap()` config bit. I've brought that back now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-04 Thread via GitHub


soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1626026413


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
+
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return The exit code
+   */
+  def runMain(args: Array[String]): Int = {
+val namespace = parseArguments(args)
+val command = namespace.getString("command")
+val config = parseConfig(namespace.getString("config"))
+command match {
+  case "info" =>
+val directories = configToLogDirectories(config)
+val selfManagedMode = configToSelfManagedMode(config)
+infoCommand(System.out, selfManagedMode, directories)
+
+  case "format" =>
+runFormatCommand(namespace, config)
+
+  case "random-uuid" =>
+System.out.println(Uuid.randomUuid)
+0
+  case _ =>
+throw new RuntimeException(s"Unknown command $command")
+}
+  }
+
   def main(args: Array[String]): Unit = {
+var exitCode: Integer = 0
+var message: Option[String] = None
 try {
-  val namespace = parseArguments(args)
-  val command = namespace.getString("command")
-  val config = Option(namespace.getString("config")).flatMap(
-p => Some(new KafkaConfig(Utils.loadProps(p
-  command match {
-case "info" =>
-  val directories = configToLogDirectories(config.get)
-  val selfManagedMode = configToSelfManagedMode(config.get)
-  Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-case "format" =>
-  val directories = configToLogDirectories(config.get)
-  val clusterId = namespace.getString("cluster_id")
-  val metaProperties = new MetaProperties.Builder().
-setVersion(MetaPropertiesVersion.V1).
-setClusterId(clusterId).
-setNodeId(config.get.nodeId).
-build()
-  val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-  val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-  if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-  }
-  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  validateMetadataVersion(metadataVersion, config)
-  // Get all other features, validate, and create records for them
-  // Use latest default for features if --release-version is not 
specified
-  generateFeatureRecords(
-metadataRecords,
-metadataVersion,
-featureNamesAndLevelsMap,
-Features.PRODUCTION_FEATURES.asScala.toList,
-config.get.unstableFeatureVersionsEnabled,
-releaseVersionFlagSpecified
-  )
-  
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-if (!metadataVersion.isScramSupported) {
-  throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-}
-for (record <- userScramCredentialRecords) {
-  metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-}
-  })
-
-  val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-  val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-  if (!configToSelfManagedMode(config.get)) {
-throw new TerseFailure("The kafka configuration file appears to be 
for " +
-  "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-  }
-  Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-  metadataVersion,ignoreFormatted))
+  exitCode = runMain(args)
+} catch {
+  case e: TerseFailure =>
+exitCode = 1
+message = Some(e.getMessage)
+}
+message.foreach(System.err.println)
+Exit.exit(exitCode, message)
+  }
 
-case "random-uuid" =>
-  System.out.println(Uuid.randomUuid)
-  Exit.exit(0)
+  private def parseConfig(configFilename: String): KafkaConfig = {
+try 

Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-03 Thread via GitHub


mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1624370212


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
+
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return The exit code
+   */
+  def runMain(args: Array[String]): Int = {
+val namespace = parseArguments(args)
+val command = namespace.getString("command")
+val config = parseConfig(namespace.getString("config"))
+command match {
+  case "info" =>
+val directories = configToLogDirectories(config)
+val selfManagedMode = configToSelfManagedMode(config)
+infoCommand(System.out, selfManagedMode, directories)
+
+  case "format" =>
+runFormatCommand(namespace, config)
+
+  case "random-uuid" =>
+System.out.println(Uuid.randomUuid)
+0
+  case _ =>
+throw new RuntimeException(s"Unknown command $command")
+}
+  }
+
   def main(args: Array[String]): Unit = {
+var exitCode: Integer = 0
+var message: Option[String] = None
 try {
-  val namespace = parseArguments(args)
-  val command = namespace.getString("command")
-  val config = Option(namespace.getString("config")).flatMap(
-p => Some(new KafkaConfig(Utils.loadProps(p
-  command match {
-case "info" =>
-  val directories = configToLogDirectories(config.get)
-  val selfManagedMode = configToSelfManagedMode(config.get)
-  Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-case "format" =>
-  val directories = configToLogDirectories(config.get)
-  val clusterId = namespace.getString("cluster_id")
-  val metaProperties = new MetaProperties.Builder().
-setVersion(MetaPropertiesVersion.V1).
-setClusterId(clusterId).
-setNodeId(config.get.nodeId).
-build()
-  val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-  val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-  val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-  if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-  }
-  val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-  val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-  validateMetadataVersion(metadataVersion, config)
-  // Get all other features, validate, and create records for them
-  // Use latest default for features if --release-version is not 
specified
-  generateFeatureRecords(
-metadataRecords,
-metadataVersion,
-featureNamesAndLevelsMap,
-Features.PRODUCTION_FEATURES.asScala.toList,
-config.get.unstableFeatureVersionsEnabled,
-releaseVersionFlagSpecified
-  )
-  
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-if (!metadataVersion.isScramSupported) {
-  throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-}
-for (record <- userScramCredentialRecords) {
-  metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-}
-  })
-
-  val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-  val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-  if (!configToSelfManagedMode(config.get)) {
-throw new TerseFailure("The kafka configuration file appears to be 
for " +
-  "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-  }
-  Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-  metadataVersion,ignoreFormatted))
+  exitCode = runMain(args)
+} catch {
+  case e: TerseFailure =>
+exitCode = 1
+message = Some(e.getMessage)
+}
+message.foreach(System.err.println)
+Exit.exit(exitCode, message)
+  }
 
-case "random-uuid" =>
-  System.out.println(Uuid.randomUuid)
-  Exit.exit(0)
+  private def parseConfig(configFilename: String): KafkaConfig = {
+

Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-03 Thread via GitHub


mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1624282626


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1457,6 +1465,18 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 }
   }
 
+  /**
+   * Validate some configurations for new MetadataVersion. A new 
MetadataVersion can take place when
+   * a FeatureLevelRecord for "metadata.version" is read from the cluster 
metadata.
+   */
+  def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {

Review Comment:
   Ah right, thanks for the explanation. Then I guess it makes sense to keep 
both methods for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-06-01 Thread via GitHub


soarez commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1623198189


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1457,6 +1465,18 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 }
   }
 
+  /**
+   * Validate some configurations for new MetadataVersion. A new 
MetadataVersion can take place when
+   * a FeatureLevelRecord for "metadata.version" is read from the cluster 
metadata.
+   */
+  def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {

Review Comment:
   We can call this from `validateValues`, but then I won't be able to take 
your other suggestion to mention `inter.broker.protocol.version` in the error 
message. This one gets called when the KRaft broker discovers a new 
metadata.version via a new feature record. Would you prefer we consolidate the 
checks, or keep mention of `inter.broker.protocol.version` when it makes sense? 
   I lean towards leaving them both, so it's helpful for the operator, and the 
duplication cost should go away in 4.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-05-31 Thread via GitHub


mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1622351252


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1457,6 +1465,18 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 }
   }
 
+  /**
+   * Validate some configurations for new MetadataVersion. A new 
MetadataVersion can take place when
+   * a FeatureLevelRecord for "metadata.version" is read from the cluster 
metadata.
+   */
+  def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {

Review Comment:
   This seems to be the exact same check we do in `validateValues()`. Can we 
consolidate both?



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1360,6 +1360,14 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   }
   validateAdvertisedListenersNonEmptyForBroker()
 }
+if (processRoles.contains(ProcessRole.BrokerRole)
+  && 
originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
+  && logDirs.size > 1) {
+require(interBrokerProtocolVersion.isDirectoryAssignmentSupported,
+  s"Multiple log directories (aka JBOD) are not supported in the 
configured " +
+s"${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG} 
${interBrokerProtocolVersion}. " +

Review Comment:
   Should we add `ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG` 
after the value? So the message looks like `Multiple log directories (aka JBOD) 
are not supported with the configured 3.6 inter.broker.protocol.version`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-05-29 Thread via GitHub


soarez commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2137732518

   @mimaison Sorry, thanks for pointing that out. That was an unintentional, 
bad rebase. Sorted now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-05-29 Thread via GitHub


mimaison commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2137620473

   Thanks @soarez. 
   
   It seems you have reverted the changes to StorageTool. Is this intended?
   When using the `--release-version 3.6` flag and multiple log dirs, 
formatting does no fail anymore:
   ```
   bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties --release-version 3.6
   metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY})
   Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
   Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
   Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-05-29 Thread via GitHub


soarez commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2137444011

   Rebased. 
   
   @mimaison please take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-05-20 Thread via GitHub


soarez commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2120643465

   @mimaison thanks for having a look.
   
   Yes, I think we should check the MetadataVersion at the formatting stage 
too. Please have a look at my latest changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]

2024-05-15 Thread via GitHub


mimaison commented on PR #15834:
URL: https://github.com/apache/kafka/pull/15834#issuecomment-2112826872

   Thanks for the PR.
   
   If I set `inter.broker.protocol.version=3.6-IV2` and multiple log 
directories in my broker configuration, it now prevents me from formatting the 
storage:
   ```
   bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties
   [2024-05-15 17:05:31,009] WARN inter.broker.protocol.version is deprecated 
in KRaft mode as of 3.3 and will only be read when first upgrading from a KRaft 
prior to 3.3. See kafka-storage.sh help for details on setting the 
metadata.version for a new KRaft cluster. (kafka.server.KafkaConfig)
   Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: Multiple log directories (aka JBOD) are not supported in the configured 
inter.broker.protocol.version 3.0-IV1. Need 3.7-IV2 or higher
   ```
   
   However, if I specify the metadata version using the `--release-version` 
flag of the storage tool, the formatting works and I only get the failure when 
I try to start the broker:
   ```
   bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c 
config/kraft/server.properties --release-version 3.6
   metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY})
   Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
   Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2.
   Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2.
   ```
   
   Should we ensure the failure always happens at the formatting stage?
   
   I also created https://issues.apache.org/jira/browse/KAFKA-16771 to avoid 
printing the "Formatting <...>" log line twice for the first directory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org