This is an automated email from the ASF dual-hosted git repository. soarez pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push: new 2ab6a3608e1 KAFKA-16606 Gate JBOD configuration on 3.7-IV2 (#15834) 2ab6a3608e1 is described below commit 2ab6a3608e1bfde0ae90d74b490dea65ee366c42 Author: Igor Soarez <i...@soarez.me> AuthorDate: Fri Jun 7 11:11:57 2024 +0300 KAFKA-16606 Gate JBOD configuration on 3.7-IV2 (#15834) Support for multiple log directories in KRaft exists from MetataVersion 3.7-IV2. When migrating a ZK broker to KRaft, we already check that the IBP is high enough before allowing the broker to startup. With KIP-584 and KIP-778, Brokers in KRaft mode do not require the IBP configuration - the configuration is deprecated. In KRaft mode inter.broker.protocol.version defaults to MetadataVersion.MINIMUM_KRAFT_VERSION (IBP_3_0_IV1). Instead KRaft brokers discover the MetadataVersion by reading the "metadata.version" FeatureLevelRecord from the cluster metadata. This change adds a new configuration validation step upon discovering the "metadata.version" from the cluster metadata. Reviewers: Mickael Maison <mickael.mai...@gmail.com> --- .../server/MetadataVersionConfigValidator.java | 71 +++++++++ .../src/main/scala/kafka/server/BrokerServer.scala | 2 + core/src/main/scala/kafka/server/KafkaConfig.scala | 20 +++ core/src/main/scala/kafka/tools/StorageTool.scala | 160 ++++++++++++--------- .../server/MetadataVersionConfigValidatorTest.java | 100 +++++++++++++ .../test/scala/unit/kafka/log/LogConfigTest.scala | 18 +++ .../scala/unit/kafka/tools/StorageToolTest.scala | 35 ++++- 7 files changed, 339 insertions(+), 67 deletions(-) diff --git a/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java b/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java new file mode 100644 index 00000000000..042ac09452f --- /dev/null +++ b/core/src/main/java/kafka/server/MetadataVersionConfigValidator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.FaultHandler; + +public class MetadataVersionConfigValidator implements MetadataPublisher { + private final String name; + private final KafkaConfig config; + private final FaultHandler faultHandler; + + public MetadataVersionConfigValidator( + KafkaConfig config, + FaultHandler faultHandler + ) { + int id = config.brokerId(); + this.name = "MetadataVersionPublisher(id=" + id + ")"; + this.config = config; + this.faultHandler = faultHandler; + } + + @Override + public String name() { + return name; + } + + @Override + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage, + LoaderManifest manifest + ) { + if (delta.featuresDelta() != null) { + if (delta.metadataVersionChanged().isPresent()) { + onMetadataVersionChanged(newImage.features().metadataVersion()); + } + } + } + + private void onMetadataVersionChanged(MetadataVersion metadataVersion) { + try { + this.config.validateWithMetadataVersion(metadataVersion); + } catch (Throwable t) { + RuntimeException exception = this.faultHandler.handleFault( + "Broker configuration does not support the cluster MetadataVersion", t); + if (exception != null) { + throw exception; + } + } + } +} diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 64a4fd7474a..94bcf321420 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -447,6 +447,7 @@ class BrokerServer( rlm.startup() } + metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler)) brokerMetadataPublisher = new BrokerMetadataPublisher(config, metadataCache, logManager, @@ -489,6 +490,7 @@ class BrokerServer( () => lifecycleManager.resendBrokerRegistrationUnlessZkMode()) metadataPublishers.add(brokerRegistrationTracker) + // Register parts of the broker that can be reconfigured via dynamic configs. This needs to // be done before we publish the dynamic configs, so that we don't miss anything. config.dynamicConfig.addReconfigurables(this) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 95ad0bfb839..f23bf349181 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1365,6 +1365,14 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } validateAdvertisedListenersNonEmptyForBroker() } + if (processRoles.contains(ProcessRole.BrokerRole) + && originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG) + && logDirs.size > 1) { + require(interBrokerProtocolVersion.isDirectoryAssignmentSupported, + s"Multiple log directories (aka JBOD) are not supported with the configured " + + s"${interBrokerProtocolVersion} ${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG}. " + + s"Need ${MetadataVersion.IBP_3_7_IV2} or higher") + } val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { @@ -1462,6 +1470,18 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } + /** + * Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when + * a FeatureLevelRecord for "metadata.version" is read from the cluster metadata. + */ + def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = { + if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) { + require(metadataVersion.isDirectoryAssignmentSupported, + s"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion ${metadataVersion}. " + + s"Need ${MetadataVersion.IBP_3_7_IV2} or higher") + } + } + /** * Copy the subset of properties that are relevant to Logs. The individual properties * are listed here since the names are slightly different in each Config class... diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 8481f8468b9..c9f51395852 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { + def main(args: Array[String]): Unit = { + var exitCode: Integer = 0 + var message: Option[String] = None try { - val namespace = parseArguments(args) - val command = namespace.getString("command") - val config = Option(namespace.getString("config")).flatMap( - p => Some(new KafkaConfig(Utils.loadProps(p)))) - command match { - case "info" => - val directories = configToLogDirectories(config.get) - val selfManagedMode = configToSelfManagedMode(config.get) - Exit.exit(infoCommand(System.out, selfManagedMode, directories)) - - case "format" => - val directories = configToLogDirectories(config.get) - val clusterId = namespace.getString("cluster_id") - val metaProperties = new MetaProperties.Builder(). - setVersion(MetaPropertiesVersion.V1). - setClusterId(clusterId). - setNodeId(config.get.nodeId). - build() - val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() - val specifiedFeatures: util.List[String] = namespace.getList("feature") - val releaseVersionFlagSpecified = namespace.getString("release_version") != null - if (releaseVersionFlagSpecified && specifiedFeatures != null) { - throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") - } - val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) - val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - validateMetadataVersion(metadataVersion, config) - // Get all other features, validate, and create records for them - // Use latest default for features if --release-version is not specified - generateFeatureRecords( - metadataRecords, - metadataVersion, - featureNamesAndLevelsMap, - Features.PRODUCTION_FEATURES.asScala.toList, - config.get.unstableFeatureVersionsEnabled, - releaseVersionFlagSpecified - ) - getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { - if (!metadataVersion.isScramSupported) { - throw new TerseFailure(s"SCRAM is only supported in metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.") - } - for (record <- userScramCredentialRecords) { - metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) - } - }) - - val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") - val ignoreFormatted = namespace.getBoolean("ignore_formatted") - if (!configToSelfManagedMode(config.get)) { - throw new TerseFailure("The kafka configuration file appears to be for " + - "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") - } - Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, - metadataVersion,ignoreFormatted)) + exitCode = execute(args) + } catch { + case e: TerseFailure => + exitCode = 1 + message = Some(e.getMessage) + } + message.foreach(System.err.println) + Exit.exit(exitCode, message) + } - case "random-uuid" => - System.out.println(Uuid.randomUuid) - Exit.exit(0) + /** + * Executes the command according to the given arguments and returns the appropriate exit code. + * @param args The command line arguments + * @return The exit code + */ + def execute(args: Array[String]): Int = { + val namespace = parseArguments(args) + val command = namespace.getString("command") + val config = Option(namespace.getString("config")).flatMap( + p => Some(new KafkaConfig(Utils.loadProps(p)))) + command match { + case "info" => + val directories = configToLogDirectories(config.get) + val selfManagedMode = configToSelfManagedMode(config.get) + infoCommand(System.out, selfManagedMode, directories) + + case "format" => + runFormatCommand(namespace, config.get) + + case "random-uuid" => + System.out.println(Uuid.randomUuid) + 0 + case _ => + throw new RuntimeException(s"Unknown command $command") + } + } - case _ => - throw new RuntimeException(s"Unknown command $command") + /** + * Validates arguments, configuration, prepares bootstrap metadata and delegates to {{@link formatCommand}}. + * Visible for testing. + * @param namespace Arguments + * @param config The server configuration + * @return The exit code + */ + def runFormatCommand(namespace: Namespace, config: KafkaConfig) = { + val directories = configToLogDirectories(config) + val clusterId = namespace.getString("cluster_id") + val metaProperties = new MetaProperties.Builder(). + setVersion(MetaPropertiesVersion.V1). + setClusterId(clusterId). + setNodeId(config.nodeId). + build() + val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + val releaseVersionFlagSpecified = namespace.getString("release_version") != null + if (releaseVersionFlagSpecified && specifiedFeatures != null) { + throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + // Use latest default for features if --release-version is not specified + generateFeatureRecords( + metadataRecords, + metadataVersion, + featureNamesAndLevelsMap, + Features.PRODUCTION_FEATURES.asScala.toList, + config.unstableFeatureVersionsEnabled, + releaseVersionFlagSpecified + ) + getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { + if (!metadataVersion.isScramSupported) { + throw new TerseFailure(s"SCRAM is only supported in metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.") } - } catch { - case e: TerseFailure => - System.err.println(e.getMessage) - Exit.exit(1, Some(e.getMessage)) + for (record <- userScramCredentialRecords) { + metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) + } + }) + val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") + val ignoreFormatted = namespace.getBoolean("ignore_formatted") + if (!configToSelfManagedMode(config)) { + throw new TerseFailure("The kafka configuration file appears to be for " + + "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") } + formatCommand(System.out, directories, metaProperties, bootstrapMetadata, + metadataVersion,ignoreFormatted) } - private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: KafkaConfig): Unit = { if (!metadataVersion.isKRaftSupported) { throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") } if (!metadataVersion.isProduction) { - if (config.get.unstableFeatureVersionsEnabled) { + if (config.unstableFeatureVersionsEnabled) { System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") } else { throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") } } + try { + config.validateWithMetadataVersion(metadataVersion) + } catch { + case e: IllegalArgumentException => throw new TerseFailure(s"Invalid configuration for metadata version: ${e.getMessage}") + } } private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], diff --git a/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java new file mode 100644 index 00000000000..a484d592e23 --- /dev/null +++ b/core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.loader.LogDeltaManifest; +import org.apache.kafka.raft.LeaderAndEpoch; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.FaultHandler; +import org.junit.jupiter.api.Test; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class MetadataVersionConfigValidatorTest { + + private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder() + .provenance(MetadataProvenance.EMPTY) + .leaderAndEpoch(LeaderAndEpoch.UNKNOWN) + .numBatches(1) + .elapsedNs(90) + .numBytes(88) + .build(); + public static final MetadataProvenance TEST_PROVENANCE = + new MetadataProvenance(50, 3, 8000); + + void testWith(MetadataVersion metadataVersion, KafkaConfig config, FaultHandler faultHandler) throws Exception { + try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(config, faultHandler)) { + MetadataDelta delta = new MetadataDelta.Builder() + .setImage(MetadataImage.EMPTY) + .build(); + if (metadataVersion != null) { + delta.replay(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(metadataVersion.featureLevel())); + } + MetadataImage image = delta.apply(TEST_PROVENANCE); + + validator.onMetadataUpdate(delta, image, TEST_MANIFEST); + } + } + + @Test + void testValidatesConfigOnMetadataChange() throws Exception { + MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2; + KafkaConfig config = mock(KafkaConfig.class); + FaultHandler faultHandler = mock(FaultHandler.class); + + when(config.brokerId()).thenReturn(8); + + testWith(metadataVersion, config, faultHandler); + + verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion)); + verifyNoMoreInteractions(faultHandler); + } + + @SuppressWarnings("ThrowableNotThrown") + @Test + void testInvokesFaultHandlerOnException() throws Exception { + MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2; + Exception exception = new Exception(); + KafkaConfig config = mock(KafkaConfig.class); + FaultHandler faultHandler = mock(FaultHandler.class); + + when(config.brokerId()).thenReturn(8); + willAnswer(invocation -> { + throw exception; + }).given(config).validateWithMetadataVersion(eq(metadataVersion)); + + testWith(metadataVersion, config, faultHandler); + + verify(config, times(1)).validateWithMetadataVersion(eq(metadataVersion)); + verify(faultHandler, times(1)).handleFault( + eq("Broker configuration does not support the cluster MetadataVersion"), + eq(exception)); + } +} diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index ed91c936edc..7be9bcd0267 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -22,6 +22,7 @@ import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM import org.apache.kafka.common.config.ConfigDef.Type.INT import org.apache.kafka.common.config.{ConfigException, SslConfigs, TopicConfig} +import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test @@ -419,4 +420,21 @@ class LogConfigTest { assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)) assertEquals(oneDayInMillis, logProps.get(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)) } + + @Test + def testValidateWithMetadataVersionJbodSupport(): Unit = { + def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit = + KafkaConfig.fromProps( + TestUtils.createBrokerConfig(nodeId = 0, zkConnect = null, logDirCount = if (jbodConfig) 2 else 1) + ).validateWithMetadataVersion(metadataVersion) + + validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false) + validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false) + validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false) + assertThrows(classOf[IllegalArgumentException], () => + validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true)) + assertThrows(classOf[IllegalArgumentException], () => + validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true)) + validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true) + } } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 60ddda5bc31..3f38708dee8 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -33,11 +33,12 @@ import org.apache.kafka.server.common.{ApiMessageAndVersion, Features, MetadataV import org.apache.kafka.common.metadata.{FeatureLevelRecord, UserScramCredentialRecord} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.raft.QuorumConfig -import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue} import org.junit.jupiter.api.{Test, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{EnumSource, ValueSource} +import org.mockito.Mockito import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -655,5 +656,37 @@ Found problem: assertEquals(1, exitStatus) } } + + @Test + def testFormatValidatesConfigForMetadataVersion(): Unit = { + val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, null))) + val args = Array("format", + "-c", "dummy.properties", + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", MetadataVersion.LATEST_PRODUCTION.toString) + val exitCode = StorageTool.runFormatCommand(StorageTool.parseArguments(args), config) + Mockito.verify(config, Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION) + assertEquals(0, exitCode) + } + + @Test + def testJbodSupportValidation(): Unit = { + def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): Integer = { + val properties = TestUtils.createBrokerConfig(10, null, logDirCount = logDirCount) + properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG) + val configFile = TestUtils.tempPropertiesFile(properties.asScala.toMap).toPath.toString + StorageTool.execute(Array("format", + "-c", configFile, + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", metadataVersion.toString)) + } + + assertEquals(0, formatWith(1, MetadataVersion.IBP_3_6_IV2)) + assertEquals("Invalid configuration for metadata version: " + + "requirement failed: Multiple log directories (aka JBOD) are not supported in the current MetadataVersion 3.6-IV2. Need 3.7-IV2 or higher", + assertThrows(classOf[TerseFailure], () => formatWith(2, MetadataVersion.IBP_3_6_IV2)).getMessage) + assertEquals(0, formatWith(1, MetadataVersion.IBP_3_7_IV2)) + assertEquals(0, formatWith(2, MetadataVersion.IBP_3_7_IV2)) + } }