junrao commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1617946240
########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -60,24 +60,28 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { - throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { - if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") - } else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") - } - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + if (namespace.getString("release_version") != null && specifiedFeatures != null) { + throw new TerseFailure("Both --release-version and --feature were set. Only one of the two flags can be set.") + } + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + generateFeatureRecords( + metadataRecords, + metadataVersion, + featureNamesAndLevelsMap, + Features.PRODUCTION_FEATURES.asScala.toList, + !Option(namespace.getString("release_version")).isEmpty Review Comment: I am wondering why we need to pass in `usesVersionDefault`? Earlier in `getMetadataVersion`, we already resolve `metadataVersion` to `LATEST_PRODUCTION` if it's not explicitly specified. ########## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; + +public final class FinalizedFeatures { + private final MetadataVersion metadataVersion; + private final Map<String, Short> finalizedFeatures; + private final long finalizedFeaturesEpoch; + + public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { + return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); + } + + public FinalizedFeatures( + MetadataVersion metadataVersion, + Map<String, Short> finalizedFeatures, + long finalizedFeaturesEpoch, + boolean kraftMode + ) { + this.metadataVersion = metadataVersion; + this.finalizedFeatures = new HashMap<>(finalizedFeatures); + this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; + // In KRaft mode, we always include the metadata version in the features map. + // In ZK mode, we never include it. + if (kraftMode) { + this.finalizedFeatures.put(FEATURE_NAME, metadataVersion.featureLevel()); + } else { + this.finalizedFeatures.remove(FEATURE_NAME); + } + } + + public MetadataVersion metadataVersion() { + return metadataVersion; + } + + public Map<String, Short> finalizedFeatures() { + return finalizedFeatures; + } + + public long finalizedFeaturesEpoch() { + return finalizedFeaturesEpoch; + } + + @Override + public boolean equals(Object o) { + if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) return false; + FinalizedFeatures other = (FinalizedFeatures) o; + return metadataVersion == other.metadataVersion && + finalizedFeatures.equals(other.finalizedFeatures) && + finalizedFeaturesEpoch == other.finalizedFeaturesEpoch; + } + + @Override + public int hashCode() { + return Objects.hash(metadataVersion, finalizedFeatures, finalizedFeaturesEpoch); + } + + @Override + public String toString() { + return "Features" + + "(metadatVersion=" + metadataVersion + Review Comment: typo metadatVersion ########## server-common/src/main/java/org/apache/kafka/server/common/Features.java: ########## @@ -16,72 +16,134 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; -import java.util.HashMap; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.stream.Collectors; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * <br> + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum Features { + + /** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersion} when implementing a new feature. + */ + TEST_VERSION("test.feature.version", TestFeatureVersion.values()); + + public static final Features[] FEATURES; + public static final List<Features> PRODUCTION_FEATURES; -public final class Features { - private final MetadataVersion version; - private final Map<String, Short> finalizedFeatures; - private final long finalizedFeaturesEpoch; + public static final List<String> PRODUCTION_FEATURE_NAMES; + private final String name; + private final FeatureVersion[] featureVersions; - public static Features fromKRaftVersion(MetadataVersion version) { - return new Features(version, Collections.emptyMap(), -1, true); + Features(String name, + FeatureVersion[] featureVersions) { + this.name = name; + this.featureVersions = featureVersions; } - public Features( - MetadataVersion version, - Map<String, Short> finalizedFeatures, - long finalizedFeaturesEpoch, - boolean kraftMode - ) { - this.version = version; - this.finalizedFeatures = new HashMap<>(finalizedFeatures); - this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; - // In KRaft mode, we always include the metadata version in the features map. - // In ZK mode, we never include it. - if (kraftMode) { - this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel()); - } else { - this.finalizedFeatures.remove(FEATURE_NAME); - } + static { + Features[] enumValues = Features.values(); + FEATURES = Arrays.copyOf(enumValues, enumValues.length); + + PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> + feature.name != TEST_VERSION.featureName()).collect(Collectors.toList()); + PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature -> + feature.name).collect(Collectors.toList()); } - public MetadataVersion metadataVersion() { - return version; + public String featureName() { + return name; } - public Map<String, Short> finalizedFeatures() { - return finalizedFeatures; + public FeatureVersion[] featureVersions() { + return featureVersions; } - public long finalizedFeaturesEpoch() { - return finalizedFeaturesEpoch; + public short latestProduction() { + return defaultValue(MetadataVersion.LATEST_PRODUCTION); } - @Override - public boolean equals(Object o) { - if (o == null || !(o.getClass().equals(Features.class))) return false; - Features other = (Features) o; - return version == other.version && - finalizedFeatures.equals(other.finalizedFeatures) && - finalizedFeaturesEpoch == other.finalizedFeaturesEpoch; + /** + * Creates a FeatureVersion from a level. + * + * @param level the level of the feature + * @return the FeatureVersionUtils.FeatureVersion for the feature the enum is based on. + * @throws IllegalArgumentException if the feature is not known. + */ + public FeatureVersion fromFeatureLevel(short level) { + return Arrays.stream(featureVersions).filter(featureVersion -> + featureVersion.featureLevel() == level).findFirst().orElseThrow( + () -> new IllegalArgumentException("No feature:" + featureName() + " with feature level " + level)); } - @Override - public int hashCode() { - return Objects.hash(version, finalizedFeatures, finalizedFeaturesEpoch); + /** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVersion#dependencies()} + * <p> + * For example, say feature X level x relies on feature Y level y: + * if feature X >= x then throw an error if feature Y < y. + * + * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. + * + * @param feature the feature we are validating + * @param metadataVersion the metadata version we have (or want to set) + * @param features the feature versions (besides MetadataVersion) we have (or want to set) + * @throws IllegalArgumentException if the feature is not valid + */ + public static void validateVersion(FeatureVersion feature, MetadataVersion metadataVersion, Map<String, Short> features) { + if (feature.featureLevel() >= 1 && metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0)) + throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + + " because it depends on metadata.version=4 (" + MetadataVersion.IBP_3_3_IV0 + ")"); + + for (Map.Entry<String, Short> dependency: feature.dependencies().entrySet()) { + Short featureLevel = features.get(dependency.getKey()); + + if (featureLevel == null || featureLevel < dependency.getValue()) { + throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + + " because it depends on " + dependency.getKey() + " level " + dependency.getValue()); + } + } + } + + /** + * A method to return the default (latest production) level of a feature based on the metadata version provided. + * + * Every time a new feature is added, it should create a mapping from metadata version to feature version + * with {@link FeatureVersion#bootstrapMetadataVersion()}. When the feature version is production ready, the metadata + * version should be made production ready as well. + * + * @param metadataVersion the metadata version we want to use to set the default. + * @return the default version level for the feature and potential metadata version Review Comment: We are not returning potential metadata version, right? ########## metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java: ########## @@ -40,6 +40,10 @@ public Optional<Short> get(String name) { return Optional.ofNullable(featureMap.get(name)); } + public short getOrDefault(String name, short defaultValue) { Review Comment: versionOrDefault or levelOrDefault? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java: ########## @@ -61,6 +62,12 @@ public static Map<String, VersionRange> defaultFeatureMap(boolean enableUnstable enableUnstable ? MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); + for (Features feature : Features.PRODUCTION_FEATURES) { + features.put(feature.featureName(), VersionRange.of( + 0, + feature.latestProduction() Review Comment: Should this take `enableUnstable` into consideration? ########## server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; + +public final class FinalizedFeatures { + private final MetadataVersion metadataVersion; + private final Map<String, Short> finalizedFeatures; + private final long finalizedFeaturesEpoch; + + public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) { + return new FinalizedFeatures(version, Collections.emptyMap(), -1, true); + } + + public FinalizedFeatures( + MetadataVersion metadataVersion, + Map<String, Short> finalizedFeatures, + long finalizedFeaturesEpoch, + boolean kraftMode + ) { + this.metadataVersion = metadataVersion; + this.finalizedFeatures = new HashMap<>(finalizedFeatures); + this.finalizedFeaturesEpoch = finalizedFeaturesEpoch; + // In KRaft mode, we always include the metadata version in the features map. + // In ZK mode, we never include it. + if (kraftMode) { + this.finalizedFeatures.put(FEATURE_NAME, metadataVersion.featureLevel()); Review Comment: Could we use MetadataVersion.FEATURE_NAME so that it's clear this is for Metadata? ########## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ########## @@ -259,26 +263,66 @@ Found problem: @Test def testDefaultMetadataVersion(): Unit = { val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ")) - val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = None) + val mv = StorageTool.getMetadataVersion(namespace, Map.empty, defaultVersionString = None) assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), mv.featureLevel(), "Expected the default metadata.version to be the latest production version") } @Test def testConfiguredMetadataVersion(): Unit = { val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ")) - val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = Some(MetadataVersion.IBP_3_3_IV2.toString)) + val mv = StorageTool.getMetadataVersion(namespace, Map.empty, defaultVersionString = Some(MetadataVersion.IBP_3_3_IV2.toString)) assertEquals(MetadataVersion.IBP_3_3_IV2.featureLevel(), mv.featureLevel(), "Expected the default metadata.version to be 3.3-IV2") } + @Test + def testSettingFeatureAndReleaseVersionFails(): Unit = { + val namespace = StorageTool.parseArguments(Array("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ", + "--release-version", "3.0-IV1", "--feature", "metadata.version=4")) + assertThrows(classOf[IllegalArgumentException], () => StorageTool.getMetadataVersion(namespace, parseFeatures(namespace), defaultVersionString = None)) + } + + @Test + def testParseFeatures(): Unit = { + def parseAddFeatures(strings: String*): Map[String, java.lang.Short] = { + var args = mutable.Seq("format", "-c", "config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ") + args ++= strings + val namespace = StorageTool.parseArguments(args.toArray) + parseFeatures(namespace) + } + + assertThrows(classOf[RuntimeException], () => parseAddFeatures("--feature", "blah")) + assertThrows(classOf[RuntimeException], () => parseAddFeatures("--feature", "blah=blah")) + + // Test with no features + assertEquals(Map(), parseAddFeatures()) + + // Test with one feature + val testFeatureLevel = 1 + val testArgument = TestFeatureVersion.FEATURE_NAME + "=" + testFeatureLevel.toString + val expectedMap = Map(TestFeatureVersion.FEATURE_NAME -> testFeatureLevel.toShort) + assertEquals(expectedMap, parseAddFeatures("--feature", testArgument)) + + // Test with two features + val metadataFeatureLevel = 5 + val metadataArgument = MetadataVersion.FEATURE_NAME + "=" + metadataFeatureLevel.toString + val expectedMap2 = expectedMap ++ Map (MetadataVersion.FEATURE_NAME -> metadataFeatureLevel.toShort) + assertEquals(expectedMap2, parseAddFeatures("--feature", testArgument, "--feature", metadataArgument)) + } + + def parseFeatures(namespace: Namespace): Map[String, java.lang.Short] = { Review Comment: Could this be private? ########## server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java: ########## @@ -14,37 +14,86 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.server.common; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.util.Collections; -import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -class FeaturesTest { - @Test - public void testKRaftModeFeatures() { - Features features = new Features(MINIMUM_KRAFT_VERSION, - Collections.singletonMap("foo", (short) 2), 123, true); - assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(), - features.finalizedFeatures().get(FEATURE_NAME)); - assertEquals((short) 2, - features.finalizedFeatures().get("foo")); - assertEquals(2, features.finalizedFeatures().size()); +public class FeaturesTest { + + @ParameterizedTest + @EnumSource(Features.class) + public void testFromFeatureLevelAllFeatures(Features feature) { + FeatureVersion[] featureImplementations = feature.featureVersions(); + int numFeatures = featureImplementations.length; + for (short i = 1; i < numFeatures; i++) { + assertEquals(featureImplementations[i - 1], feature.fromFeatureLevel(i)); + } + } + + @ParameterizedTest + @EnumSource(Features.class) + public void testValidateVersionAllFeatures(Features feature) { + for (FeatureVersion featureImpl : feature.featureVersions()) { + // Ensure that the feature is valid given the typical metadataVersionMapping and the dependencies. + // Note: Other metadata versions are valid, but this one should always be valid. + Features.validateVersion(featureImpl, featureImpl.bootstrapMetadataVersion(), featureImpl.dependencies()); + } } @Test - public void testZkModeFeatures() { - Features features = new Features(MINIMUM_KRAFT_VERSION, - Collections.singletonMap("foo", (short) 2), 123, false); - assertNull(features.finalizedFeatures().get(FEATURE_NAME)); - assertEquals((short) 2, - features.finalizedFeatures().get("foo")); - assertEquals(1, features.finalizedFeatures().size()); + public void testInvalidValidateVersion() { + // Using too low of a MetadataVersion is invalid + assertThrows(IllegalArgumentException.class, + () -> Features.validateVersion( + TestFeatureVersion.TEST_1, + MetadataVersion.IBP_2_8_IV0, + Collections.emptyMap() + ) + ); + + // Using a version that is lower than the dependency will fail. + assertThrows(IllegalArgumentException.class, + () -> Features.validateVersion( + TestFeatureVersion.TEST_2, + MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, Review Comment: Hmm, metadata version is provided here and in `features`. Should we simplify it so that it's only provided in one place? ########## core/src/test/scala/unit/kafka/tools/StorageToolTest.scala: ########## @@ -290,6 +334,101 @@ Found problem: assertThrows(classOf[IllegalArgumentException], () => parseMetadataVersion("--release-version", "0.0")) } + def generateRecord(featureName: String, level: Short): ApiMessageAndVersion = { Review Comment: Could this be private? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -141,6 +189,9 @@ object StorageTool extends Logging { formatParser.addArgument("--release-version", "-r"). action(store()). help(s"A KRaft release version to use for the initial metadata.version. The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is ${MetadataVersion.LATEST_PRODUCTION}") + formatParser.addArgument("--feature"). + help("A feature upgrade we should perform, in feature=level format. For example: `metadata.version=5`."). Review Comment: > A feature upgrade we should perform storage-tool is only used for initializing a cluster and never used for upgrading, right? So, perhaps changing to sth like "A feature level to initialize"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org