mumrah commented on code in PR #12050:
URL: https://github.com/apache/kafka/pull/12050#discussion_r871491412
##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -93,84 +129,187 @@ boolean featureExists(String featureName) {
return quorumFeatures.localSupportedFeature(featureName).isPresent();
}
- private ApiError updateFeature(String featureName,
- short newVersion,
- FeatureUpdate.UpgradeType upgradeType,
- Map<Integer, Map<String, VersionRange>>
brokersAndFeatures,
- List<ApiMessageAndVersion> records) {
+
+ MetadataVersion metadataVersion() {
+ return metadataVersion.get();
+ }
+
+ private ApiError updateFeature(
+ String featureName,
+ short newVersion,
+ FeatureUpdate.UpgradeType upgradeType,
+ Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
+ List<ApiMessageAndVersion> records
+ ) {
if (!featureExists(featureName)) {
- return new ApiError(Errors.INVALID_UPDATE_VERSION,
- "The controller does not support the given feature.");
+ return invalidUpdateVersion(featureName, newVersion,
+ "The controller does not support the given feature.");
}
if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
- return new ApiError(Errors.INVALID_UPDATE_VERSION,
- "The controller does not support the given upgrade type.");
+ return invalidUpdateVersion(featureName, newVersion,
+ "The controller does not support the given upgrade type.");
}
final Short currentVersion = finalizedVersions.get(featureName);
if (newVersion <= 0) {
- return new ApiError(Errors.INVALID_UPDATE_VERSION,
- "The upper value for the new range cannot be less than 1.");
+ return invalidUpdateVersion(featureName, newVersion,
+ "A feature version cannot be less than 1.");
}
if (!canSupportVersion(featureName, newVersion)) {
- return new ApiError(Errors.INVALID_UPDATE_VERSION,
- "The controller does not support the given feature range.");
+ return invalidUpdateVersion(featureName, newVersion,
+ "The controller does not support the given feature version.");
}
for (Entry<Integer, Map<String, VersionRange>> brokerEntry :
brokersAndFeatures.entrySet()) {
VersionRange brokerRange = brokerEntry.getValue().get(featureName);
- if (brokerRange == null || !brokerRange.contains(newVersion)) {
- return new ApiError(Errors.INVALID_UPDATE_VERSION,
+ if (brokerRange == null) {
+ return invalidUpdateVersion(featureName, newVersion,
+ "Broker " + brokerEntry.getKey() + " does not support this
feature.");
+ } else if (!brokerRange.contains(newVersion)) {
+ return invalidUpdateVersion(featureName, newVersion,
"Broker " + brokerEntry.getKey() + " does not support the
given " +
- "feature range.");
+ "version. It supports " + brokerRange.min() + " to " +
brokerRange.max() + ".");
}
}
if (currentVersion != null && newVersion < currentVersion) {
if (upgradeType.equals(FeatureUpdate.UpgradeType.UPGRADE)) {
- return new ApiError(Errors.INVALID_UPDATE_VERSION,
- "Can't downgrade the maximum version of this feature
without setting the upgrade type to safe or unsafe downgrade.");
+ return invalidUpdateVersion(featureName, newVersion,
+ "Can't downgrade the version of this feature without
setting the " +
+ "upgrade type to either safe or unsafe downgrade.");
+ }
+ }
+
+ if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
+ // Perform additional checks if we're updating metadata.version
+ return updateMetadataVersion(newVersion,
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
+ } else {
+ records.add(new ApiMessageAndVersion(
+ new FeatureLevelRecord()
+ .setName(featureName)
+ .setFeatureLevel(newVersion),
+ FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+ return ApiError.NONE;
+ }
+ }
+
+ private ApiError invalidUpdateVersion(String feature, short version,
String message) {
+ String errorMessage = String.format("Invalid update version %d for
feature %s. %s", version, feature, message);
+ log.debug(errorMessage);
+ return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+ }
+
+ /**
+ * Perform some additional validation for metadata.version updates.
+ */
+ private ApiError updateMetadataVersion(
+ short newVersionLevel,
+ boolean allowUnsafeDowngrade,
+ Consumer<ApiMessageAndVersion> recordConsumer
+ ) {
+ Optional<VersionRange> quorumSupported =
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
+ if (!quorumSupported.isPresent()) {
+ return invalidMetadataVersion(newVersionLevel, "The quorum does
not support metadata.version.");
+ }
+
+ if (newVersionLevel <= 0) {
+ return invalidMetadataVersion(newVersionLevel, "metadata.version
cannot be less than 1.");
+ }
+
+ if (!quorumSupported.get().contains(newVersionLevel)) {
+ return invalidMetadataVersion(newVersionLevel, "The controller
quorum does support this version.");
+ }
+
+ MetadataVersion currentVersion = metadataVersion();
+ final MetadataVersion newVersion;
+ try {
+ newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
+ } catch (IllegalArgumentException e) {
+ return invalidMetadataVersion(newVersionLevel, "Unknown
metadata.version.");
+ }
+
+ if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) &&
newVersion.isLessThan(currentVersion)) {
+ // This is a downgrade
+ boolean metadataChanged =
MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
+ if (!metadataChanged) {
+ log.info("Downgrading metadata.version from {} to {}.",
currentVersion, newVersion);
+ } else {
+ return invalidMetadataVersion(newVersionLevel, "Unsafe
metadata.version downgrades are not supported.");
}
}
- records.add(new ApiMessageAndVersion(
+ recordConsumer.accept(new ApiMessageAndVersion(
new FeatureLevelRecord()
- .setName(featureName)
- .setFeatureLevel(newVersion),
- FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(newVersionLevel),
FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
return ApiError.NONE;
}
- FinalizedControllerFeatures finalizedFeatures(long lastCommittedOffset) {
+ private ApiError invalidMetadataVersion(short version, String message) {
+ String errorMessage = String.format("Invalid metadata.version %d. %s",
version, message);
+ log.error(errorMessage);
+ return new ApiError(Errors.INVALID_UPDATE_VERSION, errorMessage);
+ }
+
+ FinalizedControllerFeatures finalizedFeatures(long epoch) {
Map<String, Short> features = new HashMap<>();
- for (Entry<String, Short> entry :
finalizedVersions.entrySet(lastCommittedOffset)) {
+ if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED))
{
+ features.put(MetadataVersion.FEATURE_NAME,
metadataVersion.get(epoch).featureLevel());
+ }
+ for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
features.put(entry.getKey(), entry.getValue());
}
- return new FinalizedControllerFeatures(features, lastCommittedOffset);
+ return new FinalizedControllerFeatures(features, epoch);
}
public void replay(FeatureLevelRecord record) {
- log.info("Setting feature {} to {}", record.name(),
record.featureLevel());
- finalizedVersions.put(record.name(), record.featureLevel());
+ if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
+ log.info("Setting metadata.version to {}", record.featureLevel());
+
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+ } else {
+ log.info("Setting feature {} to {}", record.name(),
record.featureLevel());
+ finalizedVersions.put(record.name(), record.featureLevel());
+ }
+ for (Entry<String, FeatureLevelListener> listenerEntry :
listeners.entrySet()) {
+ try {
+ listenerEntry.getValue().handle(record.name(),
record.featureLevel());
+ } catch (Throwable t) {
+ log.error("Failure calling feature listener " +
listenerEntry.getKey(), t);
+ }
+ }
}
class FeatureControlIterator implements
Iterator<List<ApiMessageAndVersion>> {
private final Iterator<Entry<String, Short>> iterator;
+ private final MetadataVersion metadataVersion;
+ private boolean wroteVersion = false;
Review Comment:
This boolean is used to make sure we write out the metadata.version feature
record before any others. If (somehow) a snapshot is generated before the
metadata.version is initialized, we skip it (hence the check a few lines below
this)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]