ahuang98 commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r870770149
########## metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.controller.util.SnapshotFileReader; +import org.apache.kafka.controller.util.SnapshotFileWriter; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.RaftClient; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.snapshot.SnapshotReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + + +/** + * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.snapshot" is used and the + * format is the same as a KRaft snapshot. + */ +public class BootstrapMetadata { + private static final Logger log = LoggerFactory.getLogger(BootstrapMetadata.class); + + public static final String BOOTSTRAP_FILE = "bootstrap.snapshot"; + + private final MetadataVersion metadataVersion; + + BootstrapMetadata(MetadataVersion metadataVersion) { + this.metadataVersion = metadataVersion; + } + + public MetadataVersion metadataVersion() { + return this.metadataVersion; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BootstrapMetadata metadata = (BootstrapMetadata) o; + return metadataVersion == metadata.metadataVersion; + } + + @Override + public int hashCode() { + return Objects.hash(metadataVersion); + } + + @Override + public String toString() { + return "BootstrapMetadata{" + + "metadataVersion=" + metadataVersion + + '}'; + } + + /** + * A raft client listener that simply collects all of the commits and snapshots into a mapping of + * metadata record type to list of records. + */ + private static class BootstrapListener implements RaftClient.Listener<ApiMessageAndVersion> { + private final Map<MetadataRecordType, List<ApiMessage>> messages = new LinkedHashMap<>(); + + @Override + public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { + try { + while (reader.hasNext()) { + Batch<ApiMessageAndVersion> batch = reader.next(); + for (ApiMessageAndVersion messageAndVersion : batch.records()) { + handleMessage(messageAndVersion.message()); + } + } + } finally { + reader.close(); + } + } + + @Override + public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { + try { + while (reader.hasNext()) { + Batch<ApiMessageAndVersion> batch = reader.next(); + for (ApiMessageAndVersion messageAndVersion : batch) { + handleMessage(messageAndVersion.message()); + } + } + } finally { + reader.close(); + } + } + + void handleMessage(ApiMessage message) { + try { + MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); + messages.computeIfAbsent(type, __ -> new ArrayList<>()).add(message); Review Comment: Should we be adding the message to the map even if its `type` is already a key in the map? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -882,16 +897,55 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { newEpoch + ", but we never renounced controller epoch " + curEpoch); } - log.info( - "Becoming the active controller at epoch {}, committed offset {} and committed epoch {}.", - newEpoch, lastCommittedOffset, lastCommittedEpoch - ); + curClaimEpoch = newEpoch; controllerMetrics.setActive(true); writeOffset = lastCommittedOffset; clusterControl.activate(); + // Check if we need to bootstrap a metadata.version into the log. This must happen before we can + // write any records to the log since we need the metadata.version to determine the correct + // record version + final MetadataVersion metadataVersion; + if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) { + final CompletableFuture<Map<String, ApiError>> future; + if (!bootstrapMetadataVersion.isKRaftSupported()) { + metadataVersion = MetadataVersion.UNINITIALIZED; + future = new CompletableFuture<>(); + future.completeExceptionally( + new IllegalStateException("Cannot become leader without a valid initial metadata.version to use. Got " + bootstrapMetadataVersion.version())); Review Comment: nit: Add the min viable MetadataVersion in the log. ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -93,84 +129,187 @@ boolean featureExists(String featureName) { return quorumFeatures.localSupportedFeature(featureName).isPresent(); } - private ApiError updateFeature(String featureName, - short newVersion, - FeatureUpdate.UpgradeType upgradeType, - Map<Integer, Map<String, VersionRange>> brokersAndFeatures, - List<ApiMessageAndVersion> records) { + + MetadataVersion metadataVersion() { + return metadataVersion.get(); + } + + private ApiError updateFeature( + String featureName, + short newVersion, + FeatureUpdate.UpgradeType upgradeType, + Map<Integer, Map<String, VersionRange>> brokersAndFeatures, + List<ApiMessageAndVersion> records + ) { if (!featureExists(featureName)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature."); } if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given upgrade type."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given upgrade type."); } final Short currentVersion = finalizedVersions.get(featureName); if (newVersion <= 0) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The upper value for the new range cannot be less than 1."); + return invalidUpdateVersion(featureName, newVersion, + "A feature version cannot be less than 1."); } if (!canSupportVersion(featureName, newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature range."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature version."); } for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) { VersionRange brokerRange = brokerEntry.getValue().get(featureName); - if (brokerRange == null || !brokerRange.contains(newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, + if (brokerRange == null) { + return invalidUpdateVersion(featureName, newVersion, + "Broker " + brokerEntry.getKey() + " does not support this feature."); + } else if (!brokerRange.contains(newVersion)) { + return invalidUpdateVersion(featureName, newVersion, "Broker " + brokerEntry.getKey() + " does not support the given " + - "feature range."); + "version. It supports " + brokerRange.min() + " to " + brokerRange.max() + "."); } } if (currentVersion != null && newVersion < currentVersion) { if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade."); + return invalidUpdateVersion(featureName, newVersion, + "Can't downgrade the version of this feature without setting the " + + "upgrade type to either safe or unsafe downgrade."); + } + } + + if (featureName.equals(MetadataVersion.FEATURE_NAME)) { + // Perform additional checks if we're updating metadata.version + return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add); + } else { + records.add(new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName(featureName) + .setFeatureLevel(newVersion), + FEATURE_LEVEL_RECORD.highestSupportedVersion())); + return ApiError.NONE; + } + } + + private ApiError invalidUpdateVersion(String feature, short version, String message) { + String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message); + log.debug(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + /** + * Perform some additional validation for metadata.version updates. + */ + private ApiError updateMetadataVersion( + short newVersionLevel, + boolean allowUnsafeDowngrade, + Consumer<ApiMessageAndVersion> recordConsumer + ) { + Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME); + if (!quorumSupported.isPresent()) { + return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version."); + } + + if (newVersionLevel <= 0) { + return invalidMetadataVersion(newVersionLevel, "metadata.version cannot be less than 1."); + } + + if (!quorumSupported.get().contains(newVersionLevel)) { + return invalidMetadataVersion(newVersionLevel, "The controller quorum does support this version."); + } + + MetadataVersion currentVersion = metadataVersion(); + final MetadataVersion newVersion; + try { + newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel); + } catch (IllegalArgumentException e) { + return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version."); + } + + if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) { + // This is a downgrade + boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion); + if (!metadataChanged) { + log.info("Downgrading metadata.version from {} to {}.", currentVersion, newVersion); + } else { + return invalidMetadataVersion(newVersionLevel, "Unsafe metadata.version downgrades are not supported."); } } - records.add(new ApiMessageAndVersion( + recordConsumer.accept(new ApiMessageAndVersion( new FeatureLevelRecord() - .setName(featureName) - .setFeatureLevel(newVersion), - FEATURE_LEVEL_RECORD.highestSupportedVersion())); + .setName(MetadataVersion.FEATURE_NAME) + .setFeatureLevel(newVersionLevel), FEATURE_LEVEL_RECORD.lowestSupportedVersion())); return ApiError.NONE; } - FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) { + private ApiError invalidMetadataVersion(short version, String message) { + String errorMessage = String.format("Invalid metadata.version %d. %s", version, message); + log.error(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + FinalizedControllerFeatures finalizedFeatures(long epoch) { Map<String, Short> features = new HashMap<>(); - for (Entry<String, Short> entry : finalizedVersions.entrySet(lastCommittedOffset)) { + if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) { + features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel()); + } + for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) { features.put(entry.getKey(), entry.getValue()); } - return new FinalizedControllerFeatures(features, lastCommittedOffset); + return new FinalizedControllerFeatures(features, epoch); } public void replay(FeatureLevelRecord record) { - log.info("Setting feature {} to {}", record.name(), record.featureLevel()); - finalizedVersions.put(record.name(), record.featureLevel()); + if (record.name().equals(MetadataVersion.FEATURE_NAME)) { + log.info("Setting metadata.version to {}", record.featureLevel()); + metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel())); + } else { + log.info("Setting feature {} to {}", record.name(), record.featureLevel()); + finalizedVersions.put(record.name(), record.featureLevel()); + } + for (Entry<String, FeatureLevelListener> listenerEntry : listeners.entrySet()) { Review Comment: What other FeatureLevelListeners might we have (in addition to `QuorumFeatureListener`)? ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -93,84 +129,187 @@ boolean featureExists(String featureName) { return quorumFeatures.localSupportedFeature(featureName).isPresent(); } - private ApiError updateFeature(String featureName, - short newVersion, - FeatureUpdate.UpgradeType upgradeType, - Map<Integer, Map<String, VersionRange>> brokersAndFeatures, - List<ApiMessageAndVersion> records) { + + MetadataVersion metadataVersion() { + return metadataVersion.get(); + } + + private ApiError updateFeature( + String featureName, + short newVersion, + FeatureUpdate.UpgradeType upgradeType, + Map<Integer, Map<String, VersionRange>> brokersAndFeatures, + List<ApiMessageAndVersion> records + ) { if (!featureExists(featureName)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature."); } if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given upgrade type."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given upgrade type."); } final Short currentVersion = finalizedVersions.get(featureName); if (newVersion <= 0) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The upper value for the new range cannot be less than 1."); + return invalidUpdateVersion(featureName, newVersion, + "A feature version cannot be less than 1."); } if (!canSupportVersion(featureName, newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature range."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature version."); } for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) { VersionRange brokerRange = brokerEntry.getValue().get(featureName); - if (brokerRange == null || !brokerRange.contains(newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, + if (brokerRange == null) { + return invalidUpdateVersion(featureName, newVersion, + "Broker " + brokerEntry.getKey() + " does not support this feature."); + } else if (!brokerRange.contains(newVersion)) { + return invalidUpdateVersion(featureName, newVersion, "Broker " + brokerEntry.getKey() + " does not support the given " + - "feature range."); + "version. It supports " + brokerRange.min() + " to " + brokerRange.max() + "."); } } if (currentVersion != null && newVersion < currentVersion) { if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade."); + return invalidUpdateVersion(featureName, newVersion, + "Can't downgrade the version of this feature without setting the " + + "upgrade type to either safe or unsafe downgrade."); + } + } + + if (featureName.equals(MetadataVersion.FEATURE_NAME)) { + // Perform additional checks if we're updating metadata.version + return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add); + } else { + records.add(new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName(featureName) + .setFeatureLevel(newVersion), + FEATURE_LEVEL_RECORD.highestSupportedVersion())); + return ApiError.NONE; + } + } + + private ApiError invalidUpdateVersion(String feature, short version, String message) { + String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message); + log.debug(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + /** + * Perform some additional validation for metadata.version updates. + */ + private ApiError updateMetadataVersion( + short newVersionLevel, + boolean allowUnsafeDowngrade, Review Comment: Are you planning on supporting `allowUnsafeDowngrade == true` in this PR? ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -93,84 +129,187 @@ boolean featureExists(String featureName) { return quorumFeatures.localSupportedFeature(featureName).isPresent(); } - private ApiError updateFeature(String featureName, - short newVersion, - FeatureUpdate.UpgradeType upgradeType, - Map<Integer, Map<String, VersionRange>> brokersAndFeatures, - List<ApiMessageAndVersion> records) { + + MetadataVersion metadataVersion() { + return metadataVersion.get(); + } + + private ApiError updateFeature( + String featureName, + short newVersion, + FeatureUpdate.UpgradeType upgradeType, + Map<Integer, Map<String, VersionRange>> brokersAndFeatures, + List<ApiMessageAndVersion> records + ) { if (!featureExists(featureName)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature."); } if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given upgrade type."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given upgrade type."); } final Short currentVersion = finalizedVersions.get(featureName); if (newVersion <= 0) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The upper value for the new range cannot be less than 1."); + return invalidUpdateVersion(featureName, newVersion, + "A feature version cannot be less than 1."); } if (!canSupportVersion(featureName, newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature range."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature version."); } for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) { VersionRange brokerRange = brokerEntry.getValue().get(featureName); - if (brokerRange == null || !brokerRange.contains(newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, + if (brokerRange == null) { + return invalidUpdateVersion(featureName, newVersion, + "Broker " + brokerEntry.getKey() + " does not support this feature."); + } else if (!brokerRange.contains(newVersion)) { + return invalidUpdateVersion(featureName, newVersion, "Broker " + brokerEntry.getKey() + " does not support the given " + - "feature range."); + "version. It supports " + brokerRange.min() + " to " + brokerRange.max() + "."); } } if (currentVersion != null && newVersion < currentVersion) { if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade."); + return invalidUpdateVersion(featureName, newVersion, + "Can't downgrade the version of this feature without setting the " + + "upgrade type to either safe or unsafe downgrade."); + } + } + + if (featureName.equals(MetadataVersion.FEATURE_NAME)) { + // Perform additional checks if we're updating metadata.version + return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add); + } else { + records.add(new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName(featureName) + .setFeatureLevel(newVersion), + FEATURE_LEVEL_RECORD.highestSupportedVersion())); + return ApiError.NONE; + } + } + + private ApiError invalidUpdateVersion(String feature, short version, String message) { + String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message); + log.debug(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + /** + * Perform some additional validation for metadata.version updates. + */ + private ApiError updateMetadataVersion( + short newVersionLevel, + boolean allowUnsafeDowngrade, + Consumer<ApiMessageAndVersion> recordConsumer + ) { + Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME); + if (!quorumSupported.isPresent()) { + return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version."); + } + + if (newVersionLevel <= 0) { + return invalidMetadataVersion(newVersionLevel, "metadata.version cannot be less than 1."); + } + + if (!quorumSupported.get().contains(newVersionLevel)) { + return invalidMetadataVersion(newVersionLevel, "The controller quorum does support this version."); + } + + MetadataVersion currentVersion = metadataVersion(); + final MetadataVersion newVersion; + try { + newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel); + } catch (IllegalArgumentException e) { + return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version."); + } + + if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) { + // This is a downgrade + boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion); + if (!metadataChanged) { + log.info("Downgrading metadata.version from {} to {}.", currentVersion, newVersion); + } else { + return invalidMetadataVersion(newVersionLevel, "Unsafe metadata.version downgrades are not supported."); } } - records.add(new ApiMessageAndVersion( + recordConsumer.accept(new ApiMessageAndVersion( new FeatureLevelRecord() - .setName(featureName) - .setFeatureLevel(newVersion), - FEATURE_LEVEL_RECORD.highestSupportedVersion())); + .setName(MetadataVersion.FEATURE_NAME) + .setFeatureLevel(newVersionLevel), FEATURE_LEVEL_RECORD.lowestSupportedVersion())); return ApiError.NONE; } - FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) { + private ApiError invalidMetadataVersion(short version, String message) { + String errorMessage = String.format("Invalid metadata.version %d. %s", version, message); + log.error(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + FinalizedControllerFeatures finalizedFeatures(long epoch) { Map<String, Short> features = new HashMap<>(); - for (Entry<String, Short> entry : finalizedVersions.entrySet(lastCommittedOffset)) { + if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) { + features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel()); + } + for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) { features.put(entry.getKey(), entry.getValue()); } - return new FinalizedControllerFeatures(features, lastCommittedOffset); + return new FinalizedControllerFeatures(features, epoch); } public void replay(FeatureLevelRecord record) { - log.info("Setting feature {} to {}", record.name(), record.featureLevel()); - finalizedVersions.put(record.name(), record.featureLevel()); + if (record.name().equals(MetadataVersion.FEATURE_NAME)) { + log.info("Setting metadata.version to {}", record.featureLevel()); + metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel())); + } else { + log.info("Setting feature {} to {}", record.name(), record.featureLevel()); + finalizedVersions.put(record.name(), record.featureLevel()); + } + for (Entry<String, FeatureLevelListener> listenerEntry : listeners.entrySet()) { + try { + listenerEntry.getValue().handle(record.name(), record.featureLevel()); + } catch (Throwable t) { + log.error("Failure calling feature listener " + listenerEntry.getKey(), t); + } + } } class FeatureControlIterator implements Iterator<List<ApiMessageAndVersion>> { private final Iterator<Entry<String, Short>> iterator; + private final MetadataVersion metadataVersion; + private boolean wroteVersion = false; Review Comment: Would it work to name this something like `isUninitializedVersion` or simply `uninitialized`? The point of this var is just to avoid doing the `metadataVersion.equals(UNINITIALIZED)` comparison every time we call `FeatureControlIterator.next()` right? ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -93,84 +129,187 @@ boolean featureExists(String featureName) { return quorumFeatures.localSupportedFeature(featureName).isPresent(); } - private ApiError updateFeature(String featureName, - short newVersion, - FeatureUpdate.UpgradeType upgradeType, - Map<Integer, Map<String, VersionRange>> brokersAndFeatures, - List<ApiMessageAndVersion> records) { + + MetadataVersion metadataVersion() { + return metadataVersion.get(); + } + + private ApiError updateFeature( + String featureName, + short newVersion, + FeatureUpdate.UpgradeType upgradeType, + Map<Integer, Map<String, VersionRange>> brokersAndFeatures, + List<ApiMessageAndVersion> records + ) { if (!featureExists(featureName)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature."); } if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given upgrade type."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given upgrade type."); } final Short currentVersion = finalizedVersions.get(featureName); if (newVersion <= 0) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The upper value for the new range cannot be less than 1."); + return invalidUpdateVersion(featureName, newVersion, + "A feature version cannot be less than 1."); } if (!canSupportVersion(featureName, newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature range."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature version."); } for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) { VersionRange brokerRange = brokerEntry.getValue().get(featureName); - if (brokerRange == null || !brokerRange.contains(newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, + if (brokerRange == null) { + return invalidUpdateVersion(featureName, newVersion, + "Broker " + brokerEntry.getKey() + " does not support this feature."); + } else if (!brokerRange.contains(newVersion)) { + return invalidUpdateVersion(featureName, newVersion, "Broker " + brokerEntry.getKey() + " does not support the given " + - "feature range."); + "version. It supports " + brokerRange.min() + " to " + brokerRange.max() + "."); } } if (currentVersion != null && newVersion < currentVersion) { if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade."); + return invalidUpdateVersion(featureName, newVersion, + "Can't downgrade the version of this feature without setting the " + + "upgrade type to either safe or unsafe downgrade."); + } + } + + if (featureName.equals(MetadataVersion.FEATURE_NAME)) { + // Perform additional checks if we're updating metadata.version + return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add); + } else { + records.add(new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName(featureName) + .setFeatureLevel(newVersion), + FEATURE_LEVEL_RECORD.highestSupportedVersion())); + return ApiError.NONE; + } + } + + private ApiError invalidUpdateVersion(String feature, short version, String message) { + String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message); + log.debug(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + /** + * Perform some additional validation for metadata.version updates. + */ + private ApiError updateMetadataVersion( + short newVersionLevel, + boolean allowUnsafeDowngrade, + Consumer<ApiMessageAndVersion> recordConsumer + ) { + Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME); + if (!quorumSupported.isPresent()) { + return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version."); Review Comment: This is making me realize it might be confusing that `metadata.version` refers to `featureLevel` when we also have `MetadataVersion` the enum. ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -93,84 +129,187 @@ boolean featureExists(String featureName) { return quorumFeatures.localSupportedFeature(featureName).isPresent(); } - private ApiError updateFeature(String featureName, - short newVersion, - FeatureUpdate.UpgradeType upgradeType, - Map<Integer, Map<String, VersionRange>> brokersAndFeatures, - List<ApiMessageAndVersion> records) { + + MetadataVersion metadataVersion() { + return metadataVersion.get(); + } + + private ApiError updateFeature( + String featureName, + short newVersion, + FeatureUpdate.UpgradeType upgradeType, + Map<Integer, Map<String, VersionRange>> brokersAndFeatures, + List<ApiMessageAndVersion> records + ) { if (!featureExists(featureName)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature."); } if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given upgrade type."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given upgrade type."); } final Short currentVersion = finalizedVersions.get(featureName); if (newVersion <= 0) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The upper value for the new range cannot be less than 1."); + return invalidUpdateVersion(featureName, newVersion, + "A feature version cannot be less than 1."); } if (!canSupportVersion(featureName, newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature range."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature version."); } for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) { VersionRange brokerRange = brokerEntry.getValue().get(featureName); - if (brokerRange == null || !brokerRange.contains(newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, + if (brokerRange == null) { + return invalidUpdateVersion(featureName, newVersion, + "Broker " + brokerEntry.getKey() + " does not support this feature."); + } else if (!brokerRange.contains(newVersion)) { + return invalidUpdateVersion(featureName, newVersion, "Broker " + brokerEntry.getKey() + " does not support the given " + - "feature range."); + "version. It supports " + brokerRange.min() + " to " + brokerRange.max() + "."); } } if (currentVersion != null && newVersion < currentVersion) { if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade."); + return invalidUpdateVersion(featureName, newVersion, + "Can't downgrade the version of this feature without setting the " + + "upgrade type to either safe or unsafe downgrade."); + } + } + + if (featureName.equals(MetadataVersion.FEATURE_NAME)) { + // Perform additional checks if we're updating metadata.version + return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add); + } else { + records.add(new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName(featureName) + .setFeatureLevel(newVersion), + FEATURE_LEVEL_RECORD.highestSupportedVersion())); + return ApiError.NONE; + } + } + + private ApiError invalidUpdateVersion(String feature, short version, String message) { + String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message); + log.debug(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + /** + * Perform some additional validation for metadata.version updates. + */ + private ApiError updateMetadataVersion( + short newVersionLevel, + boolean allowUnsafeDowngrade, + Consumer<ApiMessageAndVersion> recordConsumer + ) { + Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME); + if (!quorumSupported.isPresent()) { + return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version."); + } + + if (newVersionLevel <= 0) { + return invalidMetadataVersion(newVersionLevel, "metadata.version cannot be less than 1."); Review Comment: nit ```suggestion return invalidMetadataVersion(newVersionLevel, "KRaft mode/the quorum does not support metadata.version values less than 1."); ``` ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -927,6 +981,31 @@ private void appendRaftEvent(String name, Runnable runnable) { } } + /** + * A callback for changes to feature levels including metadata.version. This is called synchronously from + * {@link FeatureControlManager#replay(FeatureLevelRecord)} which is part of a ControllerWriteEvent. It is safe + * to modify controller state here. By the time this listener is called, a FeatureLevelRecord has been committed and + * the in-memory state of FeatureControlManager has been updated. + */ + class QuorumFeatureListener implements FeatureLevelListener { + @Override + public void handle(String featureName, short finalizedVersion) { + boolean isActiveController = curClaimEpoch != -1; + boolean isFeatureSupported = featureControl.canSupportVersion(featureName, finalizedVersion); + if (featureName.equals(MetadataVersion.FEATURE_NAME)) { + if (!isFeatureSupported) { + if (isActiveController) { + log.error("Active controller cannot support metadata.version {}", finalizedVersion); + } else { + log.error("Standby controller cannot support metadata.version {}, shutting down.", finalizedVersion); + } + beginShutdown(); + } + } Review Comment: Is the behavior dependent on the feature being metadata.version? ```suggestion if (!isFeatureSupported) { if (isActiveController) { log.error("Active controller cannot support {} {}", featureName, finalizedVersion); } else { log.error("Standby controller cannot support {} {}, shutting down.", featureName, finalizedVersion); } beginShutdown(); } ``` ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -83,6 +100,25 @@ ControllerResult<Map<String, ApiError>> updateFeatures( } } + ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short initVersion) { + if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) { + return ControllerResult.atomicOf( + Collections.emptyList(), + Collections.singletonMap( + MetadataVersion.FEATURE_NAME, + new ApiError(Errors.INVALID_UPDATE_VERSION, + "Cannot initialize metadata.version since it has already been initialized.") Review Comment: nit: Add the metadata.version in use to the error message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org