mumrah commented on code in PR #12050: URL: https://github.com/apache/kafka/pull/12050#discussion_r871495442
########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -93,84 +129,187 @@ boolean featureExists(String featureName) { return quorumFeatures.localSupportedFeature(featureName).isPresent(); } - private ApiError updateFeature(String featureName, - short newVersion, - FeatureUpdate.UpgradeType upgradeType, - Map<Integer, Map<String, VersionRange>> brokersAndFeatures, - List<ApiMessageAndVersion> records) { + + MetadataVersion metadataVersion() { + return metadataVersion.get(); + } + + private ApiError updateFeature( + String featureName, + short newVersion, + FeatureUpdate.UpgradeType upgradeType, + Map<Integer, Map<String, VersionRange>> brokersAndFeatures, + List<ApiMessageAndVersion> records + ) { if (!featureExists(featureName)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature."); } if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given upgrade type."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given upgrade type."); } final Short currentVersion = finalizedVersions.get(featureName); if (newVersion <= 0) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The upper value for the new range cannot be less than 1."); + return invalidUpdateVersion(featureName, newVersion, + "A feature version cannot be less than 1."); } if (!canSupportVersion(featureName, newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "The controller does not support the given feature range."); + return invalidUpdateVersion(featureName, newVersion, + "The controller does not support the given feature version."); } for (Entry<Integer, Map<String, VersionRange>> brokerEntry : brokersAndFeatures.entrySet()) { VersionRange brokerRange = brokerEntry.getValue().get(featureName); - if (brokerRange == null || !brokerRange.contains(newVersion)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, + if (brokerRange == null) { + return invalidUpdateVersion(featureName, newVersion, + "Broker " + brokerEntry.getKey() + " does not support this feature."); + } else if (!brokerRange.contains(newVersion)) { + return invalidUpdateVersion(featureName, newVersion, "Broker " + brokerEntry.getKey() + " does not support the given " + - "feature range."); + "version. It supports " + brokerRange.min() + " to " + brokerRange.max() + "."); } } if (currentVersion != null && newVersion < currentVersion) { if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) { - return new ApiError(Errors.INVALID_UPDATE_VERSION, - "Can't downgrade the maximum version of this feature without setting the upgrade type to safe or unsafe downgrade."); + return invalidUpdateVersion(featureName, newVersion, + "Can't downgrade the version of this feature without setting the " + + "upgrade type to either safe or unsafe downgrade."); + } + } + + if (featureName.equals(MetadataVersion.FEATURE_NAME)) { + // Perform additional checks if we're updating metadata.version + return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add); + } else { + records.add(new ApiMessageAndVersion( + new FeatureLevelRecord() + .setName(featureName) + .setFeatureLevel(newVersion), + FEATURE_LEVEL_RECORD.highestSupportedVersion())); + return ApiError.NONE; + } + } + + private ApiError invalidUpdateVersion(String feature, short version, String message) { + String errorMessage = String.format("Invalid update version %d for feature %s. %s", version, feature, message); + log.debug(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + /** + * Perform some additional validation for metadata.version updates. + */ + private ApiError updateMetadataVersion( + short newVersionLevel, + boolean allowUnsafeDowngrade, + Consumer<ApiMessageAndVersion> recordConsumer + ) { + Optional<VersionRange> quorumSupported = quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME); + if (!quorumSupported.isPresent()) { + return invalidMetadataVersion(newVersionLevel, "The quorum does not support metadata.version."); + } + + if (newVersionLevel <= 0) { + return invalidMetadataVersion(newVersionLevel, "metadata.version cannot be less than 1."); + } + + if (!quorumSupported.get().contains(newVersionLevel)) { + return invalidMetadataVersion(newVersionLevel, "The controller quorum does support this version."); + } + + MetadataVersion currentVersion = metadataVersion(); + final MetadataVersion newVersion; + try { + newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel); + } catch (IllegalArgumentException e) { + return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version."); + } + + if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) { + // This is a downgrade + boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion); + if (!metadataChanged) { + log.info("Downgrading metadata.version from {} to {}.", currentVersion, newVersion); + } else { + return invalidMetadataVersion(newVersionLevel, "Unsafe metadata.version downgrades are not supported."); } } - records.add(new ApiMessageAndVersion( + recordConsumer.accept(new ApiMessageAndVersion( new FeatureLevelRecord() - .setName(featureName) - .setFeatureLevel(newVersion), - FEATURE_LEVEL_RECORD.highestSupportedVersion())); + .setName(MetadataVersion.FEATURE_NAME) + .setFeatureLevel(newVersionLevel), FEATURE_LEVEL_RECORD.lowestSupportedVersion())); return ApiError.NONE; } - FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) { + private ApiError invalidMetadataVersion(short version, String message) { + String errorMessage = String.format("Invalid metadata.version %d. %s", version, message); + log.error(errorMessage); + return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage); + } + + FinalizedControllerFeatures finalizedFeatures(long epoch) { Map<String, Short> features = new HashMap<>(); - for (Entry<String, Short> entry : finalizedVersions.entrySet(lastCommittedOffset)) { + if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) { + features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel()); + } + for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) { features.put(entry.getKey(), entry.getValue()); } - return new FinalizedControllerFeatures(features, lastCommittedOffset); + return new FinalizedControllerFeatures(features, epoch); } public void replay(FeatureLevelRecord record) { - log.info("Setting feature {} to {}", record.name(), record.featureLevel()); - finalizedVersions.put(record.name(), record.featureLevel()); + if (record.name().equals(MetadataVersion.FEATURE_NAME)) { + log.info("Setting metadata.version to {}", record.featureLevel()); + metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel())); + } else { + log.info("Setting feature {} to {}", record.name(), record.featureLevel()); + finalizedVersions.put(record.name(), record.featureLevel()); + } + for (Entry<String, FeatureLevelListener> listenerEntry : listeners.entrySet()) { Review Comment: I can't really think of any immediate use cases. It's possible we'll just have a single listener on the controller side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org