mimaison commented on code in PR #15834:
URL: https://github.com/apache/kafka/pull/15834#discussion_r1624370212


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
+
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return     The exit code
+   */
+  def runMain(args: Array[String]): Int = {
+    val namespace = parseArguments(args)
+    val command = namespace.getString("command")
+    val config = parseConfig(namespace.getString("config"))
+    command match {
+      case "info" =>
+        val directories = configToLogDirectories(config)
+        val selfManagedMode = configToSelfManagedMode(config)
+        infoCommand(System.out, selfManagedMode, directories)
+
+      case "format" =>
+        runFormatCommand(namespace, config)
+
+      case "random-uuid" =>
+        System.out.println(Uuid.randomUuid)
+        0
+      case _ =>
+        throw new RuntimeException(s"Unknown command $command")
+    }
+  }
+
   def main(args: Array[String]): Unit = {
+    var exitCode: Integer = 0
+    var message: Option[String] = None
     try {
-      val namespace = parseArguments(args)
-      val command = namespace.getString("command")
-      val config = Option(namespace.getString("config")).flatMap(
-        p => Some(new KafkaConfig(Utils.loadProps(p))))
-      command match {
-        case "info" =>
-          val directories = configToLogDirectories(config.get)
-          val selfManagedMode = configToSelfManagedMode(config.get)
-          Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-        case "format" =>
-          val directories = configToLogDirectories(config.get)
-          val clusterId = namespace.getString("cluster_id")
-          val metaProperties = new MetaProperties.Builder().
-            setVersion(MetaPropertiesVersion.V1).
-            setClusterId(clusterId).
-            setNodeId(config.get.nodeId).
-            build()
-          val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-          val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-          val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-          if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-            throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-          }
-          val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-          val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-          validateMetadataVersion(metadataVersion, config)
-          // Get all other features, validate, and create records for them
-          // Use latest default for features if --release-version is not 
specified
-          generateFeatureRecords(
-            metadataRecords,
-            metadataVersion,
-            featureNamesAndLevelsMap,
-            Features.PRODUCTION_FEATURES.asScala.toList,
-            config.get.unstableFeatureVersionsEnabled,
-            releaseVersionFlagSpecified
-          )
-          
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-            if (!metadataVersion.isScramSupported) {
-              throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-            }
-            for (record <- userScramCredentialRecords) {
-              metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-            }
-          })
-
-          val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-          val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-          if (!configToSelfManagedMode(config.get)) {
-            throw new TerseFailure("The kafka configuration file appears to be 
for " +
-              "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-          }
-          Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-                                  metadataVersion,ignoreFormatted))
+      exitCode = runMain(args)
+    } catch {
+      case e: TerseFailure =>
+        exitCode = 1
+        message = Some(e.getMessage)
+    }
+    message.foreach(System.err.println)
+    Exit.exit(exitCode, message)
+  }
 
-        case "random-uuid" =>
-          System.out.println(Uuid.randomUuid)
-          Exit.exit(0)
+  private def parseConfig(configFilename: String): KafkaConfig = {
+    try {
+      new KafkaConfig(Utils.loadProps(configFilename))
+    } catch {
+      case e: IllegalArgumentException => throw new TerseFailure(s"Invalid 
configuration: ${e.getMessage}")

Review Comment:
   Wrapping `IllegalArgumentException` slightly changes the behavior in case of 
bad configurations. For example if I set `log.dirs=`, on trunk I get:
   ```
   Exception in thread "main" java.lang.IllegalArgumentException: requirement 
failed: At least one log directory must be defined via log.dirs or log.dir.
        at scala.Predef$.require(Predef.scala:337)
        at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2316)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:2285)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1634)
        at kafka.tools.StorageTool$.$anonfun$main$1(StorageTool.scala:52)
        at scala.Option.flatMap(Option.scala:283)
        at kafka.tools.StorageTool$.main(StorageTool.scala:52)
        at kafka.tools.StorageTool.main(StorageTool.scala)
   ```
   
   With this PR I get:
   ```
   Invalid configuration: requirement failed: At least one log directory must 
be defined via log.dirs or log.dir.
   ```



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
+
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return     The exit code
+   */
+  def runMain(args: Array[String]): Int = {
+    val namespace = parseArguments(args)
+    val command = namespace.getString("command")
+    val config = parseConfig(namespace.getString("config"))

Review Comment:
   We can't run this when the command is `random-uuid`, as `KafkaConfig` 
requires some configs to be set and we don't provide any:
   ```
   KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
   Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
Missing required configuration `zookeeper.connect` which has no default value.
        at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1223)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1214)
        at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:544)
        at kafka.tools.StorageTool$.parseConfig(StorageTool.scala:91)
        at kafka.tools.StorageTool$.runMain(StorageTool.scala:57)
        at kafka.tools.StorageTool$.main(StorageTool.scala:79)
        at kafka.tools.StorageTool.main(StorageTool.scala)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to