mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r871491412


##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -93,84 +129,187 @@ boolean featureExists(String featureName) {
         return quorumFeatures.localSupportedFeature(featureName).isPresent();
     }
 
-    private ApiError updateFeature(String featureName,
-                                   short newVersion,
-                                   FeatureUpdate.UpgradeType upgradeType,
-                                   Map<Integer, Map<String, VersionRange>> 
brokersAndFeatures,
-                                   List<ApiMessageAndVersion> records) {
+
+    MetadataVersion metadataVersion() {
+        return metadataVersion.get();
+    }
+
+    private ApiError updateFeature(
+        String featureName,
+        short newVersion,
+        FeatureUpdate.UpgradeType upgradeType,
+        Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+        List<ApiMessageAndVersion> records
+    ) {
         if (!featureExists(featureName)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given feature.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature.");
         }
 
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "The controller does not support the given upgrade type.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given upgrade type.");
         }
 
         final Short currentVersion = finalizedVersions.get(featureName);
 
         if (newVersion <= 0) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The upper value for the new range cannot be less than 1.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "A feature version cannot be less than 1.");
         }
 
         if (!canSupportVersion(featureName, newVersion)) {
-            return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "The controller does not support the given feature range.");
+            return invalidUpdateVersion(featureName, newVersion,
+                "The controller does not support the given feature version.");
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
             VersionRange brokerRange = brokerEntry.getValue().get(featureName);
-            if (brokerRange == null || !brokerRange.contains(newVersion)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
+            if (brokerRange == null) {
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Broker " + brokerEntry.getKey() + " does not support this 
feature.");
+            } else if (!brokerRange.contains(newVersion)) {
+                return invalidUpdateVersion(featureName, newVersion,
                     "Broker " + brokerEntry.getKey() + " does not support the 
given " +
-                        "feature range.");
+                    "version. It supports " + brokerRange.min() + " to " + 
brokerRange.max() + ".");
             }
         }
 
         if (currentVersion != null && newVersion < currentVersion) {
             if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
-                return new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Can't downgrade the maximum version of this feature 
without setting the upgrade type to safe or unsafe downgrade.");
+                return invalidUpdateVersion(featureName, newVersion,
+                    "Can't downgrade the version of this feature without 
setting the " +
+                    "upgrade type to either safe or unsafe downgrade.");
+            }
+        }
+
+        if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+            // Perform additional checks if we're updating metadata.version
+            return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+        } else {
+            records.add(new ApiMessageAndVersion(
+                new FeatureLevelRecord()
+                    .setName(featureName)
+                    .setFeatureLevel(newVersion),
+                FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            return ApiError.NONE;
+        }
+    }
+
+    private ApiError invalidUpdateVersion(String feature, short version, 
String message) {
+        String errorMessage = String.format("Invalid update version %d for 
feature %s. %s", version, feature, message);
+        log.debug(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    /**
+     * Perform some additional validation for metadata.version updates.
+     */
+    private ApiError updateMetadataVersion(
+        short newVersionLevel,
+        boolean allowUnsafeDowngrade,
+        Consumer<ApiMessageAndVersion> recordConsumer
+    ) {
+        Optional<VersionRange> quorumSupported = 
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
+        if (!quorumSupported.isPresent()) {
+            return invalidMetadataVersion(newVersionLevel, "The quorum does 
not support metadata.version.");
+        }
+
+        if (newVersionLevel <= 0) {
+            return invalidMetadataVersion(newVersionLevel, "metadata.version 
cannot be less than 1.");
+        }
+
+        if (!quorumSupported.get().contains(newVersionLevel)) {
+            return invalidMetadataVersion(newVersionLevel, "The controller 
quorum does support this version.");
+        }
+
+        MetadataVersion currentVersion = metadataVersion();
+        final MetadataVersion newVersion;
+        try {
+            newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
+        } catch (IllegalArgumentException e) {
+            return invalidMetadataVersion(newVersionLevel, "Unknown 
metadata.version.");
+        }
+
+        if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && 
newVersion.isLessThan(currentVersion)) {
+            // This is a downgrade
+            boolean metadataChanged = 
MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
+            if (!metadataChanged) {
+                log.info("Downgrading metadata.version from {} to {}.", 
currentVersion, newVersion);
+            } else {
+                return invalidMetadataVersion(newVersionLevel, "Unsafe 
metadata.version downgrades are not supported.");
             }
         }
 
-        records.add(new ApiMessageAndVersion(
+        recordConsumer.accept(new ApiMessageAndVersion(
             new FeatureLevelRecord()
-                .setName(featureName)
-                .setFeatureLevel(newVersion),
-            FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+                .setName(MetadataVersion.FEATURE_NAME)
+                .setFeatureLevel(newVersionLevel), 
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
         return ApiError.NONE;
     }
 
-    FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
+    private ApiError invalidMetadataVersion(short version, String message) {
+        String errorMessage = String.format("Invalid metadata.version %d. %s", 
version, message);
+        log.error(errorMessage);
+        return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+    }
+
+    FinalizedControllerFeatures finalizedFeatures(long epoch) {
         Map<String, Short> features = new HashMap<>();
-        for (Entry<String, Short> entry : 
finalizedVersions.entrySet(lastCommittedOffset)) {
+        if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) 
{
+            features.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get(epoch).featureLevel());
+        }
+        for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
             features.put(entry.getKey(), entry.getValue());
         }
-        return new FinalizedControllerFeatures(features, lastCommittedOffset);
+        return new FinalizedControllerFeatures(features, epoch);
     }
 
     public void replay(FeatureLevelRecord record) {
-        log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
-        finalizedVersions.put(record.name(), record.featureLevel());
+        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+            log.info("Setting metadata.version to {}", record.featureLevel());
+            
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+        } else {
+            log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
+            finalizedVersions.put(record.name(), record.featureLevel());
+        }
+        for (Entry<String, FeatureLevelListener> listenerEntry : 
listeners.entrySet()) {
+            try {
+                listenerEntry.getValue().handle(record.name(), 
record.featureLevel());
+            } catch (Throwable t) {
+                log.error("Failure calling feature listener " + 
listenerEntry.getKey(), t);
+            }
+        }
     }
 
     class FeatureControlIterator implements 
Iterator<List<ApiMessageAndVersion>> {
         private final Iterator<Entry<String, Short>> iterator;
+        private final MetadataVersion metadataVersion;
+        private boolean wroteVersion = false;

Review Comment:
   This boolean is used to make sure we write out the metadata.version feature 
record before any others. If (somehow) a snapshot is generated before the 
metadata.version is initialized, we skip it (hence the check a few lines below 
this)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to