ahuang98 commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r870770149


##########
metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.controller.util.SnapshotFileReader;
+import org.apache.kafka.controller.util.SnapshotFileWriter;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+
+/**
+ * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.snapshot" is used and the
+ * format is the same as a KRaft snapshot.
+ */
+public class BootstrapMetadata {
+    private static final Logger log = 
LoggerFactory.getLogger(BootstrapMetadata.class);
+
+    public static final String BOOTSTRAP_FILE = "bootstrap.snapshot";
+
+    private final MetadataVersion metadataVersion;
+
+    BootstrapMetadata(MetadataVersion metadataVersion) {
+        this.metadataVersion = metadataVersion;
+    }
+
+    public MetadataVersion metadataVersion() {
+        return this.metadataVersion;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        BootstrapMetadata metadata = (BootstrapMetadata) o;
+        return metadataVersion == metadata.metadataVersion;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(metadataVersion);
+    }
+
+    @Override
+    public String toString() {
+        return "BootstrapMetadata{" +
+            "metadataVersion=" + metadataVersion +
+            '}';
+    }
+
+    /**
+     * A raft client listener that simply collects all of the commits and 
snapshots into a mapping of
+     * metadata record type to list of records.
+     */
+    private static class BootstrapListener implements 
RaftClient.Listener<ApiMessageAndVersion> {
+        private final Map<MetadataRecordType, List<ApiMessage>> messages = new 
LinkedHashMap<>();
+
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                while (reader.hasNext()) {
+                    Batch<ApiMessageAndVersion> batch = reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : 
batch.records()) {
+                        handleMessage(messageAndVersion.message());
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {
+            try {
+                while (reader.hasNext()) {
+                    Batch<ApiMessageAndVersion> batch = reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : batch) {
+                        handleMessage(messageAndVersion.message());
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        void handleMessage(ApiMessage message) {
+            try {
+                MetadataRecordType type = 
MetadataRecordType.fromId(message.apiKey());
+                messages.computeIfAbsent(type, __ -> new 
ArrayList<>()).add(message);

Review Comment:
   Should we be adding the message to the map even if its `type` is already a 
key in the map?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -882,16 +897,55 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) {
                             newEpoch + ", but we never renounced controller 
epoch " +
                             curEpoch);
                     }
-                    log.info(
-                        "Becoming the active controller at epoch {}, committed 
offset {} and committed epoch {}.",
-                        newEpoch, lastCommittedOffset, lastCommittedEpoch
-                    );
+
 
                     curClaimEpoch = newEpoch;
                     controllerMetrics.setActive(true);
                     writeOffset = lastCommittedOffset;
                     clusterControl.activate();
 
+                    // Check if we need to bootstrap a metadata.version into 
the log. This must happen before we can
+                    // write any records to the log since we need the 
metadata.version to determine the correct
+                    // record version
+                    final MetadataVersion metadataVersion;
+                    if 
(featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+                        final CompletableFuture<Map<String, ApiError>> future;
+                        if (!bootstrapMetadataVersion.isKRaftSupported()) {
+                            metadataVersion = MetadataVersion.UNINITIALIZED;
+                            future = new CompletableFuture<>();
+                            future.completeExceptionally(
+                                    new IllegalStateException("Cannot become 
leader without a valid initial metadata.version to use. Got " + 
bootstrapMetadataVersion.version()));

Review Comment:
   nit: Add the min viable MetadataVersion in the log.



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -93,84 +129,187 @@ boolean featureExists(String featureName) {
         return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
-    private ApiError updateFeature(String featureName,
-                                   short newVersion,
-                                   FeatureUpdate.UpgradeType upgradeType,
-                                   Map<Integer, Map<String, VersionRange>> 
brokersAndFeatures,
-                                   List<ApiMessageAndVersion> records) {
+
+    MetadataVersion metadataVersion() {
+        return metadataVersion.get();
+    }
+
+    private ApiError updateFeature(
+        String featureName,
+        short newVersion,
+        FeatureUpdate.UpgradeType upgradeType,
+        Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+        List<ApiMessageAndVersion> records
+    ) {
         if (!featureExists(featureName)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature.");
         }
 
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given upgrade type.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given upgrade type.");
         }
 
         final Short currentVersion = finalizedVersions.get(featureName);
 
         if (newVersion <= 0) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The upper value for the new range cannot be less than 1.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "A feature version cannot be less than 1.");
         }
 
         if (!canSupportVersion(featureName, newVersion)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature version.");
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newVersion)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+            if (brokerRange == null) {
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Broker " + brokerEntry.getKey() + " does not support this 
feature.");
+            } else if (!brokerRange.contains(newVersion)) {
+                return invalidUpdateVersion(featureName, newVersion,
                     "Broker " + brokerEntry.getKey() + " does not support the 
given " +
-                        "feature range.");
+                    "version. It supports " + brokerRange.min() + " to " + 
brokerRange.max() + ".");
             }
         }
 
         if (currentVersion != null && newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature 
without setting the upgrade type to safe or unsafe downgrade.");
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Can't downgrade the version of this feature without 
setting the " +
+                    "upgrade type to either safe or unsafe downgrade.");
+            }
+        }
+
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord()
+                    .setName(featureName)
+                    .setFeatureLevel(newVersion),
+                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            return ApiError.NONE;
+        }
+    }
+
+    private ApiError invalidUpdateVersion(String feature, short version, 
String message) {
+        String errorMessage = String.format("Invalid update version %d for 
feature %s. %s", version, feature, message);
+        log.debug(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(
+        short newVersionLevel,
+        boolean allowUnsafeDowngrade,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        Optional<VersionRange> quorumSupported = 
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
+        if (!quorumSupported.isPresent()) {
+            return invalidMetadataVersion(newVersionLevel, "The quorum does 
not support metadata.version.");
+        }
+
+        if (newVersionLevel <= 0) {
+            return invalidMetadataVersion(newVersionLevel, "metadata.version 
cannot be less than 1.");
+        }
+
+        if (!quorumSupported.get().contains(newVersionLevel)) {
+            return invalidMetadataVersion(newVersionLevel, "The controller 
quorum does support this version.");
+        }
+
+        MetadataVersion currentVersion = metadataVersion();
+        final MetadataVersion newVersion;
+        try {
+            newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
+        } catch (IllegalArgumentException e) {
+            return invalidMetadataVersion(newVersionLevel, "Unknown 
metadata.version.");
+        }
+
+        if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && 
newVersion.isLessThan(currentVersion)) {
+            // This is a downgrade
+            boolean metadataChanged = 
MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
+            if (!metadataChanged) {
+                log.info("Downgrading metadata.version from {} to {}.", 
currentVersion, newVersion);
+            } else {
+                return invalidMetadataVersion(newVersionLevel, "Unsafe 
metadata.version downgrades are not supported.");
             }
         }
 
-        records.add(new ApiMessageAndVersion(
+        recordConsumer.accept(new ApiMessageAndVersion(
             new FeatureLevelRecord()
-                .setName(featureName)
-                .setFeatureLevel(newVersion),
-            FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+                .setName(MetadataVersion.FEATURE_NAME)
+                .setFeatureLevel(newVersionLevel), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
         return ApiError.NONE;
     }
 
-    FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
+    private ApiError invalidMetadataVersion(short version, String message) {
+        String errorMessage = String.format("Invalid metadata.version %d. %s", 
version, message);
+        log.error(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    FinalizedControllerFeatures finalizedFeatures(long epoch) {
         Map<String, Short> features = new HashMap<>();
-        for (Entry<String, Short> entry : 
finalizedVersions.entrySet(lastCommittedOffset)) {
+        if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) 
{
+            features.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get(epoch).featureLevel());
+        }
+        for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
             features.put(entry.getKey(), entry.getValue());
         }
-        return new FinalizedControllerFeatures(features, lastCommittedOffset);
+        return new FinalizedControllerFeatures(features, epoch);
     }
 
     public void replay(FeatureLevelRecord record) {
-        log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
-        finalizedVersions.put(record.name(), record.featureLevel());
+        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+            log.info("Setting metadata.version to {}", record.featureLevel());
+            
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+        } else {
+            log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
+            finalizedVersions.put(record.name(), record.featureLevel());
+        }
+        for (Entry<String, FeatureLevelListener> listenerEntry : 
listeners.entrySet()) {

Review Comment:
   What other FeatureLevelListeners might we have (in addition to 
`QuorumFeatureListener`)? 



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -93,84 +129,187 @@ boolean featureExists(String featureName) {
         return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
-    private ApiError updateFeature(String featureName,
-                                   short newVersion,
-                                   FeatureUpdate.UpgradeType upgradeType,
-                                   Map<Integer, Map<String, VersionRange>> 
brokersAndFeatures,
-                                   List<ApiMessageAndVersion> records) {
+
+    MetadataVersion metadataVersion() {
+        return metadataVersion.get();
+    }
+
+    private ApiError updateFeature(
+        String featureName,
+        short newVersion,
+        FeatureUpdate.UpgradeType upgradeType,
+        Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+        List<ApiMessageAndVersion> records
+    ) {
         if (!featureExists(featureName)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature.");
         }
 
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given upgrade type.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given upgrade type.");
         }
 
         final Short currentVersion = finalizedVersions.get(featureName);
 
         if (newVersion <= 0) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The upper value for the new range cannot be less than 1.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "A feature version cannot be less than 1.");
         }
 
         if (!canSupportVersion(featureName, newVersion)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature version.");
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newVersion)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+            if (brokerRange == null) {
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Broker " + brokerEntry.getKey() + " does not support this 
feature.");
+            } else if (!brokerRange.contains(newVersion)) {
+                return invalidUpdateVersion(featureName, newVersion,
                     "Broker " + brokerEntry.getKey() + " does not support the 
given " +
-                        "feature range.");
+                    "version. It supports " + brokerRange.min() + " to " + 
brokerRange.max() + ".");
             }
         }
 
         if (currentVersion != null && newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature 
without setting the upgrade type to safe or unsafe downgrade.");
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Can't downgrade the version of this feature without 
setting the " +
+                    "upgrade type to either safe or unsafe downgrade.");
+            }
+        }
+
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord()
+                    .setName(featureName)
+                    .setFeatureLevel(newVersion),
+                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            return ApiError.NONE;
+        }
+    }
+
+    private ApiError invalidUpdateVersion(String feature, short version, 
String message) {
+        String errorMessage = String.format("Invalid update version %d for 
feature %s. %s", version, feature, message);
+        log.debug(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(
+        short newVersionLevel,
+        boolean allowUnsafeDowngrade,

Review Comment:
   Are you planning on supporting `allowUnsafeDowngrade == true` in this PR?



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -93,84 +129,187 @@ boolean featureExists(String featureName) {
         return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
-    private ApiError updateFeature(String featureName,
-                                   short newVersion,
-                                   FeatureUpdate.UpgradeType upgradeType,
-                                   Map<Integer, Map<String, VersionRange>> 
brokersAndFeatures,
-                                   List<ApiMessageAndVersion> records) {
+
+    MetadataVersion metadataVersion() {
+        return metadataVersion.get();
+    }
+
+    private ApiError updateFeature(
+        String featureName,
+        short newVersion,
+        FeatureUpdate.UpgradeType upgradeType,
+        Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+        List<ApiMessageAndVersion> records
+    ) {
         if (!featureExists(featureName)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature.");
         }
 
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given upgrade type.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given upgrade type.");
         }
 
         final Short currentVersion = finalizedVersions.get(featureName);
 
         if (newVersion <= 0) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The upper value for the new range cannot be less than 1.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "A feature version cannot be less than 1.");
         }
 
         if (!canSupportVersion(featureName, newVersion)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature version.");
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newVersion)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+            if (brokerRange == null) {
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Broker " + brokerEntry.getKey() + " does not support this 
feature.");
+            } else if (!brokerRange.contains(newVersion)) {
+                return invalidUpdateVersion(featureName, newVersion,
                     "Broker " + brokerEntry.getKey() + " does not support the 
given " +
-                        "feature range.");
+                    "version. It supports " + brokerRange.min() + " to " + 
brokerRange.max() + ".");
             }
         }
 
         if (currentVersion != null && newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature 
without setting the upgrade type to safe or unsafe downgrade.");
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Can't downgrade the version of this feature without 
setting the " +
+                    "upgrade type to either safe or unsafe downgrade.");
+            }
+        }
+
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord()
+                    .setName(featureName)
+                    .setFeatureLevel(newVersion),
+                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            return ApiError.NONE;
+        }
+    }
+
+    private ApiError invalidUpdateVersion(String feature, short version, 
String message) {
+        String errorMessage = String.format("Invalid update version %d for 
feature %s. %s", version, feature, message);
+        log.debug(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(
+        short newVersionLevel,
+        boolean allowUnsafeDowngrade,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        Optional<VersionRange> quorumSupported = 
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
+        if (!quorumSupported.isPresent()) {
+            return invalidMetadataVersion(newVersionLevel, "The quorum does 
not support metadata.version.");
+        }
+
+        if (newVersionLevel <= 0) {
+            return invalidMetadataVersion(newVersionLevel, "metadata.version 
cannot be less than 1.");
+        }
+
+        if (!quorumSupported.get().contains(newVersionLevel)) {
+            return invalidMetadataVersion(newVersionLevel, "The controller 
quorum does support this version.");
+        }
+
+        MetadataVersion currentVersion = metadataVersion();
+        final MetadataVersion newVersion;
+        try {
+            newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
+        } catch (IllegalArgumentException e) {
+            return invalidMetadataVersion(newVersionLevel, "Unknown 
metadata.version.");
+        }
+
+        if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && 
newVersion.isLessThan(currentVersion)) {
+            // This is a downgrade
+            boolean metadataChanged = 
MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
+            if (!metadataChanged) {
+                log.info("Downgrading metadata.version from {} to {}.", 
currentVersion, newVersion);
+            } else {
+                return invalidMetadataVersion(newVersionLevel, "Unsafe 
metadata.version downgrades are not supported.");
             }
         }
 
-        records.add(new ApiMessageAndVersion(
+        recordConsumer.accept(new ApiMessageAndVersion(
             new FeatureLevelRecord()
-                .setName(featureName)
-                .setFeatureLevel(newVersion),
-            FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+                .setName(MetadataVersion.FEATURE_NAME)
+                .setFeatureLevel(newVersionLevel), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
         return ApiError.NONE;
     }
 
-    FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
+    private ApiError invalidMetadataVersion(short version, String message) {
+        String errorMessage = String.format("Invalid metadata.version %d. %s", 
version, message);
+        log.error(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    FinalizedControllerFeatures finalizedFeatures(long epoch) {
         Map<String, Short> features = new HashMap<>();
-        for (Entry<String, Short> entry : 
finalizedVersions.entrySet(lastCommittedOffset)) {
+        if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) 
{
+            features.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get(epoch).featureLevel());
+        }
+        for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
             features.put(entry.getKey(), entry.getValue());
         }
-        return new FinalizedControllerFeatures(features, lastCommittedOffset);
+        return new FinalizedControllerFeatures(features, epoch);
     }
 
     public void replay(FeatureLevelRecord record) {
-        log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
-        finalizedVersions.put(record.name(), record.featureLevel());
+        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+            log.info("Setting metadata.version to {}", record.featureLevel());
+            
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+        } else {
+            log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
+            finalizedVersions.put(record.name(), record.featureLevel());
+        }
+        for (Entry<String, FeatureLevelListener> listenerEntry : 
listeners.entrySet()) {
+            try {
+                listenerEntry.getValue().handle(record.name(), 
record.featureLevel());
+            } catch (Throwable t) {
+                log.error("Failure calling feature listener " + 
listenerEntry.getKey(), t);
+            }
+        }
     }
 
     class FeatureControlIterator implements 
Iterator<List<ApiMessageAndVersion>> {
         private final Iterator<Entry<String, Short>> iterator;
+        private final MetadataVersion metadataVersion;
+        private boolean wroteVersion = false;

Review Comment:
   Would it work to name this something like `isUninitializedVersion` or simply 
`uninitialized`? The point of this var is just to avoid doing the 
`metadataVersion.equals(UNINITIALIZED)` comparison every time we call 
`FeatureControlIterator.next()` right?



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -93,84 +129,187 @@ boolean featureExists(String featureName) {
         return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
-    private ApiError updateFeature(String featureName,
-                                   short newVersion,
-                                   FeatureUpdate.UpgradeType upgradeType,
-                                   Map<Integer, Map<String, VersionRange>> 
brokersAndFeatures,
-                                   List<ApiMessageAndVersion> records) {
+
+    MetadataVersion metadataVersion() {
+        return metadataVersion.get();
+    }
+
+    private ApiError updateFeature(
+        String featureName,
+        short newVersion,
+        FeatureUpdate.UpgradeType upgradeType,
+        Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+        List<ApiMessageAndVersion> records
+    ) {
         if (!featureExists(featureName)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature.");
         }
 
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given upgrade type.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given upgrade type.");
         }
 
         final Short currentVersion = finalizedVersions.get(featureName);
 
         if (newVersion <= 0) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The upper value for the new range cannot be less than 1.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "A feature version cannot be less than 1.");
         }
 
         if (!canSupportVersion(featureName, newVersion)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature version.");
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newVersion)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+            if (brokerRange == null) {
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Broker " + brokerEntry.getKey() + " does not support this 
feature.");
+            } else if (!brokerRange.contains(newVersion)) {
+                return invalidUpdateVersion(featureName, newVersion,
                     "Broker " + brokerEntry.getKey() + " does not support the 
given " +
-                        "feature range.");
+                    "version. It supports " + brokerRange.min() + " to " + 
brokerRange.max() + ".");
             }
         }
 
         if (currentVersion != null && newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature 
without setting the upgrade type to safe or unsafe downgrade.");
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Can't downgrade the version of this feature without 
setting the " +
+                    "upgrade type to either safe or unsafe downgrade.");
+            }
+        }
+
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord()
+                    .setName(featureName)
+                    .setFeatureLevel(newVersion),
+                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            return ApiError.NONE;
+        }
+    }
+
+    private ApiError invalidUpdateVersion(String feature, short version, 
String message) {
+        String errorMessage = String.format("Invalid update version %d for 
feature %s. %s", version, feature, message);
+        log.debug(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(
+        short newVersionLevel,
+        boolean allowUnsafeDowngrade,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        Optional<VersionRange> quorumSupported = 
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
+        if (!quorumSupported.isPresent()) {
+            return invalidMetadataVersion(newVersionLevel, "The quorum does 
not support metadata.version.");

Review Comment:
   This is making me realize it might be confusing that `metadata.version` 
refers to `featureLevel` when we also have `MetadataVersion` the enum. 



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -93,84 +129,187 @@ boolean featureExists(String featureName) {
         return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
-    private ApiError updateFeature(String featureName,
-                                   short newVersion,
-                                   FeatureUpdate.UpgradeType upgradeType,
-                                   Map<Integer, Map<String, VersionRange>> 
brokersAndFeatures,
-                                   List<ApiMessageAndVersion> records) {
+
+    MetadataVersion metadataVersion() {
+        return metadataVersion.get();
+    }
+
+    private ApiError updateFeature(
+        String featureName,
+        short newVersion,
+        FeatureUpdate.UpgradeType upgradeType,
+        Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+        List<ApiMessageAndVersion> records
+    ) {
         if (!featureExists(featureName)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature.");
         }
 
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given upgrade type.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given upgrade type.");
         }
 
         final Short currentVersion = finalizedVersions.get(featureName);
 
         if (newVersion <= 0) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The upper value for the new range cannot be less than 1.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "A feature version cannot be less than 1.");
         }
 
         if (!canSupportVersion(featureName, newVersion)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature version.");
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newVersion)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+            if (brokerRange == null) {
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Broker " + brokerEntry.getKey() + " does not support this 
feature.");
+            } else if (!brokerRange.contains(newVersion)) {
+                return invalidUpdateVersion(featureName, newVersion,
                     "Broker " + brokerEntry.getKey() + " does not support the 
given " +
-                        "feature range.");
+                    "version. It supports " + brokerRange.min() + " to " + 
brokerRange.max() + ".");
             }
         }
 
         if (currentVersion != null && newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature 
without setting the upgrade type to safe or unsafe downgrade.");
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Can't downgrade the version of this feature without 
setting the " +
+                    "upgrade type to either safe or unsafe downgrade.");
+            }
+        }
+
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord()
+                    .setName(featureName)
+                    .setFeatureLevel(newVersion),
+                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            return ApiError.NONE;
+        }
+    }
+
+    private ApiError invalidUpdateVersion(String feature, short version, 
String message) {
+        String errorMessage = String.format("Invalid update version %d for 
feature %s. %s", version, feature, message);
+        log.debug(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(
+        short newVersionLevel,
+        boolean allowUnsafeDowngrade,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        Optional<VersionRange> quorumSupported = 
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
+        if (!quorumSupported.isPresent()) {
+            return invalidMetadataVersion(newVersionLevel, "The quorum does 
not support metadata.version.");
+        }
+
+        if (newVersionLevel <= 0) {
+            return invalidMetadataVersion(newVersionLevel, "metadata.version 
cannot be less than 1.");

Review Comment:
   nit
   ```suggestion
               return invalidMetadataVersion(newVersionLevel, "KRaft mode/the 
quorum does not support metadata.version values less than 1.");
   ```



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -927,6 +981,31 @@ private void appendRaftEvent(String name, Runnable 
runnable) {
         }
     }
 
+    /**
+     * A callback for changes to feature levels including metadata.version. 
This is called synchronously from
+     * {@link FeatureControlManager#replay(FeatureLevelRecord)} which is part 
of a ControllerWriteEvent. It is safe
+     * to modify controller state here. By the time this listener is called, a 
FeatureLevelRecord has been committed and
+     * the in-memory state of FeatureControlManager has been updated.
+     */
+    class QuorumFeatureListener implements FeatureLevelListener {
+        @Override
+        public void handle(String featureName, short finalizedVersion) {
+            boolean isActiveController = curClaimEpoch != -1;
+            boolean isFeatureSupported = 
featureControl.canSupportVersion(featureName, finalizedVersion);
+            if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+                if (!isFeatureSupported) {
+                    if (isActiveController) {
+                        log.error("Active controller cannot support 
metadata.version {}", finalizedVersion);
+                    } else {
+                        log.error("Standby controller cannot support 
metadata.version {}, shutting down.", finalizedVersion);
+                    }
+                    beginShutdown();
+                }
+            }

Review Comment:
   Is the behavior dependent on the feature being metadata.version?
   ```suggestion
               if (!isFeatureSupported) {
                   if (isActiveController) {
                       log.error("Active controller cannot support {} {}", 
featureName, finalizedVersion);
                   } else {
                       log.error("Standby controller cannot support {} {}, 
shutting down.", featureName, finalizedVersion);
                   }
                   beginShutdown();
               }
   ```



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -83,6 +100,25 @@ ControllerResult<Map<String, ApiError>> updateFeatures(
         }
     }
 
+    ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short 
initVersion) {
+        if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+            return ControllerResult.atomicOf(
+                Collections.emptyList(),
+                Collections.singletonMap(
+                    MetadataVersion.FEATURE_NAME,
+                    new ApiError(Errors.INVALID_UPDATE_VERSION,
+                        "Cannot initialize metadata.version since it has 
already been initialized.")

Review Comment:
   nit: Add the metadata.version in use to the error message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to