dongnuo123 commented on code in PR #17886:
URL: https://github.com/apache/kafka/pull/17886#discussion_r1869917197


##########
server-common/src/main/java/org/apache/kafka/server/common/Feature.java:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV0_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV0_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV1_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV1_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV2_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV2_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV3_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV3_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV4_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV4_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV5_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV5_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV6_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV6_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV7_0;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Feature {
+
+    /**
+     * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+     * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+     *
+     * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+     */
+    TEST_VERSION("test.feature.version", 
TestFeatureVersion.valuesWithoutUnitTestVersions(), 
TestFeatureVersion.LATEST_PRODUCTION),
+    KRAFT_VERSION("kraft.version", KRaftVersion.values(), 
KRaftVersion.LATEST_PRODUCTION),
+    TRANSACTION_VERSION("transaction.version", TransactionVersion.values(), 
TransactionVersion.LATEST_PRODUCTION),
+    GROUP_VERSION("group.version", GroupVersion.values(), 
GroupVersion.LATEST_PRODUCTION),
+    ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version", 
EligibleLeaderReplicasVersion.values(), 
EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
+
+    /**
+     * Features defined only for unit tests and are not used in production.
+     */
+    UNIT_TEST_VERSION_0("unit." + TestFeatureVersion.FEATURE_NAME + ".0", new 
FeatureVersion[]{UT_FV0_0}, UT_FV0_1),
+    UNIT_TEST_VERSION_1("unit." + TestFeatureVersion.FEATURE_NAME + ".1", new 
FeatureVersion[]{UT_FV1_0, UT_FV1_1}, UT_FV1_0),
+    UNIT_TEST_VERSION_2("unit." + TestFeatureVersion.FEATURE_NAME + ".2", new 
FeatureVersion[]{UT_FV2_0, UT_FV2_1}, UT_FV2_0),
+    UNIT_TEST_VERSION_3("unit." + TestFeatureVersion.FEATURE_NAME + ".3", new 
FeatureVersion[]{UT_FV3_0, UT_FV3_1}, UT_FV3_1),
+    UNIT_TEST_VERSION_4("unit." + TestFeatureVersion.FEATURE_NAME + ".4", new 
FeatureVersion[]{UT_FV4_0, UT_FV4_1}, UT_FV4_1),
+    UNIT_TEST_VERSION_5("unit." + TestFeatureVersion.FEATURE_NAME + ".5", new 
FeatureVersion[]{UT_FV5_0, UT_FV5_1}, UT_FV5_1),
+    UNIT_TEST_VERSION_6("unit." + TestFeatureVersion.FEATURE_NAME + ".6", new 
FeatureVersion[]{UT_FV6_0, UT_FV6_1}, UT_FV6_1),
+    UNIT_TEST_VERSION_7("unit." + TestFeatureVersion.FEATURE_NAME + ".7", new 
FeatureVersion[]{UT_FV7_0}, UT_FV7_0);
+
+    public static final Feature[] FEATURES;
+
+    // The list of features that are not unit test features.
+    public static final List<Feature> SUPPORTED_FEATURES;
+
+    public static final List<Feature> PRODUCTION_FEATURES;
+
+    public static final List<String> PRODUCTION_FEATURE_NAMES;
+    private final String name;
+    private final FeatureVersion[] featureVersions;
+
+    // The latest production version of the feature, owned and updated by the 
feature owner
+    // in the respective feature definition. The value should not be smaller 
than the default
+    // value calculated with {@link #defaultValue(MetadataVersion)}.
+    public final FeatureVersion latestProduction;
+
+    Feature(String name,
+            FeatureVersion[] featureVersions,
+            FeatureVersion latestProduction) {
+        this.name = name;
+        this.featureVersions = featureVersions;
+        this.latestProduction = latestProduction;
+    }
+
+    static {
+        Feature[] enumValues = Feature.values();
+        FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+        SUPPORTED_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+            !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME)
+        ).collect(Collectors.toList());
+
+        PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+            !feature.name.equals(TEST_VERSION.featureName()) &&
+            !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME)
+        ).collect(Collectors.toList());
+        PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+                feature.name).collect(Collectors.toList());
+
+        validateDefaultValueAndLatestProductionValue(TEST_VERSION);
+        for (Feature feature : PRODUCTION_FEATURES) {
+            validateDefaultValueAndLatestProductionValue(feature);
+        }
+    }
+
+    public String featureName() {
+        return name;
+    }
+
+    public FeatureVersion[] featureVersions() {
+        return featureVersions;
+    }
+
+    public short latestProduction() {
+        return latestProduction.featureLevel();
+    }
+
+    public short minimumProduction() {
+        return featureVersions[0].featureLevel();
+    }
+
+    public short latestTesting() {
+        return featureVersions[featureVersions.length - 1].featureLevel();
+    }
+
+    public SupportedVersionRange supportedVersionRange() {
+        return new SupportedVersionRange(
+            minimumProduction(),
+            latestTesting()
+        );
+    }
+
+    /**
+     * Creates a FeatureVersion from a level.
+     *
+     * @param level                        the level of the feature
+     * @param allowUnstableFeatureVersions whether unstable versions can be 
used
+     * @return the FeatureVersionUtils.FeatureVersion for the feature the enum 
is based on.
+     * @throws IllegalArgumentException    if the feature is not known.
+     */
+    public FeatureVersion fromFeatureLevel(short level,
+                                           boolean 
allowUnstableFeatureVersions) {
+        return Arrays.stream(featureVersions).filter(featureVersion ->
+            featureVersion.featureLevel() == level && 
(allowUnstableFeatureVersions || level <= 
latestProduction())).findFirst().orElseThrow(
+                () -> new IllegalArgumentException("No feature:" + 
featureName() + " with feature level " + level));
+    }
+
+    /**
+     * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+     * captured in {@link FeatureVersion#dependencies()}
+     * <p>
+     * For example, say feature X level x relies on feature Y level y:
+     * if feature X >= x then throw an error if feature Y < y.
+     *
+     * All feature levels above 0 in kraft require metadata.version=4 
(IBP_3_3_IV0) in order to write the feature records to the cluster.
+     *
+     * @param feature                   the feature we are validating
+     * @param features                  the feature versions we have (or want 
to set)
+     * @throws IllegalArgumentException if the feature is not valid
+     */
+    public static void validateVersion(FeatureVersion feature, Map<String, 
Short> features) {
+        Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME);
+
+        if (feature.featureLevel() >= 1 && (metadataVersion == null || 
metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel()))
+            throw new IllegalArgumentException(feature.featureName() + " could 
not be set to " + feature.featureLevel() +
+                    " because it depends on metadata.version=4 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+
+        for (Map.Entry<String, Short> dependency: 
feature.dependencies().entrySet()) {
+            Short featureLevel = features.get(dependency.getKey());
+
+            if (featureLevel == null || featureLevel < dependency.getValue()) {
+                throw new IllegalArgumentException(feature.featureName() + " 
could not be set to " + feature.featureLevel() +
+                        " because it depends on " + dependency.getKey() + " 
level " + dependency.getValue());
+            }
+        }
+    }
+
+    /**
+     * A method to return the default (latest production) version of a feature 
based on the metadata version provided.
+     *
+     * Every time a new feature is added, it should create a mapping from 
metadata version to feature version
+     * with {@link FeatureVersion#bootstrapMetadataVersion()}. The feature 
version should be marked as production ready
+     * before the metadata version is made production ready.
+     *
+     * @param metadataVersion the metadata version we want to use to set the 
default.
+     * @return the default version given the feature and provided metadata 
version
+     */
+    public FeatureVersion defaultVersion(MetadataVersion metadataVersion) {
+        FeatureVersion version = featureVersions[0];
+        for (Iterator<FeatureVersion> it = 
Arrays.stream(featureVersions).iterator(); it.hasNext(); ) {
+            FeatureVersion feature = it.next();
+            if (feature.bootstrapMetadataVersion().isLessThan(metadataVersion) 
|| feature.bootstrapMetadataVersion().equals(metadataVersion))
+                version = feature;
+            else
+                return version;
+        }
+        return version;
+    }
+
+    public short defaultLevel(MetadataVersion metadataVersion) {
+        return defaultVersion(metadataVersion).featureLevel();
+    }
+
+    public static Feature featureFromName(String featureName) {
+        for (Feature feature : FEATURES) {
+            if (feature.name.equals(featureName))
+                return feature;
+        }
+        throw new IllegalArgumentException("Feature " + featureName + " not 
found.");
+    }
+
+    /**
+     * Utility method to map a list of FeatureVersion to a map of feature name 
to feature level
+     */
+    public static Map<String, Short> featureImplsToMap(List<FeatureVersion> 
features) {
+        return 
features.stream().collect(Collectors.toMap(FeatureVersion::featureName, 
FeatureVersion::featureLevel));
+    }
+
+    public boolean isProductionReady(short featureVersion) {
+        return featureVersion <= latestProduction();
+    }
+
+    public boolean hasFeatureVersion(FeatureVersion featureVersion) {
+        for (FeatureVersion v : featureVersions()) {
+            if (v == featureVersion) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * The method ensures that the following statements are met:
+     * 1. The latest production value is one of the feature values.
+     * 2. The latest production value >= the default value.
+     * 3. The dependencies of the latest production value <= their latest 
production values.
+     * 4. The dependencies of all default values <= their default values.
+     * 5. If the latest production depends on MetadataVersion, the value 
should be <= MetadataVersion.LATEST_PRODUCTION.
+     * 6. If any default value depends on MetadataVersion, the value should be 
<= the default value bootstrap MV.
+     *
+     * Suppose we have feature X as the feature being validated.
+     * Invalid examples:
+     *     - The feature X has default version = XV_10 (dependency = {}), 
latest production = XV_5 (dependency = {})
+     *       (Violating rule 2. The latest production value XV_5 is smaller 
than the default value)
+     *     - The feature X has latest production = XV_11 (dependency = {Y: 
YV_4})
+     *       The feature Y has latest production = YV_3 (dependency = {})
+     *       (Violating rule 3. For latest production XV_11, Y's latest 
production YV_3 is smaller than the dependency value YV_4)
+     *     - The feature X has default version = XV_10 (dependency = {Y: YV_4})
+     *       The feature Y has default version = YV_3 (dependency = {})
+     *       (Violating rule 4. For default version XV_10, Y's default value 
YV_3 is smaller than the dependency value YV_4)
+     *     - The feature X has latest production = XV_11 (dependency = 
{MetadataVersion: IBP_4_0_IV1}), MetadataVersion.LATEST_PRODUCTION is 
IBP_4_0_IV0
+     *       (Violating rule 5. The dependency MV IBP_4_0_IV1 is ahead of MV 
latest production IBP_4_0_IV0)
+     *     - The feature X has default version = XV_10 (dependency = 
{MetadataVersion: IBP_4_0_IV1}) and bootstrap MV = IBP_4_0_IV0
+     *       (Violating rule 6. When MV latest production is IBP_4_0_IV0, 
feature X will be set to XV_10 by default whereas it depends on MV IBP_4_0_IV1)
+     * Valid examples:
+     *     - The feature X has default version = XV_10 (dependency = {}), 
latest production = XV_10 (dependency = {})
+     *     - The feature X has default version = XV_10 (dependency = {Y: 
YV_3}), latest production = XV_11 (dependency = {Y: YV_4})
+     *       The feature Y has default version = YV_3 (dependency = {}), 
latest production = YV_4 (dependency = {})
+     *     - The feature X has default version = XV_10 (dependency = 
{MetadataVersion: IBP_4_0_IV0}), boostrap MV = IBP_4_0_IV0,
+     *                       latest production = XV_11 (dependency = 
{MetadataVersion: IBP_4_0_IV1}), MV latest production = IBP_4_0_IV1
+     *
+     * @param feature the feature to validate.
+     * @return true if the feature is valid, false otherwise.
+     * @throws IllegalArgumentException if the feature violates any of the 
rules thus is not valid.
+     */
+    public static void validateDefaultValueAndLatestProductionValue(
+        Feature feature
+    ) throws IllegalArgumentException {
+        FeatureVersion defaultVersion = 
feature.defaultVersion(MetadataVersion.LATEST_PRODUCTION);
+        FeatureVersion latestProduction = feature.latestProduction;
+
+        if (!feature.hasFeatureVersion(latestProduction)) {
+            throw new IllegalArgumentException(String.format("Feature %s has 
latest production version %s=%s " +
+                    "which is not one of its feature versions.",
+                feature.name(), latestProduction.featureName(), 
latestProduction.featureLevel()));
+        }
+
+        if (latestProduction.featureLevel() < defaultVersion.featureLevel()) {
+            throw new IllegalArgumentException(String.format("Feature %s has 
latest production value %s " +
+                    "smaller than its default value %s.",
+                feature.name(), latestProduction.featureLevel(), 
defaultVersion.featureLevel()));
+        }
+
+        for (Map.Entry<String, Short> dependency: 
latestProduction.dependencies().entrySet()) {
+            String dependencyFeatureName = dependency.getKey();
+            if (!dependencyFeatureName.equals(MetadataVersion.FEATURE_NAME)) {
+                Feature dependencyFeature = 
featureFromName(dependencyFeatureName);
+                if 
(!dependencyFeature.isProductionReady(dependency.getValue())) {
+                    throw new IllegalArgumentException(String.format("Latest 
production FeatureVersion %s=%s " +
+                            "has a dependency %s=%s that is not production 
ready. (%s latest production: %s)",
+                        feature.name(), latestProduction.featureLevel(), 
dependencyFeature.name(), dependency.getValue(),
+                        dependencyFeature.name(), 
dependencyFeature.latestProduction()));
+                }
+            } else {
+                if (dependency.getValue() > 
MetadataVersion.LATEST_PRODUCTION.featureLevel()) {
+                    throw new IllegalArgumentException(String.format("Latest 
production FeatureVersion %s=%s " +
+                            "has a dependency %s=%s that is not production 
ready. (%s latest production: %s)",
+                        feature.name(), latestProduction.featureLevel(), 
MetadataVersion.FEATURE_NAME, dependency.getValue(),
+                        MetadataVersion.FEATURE_NAME, 
MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+                }
+            }
+        }
+
+        for (MetadataVersion metadataVersion: MetadataVersion.values()) {
+            // Only checking the kraft metadata versions.
+            if 
(metadataVersion.compareTo(MetadataVersion.MINIMUM_KRAFT_VERSION) < 0) {
+                continue;
+            }
+
+            defaultVersion = feature.defaultVersion(metadataVersion);
+            for (Map.Entry<String, Short> dependency: 
defaultVersion.dependencies().entrySet()) {
+                String dependencyFeatureName = dependency.getKey();
+                if 
(!dependencyFeatureName.equals(MetadataVersion.FEATURE_NAME)) {
+                    Feature dependencyFeature = 
featureFromName(dependencyFeatureName);
+                    if (dependency.getValue() > 
dependencyFeature.defaultLevel(MetadataVersion.LATEST_PRODUCTION)) {
+                        throw new 
IllegalArgumentException(String.format("Default FeatureVersion %s=%s has " +
+                                "a dependency %s=%s that is ahead of its 
default value %s.",
+                            feature.name(), defaultVersion.featureLevel(), 
dependencyFeature.name(), dependency.getValue(),
+                            
dependencyFeature.defaultLevel(MetadataVersion.LATEST_PRODUCTION)));
+                    }
+                } else {
+                    if (dependency.getValue() > 
defaultVersion.bootstrapMetadataVersion().featureLevel()) {

Review Comment:
   The two are not necessarily equal. For instance, when `metadataVersion=120`, 
the feature has versions
   - FV_1, bootstrap MV = 119
   - FV_2, bootstrap MV = 121
   
   Then `defaultVersion.bootstrapMetadataVersion()` will be 119. Using 
`metadataVersion` would also work since we do check when the two are equal, but 
I feel comparing with the bootstrap MV makes more sense.



##########
server-common/src/main/java/org/apache/kafka/server/common/Feature.java:
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV0_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV0_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV1_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV1_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV2_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV2_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV3_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV3_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV4_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV4_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV5_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV5_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV6_0;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV6_1;
+import static org.apache.kafka.server.common.TestFeatureVersion.UT_FV7_0;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Feature {
+
+    /**
+     * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+     * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+     *
+     * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+     */
+    TEST_VERSION("test.feature.version", 
TestFeatureVersion.valuesWithoutUnitTestVersions(), 
TestFeatureVersion.LATEST_PRODUCTION),
+    KRAFT_VERSION("kraft.version", KRaftVersion.values(), 
KRaftVersion.LATEST_PRODUCTION),
+    TRANSACTION_VERSION("transaction.version", TransactionVersion.values(), 
TransactionVersion.LATEST_PRODUCTION),
+    GROUP_VERSION("group.version", GroupVersion.values(), 
GroupVersion.LATEST_PRODUCTION),
+    ELIGIBLE_LEADER_REPLICAS_VERSION("eligible.leader.replicas.version", 
EligibleLeaderReplicasVersion.values(), 
EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
+
+    /**
+     * Features defined only for unit tests and are not used in production.
+     */
+    UNIT_TEST_VERSION_0("unit." + TestFeatureVersion.FEATURE_NAME + ".0", new 
FeatureVersion[]{UT_FV0_0}, UT_FV0_1),
+    UNIT_TEST_VERSION_1("unit." + TestFeatureVersion.FEATURE_NAME + ".1", new 
FeatureVersion[]{UT_FV1_0, UT_FV1_1}, UT_FV1_0),
+    UNIT_TEST_VERSION_2("unit." + TestFeatureVersion.FEATURE_NAME + ".2", new 
FeatureVersion[]{UT_FV2_0, UT_FV2_1}, UT_FV2_0),
+    UNIT_TEST_VERSION_3("unit." + TestFeatureVersion.FEATURE_NAME + ".3", new 
FeatureVersion[]{UT_FV3_0, UT_FV3_1}, UT_FV3_1),
+    UNIT_TEST_VERSION_4("unit." + TestFeatureVersion.FEATURE_NAME + ".4", new 
FeatureVersion[]{UT_FV4_0, UT_FV4_1}, UT_FV4_1),
+    UNIT_TEST_VERSION_5("unit." + TestFeatureVersion.FEATURE_NAME + ".5", new 
FeatureVersion[]{UT_FV5_0, UT_FV5_1}, UT_FV5_1),
+    UNIT_TEST_VERSION_6("unit." + TestFeatureVersion.FEATURE_NAME + ".6", new 
FeatureVersion[]{UT_FV6_0, UT_FV6_1}, UT_FV6_1),
+    UNIT_TEST_VERSION_7("unit." + TestFeatureVersion.FEATURE_NAME + ".7", new 
FeatureVersion[]{UT_FV7_0}, UT_FV7_0);
+
+    public static final Feature[] FEATURES;
+
+    // The list of features that are not unit test features.
+    public static final List<Feature> SUPPORTED_FEATURES;
+
+    public static final List<Feature> PRODUCTION_FEATURES;
+
+    public static final List<String> PRODUCTION_FEATURE_NAMES;
+    private final String name;
+    private final FeatureVersion[] featureVersions;
+
+    // The latest production version of the feature, owned and updated by the 
feature owner
+    // in the respective feature definition. The value should not be smaller 
than the default
+    // value calculated with {@link #defaultValue(MetadataVersion)}.
+    public final FeatureVersion latestProduction;
+
+    Feature(String name,
+            FeatureVersion[] featureVersions,
+            FeatureVersion latestProduction) {
+        this.name = name;
+        this.featureVersions = featureVersions;
+        this.latestProduction = latestProduction;
+    }
+
+    static {
+        Feature[] enumValues = Feature.values();
+        FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+        SUPPORTED_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+            !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME)
+        ).collect(Collectors.toList());
+
+        PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+            !feature.name.equals(TEST_VERSION.featureName()) &&
+            !feature.name.startsWith("unit." + TestFeatureVersion.FEATURE_NAME)
+        ).collect(Collectors.toList());
+        PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+                feature.name).collect(Collectors.toList());
+
+        validateDefaultValueAndLatestProductionValue(TEST_VERSION);
+        for (Feature feature : PRODUCTION_FEATURES) {
+            validateDefaultValueAndLatestProductionValue(feature);
+        }
+    }
+
+    public String featureName() {
+        return name;
+    }
+
+    public FeatureVersion[] featureVersions() {
+        return featureVersions;
+    }
+
+    public short latestProduction() {
+        return latestProduction.featureLevel();
+    }
+
+    public short minimumProduction() {
+        return featureVersions[0].featureLevel();
+    }
+
+    public short latestTesting() {
+        return featureVersions[featureVersions.length - 1].featureLevel();
+    }
+
+    public SupportedVersionRange supportedVersionRange() {
+        return new SupportedVersionRange(
+            minimumProduction(),
+            latestTesting()
+        );
+    }
+
+    /**
+     * Creates a FeatureVersion from a level.
+     *
+     * @param level                        the level of the feature
+     * @param allowUnstableFeatureVersions whether unstable versions can be 
used
+     * @return the FeatureVersionUtils.FeatureVersion for the feature the enum 
is based on.
+     * @throws IllegalArgumentException    if the feature is not known.
+     */
+    public FeatureVersion fromFeatureLevel(short level,
+                                           boolean 
allowUnstableFeatureVersions) {
+        return Arrays.stream(featureVersions).filter(featureVersion ->
+            featureVersion.featureLevel() == level && 
(allowUnstableFeatureVersions || level <= 
latestProduction())).findFirst().orElseThrow(
+                () -> new IllegalArgumentException("No feature:" + 
featureName() + " with feature level " + level));
+    }
+
+    /**
+     * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+     * captured in {@link FeatureVersion#dependencies()}
+     * <p>
+     * For example, say feature X level x relies on feature Y level y:
+     * if feature X >= x then throw an error if feature Y < y.
+     *
+     * All feature levels above 0 in kraft require metadata.version=4 
(IBP_3_3_IV0) in order to write the feature records to the cluster.
+     *
+     * @param feature                   the feature we are validating
+     * @param features                  the feature versions we have (or want 
to set)
+     * @throws IllegalArgumentException if the feature is not valid
+     */
+    public static void validateVersion(FeatureVersion feature, Map<String, 
Short> features) {
+        Short metadataVersion = features.get(MetadataVersion.FEATURE_NAME);
+
+        if (feature.featureLevel() >= 1 && (metadataVersion == null || 
metadataVersion < MetadataVersion.IBP_3_3_IV0.featureLevel()))
+            throw new IllegalArgumentException(feature.featureName() + " could 
not be set to " + feature.featureLevel() +
+                    " because it depends on metadata.version=4 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+
+        for (Map.Entry<String, Short> dependency: 
feature.dependencies().entrySet()) {
+            Short featureLevel = features.get(dependency.getKey());
+
+            if (featureLevel == null || featureLevel < dependency.getValue()) {
+                throw new IllegalArgumentException(feature.featureName() + " 
could not be set to " + feature.featureLevel() +
+                        " because it depends on " + dependency.getKey() + " 
level " + dependency.getValue());
+            }
+        }
+    }
+
+    /**
+     * A method to return the default (latest production) version of a feature 
based on the metadata version provided.
+     *
+     * Every time a new feature is added, it should create a mapping from 
metadata version to feature version
+     * with {@link FeatureVersion#bootstrapMetadataVersion()}. The feature 
version should be marked as production ready
+     * before the metadata version is made production ready.
+     *
+     * @param metadataVersion the metadata version we want to use to set the 
default.
+     * @return the default version given the feature and provided metadata 
version
+     */
+    public FeatureVersion defaultVersion(MetadataVersion metadataVersion) {
+        FeatureVersion version = featureVersions[0];
+        for (Iterator<FeatureVersion> it = 
Arrays.stream(featureVersions).iterator(); it.hasNext(); ) {
+            FeatureVersion feature = it.next();
+            if (feature.bootstrapMetadataVersion().isLessThan(metadataVersion) 
|| feature.bootstrapMetadataVersion().equals(metadataVersion))
+                version = feature;
+            else
+                return version;
+        }
+        return version;
+    }
+
+    public short defaultLevel(MetadataVersion metadataVersion) {
+        return defaultVersion(metadataVersion).featureLevel();
+    }
+
+    public static Feature featureFromName(String featureName) {
+        for (Feature feature : FEATURES) {
+            if (feature.name.equals(featureName))
+                return feature;
+        }
+        throw new IllegalArgumentException("Feature " + featureName + " not 
found.");
+    }
+
+    /**
+     * Utility method to map a list of FeatureVersion to a map of feature name 
to feature level
+     */
+    public static Map<String, Short> featureImplsToMap(List<FeatureVersion> 
features) {

Review Comment:
   Yeah this doesn't seem to be used.. I guess it's fine to remove it



##########
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##########
@@ -16,22 +16,57 @@
  */
 package org.apache.kafka.server.common;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
 public enum TestFeatureVersion implements FeatureVersion {
     TEST_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
     // TEST_1 released right before MV 3.7-IVO was released, and it has no 
dependencies
     TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
-    // TEST_2 is not yet released and maps to the latest testing version, and 
it depends on this metadata version
-    TEST_2(2, MetadataVersion.latestTesting(), 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.latestTesting().featureLevel()));
+    // TEST_2 is not yet set to be the default version and maps to the latest 
testing version, and it depends on this metadata version
+    TEST_2(2, MetadataVersion.latestTesting(), 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.latestTesting().featureLevel())),
+
+    /**
+     * Test versions only used for unit test FeatureTest.java.
+     */
+    // For testing latest production is not one of the feature versions.
+    UT_FV0_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
+    UT_FV0_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+
+    // For testing latest production is lagged behind the default value.
+    UT_FV1_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
+    UT_FV1_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+
+    // For testing the dependency of the latest production that is not yet 
production ready.
+    UT_FV2_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
+    UT_FV2_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+    UT_FV3_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
+    UT_FV3_1(1, MetadataVersion.IBP_3_7_IV0, 
Collections.singletonMap("unit.test.feature.version.2", (short) 1)),
+
+    // For testing the dependency of the default value that is not yet default 
ready.
+    UT_FV4_0(0, MetadataVersion.MINIMUM_KRAFT_VERSION, Collections.emptyMap()),
+    UT_FV4_1(1, MetadataVersion.latestTesting(), Collections.emptyMap()),

Review Comment:
   Yeah, if the two are equal then the test case would be valid. There is an if 
statement checking the situation in the unit test. e.g.,
   ```
           if 
(MetadataVersion.latestProduction().isLessThan(MetadataVersion.latestTesting()))
 {
               assertThrows(IllegalArgumentException.class, () ->
                       
validateDefaultValueAndLatestProductionValue(Feature.UNIT_TEST_VERSION_6),
                   "Feature UNIT_TEST_VERSION_6 has latest production 
FeatureVersion UT_FV6_1 with " +
                       "MV dependency 4.0-IV3 that is not production ready. (MV 
latest production: 4.0-IV0)");
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to