This is an automated email from the ASF dual-hosted git repository.

soarez pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 2ab6a3608e1 KAFKA-16606 Gate JBOD configuration on 3.7-IV2 (#15834)
2ab6a3608e1 is described below

commit 2ab6a3608e1bfde0ae90d74b490dea65ee366c42
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Fri Jun 7 11:11:57 2024 +0300

    KAFKA-16606 Gate JBOD configuration on 3.7-IV2 (#15834)
    
    Support for multiple log directories in KRaft exists from
    MetataVersion 3.7-IV2.
    
    When migrating a ZK broker to KRaft, we already check that
    the IBP is high enough before allowing the broker to startup.
    
    With KIP-584 and KIP-778, Brokers in KRaft mode do not require
    the IBP configuration - the configuration is deprecated.
    In KRaft mode inter.broker.protocol.version defaults to
    MetadataVersion.MINIMUM_KRAFT_VERSION (IBP_3_0_IV1).
    
    Instead KRaft brokers discover the MetadataVersion by reading
    the "metadata.version" FeatureLevelRecord from the cluster metadata.
    
    This change adds a new configuration validation step upon discovering
    the "metadata.version" from the cluster metadata.
    
    Reviewers: Mickael Maison <mickael.mai...@gmail.com>
---
 .../server/MetadataVersionConfigValidator.java     |  71 +++++++++
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |  20 +++
 core/src/main/scala/kafka/tools/StorageTool.scala  | 160 ++++++++++++---------
 .../server/MetadataVersionConfigValidatorTest.java | 100 +++++++++++++
 .../test/scala/unit/kafka/log/LogConfigTest.scala  |  18 +++
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  35 ++++-
 7 files changed, 339 insertions(+), 67 deletions(-)

diff --git 
a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java 
b/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java
new file mode 100644
index 00000000000..042ac09452f
--- /dev/null
+++ b/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+
+public class MetadataVersionConfigValidator implements MetadataPublisher {
+    private final String name;
+    private final KafkaConfig config;
+    private final FaultHandler faultHandler;
+
+    public MetadataVersionConfigValidator(
+            KafkaConfig config,
+            FaultHandler faultHandler
+    ) {
+        int id = config.brokerId();
+        this.name = "MetadataVersionPublisher(id=" + id + ")";
+        this.config = config;
+        this.faultHandler = faultHandler;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void onMetadataUpdate(
+            MetadataDelta delta,
+            MetadataImage newImage,
+            LoaderManifest manifest
+    ) {
+        if (delta.featuresDelta() != null) {
+            if (delta.metadataVersionChanged().isPresent()) {
+                
onMetadataVersionChanged(newImage.features().metadataVersion());
+            }
+        }
+    }
+
+    private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
+        try {
+            this.config.validateWithMetadataVersion(metadataVersion);
+        } catch (Throwable t) {
+            RuntimeException exception = this.faultHandler.handleFault(
+                    "Broker configuration does not support the cluster 
MetadataVersion", t);
+            if (exception != null) {
+                throw exception;
+            }
+        }
+    }
+}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 64a4fd7474a..94bcf321420 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -447,6 +447,7 @@ class BrokerServer(
         rlm.startup()
       }
 
+      metadataPublishers.add(new MetadataVersionConfigValidator(config, 
sharedServer.metadataPublishingFaultHandler))
       brokerMetadataPublisher = new BrokerMetadataPublisher(config,
         metadataCache,
         logManager,
@@ -489,6 +490,7 @@ class BrokerServer(
         () => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
       metadataPublishers.add(brokerRegistrationTracker)
 
+
       // Register parts of the broker that can be reconfigured via dynamic 
configs.  This needs to
       // be done before we publish the dynamic configs, so that we don't miss 
anything.
       config.dynamicConfig.addReconfigurables(this)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 95ad0bfb839..f23bf349181 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1365,6 +1365,14 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
       }
       validateAdvertisedListenersNonEmptyForBroker()
     }
+    if (processRoles.contains(ProcessRole.BrokerRole)
+      && 
originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
+      && logDirs.size > 1) {
+        require(interBrokerProtocolVersion.isDirectoryAssignmentSupported,
+          s"Multiple log directories (aka JBOD) are not supported with the 
configured " +
+            s"${interBrokerProtocolVersion} 
${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}. " +
+            s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
+    }
 
     val listenerNames = listeners.map(_.listenerName).toSet
     if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) 
{
@@ -1462,6 +1470,18 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
     }
   }
 
+  /**
+   * Validate some configurations for new MetadataVersion. A new 
MetadataVersion can take place when
+   * a FeatureLevelRecord for "metadata.version" is read from the cluster 
metadata.
+   */
+  def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {
+    if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) {
+      require(metadataVersion.isDirectoryAssignmentSupported,
+        s"Multiple log directories (aka JBOD) are not supported in the current 
MetadataVersion ${metadataVersion}. " +
+          s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
+    }
+  }
+
   /**
    * Copy the subset of properties that are relevant to Logs. The individual 
properties
    * are listed here since the names are slightly different in each Config 
class...
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 8481f8468b9..c9f51395852 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ArrayBuffer
 
 object StorageTool extends Logging {
+
   def main(args: Array[String]): Unit = {
+    var exitCode: Integer = 0
+    var message: Option[String] = None
     try {
-      val namespace = parseArguments(args)
-      val command = namespace.getString("command")
-      val config = Option(namespace.getString("config")).flatMap(
-        p => Some(new KafkaConfig(Utils.loadProps(p))))
-      command match {
-        case "info" =>
-          val directories = configToLogDirectories(config.get)
-          val selfManagedMode = configToSelfManagedMode(config.get)
-          Exit.exit(infoCommand(System.out, selfManagedMode, directories))
-
-        case "format" =>
-          val directories = configToLogDirectories(config.get)
-          val clusterId = namespace.getString("cluster_id")
-          val metaProperties = new MetaProperties.Builder().
-            setVersion(MetaPropertiesVersion.V1).
-            setClusterId(clusterId).
-            setNodeId(config.get.nodeId).
-            build()
-          val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
-          val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
-          val releaseVersionFlagSpecified = 
namespace.getString("release_version") != null
-          if (releaseVersionFlagSpecified && specifiedFeatures != null) {
-            throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
-          }
-          val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
-          val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
-            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-          validateMetadataVersion(metadataVersion, config)
-          // Get all other features, validate, and create records for them
-          // Use latest default for features if --release-version is not 
specified
-          generateFeatureRecords(
-            metadataRecords,
-            metadataVersion,
-            featureNamesAndLevelsMap,
-            Features.PRODUCTION_FEATURES.asScala.toList,
-            config.get.unstableFeatureVersionsEnabled,
-            releaseVersionFlagSpecified
-          )
-          
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
-            if (!metadataVersion.isScramSupported) {
-              throw new TerseFailure(s"SCRAM is only supported in 
metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.")
-            }
-            for (record <- userScramCredentialRecords) {
-              metadataRecords.append(new ApiMessageAndVersion(record, 
0.toShort))
-            }
-          })
-
-          val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
-          val ignoreFormatted = namespace.getBoolean("ignore_formatted")
-          if (!configToSelfManagedMode(config.get)) {
-            throw new TerseFailure("The kafka configuration file appears to be 
for " +
-              "a legacy cluster. Formatting is only supported for clusters in 
KRaft mode.")
-          }
-          Exit.exit(formatCommand(System.out, directories, metaProperties, 
bootstrapMetadata,
-                                  metadataVersion,ignoreFormatted))
+      exitCode = execute(args)
+    } catch {
+      case e: TerseFailure =>
+        exitCode = 1
+        message = Some(e.getMessage)
+    }
+    message.foreach(System.err.println)
+    Exit.exit(exitCode, message)
+  }
 
-        case "random-uuid" =>
-          System.out.println(Uuid.randomUuid)
-          Exit.exit(0)
+  /**
+   * Executes the command according to the given arguments and returns the 
appropriate exit code.
+   * @param args The command line arguments
+   * @return     The exit code
+   */
+  def execute(args: Array[String]): Int = {
+    val namespace = parseArguments(args)
+    val command = namespace.getString("command")
+    val config = Option(namespace.getString("config")).flatMap(
+      p => Some(new KafkaConfig(Utils.loadProps(p))))
+    command match {
+      case "info" =>
+        val directories = configToLogDirectories(config.get)
+        val selfManagedMode = configToSelfManagedMode(config.get)
+        infoCommand(System.out, selfManagedMode, directories)
+
+      case "format" =>
+        runFormatCommand(namespace, config.get)
+
+      case "random-uuid" =>
+        System.out.println(Uuid.randomUuid)
+        0
+      case _ =>
+        throw new RuntimeException(s"Unknown command $command")
+    }
+  }
 
-        case _ =>
-          throw new RuntimeException(s"Unknown command $command")
+  /**
+   * Validates arguments, configuration, prepares bootstrap metadata and 
delegates to {{@link formatCommand}}.
+   * Visible for testing.
+   * @param namespace   Arguments
+   * @param config      The server configuration
+   * @return            The exit code
+   */
+  def runFormatCommand(namespace: Namespace, config: KafkaConfig) = {
+    val directories = configToLogDirectories(config)
+    val clusterId = namespace.getString("cluster_id")
+    val metaProperties = new MetaProperties.Builder().
+      setVersion(MetaPropertiesVersion.V1).
+      setClusterId(clusterId).
+      setNodeId(config.nodeId).
+      build()
+    val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
+    val specifiedFeatures: util.List[String] = namespace.getList("feature")
+    val releaseVersionFlagSpecified = namespace.getString("release_version") 
!= null
+    if (releaseVersionFlagSpecified && specifiedFeatures != null) {
+      throw new TerseFailure("Both --release-version and --feature were set. 
Only one of the two flags can be set.")
+    }
+    val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+    val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+      
Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+    validateMetadataVersion(metadataVersion, config)
+    // Get all other features, validate, and create records for them
+    // Use latest default for features if --release-version is not specified
+    generateFeatureRecords(
+      metadataRecords,
+      metadataVersion,
+      featureNamesAndLevelsMap,
+      Features.PRODUCTION_FEATURES.asScala.toList,
+      config.unstableFeatureVersionsEnabled,
+      releaseVersionFlagSpecified
+    )
+    
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
+      if (!metadataVersion.isScramSupported) {
+        throw new TerseFailure(s"SCRAM is only supported in metadata.version 
${MetadataVersion.IBP_3_5_IV2} or later.")
       }
-    } catch {
-      case e: TerseFailure =>
-        System.err.println(e.getMessage)
-        Exit.exit(1, Some(e.getMessage))
+      for (record <- userScramCredentialRecords) {
+        metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
+      }
+    })
+    val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, 
Some(metadataRecords), "format command")
+    val ignoreFormatted = namespace.getBoolean("ignore_formatted")
+    if (!configToSelfManagedMode(config)) {
+      throw new TerseFailure("The kafka configuration file appears to be for " 
+
+        "a legacy cluster. Formatting is only supported for clusters in KRaft 
mode.")
     }
+    formatCommand(System.out, directories, metaProperties, bootstrapMetadata,
+      metadataVersion,ignoreFormatted)
   }
 
-  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: KafkaConfig): Unit = {
     if (!metadataVersion.isKRaftSupported) {
       throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
     }
     if (!metadataVersion.isProduction) {
-      if (config.get.unstableFeatureVersionsEnabled) {
+      if (config.unstableFeatureVersionsEnabled) {
         System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
       } else {
         throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
       }
     }
+    try {
+      config.validateWithMetadataVersion(metadataVersion)
+    } catch {
+      case e: IllegalArgumentException => throw new TerseFailure(s"Invalid 
configuration for metadata version: ${e.getMessage}")
+    }
   }
 
   private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
diff --git 
a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java 
b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java
new file mode 100644
index 00000000000..a484d592e23
--- /dev/null
+++ b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.BDDMockito.willAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class MetadataVersionConfigValidatorTest {
+
+    private static final LogDeltaManifest TEST_MANIFEST = 
LogDeltaManifest.newBuilder()
+            .provenance(MetadataProvenance.EMPTY)
+            .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
+            .numBatches(1)
+            .elapsedNs(90)
+            .numBytes(88)
+            .build();
+    public static final MetadataProvenance TEST_PROVENANCE =
+            new MetadataProvenance(50, 3, 8000);
+
+    void testWith(MetadataVersion metadataVersion, KafkaConfig config, 
FaultHandler faultHandler) throws Exception {
+        try (MetadataVersionConfigValidator validator = new 
MetadataVersionConfigValidator(config, faultHandler)) {
+            MetadataDelta delta = new MetadataDelta.Builder()
+                    .setImage(MetadataImage.EMPTY)
+                    .build();
+            if (metadataVersion != null) {
+                delta.replay(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(metadataVersion.featureLevel()));
+            }
+            MetadataImage image = delta.apply(TEST_PROVENANCE);
+
+            validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
+        }
+    }
+
+    @Test
+    void testValidatesConfigOnMetadataChange() throws Exception {
+        MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
+        KafkaConfig config = mock(KafkaConfig.class);
+        FaultHandler faultHandler = mock(FaultHandler.class);
+
+        when(config.brokerId()).thenReturn(8);
+
+        testWith(metadataVersion, config, faultHandler);
+
+        verify(config, 
times(1)).validateWithMetadataVersion(eq(metadataVersion));
+        verifyNoMoreInteractions(faultHandler);
+    }
+
+    @SuppressWarnings("ThrowableNotThrown")
+    @Test
+    void testInvokesFaultHandlerOnException() throws Exception {
+        MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
+        Exception exception = new Exception();
+        KafkaConfig config = mock(KafkaConfig.class);
+        FaultHandler faultHandler = mock(FaultHandler.class);
+
+        when(config.brokerId()).thenReturn(8);
+        willAnswer(invocation -> {
+            throw exception;
+        }).given(config).validateWithMetadataVersion(eq(metadataVersion));
+
+        testWith(metadataVersion, config, faultHandler);
+
+        verify(config, 
times(1)).validateWithMetadataVersion(eq(metadataVersion));
+        verify(faultHandler, times(1)).handleFault(
+                eq("Broker configuration does not support the cluster 
MetadataVersion"),
+                eq(exception));
+    }
+}
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index ed91c936edc..7be9bcd0267 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -22,6 +22,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
 import org.apache.kafka.common.config.ConfigDef.Type.INT
 import org.apache.kafka.common.config.{ConfigException, SslConfigs, 
TopicConfig}
+import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
@@ -419,4 +420,21 @@ class LogConfigTest {
     assertEquals(oneDayInMillis, 
logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG))
     assertEquals(oneDayInMillis, 
logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG))
   }
+
+  @Test
+  def testValidateWithMetadataVersionJbodSupport(): Unit = {
+    def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
+      KafkaConfig.fromProps(
+          TestUtils.createBrokerConfig(nodeId = 0, zkConnect = null, 
logDirCount = if (jbodConfig) 2 else 1)
+        ).validateWithMetadataVersion(metadataVersion)
+
+    validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false)
+    validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false)
+    validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false)
+    assertThrows(classOf[IllegalArgumentException], () =>
+      validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true))
+    assertThrows(classOf[IllegalArgumentException], () =>
+      validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true))
+    validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 60ddda5bc31..3f38708dee8 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -33,11 +33,12 @@ import 
org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataV
 import org.apache.kafka.common.metadata.{FeatureLevelRecord, 
UserScramCredentialRecord}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, 
ServerLogConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
 import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
