This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0ca9cd4d2d MINOR: Several fixes and improvements for
FeatureControlManager (#12207)
0ca9cd4d2d is described below
commit 0ca9cd4d2d2a89510dbe357783a316f2f0789799
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Jun 1 16:09:38 2022 -0700
MINOR: Several fixes and improvements for FeatureControlManager (#12207)
This PR fixes a bug where FeatureControlManager#replay(FeatureLevelRecord)
was throwing an
exception if not all controllers in the quorum supported the feature being
applied. While we do
want to validate this, it needs to be validated earlier, before the record
is committed to the log.
Once the record has been committed to the log it should always be applied
if the current controller
supports it.
Fix another bug where removing a feature was not supported once it had been
configured. Note that
because we reserve feature level 0 for "feature not enabled", we don't need
to use
Optional<VersionRange>; we can just return a range of 0-0 when the feature
is not supported.
Allow the metadata version to be downgraded when
UpgradeType.UNSAFE_DOWNGRADE has been set.
Previously we were unconditionally denying this even when this was set.
Add a builder for FeatureControlManager, so that we can easily add new
parameters to the
constructor in the future. This will also be useful for creating
FeatureControlManagers that are
initialized to a specific MetadataVersion.
Get rid of RemoveFeatureLevelRecord, since it's easier to just issue a
FeatureLevelRecord with
the level set to 0.
Set metadata.max.idle.interval.ms to 0 in RaftClusterSnapshotTest for more
predictability.
Reviewers: David Arthur <[email protected]>, dengziming
<[email protected]>
---
.../kafka/server/RaftClusterSnapshotTest.scala | 8 +-
.../kafka/controller/FeatureControlManager.java | 134 ++++++++++----------
.../apache/kafka/controller/QuorumController.java | 6 +-
.../apache/kafka/controller/QuorumFeatures.java | 78 ++++++------
.../java/org/apache/kafka/image/FeaturesDelta.java | 15 +--
.../java/org/apache/kafka/image/MetadataDelta.java | 8 --
.../common/metadata/RemoveFeatureLevelRecord.json | 26 ----
.../controller/FeatureControlManagerTest.java | 139 ++++++++++++---------
.../kafka/controller/QuorumFeaturesTest.java | 108 +++++++---------
.../org/apache/kafka/image/FeaturesImageTest.java | 15 ++-
10 files changed, 257 insertions(+), 280 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index e34a5ed6ed..503ce7d2be 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -21,6 +21,7 @@ import java.util.Collections
import kafka.testkit.KafkaClusterTestKit
import kafka.testkit.TestKitNodes
import kafka.utils.TestUtils
+import kafka.server.KafkaConfig.{MetadataMaxIdleIntervalMsProp,
MetadataSnapshotMaxNewRecordBytesProp}
import org.apache.kafka.common.utils.BufferSupplier
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.snapshot.RecordsSnapshotReader
@@ -38,7 +39,6 @@ class RaftClusterSnapshotTest {
def testSnapshotsGenerated(): Unit = {
val numberOfBrokers = 3
val numberOfControllers = 3
- val metadataSnapshotMaxNewRecordBytes = 100
TestUtils.resource(
new KafkaClusterTestKit
@@ -48,10 +48,8 @@ class RaftClusterSnapshotTest {
.setNumControllerNodes(numberOfControllers)
.build()
)
- .setConfigProp(
- KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp,
- metadataSnapshotMaxNewRecordBytes.toString
- )
+ .setConfigProp(MetadataSnapshotMaxNewRecordBytesProp, "10")
+ .setConfigProp(MetadataMaxIdleIntervalMsProp, "0")
.build()
) { cluster =>
cluster.format()
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 52d3c5d521..19127afa72 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -29,6 +29,7 @@ import java.util.Optional;
import java.util.TreeMap;
import java.util.function.Consumer;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
@@ -47,6 +48,46 @@ import static
org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
public class FeatureControlManager {
+ public static class Builder {
+ private LogContext logContext = null;
+ private SnapshotRegistry snapshotRegistry = null;
+ private QuorumFeatures quorumFeatures = null;
+ private MetadataVersion metadataVersion =
MetadataVersion.UNINITIALIZED;
+
+ Builder setLogContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
+ this.quorumFeatures = quorumFeatures;
+ return this;
+ }
+
+ Builder setMetadataVersion(MetadataVersion metadataVersion) {
+ this.metadataVersion = metadataVersion;
+ return this;
+ }
+
+ public FeatureControlManager build() {
+ if (logContext == null) logContext = new LogContext();
+ if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
+ if (quorumFeatures == null) {
+ quorumFeatures = new QuorumFeatures(0, new ApiVersions(),
QuorumFeatures.defaultFeatureMap(),
+ Collections.emptyList());
+ }
+ return new FeatureControlManager(logContext,
+ quorumFeatures,
+ snapshotRegistry,
+ metadataVersion);
+ }
+ }
+
private final Logger log;
/**
@@ -65,13 +106,16 @@ public class FeatureControlManager {
private final TimelineObject<MetadataVersion> metadataVersion;
- FeatureControlManager(LogContext logContext,
- QuorumFeatures quorumFeatures,
- SnapshotRegistry snapshotRegistry) {
+ private FeatureControlManager(
+ LogContext logContext,
+ QuorumFeatures quorumFeatures,
+ SnapshotRegistry snapshotRegistry,
+ MetadataVersion metadataVersion
+ ) {
this.log = logContext.logger(FeatureControlManager.class);
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
- this.metadataVersion = new TimelineObject<>(snapshotRegistry,
MetadataVersion.UNINITIALIZED);
+ this.metadataVersion = new TimelineObject<>(snapshotRegistry,
metadataVersion);
}
ControllerResult<Map<String, ApiError>> updateFeatures(
@@ -94,35 +138,6 @@ public class FeatureControlManager {
}
}
- ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short
initVersion) {
- if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
- return ControllerResult.atomicOf(
- Collections.emptyList(),
- Collections.singletonMap(
- MetadataVersion.FEATURE_NAME,
- new ApiError(Errors.INVALID_UPDATE_VERSION,
- "Cannot initialize metadata.version to " + initVersion
+ " since it has already been " +
- "initialized to " +
metadataVersion().featureLevel() + ".")
- ));
- }
- List<ApiMessageAndVersion> records = new ArrayList<>();
- ApiError result = updateMetadataVersion(initVersion, false,
records::add);
- return ControllerResult.atomicOf(records,
Collections.singletonMap(MetadataVersion.FEATURE_NAME, result));
- }
-
- /**
- * Test if the quorum can support this feature and version
- */
- boolean canSupportVersion(String featureName, short version) {
- return quorumFeatures.quorumSupportedFeature(featureName)
- .filter(versionRange -> versionRange.contains(version))
- .isPresent();
- }
-
- boolean featureExists(String featureName) {
- return quorumFeatures.localSupportedFeature(featureName).isPresent();
- }
-
MetadataVersion metadataVersion() {
return metadataVersion.get();
}
@@ -134,11 +149,6 @@ public class FeatureControlManager {
Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
List<ApiMessageAndVersion> records
) {
- if (!featureExists(featureName)) {
- return invalidUpdateVersion(featureName, newVersion,
- "The controller does not support the given feature.");
- }
-
if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
return invalidUpdateVersion(featureName, newVersion,
"The controller does not support the given upgrade type.");
@@ -151,14 +161,14 @@ public class FeatureControlManager {
currentVersion = finalizedVersions.get(featureName);
}
- if (newVersion <= 0) {
+ if (newVersion < 0) {
return invalidUpdateVersion(featureName, newVersion,
- "A feature version cannot be less than 1.");
+ "A feature version cannot be less than 0.");
}
- if (!canSupportVersion(featureName, newVersion)) {
- return invalidUpdateVersion(featureName, newVersion,
- "The quorum does not support the given feature version.");
+ Optional<String> reasonNotSupported =
quorumFeatures.reasonNotSupported(featureName, newVersion);
+ if (reasonNotSupported.isPresent()) {
+ return invalidUpdateVersion(featureName, newVersion,
reasonNotSupported.get());
}
for (Entry<Integer, Map<String, VersionRange>> brokerEntry :
brokersAndFeatures.entrySet()) {
@@ -208,19 +218,6 @@ public class FeatureControlManager {
boolean allowUnsafeDowngrade,
Consumer<ApiMessageAndVersion> recordConsumer
) {
- Optional<VersionRange> quorumSupported =
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
- if (!quorumSupported.isPresent()) {
- return invalidMetadataVersion(newVersionLevel, "The quorum does
not support metadata.version.");
- }
-
- if (newVersionLevel <= 0) {
- return invalidMetadataVersion(newVersionLevel, "KRaft mode/the
quorum does not support metadata.version values less than 1.");
- }
-
- if (!quorumSupported.get().contains(newVersionLevel)) {
- return invalidMetadataVersion(newVersionLevel, "The controller
quorum does support this version.");
- }
-
MetadataVersion currentVersion = metadataVersion();
final MetadataVersion newVersion;
try {
@@ -234,9 +231,15 @@ public class FeatureControlManager {
boolean metadataChanged =
MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
if (!metadataChanged) {
log.info("Downgrading metadata.version from {} to {}.",
currentVersion, newVersion);
+ } else if (allowUnsafeDowngrade) {
+ log.info("Downgrading metadata.version unsafely from {} to
{}.", currentVersion, newVersion);
} else {
- return invalidMetadataVersion(newVersionLevel, "Unsafe
metadata.version downgrades are not supported.");
+ return invalidMetadataVersion(newVersionLevel, "Refusing to
perform the requested " +
+ "downgrade because it might delete metadata
information. Retry using " +
+ "UNSAFE_DOWNGRADE if you want to force the downgrade
to proceed.");
}
+ } else {
+ log.info("Upgrading metadata.version from {} to {}.",
currentVersion, newVersion);
}
recordConsumer.accept(new ApiMessageAndVersion(
@@ -264,17 +267,22 @@ public class FeatureControlManager {
}
public void replay(FeatureLevelRecord record) {
- if (!canSupportVersion(record.name(), record.featureLevel())) {
- throw new RuntimeException("Controller cannot support feature " +
record.name() +
- " at version " + record.featureLevel());
+ VersionRange range =
quorumFeatures.localSupportedFeature(record.name());
+ if (!range.contains(record.featureLevel())) {
+ throw new RuntimeException("Tried to apply FeatureLevelRecord " +
record + ", but this controller only " +
+ "supports versions " + range);
}
-
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
log.info("Setting metadata.version to {}", record.featureLevel());
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
} else {
- log.info("Setting feature {} to {}", record.name(),
record.featureLevel());
- finalizedVersions.put(record.name(), record.featureLevel());
+ if (record.featureLevel() == 0) {
+ log.info("Removing feature {}", record.name());
+ finalizedVersions.remove(record.name());
+ } else {
+ log.info("Setting feature {} to {}", record.name(),
record.featureLevel());
+ finalizedVersions.put(record.name(), record.featureLevel());
+ }
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7ca557dada..97a2040a38 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1563,7 +1563,11 @@ public final class QuorumController implements
Controller {
setReplicaPlacer(replicaPlacer).
setControllerMetrics(controllerMetrics).
build();
- this.featureControl = new FeatureControlManager(logContext,
quorumFeatures, snapshotRegistry);
+ this.featureControl = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setQuorumFeatures(quorumFeatures).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
this.producerIdControlManager = new
ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 80a5c0a0b1..9b723515bd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -33,40 +33,41 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.OptionalInt;
import java.util.stream.Collectors;
/**
* A holder class of the local node's supported feature flags as well as the
ApiVersions of other nodes.
*/
public class QuorumFeatures {
+ private static final VersionRange DISABLED = VersionRange.of(0, 0);
+
private static final Logger log =
LoggerFactory.getLogger(QuorumFeatures.class);
private final int nodeId;
private final ApiVersions apiVersions;
- private final Map<String, VersionRange> supportedFeatures;
+ private final Map<String, VersionRange> localSupportedFeatures;
private final List<Integer> quorumNodeIds;
QuorumFeatures(
int nodeId,
ApiVersions apiVersions,
- Map<String, VersionRange> supportedFeatures,
+ Map<String, VersionRange> localSupportedFeatures,
List<Integer> quorumNodeIds
) {
this.nodeId = nodeId;
this.apiVersions = apiVersions;
- this.supportedFeatures =
Collections.unmodifiableMap(supportedFeatures);
+ this.localSupportedFeatures =
Collections.unmodifiableMap(localSupportedFeatures);
this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
}
public static QuorumFeatures create(
int nodeId,
ApiVersions apiVersions,
- Map<String, VersionRange> supportedFeatures,
+ Map<String, VersionRange> localSupportedFeatures,
Collection<Node> quorumNodes
) {
List<Integer> nodeIds =
quorumNodes.stream().map(Node::id).collect(Collectors.toList());
- return new QuorumFeatures(nodeId, apiVersions, supportedFeatures,
nodeIds);
+ return new QuorumFeatures(nodeId, apiVersions, localSupportedFeatures,
nodeIds);
}
public static Map<String, VersionRange> defaultFeatureMap() {
@@ -75,47 +76,50 @@ public class QuorumFeatures {
return features;
}
- Optional<VersionRange> quorumSupportedFeature(String featureName) {
- List<VersionRange> supportedVersions = new
ArrayList<>(quorumNodeIds.size());
- for (int nodeId : quorumNodeIds) {
- if (nodeId == this.nodeId) {
- // We get this node's features from "supportedFeatures"
- continue;
+ /**
+ * Return the reason a specific feature level is not supported, or
Optional.empty if it is supported.
+ *
+ * @param featureName The feature name.
+ * @param level The feature level.
+ * @return The reason why the feature level is not supported,
or Optional.empty if it is supported.
+ */
+ public Optional<String> reasonNotSupported(String featureName, short
level) {
+ VersionRange localRange =
localSupportedFeatures.getOrDefault(featureName, DISABLED);
+ if (!localRange.contains(level)) {
+ if (localRange.equals(DISABLED)) {
+ return Optional.of("Local controller " + nodeId + " does not
support this feature.");
+ } else {
+ return Optional.of("Local controller " + nodeId + " only
supports versions " + localRange);
}
- NodeApiVersions nodeVersions =
apiVersions.get(Integer.toString(nodeId));
+ }
+ List<String> missing = new ArrayList<>();
+ for (int id : quorumNodeIds) {
+ if (nodeId == id) {
+ continue; // We get the local node's features from
localSupportedFeatures.
+ }
+ NodeApiVersions nodeVersions =
apiVersions.get(Integer.toString(id));
if (nodeVersions == null) {
+ missing.add(Integer.toString(id));
continue;
}
SupportedVersionRange supportedRange =
nodeVersions.supportedFeatures().get(featureName);
- if (supportedRange == null) {
- supportedVersions.add(VersionRange.of(0, 0));
- } else {
- supportedVersions.add(VersionRange.of(supportedRange.min(),
supportedRange.max()));
- }
- }
- localSupportedFeature(featureName).ifPresent(supportedVersions::add);
-
- if (supportedVersions.isEmpty()) {
- return Optional.empty();
- } else {
- OptionalInt highestMinVersion =
supportedVersions.stream().mapToInt(VersionRange::min).max();
- OptionalInt lowestMaxVersion =
supportedVersions.stream().mapToInt(VersionRange::max).min();
- if (highestMinVersion.isPresent() && lowestMaxVersion.isPresent())
{
- if (highestMinVersion.getAsInt() <=
lowestMaxVersion.getAsInt()) {
- if (supportedVersions.size() < quorumNodeIds.size()) {
- log.info("Using incomplete set of quorum supported
features.");
- }
- return Optional.of(VersionRange.of((short)
highestMinVersion.getAsInt(), (short) lowestMaxVersion.getAsInt()));
+ VersionRange range = supportedRange == null ? DISABLED :
+ VersionRange.of(supportedRange.min(),
supportedRange.max());
+ if (!range.contains(level)) {
+ if (range.equals(DISABLED)) {
+ return Optional.of("Controller " + id + " does not support
this feature.");
} else {
- return Optional.empty();
+ return Optional.of("Controller " + id + " only supports
versions " + range);
}
- } else {
- return Optional.empty();
}
}
+ if (!missing.isEmpty()) {
+ log.info("Unable to get feature level information for
controller(s): " + String.join(", ", missing));
+ }
+ return Optional.empty();
}
- Optional<VersionRange> localSupportedFeature(String featureName) {
- return Optional.ofNullable(supportedFeatures.get(featureName));
+ VersionRange localSupportedFeature(String featureName) {
+ return localSupportedFeatures.getOrDefault(featureName, DISABLED);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 28eb187bfb..7f431c2d06 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -18,7 +18,6 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.HashMap;
@@ -61,15 +60,11 @@ public final class FeaturesDelta {
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
metadataVersionChange =
MetadataVersion.fromFeatureLevel(record.featureLevel());
} else {
- changes.put(record.name(), Optional.of(record.featureLevel()));
- }
- }
-
- public void replay(RemoveFeatureLevelRecord record) {
- if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
- metadataVersionChange = null;
- } else {
- changes.put(record.name(), Optional.empty());
+ if (record.featureLevel() == 0) {
+ changes.put(record.name(), Optional.empty());
+ } else {
+ changes.put(record.name(), Optional.of(record.featureLevel()));
+ }
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 01455e360c..25e141ea0d 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
@@ -204,9 +203,6 @@ public final class MetadataDelta {
case PRODUCER_IDS_RECORD:
replay((ProducerIdsRecord) record);
break;
- case REMOVE_FEATURE_LEVEL_RECORD:
- replay((RemoveFeatureLevelRecord) record);
- break;
case BROKER_REGISTRATION_CHANGE_RECORD:
replay((BrokerRegistrationChangeRecord) record);
break;
@@ -291,10 +287,6 @@ public final class MetadataDelta {
getOrCreateProducerIdsDelta().replay(record);
}
- public void replay(RemoveFeatureLevelRecord record) {
- getOrCreateFeaturesDelta().replay(record);
- }
-
public void replay(AccessControlEntryRecord record) {
getOrCreateAclsDelta().replay(record);
}
diff --git
a/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json
b/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json
deleted file mode 100644
index 6ed716192e..0000000000
--- a/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-{
- "apiKey": 16,
- "type": "metadata",
- "name": "RemoveFeatureLevelRecord",
- "validVersions": "0",
- "flexibleVersions": "0+",
- "fields": [
- { "name": "Name", "type": "string", "versions": "0+",
- "about": "The feature name." }
- ]
-}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 02b1493548..3a8c4042df 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.admin.FeatureUpdate;
@@ -39,6 +40,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -70,7 +72,7 @@ public class FeatureControlManagerTest {
public static QuorumFeatures features(Object... args) {
Map<String, VersionRange> features =
QuorumFeatures.defaultFeatureMap();
features.putAll(rangeMap(args));
- return new QuorumFeatures(0, new ApiVersions(), features,
Collections.emptyList());
+ return new QuorumFeatures(0, new ApiVersions(), features, emptyList());
}
private static Map<String, Short> updateMap(Object... args) {
@@ -85,16 +87,17 @@ public class FeatureControlManagerTest {
@Test
public void testUpdateFeatures() {
- LogContext logContext = new LogContext();
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setQuorumFeatures(features("foo", 1, 2)).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
snapshotRegistry.getOrCreateSnapshot(-1);
- FeatureControlManager manager = new FeatureControlManager(logContext,
- features("foo", 1, 2), snapshotRegistry);
assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(),
-1),
manager.finalizedFeatures(-1));
- assertEquals(ControllerResult.atomicOf(Collections.emptyList(),
Collections.
+ assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
- "Invalid update version 3 for feature foo. The quorum does
not support the given feature version."))),
+ "Invalid update version 3 for feature foo. Local
controller 0 only supports versions 1-2"))),
manager.updateFeatures(updateMap("foo", 3),
Collections.singletonMap("foo",
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(), false));
@@ -104,7 +107,7 @@ public class FeatureControlManagerTest {
Map<String, ApiError> expectedMap = new HashMap<>();
expectedMap.put("foo", ApiError.NONE);
expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
- "Invalid update version 1 for feature bar. The controller does
not support the given feature."));
+ "Invalid update version 1 for feature bar. Local controller 0
does not support this feature."));
assertEquals(expectedMap, result.response());
List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@@ -121,8 +124,11 @@ public class FeatureControlManagerTest {
setName("foo").setFeatureLevel((short) 2);
snapshotRegistry.getOrCreateSnapshot(-1);
- FeatureControlManager manager = new FeatureControlManager(logContext,
- features("foo", 1, 2), snapshotRegistry);
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setQuorumFeatures(features("foo", 1, 2)).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
manager.replay(record);
snapshotRegistry.getOrCreateSnapshot(123);
assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2),
123),
@@ -133,12 +139,15 @@ public class FeatureControlManagerTest {
public void testUpdateFeaturesErrorCases() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
- FeatureControlManager manager = new FeatureControlManager(logContext,
- features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
assertEquals(
ControllerResult.atomicOf(
- Collections.emptyList(),
+ emptyList(),
Collections.singletonMap(
"foo",
new ApiError(
@@ -160,7 +169,7 @@ public class FeatureControlManagerTest {
manager.replay((FeatureLevelRecord) result.records().get(0).message());
snapshotRegistry.getOrCreateSnapshot(3);
- assertEquals(ControllerResult.atomicOf(Collections.emptyList(),
Collections.
+ assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 2 for feature foo. Can't downgrade
the version of this feature " +
"without setting the upgrade type to either safe or unsafe
downgrade."))),
@@ -191,8 +200,11 @@ public class FeatureControlManagerTest {
public void testFeatureControlIterator() throws Exception {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
- FeatureControlManager manager = new FeatureControlManager(logContext,
- features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setLogContext(logContext).
+ setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
+ setSnapshotRegistry(snapshotRegistry).
+ build();
ControllerResult<Map<String, ApiError>> result = manager.
updateFeatures(updateMap("foo", 5, "bar", 1),
Collections.emptyMap(), Collections.emptyMap(), false);
@@ -208,57 +220,28 @@ public class FeatureControlManagerTest {
}
@Test
- public void testInitializeMetadataVersion() {
- // Default QuorumFeatures
- checkMetadataVersion(features(), MetadataVersion.IBP_3_0_IV0,
Errors.NONE);
- checkMetadataVersion(features(), MetadataVersion.latest(),
Errors.NONE);
- checkMetadataVersion(features(), MetadataVersion.UNINITIALIZED,
Errors.INVALID_UPDATE_VERSION);
- checkMetadataVersion(features(), MetadataVersion.IBP_2_7_IV1,
Errors.INVALID_UPDATE_VERSION);
-
- // Increased QuorumFeatures
- QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_2_IV0.featureLevel(),
MetadataVersion.IBP_3_3_IV0.featureLevel());
- checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0,
Errors.INVALID_UPDATE_VERSION);
-
- // Empty QuorumFeatures
- features = new QuorumFeatures(0, new ApiVersions(),
Collections.emptyMap(), Collections.emptyList());
- checkMetadataVersion(features, MetadataVersion.latest(),
Errors.INVALID_UPDATE_VERSION);
- checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0,
Errors.INVALID_UPDATE_VERSION);
- }
-
- @Test
- public void reInitializeMetadataVersion() {
- LogContext logContext = new LogContext();
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
- FeatureControlManager manager = new FeatureControlManager(logContext,
features(), snapshotRegistry);
- ControllerResult<Map<String, ApiError>> result =
manager.initializeMetadataVersion(MetadataVersion.IBP_3_0_IV0.featureLevel());
- Errors actual =
result.response().get(MetadataVersion.FEATURE_NAME).error();
- assertEquals(Errors.NONE, actual);
- RecordTestUtils.replayAll(manager, result.records());
-
- result =
manager.initializeMetadataVersion(MetadataVersion.latest().featureLevel());
- actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
- assertEquals(Errors.INVALID_UPDATE_VERSION, actual);
- }
-
- public void checkMetadataVersion(QuorumFeatures features, MetadataVersion
version, Errors expected) {
- LogContext logContext = new LogContext();
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
- FeatureControlManager manager = new FeatureControlManager(logContext,
features, snapshotRegistry);
- ControllerResult<Map<String, ApiError>> result =
manager.initializeMetadataVersion(version.featureLevel());
- Errors actual =
result.response().get(MetadataVersion.FEATURE_NAME).error();
- assertEquals(expected, actual);
+ public void testApplyMetadataVersionChangeRecord() {
+ QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
+ MetadataVersion.IBP_3_0_IV0.featureLevel(),
MetadataVersion.IBP_3_3_IV0.featureLevel());
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setQuorumFeatures(features).build();
+ manager.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.IBP_3_0_IV0.featureLevel()));
+ assertEquals(MetadataVersion.IBP_3_0_IV0, manager.metadataVersion());
}
@Test
public void testDowngradeMetadataVersion() {
- LogContext logContext = new LogContext();
- SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
- QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_2_IV0.featureLevel(),
MetadataVersion.IBP_3_3_IV0.featureLevel());
- FeatureControlManager manager = new FeatureControlManager(logContext,
features, snapshotRegistry);
- ControllerResult<Map<String, ApiError>> result =
manager.initializeMetadataVersion(MetadataVersion.IBP_3_3_IV0.featureLevel());
- RecordTestUtils.replayAll(manager, result.records());
+ QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
+ MetadataVersion.IBP_3_2_IV0.featureLevel(),
MetadataVersion.IBP_3_3_IV0.featureLevel());
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setQuorumFeatures(features).
+ setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
+ build();
assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0);
+ ControllerResult<Map<String, ApiError>> result;
result = manager.updateFeatures(
Collections.singletonMap(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_2_IV0.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME,
FeatureUpdate.UpgradeType.UPGRADE),
@@ -280,7 +263,39 @@ public class FeatureControlManagerTest {
Collections.emptyMap(),
true);
assertEquals(Errors.INVALID_UPDATE_VERSION,
result.response().get(MetadataVersion.FEATURE_NAME).error());
- assertEquals("Invalid update version 1 for feature metadata.version.
The quorum does not support the given feature version.",
+ assertEquals("Invalid update version 1 for feature metadata.version.
Local controller 0 only supports versions 4-5",
result.response().get(MetadataVersion.FEATURE_NAME).message());
}
+
+ @Test
+ public void testCreateFeatureLevelRecords() {
+ Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
+ localSupportedFeatures.put(MetadataVersion.FEATURE_NAME,
VersionRange.of(
+ MetadataVersion.IBP_3_0_IV0.featureLevel(),
MetadataVersion.latest().featureLevel()));
+ localSupportedFeatures.put("foo", VersionRange.of(0, 2));
+ FeatureControlManager manager = new FeatureControlManager.Builder().
+ setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
localSupportedFeatures, emptyList())).
+ build();
+ ControllerResult<Map<String, ApiError>> result =
manager.updateFeatures(
+ Collections.singletonMap("foo", (short) 1),
+ Collections.singletonMap("foo",
FeatureUpdate.UpgradeType.UPGRADE),
+ Collections.singletonMap(1, Collections.singletonMap("foo",
VersionRange.of(0, 3))),
+ false);
+ assertEquals(ControllerResult.atomicOf(Arrays.asList(new
ApiMessageAndVersion(
+ new
FeatureLevelRecord().setName("foo").setFeatureLevel((short) 1), (short) 0)),
+ Collections.singletonMap("foo", ApiError.NONE)),
result);
+ RecordTestUtils.replayAll(manager, result.records());
+ assertEquals(Optional.of((short) 1),
manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
+
+ ControllerResult<Map<String, ApiError>> result2 =
manager.updateFeatures(
+ Collections.singletonMap("foo", (short) 0),
+ Collections.singletonMap("foo",
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+ Collections.singletonMap(1, Collections.singletonMap("foo",
VersionRange.of(0, 3))),
+ false);
+ assertEquals(ControllerResult.atomicOf(Arrays.asList(new
ApiMessageAndVersion(
+ new
FeatureLevelRecord().setName("foo").setFeatureLevel((short) 0), (short) 0)),
+ Collections.singletonMap("foo", ApiError.NONE)), result2);
+ RecordTestUtils.replayAll(manager, result2.records());
+ assertEquals(Optional.empty(),
manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
index 0194cd674e..7d8ba5bfec 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
@@ -19,86 +19,74 @@ package org.apache.kafka.controller;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.common.message.ApiVersionsResponseData;
+import
org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
import org.apache.kafka.metadata.VersionRange;
import org.junit.jupiter.api.Test;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
+import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
public class QuorumFeaturesTest {
- @Test
- public void testQuorumFeatures() {
- ApiVersions apiVersions = new ApiVersions();
- Map<String, VersionRange> featureMap = new HashMap<>(2);
- featureMap.put("foo", VersionRange.of(1, 2));
- featureMap.put("bar", VersionRange.of(3, 5));
-
- List<Integer> nodeIds = new ArrayList<>();
- nodeIds.add(0);
-
- QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions,
featureMap, nodeIds);
- assertLocalFeature(quorumFeatures, "foo", 1, 2);
- assertLocalFeature(quorumFeatures, "bar", 3, 5);
- assertQuorumFeature(quorumFeatures, "foo", 1, 2);
- assertQuorumFeature(quorumFeatures, "bar", 3, 5);
+ private final static Map<String, VersionRange> LOCAL;
- // Add a second node with identical features
- nodeIds.add(1);
- apiVersions.update("1", nodeApiVersions(featureMap));
- assertLocalFeature(quorumFeatures, "foo", 1, 2);
- assertLocalFeature(quorumFeatures, "bar", 3, 5);
- assertQuorumFeature(quorumFeatures, "foo", 1, 2);
- assertQuorumFeature(quorumFeatures, "bar", 3, 5);
-
- // Change the supported features of one node
- Map<String, VersionRange> node1Features = new HashMap<>(featureMap);
- node1Features.put("bar", VersionRange.of(3, 4));
- apiVersions.update("1", nodeApiVersions(node1Features));
- assertLocalFeature(quorumFeatures, "foo", 1, 2);
- assertLocalFeature(quorumFeatures, "bar", 3, 5);
- assertQuorumFeature(quorumFeatures, "foo", 1, 2);
- assertQuorumFeature(quorumFeatures, "bar", 3, 4);
-
- // Add a third node with no features
- nodeIds.add(2);
- apiVersions.update("1", NodeApiVersions.create());
- assertFalse(quorumFeatures.quorumSupportedFeature("foo").isPresent());
- assertFalse(quorumFeatures.quorumSupportedFeature("bar").isPresent());
+ static {
+ Map<String, VersionRange> local = new HashMap<>();
+ local.put("foo", VersionRange.of(0, 3));
+ local.put("bar", VersionRange.of(0, 4));
+ local.put("baz", VersionRange.of(2, 2));
+ LOCAL = Collections.unmodifiableMap(local);
}
-
- public static NodeApiVersions nodeApiVersions(Map<String, VersionRange>
featureMap) {
- List<ApiVersionsResponseData.SupportedFeatureKey> supportedFeatures =
new ArrayList<>(featureMap.size());
- featureMap.forEach((featureName, versionRange) -> {
- supportedFeatures.add(new
ApiVersionsResponseData.SupportedFeatureKey()
- .setName(featureName)
- .setMinVersion(versionRange.min())
- .setMaxVersion(versionRange.max()));
- });
- return new NodeApiVersions(Collections.emptyList(), supportedFeatures);
+ @Test
+ public void testDefaultSupportedLevels() {
+ QuorumFeatures quorumFeatures = new QuorumFeatures(0, new
ApiVersions(), emptyMap(), Arrays.asList(0, 1, 2));
+ assertEquals(Optional.empty(),
quorumFeatures.reasonNotSupported("foo", (short) 0));
+ assertEquals(Optional.of("Local controller 0 does not support this
feature."),
+ quorumFeatures.reasonNotSupported("foo", (short) 1));
}
- private void assertLocalFeature(QuorumFeatures features, String name, int
expectedMin, int expectedMax) {
- Optional<VersionRange> featureRange =
features.localSupportedFeature(name);
- assertTrue(featureRange.isPresent());
- assertEquals(expectedMin, featureRange.get().min());
- assertEquals(expectedMax, featureRange.get().max());
+ @Test
+ public void testLocalSupportedFeature() {
+ QuorumFeatures quorumFeatures = new QuorumFeatures(0, new
ApiVersions(), LOCAL, Arrays.asList(0, 1, 2));
+ assertEquals(VersionRange.of(0, 3),
quorumFeatures.localSupportedFeature("foo"));
+ assertEquals(VersionRange.of(0, 4),
quorumFeatures.localSupportedFeature("bar"));
+ assertEquals(VersionRange.of(2, 2),
quorumFeatures.localSupportedFeature("baz"));
+ assertEquals(VersionRange.of(0, 0),
quorumFeatures.localSupportedFeature("quux"));
}
- private void assertQuorumFeature(QuorumFeatures features, String name, int
expectedMin, int expectedMax) {
- Optional<VersionRange> featureRange =
features.quorumSupportedFeature(name);
- assertTrue(featureRange.isPresent());
- assertEquals(expectedMin, featureRange.get().min());
- assertEquals(expectedMax, featureRange.get().max());
+ @Test
+ public void testReasonNotSupported() {
+ ApiVersions apiVersions = new ApiVersions();
+ QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions,
LOCAL, Arrays.asList(0, 1, 2));
+ assertEquals(Optional.of("Local controller 0 only supports versions
0-3"),
+ quorumFeatures.reasonNotSupported("foo", (short) 10));
+ apiVersions.update("1", nodeApiVersions(Arrays.asList(
+ new SimpleImmutableEntry<>("foo", VersionRange.of(1, 3)),
+ new SimpleImmutableEntry<>("bar", VersionRange.of(1, 3)),
+ new SimpleImmutableEntry<>("baz", VersionRange.of(1, 2)))));
+ assertEquals(Optional.empty(),
quorumFeatures.reasonNotSupported("bar", (short) 3));
+ assertEquals(Optional.of("Controller 1 only supports versions 1-3"),
+ quorumFeatures.reasonNotSupported("bar", (short) 4));
}
+ private static NodeApiVersions nodeApiVersions(List<Entry<String,
VersionRange>> entries) {
+ List<SupportedFeatureKey> features = new ArrayList<>();
+ entries.forEach(entry -> {
+ features.add(new SupportedFeatureKey().
+ setName(entry.getKey()).
+ setMinVersion(entry.getValue().min()).
+ setMaxVersion(entry.getValue().max()));
+ });
+ return new NodeApiVersions(Collections.emptyList(), features);
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index 0812048bb0..6ea31b080b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -18,7 +18,6 @@
package org.apache.kafka.image;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
@@ -30,8 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
-import static
org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_FEATURE_LEVEL_RECORD;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -52,11 +49,13 @@ public class FeaturesImageTest {
DELTA1_RECORDS = new ArrayList<>();
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").setFeatureLevel((short) 3),
- FEATURE_LEVEL_RECORD.highestSupportedVersion()));
- DELTA1_RECORDS.add(new ApiMessageAndVersion(new
RemoveFeatureLevelRecord().
- setName("bar"),
REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
- DELTA1_RECORDS.add(new ApiMessageAndVersion(new
RemoveFeatureLevelRecord().
- setName("baz"),
REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+ (short) 0));
+ DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName("bar").setFeatureLevel((short) 0),
+ (short) 0));
+ DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName("baz").setFeatureLevel((short) 0),
+ (short) 0));
DELTA1 = new FeaturesDelta(IMAGE1);
RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);