junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612454534


##########
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
+
+public final class FinalizedFeatures {
+    private final MetadataVersion metadataVersion;
+    private final Map<String, Short> finalizedFeatures;
+    private final long finalizedFeaturesEpoch;
+
+    public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
+        return new FinalizedFeatures(version, Collections.emptyMap(), -1, 
true);
+    }
+
+    public FinalizedFeatures(
+        MetadataVersion metadataVersion,
+        Map<String, Short> finalizedFeatures,
+        long finalizedFeaturesEpoch,
+        boolean kraftMode
+    ) {
+        this.metadataVersion = metadataVersion;
+        this.finalizedFeatures = new HashMap<>(finalizedFeatures);
+        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        // In KRaft mode, we always include the metadata version in the 
features map.
+        // In ZK mode, we never include it.
+        if (kraftMode) {
+            this.finalizedFeatures.put(FEATURE_NAME, 
metadataVersion.featureLevel());
+        } else {
+            this.finalizedFeatures.remove(FEATURE_NAME);
+        }
+    }
+
+    public MetadataVersion metadataVersion() {
+        return metadataVersion;
+    }
+
+    public Map<String, Short> finalizedFeatures() {
+        return finalizedFeatures;
+    }
+
+    public long finalizedFeaturesEpoch() {
+        return finalizedFeaturesEpoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o.getClass().equals(FinalizedFeatures.class))) 
return false;
+        FinalizedFeatures other = (FinalizedFeatures) o;
+        return metadataVersion == other.metadataVersion &&
+            finalizedFeatures.equals(other.finalizedFeatures) &&
+                finalizedFeaturesEpoch == other.finalizedFeaturesEpoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(metadataVersion, finalizedFeatures, 
finalizedFeaturesEpoch);
+    }
+
+    @Override
+    public String toString() {
+        return "Features" +
+                "(version=" + metadataVersion +

Review Comment:
   version => metadataVersion ?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
     }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+    if (!metadataVersion.isKRaftSupported) {
+      throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+    }
+    if (!metadataVersion.isProduction) {
+      if (config.get.unstableMetadataVersionsEnabled) {
+        System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+      } else {
+        throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+      }
+    }
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+                                            metadataVersion: MetadataVersion,
+                                            specifiedFeatures: Map[String, 
java.lang.Short],
+                                            allFeatures: List[Features],
+                                            usesVersionDefault: Boolean): Unit 
= {
+    // If we are using --version-default, the default is based on the metadata 
version.
+    val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   > If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes.
   
   I thought that's the model being implemented since each FeatureVersion needs 
to define `bootstrapMetadataVersion()`?
   
   Another thing is that if we follow this model, it would be inconvenient for 
each feature to maintain its own latestProductionVersion and make sure that 
it's consistent with the LATEST_PRODUCTION in MV. Perhaps it's simpler to just 
maintain the latestProductionVersion in MV? Then through 
`bootstrapMetadataVersion()`, we will know whether a feature level is 
production ready or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to