+import org.mockito.Mockito
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -655,5 +656,37 @@ Found problem:
       assertEquals(1, exitStatus)
     }
   }
+
+  @Test
+  def testFormatValidatesConfigForMetadataVersion(): Unit = {
+    val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, 
null)))
+    val args = Array("format",
+      "-c", "dummy.properties",
+      "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+      "--release-version", MetadataVersion.LATEST_PRODUCTION.toString)
+    val exitCode = 
StorageTool.runFormatCommand(StorageTool.parseArguments(args), config)
+    Mockito.verify(config, 
Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION)
+    assertEquals(0, exitCode)
+  }
+
+  @Test
+  def testJbodSupportValidation(): Unit = {
+    def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): 
Integer = {
+      val properties = TestUtils.createBrokerConfig(10, null, logDirCount = 
logDirCount)
+      
properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)
+      val configFile = 
TestUtils.tempPropertiesFile(properties.asScala.toMap).toPath.toString
+      StorageTool.execute(Array("format",
+        "-c", configFile,
+        "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+        "--release-version", metadataVersion.toString))
+    }
+
+    assertEquals(0, formatWith(1, MetadataVersion.IBP_3_6_IV2))
+    assertEquals("Invalid configuration for metadata version: " +
+      "requirement failed: Multiple log directories (aka JBOD) are not 
supported in the current MetadataVersion 3.6-IV2. Need 3.7-IV2 or higher",
+      assertThrows(classOf[TerseFailure], () => formatWith(2, 
MetadataVersion.IBP_3_6_IV2)).getMessage)
+    assertEquals(0, formatWith(1, MetadataVersion.IBP_3_7_IV2))
+    assertEquals(0, formatWith(2, MetadataVersion.IBP_3_7_IV2))
+  }
 }
 

Reply via email to