junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1619224649


##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -109,6 +114,52 @@ object StorageTool extends Logging {
     }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+    if (!metadataVersion.isKRaftSupported) {
+      throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+    }
+    if (!metadataVersion.isProduction) {
+      if (config.get.unstableMetadataVersionsEnabled) {
+        System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+      } else {
+        throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+      }
+    }
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+                                            metadataVersion: MetadataVersion,
+                                            specifiedFeatures: Map[String, 
java.lang.Short],
+                                            allFeatures: List[Features],
+                                            usesVersionDefault: Boolean): Unit 
= {

Review Comment:
   usesVersionDefault => releaseVersionSpecified ?



##########
server-common/src/test/java/org/apache/kafka/server/common/FeaturesTest.java:
##########
@@ -14,37 +14,101 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.server.common;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
-import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
-import static 
org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-class FeaturesTest {
-    @Test
-    public void testKRaftModeFeatures() {
-        Features features = new Features(MINIMUM_KRAFT_VERSION,
-                Collections.singletonMap("foo", (short) 2), 123, true);
-        assertEquals(MINIMUM_KRAFT_VERSION.featureLevel(),
-                features.finalizedFeatures().get(FEATURE_NAME));
-        assertEquals((short) 2,
-                features.finalizedFeatures().get("foo"));
-        assertEquals(2, features.finalizedFeatures().size());
+public class FeaturesTest {
+
+    @ParameterizedTest
+    @EnumSource(Features.class)
+    public void testFromFeatureLevelAllFeatures(Features feature) {
+        FeatureVersion[] featureImplementations = feature.featureVersions();
+        int numFeatures = featureImplementations.length;
+        for (short i = 1; i < numFeatures; i++) {
+            assertEquals(featureImplementations[i - 1], 
feature.fromFeatureLevel(i));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(Features.class)
+    public void testValidateVersionAllFeatures(Features feature) {
+        for (FeatureVersion featureImpl : feature.featureVersions()) {
+            // Ensure the minimum bootstrap metadata version is included if no 
metadata version dependency.
+            Map<String, Short> deps = new HashMap<>();
+            deps.putAll(featureImpl.dependencies());
+            if (!deps.containsKey(MetadataVersion.FEATURE_NAME)) {

Review Comment:
   Should we require each feature to include a dependency on MV? Otherwise, we 
need to add this logic in all places where `Features.validateVersion` is called.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -60,24 +60,28 @@ object StorageTool extends Logging {
         case "format" =>
           val directories = configToLogDirectories(config.get)
           val clusterId = namespace.getString("cluster_id")
-          val metadataVersion = getMetadataVersion(namespace,
-            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-          if (!metadataVersion.isKRaftSupported) {
-            throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-          }
-          if (!metadataVersion.isProduction) {
-            if (config.get.unstableMetadataVersionsEnabled) {
-              System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-            } else {
-              throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-            }
-          }
           val metaProperties = new MetaProperties.Builder().
             setVersion(MetaPropertiesVersion.V1).
             setClusterId(clusterId).
             setNodeId(config.get.nodeId).
             build()
           val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+          val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+          if (namespace.getString("release_version") != null && 
specifiedFeatures != null) {
+            throw new TerseFailure("Both --release-version and --feature were 
set. Only one of the two flags can be set.")
+          }
+          val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+          val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+          validateMetadataVersion(metadataVersion, config)
+          // Get all other features, validate, and create records for them
+          generateFeatureRecords(
+            metadataRecords,
+            metadataVersion,
+            featureNamesAndLevelsMap,
+            Features.PRODUCTION_FEATURES.asScala.toList,
+            !Option(namespace.getString("release_version")).isEmpty

Review Comment:
   Got it. If --feature is used, but doesn't include MV, we use 
MetadataVersion.LATEST_PRODUCTION. However, if neither --feature nor 
--release-version is specified, we use 
ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG first and use 
MetadataVersion.LATEST_PRODUCTION as the fallback. Should we keep them 
consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to