Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2155838189 @ableegoldman @mimaison please see #16251 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
ableegoldman commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2155777043 Hey @soarez / @mimaison -- I've noticed the ReassignPartitionsCommandTest.testReassignmentCompletionDuringPartialUpgrade test seems to be failing consistently since this was merged, with an error message that seems to be related to my unfamiliar eyes. Can you take a quick look into this failing test? You can find the full logs on the [build for this PR](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15834/7/tests), but here is the fatal exception: ``` java.lang.IllegalArgumentException: requirement failed: Multiple log directories (aka JBOD) are not supported with the configured 3.0-IV1 inter.broker.protocol.version. Need 3.7-IV2 or higher ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2154323969 Thanks for reviewing this @mimaison. Backporting to 3.8 and 3.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez merged PR #15834: URL: https://github.com/apache/kafka/pull/15834 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2152667234 You can backport it to 3.7 too if you want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1629590599 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -656,5 +657,47 @@ Found problem: assertEquals(1, exitStatus) } } + + @Test + def testFormatValidatesConfigForMetadataVersion(): Unit = { +val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, null))) +val args = Array("format", + "-c", "dummy.properties", + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", MetadataVersion.LATEST_PRODUCTION.toString) +val exitCode = StorageTool.runFormatCommand(StorageTool.parseArguments(args), config) +Mockito.verify(config, Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION) +assertEquals(0, exitCode) + } + + private def createPropsFile(properties: Properties): String = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +try { + properties.store(propsStream, "config.props") +} finally { + propsStream.close() +} +propsFile.toPath.toString + } + + @Test + def testJbodSupportValidation(): Unit = { +def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): Integer = { + val properties = TestUtils.createBrokerConfig(10, null, logDirCount = logDirCount) + properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG) Review Comment: Ah right, thanks for checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1628002868 ## core/src/main/java/kafka/server/MetadataVersionConfigValidator.java: ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.loader.LoaderManifest; +import org.apache.kafka.image.publisher.MetadataPublisher; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.fault.FaultHandler; + +public class MetadataVersionConfigValidator implements MetadataPublisher { Review Comment: I'm glad you asked. There are two ways the broker can assume a MetadataVersion: 1. Via the `inter.broker.protocol.version` configuration property in ZK mode 2. Via discovering a `FeatureRecord` for `metadata.version` in the cluster metadata in KRaft mode. This is preset with `--release-version` in the format command or with `kafka-features.sh upgrade --metadata`. For (1) we have the version set at startup time, and we can catch the conifguration error in `KafkaConfig` as the properties are loaded. But for (2) we can only validate config later when the version is discovered from metadata. Ealier in this PR I had the check in `BrokerMetadataPublisher` — this class streams metadata records in the broker and distributes them to various types of `MetadataPublisher` for reconciliation — but instaed of adding the check to that class I think it's better reuse the publisher pattern, similar to how the broker registration re-sending was extracted to `BrokerRegistrationTracker` in #15945. Perhaps we could implement `MetadataPublisher` directly in `KafkaConfig`, but I thought that class is big enough already. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1627976190 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -656,5 +657,47 @@ Found problem: assertEquals(1, exitStatus) } } + + @Test + def testFormatValidatesConfigForMetadataVersion(): Unit = { +val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, null))) +val args = Array("format", + "-c", "dummy.properties", + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", MetadataVersion.LATEST_PRODUCTION.toString) +val exitCode = StorageTool.runFormatCommand(StorageTool.parseArguments(args), config) +Mockito.verify(config, Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION) +assertEquals(0, exitCode) + } + + private def createPropsFile(properties: Properties): String = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +try { + properties.store(propsStream, "config.props") +} finally { + propsStream.close() +} +propsFile.toPath.toString + } + + @Test + def testJbodSupportValidation(): Unit = { +def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): Integer = { + val properties = TestUtils.createBrokerConfig(10, null, logDirCount = logDirCount) + properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG) Review Comment: This property is ignored in KRaft. It assumes the value of `MINIMUM_KRAFT_VERSION` and cannot be set. https://github.com/apache/kafka/blob/f2aafcc66faca4d07a27e4ef90158136cb317b44/core/src/main/scala/kafka/server/KafkaConfig.scala#L896-L914 So I don't think we need to test this? What test did you have in mind? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1627855813 ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -656,5 +657,47 @@ Found problem: assertEquals(1, exitStatus) } } + + @Test + def testFormatValidatesConfigForMetadataVersion(): Unit = { +val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, null))) +val args = Array("format", + "-c", "dummy.properties", + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", MetadataVersion.LATEST_PRODUCTION.toString) +val exitCode = StorageTool.runFormatCommand(StorageTool.parseArguments(args), config) +Mockito.verify(config, Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION) +assertEquals(0, exitCode) + } + + private def createPropsFile(properties: Properties): String = { +val propsFile = TestUtils.tempFile() +val propsStream = Files.newOutputStream(propsFile.toPath) +try { + properties.store(propsStream, "config.props") +} finally { + propsStream.close() +} +propsFile.toPath.toString + } + + @Test + def testJbodSupportValidation(): Unit = { +def formatWith(logDirCount: Int, metadataVersion: MetadataVersion): Integer = { + val properties = TestUtils.createBrokerConfig(10, null, logDirCount = logDirCount) + properties.remove(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG) + StorageTool.runMain(Array("format", Review Comment: Makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1627741886 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { - def main(args: Array[String]): Unit = { -try { - val namespace = parseArguments(args) - val command = namespace.getString("command") - val config = Option(namespace.getString("config")).flatMap( -p => Some(new KafkaConfig(Utils.loadProps(p - command match { -case "info" => - val directories = configToLogDirectories(config.get) - val selfManagedMode = configToSelfManagedMode(config.get) - Exit.exit(infoCommand(System.out, selfManagedMode, directories)) - -case "format" => - val directories = configToLogDirectories(config.get) - val clusterId = namespace.getString("cluster_id") - val metaProperties = new MetaProperties.Builder(). -setVersion(MetaPropertiesVersion.V1). -setClusterId(clusterId). -setNodeId(config.get.nodeId). -build() - val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() - val specifiedFeatures: util.List[String] = namespace.getList("feature") - val releaseVersionFlagSpecified = namespace.getString("release_version") != null - if (releaseVersionFlagSpecified && specifiedFeatures != null) { -throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") - } - val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) - val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - validateMetadataVersion(metadataVersion, config) - // Get all other features, validate, and create records for them - // Use latest default for features if --release-version is not specified - generateFeatureRecords( -metadataRecords, -metadataVersion, -featureNamesAndLevelsMap, -Features.PRODUCTION_FEATURES.asScala.toList, -config.get.unstableFeatureVersionsEnabled, -releaseVersionFlagSpecified - ) - getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { -if (!metadataVersion.isScramSupported) { - throw new TerseFailure(s"SCRAM is only supported in metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.") -} -for (record <- userScramCredentialRecords) { - metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) -} - }) - - val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") - val ignoreFormatted = namespace.getBoolean("ignore_formatted") - if (!configToSelfManagedMode(config.get)) { -throw new TerseFailure("The kafka configuration file appears to be for " + - "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") - } - Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, - metadataVersion,ignoreFormatted)) -case "random-uuid" => - System.out.println(Uuid.randomUuid) - Exit.exit(0) + /** + * Executes the command according to the given arguments and returns the appropriate exit code. + * @param args The command line arguments + * @return The exit code + */ + def runMain(args: Array[String]): Int = { Review Comment: I hadn't noticed this yet. Thanks for pointing this out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1626218724 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -45,89 +45,117 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { - def main(args: Array[String]): Unit = { -try { - val namespace = parseArguments(args) - val command = namespace.getString("command") - val config = Option(namespace.getString("config")).flatMap( -p => Some(new KafkaConfig(Utils.loadProps(p - command match { -case "info" => - val directories = configToLogDirectories(config.get) - val selfManagedMode = configToSelfManagedMode(config.get) - Exit.exit(infoCommand(System.out, selfManagedMode, directories)) - -case "format" => - val directories = configToLogDirectories(config.get) - val clusterId = namespace.getString("cluster_id") - val metaProperties = new MetaProperties.Builder(). -setVersion(MetaPropertiesVersion.V1). -setClusterId(clusterId). -setNodeId(config.get.nodeId). -build() - val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() - val specifiedFeatures: util.List[String] = namespace.getList("feature") - val releaseVersionFlagSpecified = namespace.getString("release_version") != null - if (releaseVersionFlagSpecified && specifiedFeatures != null) { -throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") - } - val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) - val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - validateMetadataVersion(metadataVersion, config) - // Get all other features, validate, and create records for them - // Use latest default for features if --release-version is not specified - generateFeatureRecords( -metadataRecords, -metadataVersion, -featureNamesAndLevelsMap, -Features.PRODUCTION_FEATURES.asScala.toList, -config.get.unstableFeatureVersionsEnabled, -releaseVersionFlagSpecified - ) - getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { -if (!metadataVersion.isScramSupported) { - throw new TerseFailure(s"SCRAM is only supported in metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.") -} -for (record <- userScramCredentialRecords) { - metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) -} - }) - - val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") - val ignoreFormatted = namespace.getBoolean("ignore_formatted") - if (!configToSelfManagedMode(config.get)) { -throw new TerseFailure("The kafka configuration file appears to be for " + - "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") - } - Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, - metadataVersion,ignoreFormatted)) -case "random-uuid" => - System.out.println(Uuid.randomUuid) - Exit.exit(0) + /** + * Executes the command according to the given arguments and returns the appropriate exit code. + * @param args The command line arguments + * @return The exit code + */ + def runMain(args: Array[String]): Int = { Review Comment: In other tools, the method that is called by `main()` is typically called `execute()`. I think it would make sense to use that name here too. ## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ## @@ -656,5 +657,47 @@ Found problem: assertEquals(1, exitStatus) } } + + @Test + def testFormatValidatesConfigForMetadataVersion(): Unit = { +val config = Mockito.spy(new KafkaConfig(TestUtils.createBrokerConfig(10, null))) +val args = Array("format", + "-c", "dummy.properties", + "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", MetadataVersion.LATEST_PRODUCTION.toString) +val exitCode = StorageTool.runFormatCommand(StorageTool.parseArguments(args), config) +Mockito.verify(config, Mockito.times(1)).validateWithMetadataVersion(MetadataVersion.LATEST_PRODUCTION) +assertEquals(0, exitCode) + } + + private def createPropsFile(properties: Properties): String = { +val propsFile = TestUtils.t
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2147620965 @mimaison I rebased due to conflicts and changed how the MV change via metadata records is detected to use a publisher following #15945. Please take another look, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1626025676 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { + + /** + * Executes the command according to the given arguments and returns the appropriate exit code. + * @param args The command line arguments + * @return The exit code + */ + def runMain(args: Array[String]): Int = { +val namespace = parseArguments(args) +val command = namespace.getString("command") +val config = parseConfig(namespace.getString("config")) Review Comment: You're right, thanks for pointing this out. I missed that when I simplified the `Option(...).flatMap()` config bit. I've brought that back now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1626026413 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { + + /** + * Executes the command according to the given arguments and returns the appropriate exit code. + * @param args The command line arguments + * @return The exit code + */ + def runMain(args: Array[String]): Int = { +val namespace = parseArguments(args) +val command = namespace.getString("command") +val config = parseConfig(namespace.getString("config")) +command match { + case "info" => +val directories = configToLogDirectories(config) +val selfManagedMode = configToSelfManagedMode(config) +infoCommand(System.out, selfManagedMode, directories) + + case "format" => +runFormatCommand(namespace, config) + + case "random-uuid" => +System.out.println(Uuid.randomUuid) +0 + case _ => +throw new RuntimeException(s"Unknown command $command") +} + } + def main(args: Array[String]): Unit = { +var exitCode: Integer = 0 +var message: Option[String] = None try { - val namespace = parseArguments(args) - val command = namespace.getString("command") - val config = Option(namespace.getString("config")).flatMap( -p => Some(new KafkaConfig(Utils.loadProps(p - command match { -case "info" => - val directories = configToLogDirectories(config.get) - val selfManagedMode = configToSelfManagedMode(config.get) - Exit.exit(infoCommand(System.out, selfManagedMode, directories)) - -case "format" => - val directories = configToLogDirectories(config.get) - val clusterId = namespace.getString("cluster_id") - val metaProperties = new MetaProperties.Builder(). -setVersion(MetaPropertiesVersion.V1). -setClusterId(clusterId). -setNodeId(config.get.nodeId). -build() - val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() - val specifiedFeatures: util.List[String] = namespace.getList("feature") - val releaseVersionFlagSpecified = namespace.getString("release_version") != null - if (releaseVersionFlagSpecified && specifiedFeatures != null) { -throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") - } - val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) - val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - validateMetadataVersion(metadataVersion, config) - // Get all other features, validate, and create records for them - // Use latest default for features if --release-version is not specified - generateFeatureRecords( -metadataRecords, -metadataVersion, -featureNamesAndLevelsMap, -Features.PRODUCTION_FEATURES.asScala.toList, -config.get.unstableFeatureVersionsEnabled, -releaseVersionFlagSpecified - ) - getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { -if (!metadataVersion.isScramSupported) { - throw new TerseFailure(s"SCRAM is only supported in metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.") -} -for (record <- userScramCredentialRecords) { - metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) -} - }) - - val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") - val ignoreFormatted = namespace.getBoolean("ignore_formatted") - if (!configToSelfManagedMode(config.get)) { -throw new TerseFailure("The kafka configuration file appears to be for " + - "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") - } - Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, - metadataVersion,ignoreFormatted)) + exitCode = runMain(args) +} catch { + case e: TerseFailure => +exitCode = 1 +message = Some(e.getMessage) +} +message.foreach(System.err.println) +Exit.exit(exitCode, message) + } -case "random-uuid" => - System.out.println(Uuid.randomUuid) - Exit.exit(0) + private def parseConfig(configFilename: String): KafkaConfig = { +try {
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1624370212 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -45,89 +45,124 @@ import scala.jdk.CollectionConverters._ import scala.collection.mutable.ArrayBuffer object StorageTool extends Logging { + + /** + * Executes the command according to the given arguments and returns the appropriate exit code. + * @param args The command line arguments + * @return The exit code + */ + def runMain(args: Array[String]): Int = { +val namespace = parseArguments(args) +val command = namespace.getString("command") +val config = parseConfig(namespace.getString("config")) +command match { + case "info" => +val directories = configToLogDirectories(config) +val selfManagedMode = configToSelfManagedMode(config) +infoCommand(System.out, selfManagedMode, directories) + + case "format" => +runFormatCommand(namespace, config) + + case "random-uuid" => +System.out.println(Uuid.randomUuid) +0 + case _ => +throw new RuntimeException(s"Unknown command $command") +} + } + def main(args: Array[String]): Unit = { +var exitCode: Integer = 0 +var message: Option[String] = None try { - val namespace = parseArguments(args) - val command = namespace.getString("command") - val config = Option(namespace.getString("config")).flatMap( -p => Some(new KafkaConfig(Utils.loadProps(p - command match { -case "info" => - val directories = configToLogDirectories(config.get) - val selfManagedMode = configToSelfManagedMode(config.get) - Exit.exit(infoCommand(System.out, selfManagedMode, directories)) - -case "format" => - val directories = configToLogDirectories(config.get) - val clusterId = namespace.getString("cluster_id") - val metaProperties = new MetaProperties.Builder(). -setVersion(MetaPropertiesVersion.V1). -setClusterId(clusterId). -setNodeId(config.get.nodeId). -build() - val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() - val specifiedFeatures: util.List[String] = namespace.getList("feature") - val releaseVersionFlagSpecified = namespace.getString("release_version") != null - if (releaseVersionFlagSpecified && specifiedFeatures != null) { -throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") - } - val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) - val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - validateMetadataVersion(metadataVersion, config) - // Get all other features, validate, and create records for them - // Use latest default for features if --release-version is not specified - generateFeatureRecords( -metadataRecords, -metadataVersion, -featureNamesAndLevelsMap, -Features.PRODUCTION_FEATURES.asScala.toList, -config.get.unstableFeatureVersionsEnabled, -releaseVersionFlagSpecified - ) - getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => { -if (!metadataVersion.isScramSupported) { - throw new TerseFailure(s"SCRAM is only supported in metadata.version ${MetadataVersion.IBP_3_5_IV2} or later.") -} -for (record <- userScramCredentialRecords) { - metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort)) -} - }) - - val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, Some(metadataRecords), "format command") - val ignoreFormatted = namespace.getBoolean("ignore_formatted") - if (!configToSelfManagedMode(config.get)) { -throw new TerseFailure("The kafka configuration file appears to be for " + - "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") - } - Exit.exit(formatCommand(System.out, directories, metaProperties, bootstrapMetadata, - metadataVersion,ignoreFormatted)) + exitCode = runMain(args) +} catch { + case e: TerseFailure => +exitCode = 1 +message = Some(e.getMessage) +} +message.foreach(System.err.println) +Exit.exit(exitCode, message) + } -case "random-uuid" => - System.out.println(Uuid.randomUuid) - Exit.exit(0) + private def parseConfig(configFilename: String): KafkaConfig = { +try
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1624282626 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1457,6 +1465,18 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } + /** + * Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when + * a FeatureLevelRecord for "metadata.version" is read from the cluster metadata. + */ + def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = { Review Comment: Ah right, thanks for the explanation. Then I guess it makes sense to keep both methods for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1623198189 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1457,6 +1465,18 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } + /** + * Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when + * a FeatureLevelRecord for "metadata.version" is read from the cluster metadata. + */ + def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = { Review Comment: We can call this from `validateValues`, but then I won't be able to take your other suggestion to mention `inter.broker.protocol.version` in the error message. This one gets called when the KRaft broker discovers a new metadata.version via a new feature record. Would you prefer we consolidate the checks, or keep mention of `inter.broker.protocol.version` when it makes sense? I lean towards leaving them both, so it's helpful for the operator, and the duplication cost should go away in 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on code in PR #15834: URL: https://github.com/apache/kafka/pull/15834#discussion_r1622351252 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1457,6 +1465,18 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } + /** + * Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when + * a FeatureLevelRecord for "metadata.version" is read from the cluster metadata. + */ + def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = { Review Comment: This seems to be the exact same check we do in `validateValues()`. Can we consolidate both? ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1360,6 +1360,14 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } validateAdvertisedListenersNonEmptyForBroker() } +if (processRoles.contains(ProcessRole.BrokerRole) + && originals.containsKey(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG) + && logDirs.size > 1) { +require(interBrokerProtocolVersion.isDirectoryAssignmentSupported, + s"Multiple log directories (aka JBOD) are not supported in the configured " + +s"${ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG} ${interBrokerProtocolVersion}. " + Review Comment: Should we add `ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG` after the value? So the message looks like `Multiple log directories (aka JBOD) are not supported with the configured 3.6 inter.broker.protocol.version` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2137732518 @mimaison Sorry, thanks for pointing that out. That was an unintentional, bad rebase. Sorted now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2137620473 Thanks @soarez. It seems you have reverted the changes to StorageTool. Is this intended? When using the `--release-version 3.6` flag and multiple log dirs, formatting does no fail anymore: ``` bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties --release-version 3.6 metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY}) Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2137444011 Rebased. @mimaison please take another look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
soarez commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2120643465 @mimaison thanks for having a look. Yes, I think we should check the MetadataVersion at the formatting stage too. Please have a look at my latest changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16606 Gate JBOD configuration on 3.7-IV2 [kafka]
mimaison commented on PR #15834: URL: https://github.com/apache/kafka/pull/15834#issuecomment-2112826872 Thanks for the PR. If I set `inter.broker.protocol.version=3.6-IV2` and multiple log directories in my broker configuration, it now prevents me from formatting the storage: ``` bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties [2024-05-15 17:05:31,009] WARN inter.broker.protocol.version is deprecated in KRaft mode as of 3.3 and will only be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting the metadata.version for a new KRaft cluster. (kafka.server.KafkaConfig) Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Multiple log directories (aka JBOD) are not supported in the configured inter.broker.protocol.version 3.0-IV1. Need 3.7-IV2 or higher ``` However, if I specify the metadata version using the `--release-version` flag of the storage tool, the formatting works and I only get the failure when I try to start the broker: ``` bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties --release-version 3.6 metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/tmp/kraft-combined-logs: EMPTY, /tmp/kraft-combined-logs2: EMPTY}) Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. Formatting /tmp/kraft-combined-logs with metadata.version 3.6-IV2. Formatting /tmp/kraft-combined-logs2 with metadata.version 3.6-IV2. ``` Should we ensure the failure always happens at the formatting stage? I also created https://issues.apache.org/jira/browse/KAFKA-16771 to avoid printing the "Formatting <...>" log line twice for the first directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org