junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1617946240


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
         case "format" =>
           val directories = configToLogDirectories(config.get)
           val clusterId = namespace.getString("cluster_id")
-          val metadataVersion = getMetadataVersion(namespace,
-            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-          if (!metadataVersion.isKRaftSupported) {
-            throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-          }
-          if (!metadataVersion.isProduction) {
-            if (config.get.unstableMetadataVersionsEnabled) {
-              System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-            } else {
-              throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-            }
-          }
           val metaProperties = new MetaProperties.Builder().
             setVersion(MetaPropertiesVersion.V1).
             setClusterId(clusterId).
             setNodeId(config.get.nodeId).
             build()
           val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+          val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+          if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+            throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+          }
+          val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+          val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+          validateMetadataVersion(metadataVersion, config)
+          // Get all other features, validate, and create records for them
+          generateFeatureRecords(
+            metadataRecords,
+            metadataVersion,
+            featureNamesAndLevelsMap,
+            Features.PRODUCTION_FEATURES.asScala.toList,
+            !Option(namespace.getString("release_version")).isEmpty

Review Comment:
   I am wondering why we need to pass in `usesVersionDefault`? Earlier in 
`getMetadataVersion`, we already resolve `metadataVersion` to 
`LATEST_PRODUCTION` if it's not explicitly specified.



##########
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+
+public final class FinalizedFeatures {
+    private final MetadataVersion metadataVersion;
+    private final Map<String, Short> finalizedFeatures;
+    private final long finalizedFeaturesEpoch;
+
+    public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
+        return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
+    }
+
+    public FinalizedFeatures(
+        MetadataVersion metadataVersion,
+        Map<String, Short> finalizedFeatures,
+        long finalizedFeaturesEpoch,
+        boolean kraftMode
+    ) {
+        this.metadataVersion = metadataVersion;
+        this.finalizedFeatures = new HashMap<>(finalizedFeatures);
+        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        // In KRaft mode, we always include the metadata version in the 
features map.
+        // In ZK mode, we never include it.
+        if (kraftMode) {
+            this.finalizedFeatures.put(FEATURE_NAME, 
metadataVersion.featureLevel());
+        } else {
+            this.finalizedFeatures.remove(FEATURE_NAME);
+        }
+    }
+
+    public MetadataVersion metadataVersion() {
+        return metadataVersion;
+    }
+
+    public Map<String, Short> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    public long finalizedFeaturesEpoch() {
+        return finalizedFeaturesEpoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) 
return false;
+        FinalizedFeatures other = (FinalizedFeatures) o;
+        return metadataVersion == other.metadataVersion &&
+            finalizedFeatures.equals(other.finalizedFeatures) &&
+                finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(metadataVersion, finalizedFeatures, 
finalizedFeaturesEpoch);
+    }
+
+    @Override
+    public String toString() {
+        return "Features" +
+                "(metadatVersion=" + metadataVersion +

Review Comment:
   typo metadatVersion



##########
server-common/src/main/java/org/apache/kafka/server/common/Features.java:
##########
@@ -16,72 +16,134 @@
  */
 package org.apache.kafka.server.common;
 
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum Features {
+
+    /**
+     * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+     * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+     *
+     * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersion} when implementing a new feature.
+     */
+    TEST_VERSION("test.feature.version", TestFeatureVersion.values());
+
+    public static final Features[] FEATURES;
+    public static final List<Features> PRODUCTION_FEATURES;
 
-public final class Features {
-    private final MetadataVersion version;
-    private final Map<String, Short> finalizedFeatures;
-    private final long finalizedFeaturesEpoch;
+    public static final List<String> PRODUCTION_FEATURE_NAMES;
+    private final String name;
+    private final FeatureVersion[] featureVersions;
 
-    public static Features fromKRaftVersion(MetadataVersion version) {
-        return new Features(version, Collections.emptyMap(), -1, true);
+    Features(String name,
+             FeatureVersion[] featureVersions) {
+        this.name = name;
+        this.featureVersions = featureVersions;
     }
 
-    public Features(
-        MetadataVersion version,
-        Map<String, Short> finalizedFeatures,
-        long finalizedFeaturesEpoch,
-        boolean kraftMode
-    ) {
-        this.version = version;
-        this.finalizedFeatures = new HashMap<>(finalizedFeatures);
-        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-        // In KRaft mode, we always include the metadata version in the 
features map.
-        // In ZK mode, we never include it.
-        if (kraftMode) {
-            this.finalizedFeatures.put(FEATURE_NAME, version.featureLevel());
-        } else {
-            this.finalizedFeatures.remove(FEATURE_NAME);
-        }
+    static {
+        Features[] enumValues = Features.values();
+        FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+        PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+                feature.name != 
TEST_VERSION.featureName()).collect(Collectors.toList());
+        PRODUCTION_FEATURE_NAMES = PRODUCTION_FEATURES.stream().map(feature ->
+                feature.name).collect(Collectors.toList());
     }
 
-    public MetadataVersion metadataVersion() {
-        return version;
+    public String featureName() {
+        return name;
     }
 
-    public Map<String, Short> finalizedFeatures() {
-        return finalizedFeatures;
+    public FeatureVersion[] featureVersions() {
+        return featureVersions;
     }
 
-    public long finalizedFeaturesEpoch() {
-        return finalizedFeaturesEpoch;
+    public short latestProduction() {
+        return defaultValue(MetadataVersion.LATEST_PRODUCTION);
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || !(o.getClass().equals(Features.class))) return false;
-        Features other = (Features) o;
-        return version == other.version &&
-            finalizedFeatures.equals(other.finalizedFeatures) &&
-                finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+    /**
+     * Creates a FeatureVersion from a level.
+     *
+     * @param level   the level of the feature
+     * @return       the FeatureVersionUtils.FeatureVersion for the feature 
the enum is based on.
+     * @throws        IllegalArgumentException if the feature is not known.
+     */
+    public FeatureVersion fromFeatureLevel(short level) {
+        return Arrays.stream(featureVersions).filter(featureVersion ->
+            featureVersion.featureLevel() == level).findFirst().orElseThrow(
+                () -> new IllegalArgumentException("No feature:" + 
featureName() + " with feature level " + level));
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(version, finalizedFeatures, 
finalizedFeaturesEpoch);
+    /**
+     * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+     * captured in {@link FeatureVersion#dependencies()}
+     * <p>
+     * For example, say feature X level x relies on feature Y level y:
+     * if feature X >= x then throw an error if feature Y < y.
+     *
+     * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in 
order to write the feature records to the cluster.
+     *
+     * @param feature                   the feature we are validating
+     * @param metadataVersion           the metadata version we have (or want 
to set)
+     * @param features                  the feature versions (besides 
MetadataVersion) we have (or want to set)
+     * @throws IllegalArgumentException if the feature is not valid
+     */
+    public static void validateVersion(FeatureVersion feature, MetadataVersion 
metadataVersion, Map<String, Short> features) {
+        if (feature.featureLevel() >= 1 && 
metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0))
+            throw new IllegalArgumentException(feature.featureName() + " could 
not be set to " + feature.featureLevel() +
+                    " because it depends on metadata.version=4 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+
+        for (Map.Entry<String, Short> dependency: 
feature.dependencies().entrySet()) {
+            Short featureLevel = features.get(dependency.getKey());
+
+            if (featureLevel == null || featureLevel < dependency.getValue()) {
+                throw new IllegalArgumentException(feature.featureName() + " 
could not be set to " + feature.featureLevel() +
+                        " because it depends on " + dependency.getKey() + " 
level " + dependency.getValue());
+            }
+        }
+    }
+
+    /**
+     * A method to return the default (latest production) level of a feature 
based on the metadata version provided.
+     *
+     * Every time a new feature is added, it should create a mapping from 
metadata version to feature version
+     * with {@link FeatureVersion#bootstrapMetadataVersion()}. When the 
feature version is production ready, the metadata
+     * version should be made production ready as well.
+     *
+     * @param metadataVersion the metadata version we want to use to set the 
default.
+     * @return the default version level for the feature and potential 
metadata version

Review Comment:
   We are not returning potential metadata version, right?



##########
metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java:
##########
@@ -40,6 +40,10 @@ public Optional<Short> get(String name) {
         return Optional.ofNullable(featureMap.get(name));
     }
 
+    public short getOrDefault(String name, short defaultValue) {

Review Comment:
   versionOrDefault or levelOrDefault?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##########
@@ -61,6 +62,12 @@ public static Map<String, VersionRange> 
defaultFeatureMap(boolean enableUnstable
                 enableUnstable ?
                     MetadataVersion.latestTesting().featureLevel() :
                     MetadataVersion.latestProduction().featureLevel()));
+        for (Features feature : Features.PRODUCTION_FEATURES) {
+            features.put(feature.featureName(), VersionRange.of(
+                0,
+                feature.latestProduction()

Review Comment:
   Should this take `enableUnstable` into consideration?



##########
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+
+public final class FinalizedFeatures {
+    private final MetadataVersion metadataVersion;
+    private final Map<String, Short> finalizedFeatures;
+    private final long finalizedFeaturesEpoch;
+
+    public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
+        return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
+    }
+
+    public FinalizedFeatures(
+        MetadataVersion metadataVersion,
+        Map<String, Short> finalizedFeatures,
+        long finalizedFeaturesEpoch,
+        boolean kraftMode
+    ) {
+        this.metadataVersion = metadataVersion;
+        this.finalizedFeatures = new HashMap<>(finalizedFeatures);
+        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        // In KRaft mode, we always include the metadata version in the 
features map.
+        // In ZK mode, we never include it.
+        if (kraftMode) {
+            this.finalizedFeatures.put(FEATURE_NAME, 
metadataVersion.featureLevel());

Review Comment:
   Could we use MetadataVersion.FEATURE_NAME so that it's clear this is for 
Metadata?



##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -259,26 +263,66 @@ Found problem:
   @Test
   def testDefaultMetadataVersion(): Unit = {
     val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
-    val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = 
None)
+    val mv = StorageTool.getMetadataVersion(namespace, Map.empty, 
defaultVersionString = None)
     assertEquals(MetadataVersion.LATEST_PRODUCTION.featureLevel(), 
mv.featureLevel(),
       "Expected the default metadata.version to be the latest production 
version")
   }
 
   @Test
   def testConfiguredMetadataVersion(): Unit = {
     val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ"))
-    val mv = StorageTool.getMetadataVersion(namespace, defaultVersionString = 
Some(MetadataVersion.IBP_3_3_IV2.toString))
+    val mv = StorageTool.getMetadataVersion(namespace, Map.empty, 
defaultVersionString = Some(MetadataVersion.IBP_3_3_IV2.toString))
     assertEquals(MetadataVersion.IBP_3_3_IV2.featureLevel(), mv.featureLevel(),
       "Expected the default metadata.version to be 3.3-IV2")
   }
 
+  @Test
+  def testSettingFeatureAndReleaseVersionFails(): Unit = {
+    val namespace = StorageTool.parseArguments(Array("format", "-c", 
"config.props", "-t", "XcZZOzUqS4yHOjhMQB6JLQ",
+      "--release-version", "3.0-IV1", "--feature", "metadata.version=4"))
+    assertThrows(classOf[IllegalArgumentException], () => 
StorageTool.getMetadataVersion(namespace, parseFeatures(namespace), 
defaultVersionString = None))
+  }
+
+  @Test
+  def testParseFeatures(): Unit = {
+    def parseAddFeatures(strings: String*): Map[String, java.lang.Short] = {
+      var args = mutable.Seq("format", "-c", "config.props", "-t", 
"XcZZOzUqS4yHOjhMQB6JLQ")
+      args ++= strings
+      val namespace = StorageTool.parseArguments(args.toArray)
+      parseFeatures(namespace)
+    }
+
+    assertThrows(classOf[RuntimeException], () => 
parseAddFeatures("--feature", "blah"))
+    assertThrows(classOf[RuntimeException], () => 
parseAddFeatures("--feature", "blah=blah"))
+
+    // Test with no features
+    assertEquals(Map(), parseAddFeatures())
+
+    // Test with one feature
+    val testFeatureLevel = 1
+    val testArgument = TestFeatureVersion.FEATURE_NAME + "=" + 
testFeatureLevel.toString
+    val expectedMap = Map(TestFeatureVersion.FEATURE_NAME -> 
testFeatureLevel.toShort)
+    assertEquals(expectedMap, parseAddFeatures("--feature", testArgument))
+
+    // Test with two features
+    val metadataFeatureLevel = 5
+    val metadataArgument = MetadataVersion.FEATURE_NAME + "=" + 
metadataFeatureLevel.toString
+    val expectedMap2 = expectedMap ++ Map (MetadataVersion.FEATURE_NAME -> 
metadataFeatureLevel.toShort)
+    assertEquals(expectedMap2, parseAddFeatures("--feature", testArgument, 
"--feature", metadataArgument))
+  }
+
+  def parseFeatures(namespace: Namespace): Map[String, java.lang.Short] = {

Review Comment:
   Could this be private?



##########
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java:
##########
@@ -14,37 +14,86 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.server.common;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
-import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-class FeaturesTest {
-    @Test
-    public void testKRaftModeFeatures() {
-        Features features = new Features(MINIMUM_KRAFT_VERSION,
-                Collections.singletonMap("foo", (short) 2), 123, true);
-        assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
-                features.finalizedFeatures().get(FEATURE_NAME));
-        assertEquals((short) 2,
-                features.finalizedFeatures().get("foo"));
-        assertEquals(2, features.finalizedFeatures().size());
+public class FeaturesTest {
+
+    @ParameterizedTest
+    @EnumSource(Features.class)
+    public void testFromFeatureLevelAllFeatures(Features feature) {
+        FeatureVersion[] featureImplementations = feature.featureVersions();
+        int numFeatures = featureImplementations.length;
+        for (short i = 1; i < numFeatures; i++) {
+            assertEquals(featureImplementations[i - 1], 
feature.fromFeatureLevel(i));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(Features.class)
+    public void testValidateVersionAllFeatures(Features feature) {
+        for (FeatureVersion featureImpl : feature.featureVersions()) {
+            // Ensure that the feature is valid given the typical 
metadataVersionMapping and the dependencies.
+            // Note: Other metadata versions are valid, but this one should 
always be valid.
+            Features.validateVersion(featureImpl, 
featureImpl.bootstrapMetadataVersion(), featureImpl.dependencies());
+        }
     }
 
     @Test
-    public void testZkModeFeatures() {
-        Features features = new Features(MINIMUM_KRAFT_VERSION,
-                Collections.singletonMap("foo", (short) 2), 123, false);
-        assertNull(features.finalizedFeatures().get(FEATURE_NAME));
-        assertEquals((short) 2,
-                features.finalizedFeatures().get("foo"));
-        assertEquals(1, features.finalizedFeatures().size());
+    public void testInvalidValidateVersion() {
+        // Using too low of a MetadataVersion is invalid
+        assertThrows(IllegalArgumentException.class,
+            () -> Features.validateVersion(
+                TestFeatureVersion.TEST_1,
+                MetadataVersion.IBP_2_8_IV0,
+                Collections.emptyMap()
+            )
+        );
+
+        // Using a version that is lower than the dependency will fail.
+        assertThrows(IllegalArgumentException.class,
+             () -> Features.validateVersion(
+                 TestFeatureVersion.TEST_2,
+                 MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,

Review Comment:
   Hmm, metadata version is provided here and in `features`. Should we simplify 
it so that it's only provided in one place?



##########
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##########
@@ -290,6 +334,101 @@ Found problem:
     assertThrows(classOf[IllegalArgumentException], () => 
parseMetadataVersion("--release-version", "0.0"))
   }
 
+  def generateRecord(featureName: String, level: Short): ApiMessageAndVersion 
= {

Review Comment:
   Could this be private?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -141,6 +189,9 @@ object StorageTool extends Logging {
     formatParser.addArgument("--release-version", "-r").
       action(store()).
       help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION}")
+    formatParser.addArgument("--feature").
+      help("A feature upgrade we should perform, in feature=level format. For 
example: `metadata.version=5`.").

Review Comment:
   > A feature upgrade we should perform
   
   storage-tool is only used for initializing a cluster and never used for 
upgrading, right? So, perhaps changing to sth like "A feature level to 
initialize"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to