junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1619224649
########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -109,6 +114,52 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { + if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") + } + if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { + System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { + throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } + } + } + + private[tools] def generateFeatureRecords(metadataRecords: ArrayBuffer[ApiMessageAndVersion], + metadataVersion: MetadataVersion, + specifiedFeatures: Map[String, java.lang.Short], + allFeatures: List[Features], + usesVersionDefault: Boolean): Unit = { Review Comment: usesVersionDefault => releaseVersionSpecified ? ########## server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java: ########## @@ -14,37 +14,101 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.server.common; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -class FeaturesTest { - @Test - public void testKRaftModeFeatures() { - Features features = new Features(MINIMUM_KRAFT_VERSION, - Collections.singletonMap("foo", (short) 2), 123, true); - assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), - features.finalizedFeatures().get(FEATURE_NAME)); - assertEquals((short) 2, - features.finalizedFeatures().get("foo")); - assertEquals(2, features.finalizedFeatures().size()); +public class FeaturesTest { + + @ParameterizedTest + @EnumSource(Features.class) + public void testFromFeatureLevelAllFeatures(Features feature) { + FeatureVersion[] featureImplementations = feature.featureVersions(); + int numFeatures = featureImplementations.length; + for (short i = 1; i < numFeatures; i++) { + assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i)); + } + } + + @ParameterizedTest + @EnumSource(Features.class) + public void testValidateVersionAllFeatures(Features feature) { + for (FeatureVersion featureImpl : feature.featureVersions()) { + // Ensure the minimum bootstrap metadata version is included if no metadata version dependency. + Map<String, Short> deps = new HashMap<>(); + deps.putAll(featureImpl.dependencies()); + if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) { Review Comment: Should we require each feature to include a dependency on MV? Otherwise, we need to add this logic in all places where `Features.validateVersion` is called. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { - throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { - if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") - } else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") - } - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { + throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( + metadataRecords, + metadataVersion, + featureNamesAndLevelsMap, + Features.PRODUCTION_FEATURES.asScala.toList, + !Option(namespace.getString("release_version")).isEmpty Review Comment: Got it. If --feature is used, but doesn't include MV, we use MetadataVersion.LATEST_PRODUCTION. However, if neither --feature nor --release-version is specified, we use ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG first and use MetadataVersion.LATEST_PRODUCTION as the fallback. Should we keep them consistent? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org