This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d171ff08a70 KAFKA-18858 Refactor FeatureControlManager to avoid using
uninitialized MV (#19040)
d171ff08a70 is described below
commit d171ff08a70f9fa8065e6661fcc1f3da092d7faf
Author: PoAn Yang <[email protected]>
AuthorDate: Thu Mar 13 23:37:41 2025 +0800
KAFKA-18858 Refactor FeatureControlManager to avoid using uninitialized MV
(#19040)
The `FeatureControlManager` used `MetadataVersion#LATEST_PRODUCTION` as
uninitialized MV. This makes other component may get a stale MV. In production
code, the `FeatureControlManager` set MV when replaying `FeatureLevelRecord`,
so we can set `Optional.empty()` as uninitialized MV. If other components get
an empty result, the `FeatureLevelRecord` throws an exception like
`FeaturesImage`.
Unit test:
* FeatureControlManagerTest#testMetadataVersion: test getting
MetadataVersion
* before and after replaying FeatureLevelRecord.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../controller/ActivationRecordsGenerator.java | 10 +--
.../kafka/controller/ClusterControlManager.java | 10 +--
.../kafka/controller/FeatureControlManager.java | 35 ++++++-----
.../apache/kafka/controller/LogReplayTracker.java | 73 ----------------------
.../apache/kafka/controller/QuorumController.java | 19 ++----
.../controller/ReplicationControlManager.java | 24 +++----
.../controller/ClusterControlManagerTest.java | 42 +++++++++----
.../ConfigurationControlManagerTest.java | 23 ++++++-
.../controller/FeatureControlManagerTest.java | 58 +++++++++++------
.../kafka/controller/LogReplayTrackerTest.java | 38 -----------
.../kafka/controller/QuorumControllerTest.java | 22 +++----
.../controller/ReplicationControlManagerTest.java | 8 ++-
12 files changed, 150 insertions(+), 212 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
index a535f9f19d7..60058c4e5a8 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java
@@ -29,6 +29,7 @@ import org.apache.kafka.server.common.MetadataVersion;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Consumer;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
@@ -144,7 +145,7 @@ public class ActivationRecordsGenerator {
/**
* Generate the set of activation records.
* </p>
- * If the log is empty, write the bootstrap records. If the log is not
empty, do some validation and
+ * If the metadata version is empty, write the bootstrap records. If the
metadata version is not empty, do some validation and
* possibly write some records to put the log into a valid state. For
bootstrap records, if KIP-868
* metadata transactions are supported, use them. Otherwise, write the
bootstrap records as an
* atomic batch. The single atomic batch can be problematic if the
bootstrap records are too large
@@ -152,13 +153,12 @@ public class ActivationRecordsGenerator {
*/
static ControllerResult<Void> generate(
Consumer<String> activationMessageConsumer,
- boolean isEmpty,
long transactionStartOffset,
BootstrapMetadata bootstrapMetadata,
- MetadataVersion curMetadataVersion,
+ Optional<MetadataVersion> curMetadataVersion,
int defaultMinInSyncReplicas
) {
- if (isEmpty) {
+ if (curMetadataVersion.isEmpty()) {
return recordsForEmptyLog(activationMessageConsumer,
transactionStartOffset,
bootstrapMetadata,
@@ -167,7 +167,7 @@ public class ActivationRecordsGenerator {
} else {
return recordsForNonEmptyLog(activationMessageConsumer,
transactionStartOffset,
- curMetadataVersion);
+ curMetadataVersion.get());
}
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 590eb703fff..dd50b45628a 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -365,7 +365,7 @@ public class ClusterControlManager {
throw new BrokerIdNotRegisteredException("Controller does not
support registering ZK brokers.");
}
- if (featureControl.metadataVersion().isDirectoryAssignmentSupported())
{
+ if
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
if (request.logDirs().isEmpty()) {
throw new InvalidRegistrationException("No directories
specified in request");
}
@@ -415,7 +415,7 @@ public class ClusterControlManager {
setMaxSupportedVersion((short) 0));
}
});
- if (featureControl.metadataVersion().isDirectoryAssignmentSupported())
{
+ if
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
record.setLogDirs(request.logDirs());
}
@@ -444,7 +444,7 @@ public class ClusterControlManager {
record.setInControlledShutdown(existing.inControlledShutdown());
record.setBrokerEpoch(existing.epoch());
}
- records.add(new ApiMessageAndVersion(record,
featureControl.metadataVersion().
+ records.add(new ApiMessageAndVersion(record,
featureControl.metadataVersionOrThrow().
registerBrokerRecordVersion()));
if (!request.incarnationId().equals(prevIncarnationId)) {
@@ -461,7 +461,7 @@ public class ClusterControlManager {
}
ControllerResult<Void>
registerController(ControllerRegistrationRequestData request) {
- if
(!featureControl.metadataVersion().isControllerRegistrationSupported()) {
+ if
(!featureControl.metadataVersionOrThrow().isControllerRegistrationSupported()) {
throw new UnsupportedVersionException("The current MetadataVersion
is too old to " +
"support controller registrations.");
}
@@ -830,7 +830,7 @@ public class ClusterControlManager {
}
Iterator<Entry<Integer, Map<String, VersionRange>>>
controllerSupportedFeatures() {
- if
(!featureControl.metadataVersion().isControllerRegistrationSupported()) {
+ if
(!featureControl.metadataVersionOrThrow().isControllerRegistrationSupported()) {
throw new UnsupportedVersionException("The current MetadataVersion
is too old to " +
"support controller registrations.");
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 5bbe3b9f148..c9ea02a99e4 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -54,7 +54,6 @@ public class FeatureControlManager {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private QuorumFeatures quorumFeatures = null;
- private MetadataVersion metadataVersion =
MetadataVersion.latestProduction();
private ClusterFeatureSupportDescriber clusterSupportDescriber = new
ClusterFeatureSupportDescriber() {
@Override
public Iterator<Entry<Integer, Map<String, VersionRange>>>
brokerSupported() {
@@ -82,11 +81,6 @@ public class FeatureControlManager {
return this;
}
- Builder setMetadataVersion(MetadataVersion metadataVersion) {
- this.metadataVersion = metadataVersion;
- return this;
- }
-
Builder
setClusterFeatureSupportDescriber(ClusterFeatureSupportDescriber
clusterSupportDescriber) {
this.clusterSupportDescriber = clusterSupportDescriber;
return this;
@@ -106,7 +100,6 @@ public class FeatureControlManager {
logContext,
quorumFeatures,
snapshotRegistry,
- metadataVersion,
clusterSupportDescriber
);
}
@@ -127,7 +120,7 @@ public class FeatureControlManager {
/**
* The current metadata version
*/
- private final TimelineObject<MetadataVersion> metadataVersion;
+ private final TimelineObject<Optional<MetadataVersion>> metadataVersion;
/**
* Gives information about the supported versions in the cluster.
@@ -138,13 +131,12 @@ public class FeatureControlManager {
LogContext logContext,
QuorumFeatures quorumFeatures,
SnapshotRegistry snapshotRegistry,
- MetadataVersion metadataVersion,
ClusterFeatureSupportDescriber clusterSupportDescriber
) {
this.log = logContext.logger(FeatureControlManager.class);
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
- this.metadataVersion = new TimelineObject<>(snapshotRegistry,
metadataVersion);
+ this.metadataVersion = new TimelineObject<>(snapshotRegistry,
Optional.empty());
this.clusterSupportDescriber = clusterSupportDescriber;
}
@@ -157,7 +149,7 @@ public class FeatureControlManager {
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
Map<String, Short> proposedUpdatedVersions = new
HashMap<>(finalizedVersions);
- proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME,
metadataVersion.get().featureLevel());
+ proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME,
metadataVersionOrThrow().featureLevel());
proposedUpdatedVersions.putAll(updates);
for (Entry<String, Short> entry : updates.entrySet()) {
@@ -175,10 +167,19 @@ public class FeatureControlManager {
}
}
- MetadataVersion metadataVersion() {
+ Optional<MetadataVersion> metadataVersion() {
return metadataVersion.get();
}
+ MetadataVersion metadataVersionOrThrow() {
+ return metadataVersionOrThrow(SnapshotRegistry.LATEST_EPOCH);
+ }
+
+ private MetadataVersion metadataVersionOrThrow(long epoch) {
+ return metadataVersion.get(epoch).orElseThrow(() ->
+ new IllegalStateException("Unknown metadata version for
FeatureControlManager"));
+ }
+
private ApiError updateFeature(
String featureName,
short newVersion,
@@ -193,7 +194,7 @@ public class FeatureControlManager {
final short currentVersion;
if (featureName.equals(MetadataVersion.FEATURE_NAME)) {
- currentVersion = metadataVersion.get().featureLevel();
+ currentVersion = metadataVersionOrThrow().featureLevel();
} else {
currentVersion = finalizedVersions.getOrDefault(featureName,
(short) 0);
}
@@ -262,7 +263,7 @@ public class FeatureControlManager {
String registrationSuffix = "";
HashSet<Integer> foundControllers = new HashSet<>();
foundControllers.add(quorumFeatures.nodeId());
- if (metadataVersion.get().isControllerRegistrationSupported()) {
+ if (metadataVersionOrThrow().isControllerRegistrationSupported()) {
for (Iterator<Entry<Integer, Map<String, VersionRange>>> iter =
clusterSupportDescriber.controllerSupported();
iter.hasNext(); ) {
@@ -309,7 +310,7 @@ public class FeatureControlManager {
boolean allowUnsafeDowngrade,
Consumer<ApiMessageAndVersion> recordConsumer
) {
- MetadataVersion currentVersion = metadataVersion();
+ MetadataVersion currentVersion = metadataVersionOrThrow();
final MetadataVersion newVersion;
try {
newVersion = MetadataVersion.fromFeatureLevel(newVersionLevel);
@@ -352,7 +353,7 @@ public class FeatureControlManager {
FinalizedControllerFeatures finalizedFeatures(long epoch) {
Map<String, Short> features = new HashMap<>();
- features.put(MetadataVersion.FEATURE_NAME,
metadataVersion.get(epoch).featureLevel());
+ features.put(MetadataVersion.FEATURE_NAME,
metadataVersionOrThrow(epoch).featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
features.put(entry.getKey(), entry.getValue());
}
@@ -367,7 +368,7 @@ public class FeatureControlManager {
}
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
MetadataVersion mv =
MetadataVersion.fromFeatureLevel(record.featureLevel());
- metadataVersion.set(mv);
+ metadataVersion.set(Optional.of(mv));
log.info("Replayed a FeatureLevelRecord setting metadata.version
to {}", mv);
} else {
if (record.featureLevel() == 0) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
b/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
deleted file mode 100644
index 87773060291..00000000000
--- a/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.utils.LogContext;
-
-import org.slf4j.Logger;
-
-
-/**
- * The LogReplayTracker manages state associated with replaying the metadata
log, such as whether
- * we have seen any records. It is accessed solely from the quorum controller
thread.
- */
-public class LogReplayTracker {
- public static class Builder {
- private LogContext logContext = null;
-
- Builder setLogContext(LogContext logContext) {
- this.logContext = logContext;
- return this;
- }
-
- public LogReplayTracker build() {
- if (logContext == null) logContext = new LogContext();
- return new LogReplayTracker(logContext);
- }
- }
-
- /**
- * The slf4j logger.
- */
- private final Logger log;
-
- /**
- * True if we haven't replayed any records yet.
- */
- private boolean empty;
-
- private LogReplayTracker(
- LogContext logContext
- ) {
- this.log = logContext.logger(LogReplayTracker.class);
- resetToEmpty();
- }
-
- void resetToEmpty() {
- this.empty = true;
- }
-
- boolean empty() {
- return empty;
- }
-
- void replay(ApiMessage message) {
- empty = false;
- }
-}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index fc5f99358f2..c2fbc2ca1a2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1156,7 +1156,6 @@ public final class QuorumController implements Controller
{
try {
return ActivationRecordsGenerator.generate(
log::warn,
- logReplayTracker.empty(),
offsetControl.transactionStartOffset(),
bootstrapMetadata,
featureControl.metadataVersion(),
@@ -1213,7 +1212,6 @@ public final class QuorumController implements Controller
{
recordRedactor.toLoggableString(message), offset);
}
}
- logReplayTracker.replay(message);
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
@@ -1430,12 +1428,6 @@ public final class QuorumController implements
Controller {
*/
private final AclControlManager aclControlManager;
- /**
- * Tracks replaying the log.
- * This must be accessed only by the event queue thread.
- */
- private final LogReplayTracker logReplayTracker;
-
/**
* The interface that we use to mutate the Raft log.
*/
@@ -1592,9 +1584,6 @@ public final class QuorumController implements Controller
{
setLogContext(logContext).
setSnapshotRegistry(snapshotRegistry).
build();
- this.logReplayTracker = new LogReplayTracker.Builder().
- setLogContext(logContext).
- build();
this.raftClient = raftClient;
this.bootstrapMetadata = bootstrapMetadata;
this.maxRecordsPerBatch = maxRecordsPerBatch;
@@ -1755,7 +1744,7 @@ public final class QuorumController implements Controller
{
return CompletableFuture.completedFuture(new
AlterUserScramCredentialsResponseData());
}
return appendWriteEvent("alterUserScramCredentials",
context.deadlineNs(),
- () -> scramControlManager.alterCredentials(request,
featureControl.metadataVersion()));
+ () -> scramControlManager.alterCredentials(request,
featureControl.metadataVersionOrThrow()));
}
@Override
@@ -1764,7 +1753,7 @@ public final class QuorumController implements Controller
{
CreateDelegationTokenRequestData request
) {
return appendWriteEvent("createDelegationToken", context.deadlineNs(),
- () -> delegationTokenControlManager.createDelegationToken(context,
request, featureControl.metadataVersion()));
+ () -> delegationTokenControlManager.createDelegationToken(context,
request, featureControl.metadataVersionOrThrow()));
}
@Override
@@ -1773,7 +1762,7 @@ public final class QuorumController implements Controller
{
RenewDelegationTokenRequestData request
) {
return appendWriteEvent("renewDelegationToken", context.deadlineNs(),
- () -> delegationTokenControlManager.renewDelegationToken(context,
request, featureControl.metadataVersion()));
+ () -> delegationTokenControlManager.renewDelegationToken(context,
request, featureControl.metadataVersionOrThrow()));
}
@Override
@@ -1782,7 +1771,7 @@ public final class QuorumController implements Controller
{
ExpireDelegationTokenRequestData request
) {
return appendWriteEvent("expireDelegationToken", context.deadlineNs(),
- () -> delegationTokenControlManager.expireDelegationToken(context,
request, featureControl.metadataVersion()));
+ () -> delegationTokenControlManager.expireDelegationToken(context,
request, featureControl.metadataVersionOrThrow()));
}
@Override
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 12bbadb17a7..ee3fe6a9a96 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -240,7 +240,7 @@ public class ReplicationControlManager {
@Override
public Uuid defaultDir(int brokerId) {
- if
(featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
+ if
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
return clusterControl.defaultDir(brokerId);
} else {
return DirectoryId.MIGRATING;
@@ -844,7 +844,7 @@ public class ReplicationControlManager {
for (Entry<Integer, PartitionRegistration> partEntry :
newParts.entrySet()) {
int partitionIndex = partEntry.getKey();
PartitionRegistration info = partEntry.getValue();
- records.add(info.toRecord(topicId, partitionIndex, new
ImageWriterOptions.Builder(featureControl.metadataVersion()).
+ records.add(info.toRecord(topicId, partitionIndex, new
ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()).
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
build()));
}
@@ -1121,7 +1121,7 @@ public class ReplicationControlManager {
topic.id,
partitionId,
new LeaderAcceptor(clusterControl, partition),
- featureControl.metadataVersion(),
+ featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
)
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -1583,7 +1583,7 @@ public class ReplicationControlManager {
topicId,
partitionId,
new LeaderAcceptor(clusterControl, partition),
- featureControl.metadataVersion(),
+ featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic)
)
.setElection(election)
@@ -1629,7 +1629,7 @@ public class ReplicationControlManager {
heartbeatManager.touch(brokerId,
states.next().fenced(),
request.currentMetadataOffset());
- if (featureControl.metadataVersion().isDirectoryAssignmentSupported())
{
+ if
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
handleDirectoriesOffline(brokerId, brokerEpoch,
request.offlineLogDirs(), records);
}
boolean isCaughtUp = request.currentMetadataOffset() >=
registerBrokerRecordOffset;
@@ -1747,7 +1747,7 @@ public class ReplicationControlManager {
topicPartition.topicId(),
topicPartition.partitionId(),
new LeaderAcceptor(clusterControl, partition),
- featureControl.metadataVersion(),
+ featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
)
.setElection(PartitionChangeBuilder.Election.PREFERRED)
@@ -1916,7 +1916,7 @@ public class ReplicationControlManager {
" time(s): All brokers are currently fenced or in
controlled shutdown.");
}
records.add(buildPartitionRegistration(partitionAssignment, isr)
- .toRecord(topicId, partitionId, new
ImageWriterOptions.Builder(featureControl.metadataVersion()).
+ .toRecord(topicId, partitionId, new
ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()).
setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).
build()));
partitionId++;
@@ -2017,7 +2017,7 @@ public class ReplicationControlManager {
topicIdPart.topicId(),
topicIdPart.partitionId(),
new LeaderAcceptor(clusterControl, partition,
isAcceptableLeader),
- featureControl.metadataVersion(),
+ featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topic.name)
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -2138,7 +2138,7 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
- featureControl.metadataVersion(),
+ featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topicName)
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -2204,7 +2204,7 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
new LeaderAcceptor(clusterControl, part),
- featureControl.metadataVersion(),
+ featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)
);
builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled());
@@ -2244,7 +2244,7 @@ public class ReplicationControlManager {
}
ControllerResult<AssignReplicasToDirsResponseData>
handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) {
- if
(!featureControl.metadataVersion().isDirectoryAssignmentSupported()) {
+ if
(!featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
throw new UnsupportedVersionException("Directory assignment is not
supported yet.");
}
int brokerId = request.brokerId();
@@ -2287,7 +2287,7 @@ public class ReplicationControlManager {
topicId,
partitionIndex,
new LeaderAcceptor(clusterControl,
partitionRegistration),
- featureControl.metadataVersion(),
+ featureControl.metadataVersionOrThrow(),
getTopicEffectiveMinIsr(topicName)
)
.setDirectory(brokerId, dirId)
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 9298da8df3a..5ca71c042c1 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -289,8 +289,10 @@ public class ClusterControlManagerTest {
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
- setMetadataVersion(metadataVersion).
build();
+ featureControl.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(metadataVersion.featureLevel()));
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
@@ -460,8 +462,10 @@ public class ClusterControlManagerTest {
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
- setMetadataVersion(metadataVersion).
build();
+ featureControl.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(metadataVersion.featureLevel()));
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setTime(time).
setSnapshotRegistry(snapshotRegistry).
@@ -541,8 +545,10 @@ public class ClusterControlManagerTest {
FeatureControlManager featureControl = new
FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, supportedFeatures,
Collections.singletonList(0))).
- setMetadataVersion(MetadataVersion.IBP_3_7_IV0).
build();
+ featureControl.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.IBP_3_7_IV0.featureLevel()));
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
@@ -590,8 +596,10 @@ public class ClusterControlManagerTest {
FeatureControlManager featureControl = new
FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, supportedFeatures,
Collections.singletonList(0))).
- setMetadataVersion(MetadataVersion.IBP_3_9_IV0).
build();
+ featureControl.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.IBP_3_9_IV0.featureLevel()));
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
@@ -671,8 +679,10 @@ public class ClusterControlManagerTest {
MetadataVersion.IBP_3_5_IV0.featureLevel(),
MetadataVersion.IBP_3_6_IV0.featureLevel())),
Collections.singletonList(0))).
- setMetadataVersion(MetadataVersion.IBP_3_5_IV0).
build();
+ featureControl.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.IBP_3_5_IV0.featureLevel()));
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
@@ -722,8 +732,10 @@ public class ClusterControlManagerTest {
@Test
public void testRegisterControlWithUnsupportedMetadataVersion() {
FeatureControlManager featureControl = new
FeatureControlManager.Builder().
- setMetadataVersion(MetadataVersion.IBP_3_6_IV2).
build();
+ featureControl.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel()));
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setFeatureControlManager(featureControl).
@@ -739,7 +751,7 @@ public class ClusterControlManagerTest {
public void testRegisterWithDuplicateDirectoryId() {
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("QzZZEtC7SxucRM29Xdzijw").
- setFeatureControlManager(new
FeatureControlManager.Builder().build()).
+ setFeatureControlManager(createFeatureControlManager()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records)
-> { }).
build();
RegisterBrokerRecord brokerRecord = new
RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(0).setLogDirs(asList(
@@ -788,7 +800,7 @@ public class ClusterControlManagerTest {
public void testHasOnlineDir() {
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
- setFeatureControlManager(new
FeatureControlManager.Builder().build()).
+ setFeatureControlManager(createFeatureControlManager()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records)
-> { }).
build();
clusterControl.activate();
@@ -807,7 +819,7 @@ public class ClusterControlManagerTest {
public void testDefaultDir() {
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
- setFeatureControlManager(new
FeatureControlManager.Builder().build()).
+ setFeatureControlManager(createFeatureControlManager()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records)
-> { }).
build();
clusterControl.activate();
@@ -827,7 +839,7 @@ public class ClusterControlManagerTest {
public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) {
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
- setFeatureControlManager(new
FeatureControlManager.Builder().build()).
+ setFeatureControlManager(createFeatureControlManager()).
setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> {
}).
build();
clusterControl.activate();
@@ -882,7 +894,7 @@ public class ClusterControlManagerTest {
public void testReRegistrationWithCleanShutdownDetection(boolean
isCleanShutdown) {
ClusterControlManager clusterControl = new
ClusterControlManager.Builder().
setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
- setFeatureControlManager(new
FeatureControlManager.Builder().build()).
+ setFeatureControlManager(createFeatureControlManager()).
setBrokerShutdownHandler((brokerId, cleanShutdown, records) -> {
if (!cleanShutdown) {
records.add(new ApiMessageAndVersion(new
PartitionChangeRecord(), PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION));
@@ -967,4 +979,12 @@ public class ClusterControlManagerTest {
assertEquals(OptionalLong.empty(),
clusterControl.heartbeatManager().tracker().
contactTime(new BrokerIdAndEpoch(2, 100)));
}
+
+ private FeatureControlManager createFeatureControlManager() {
+ FeatureControlManager featureControlManager = new
FeatureControlManager.Builder().build();
+ featureControlManager.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+ return featureControlManager;
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 62a920bdc9b..0d1e465ab90 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.KafkaConfigSchema;
@@ -160,6 +161,7 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterConfigs() {
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA).
build();
@@ -191,6 +193,7 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterConfig() {
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA).
build();
Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps =
toMap(entry("abc", entry(APPEND, "123")));
@@ -224,6 +227,7 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterMultipleConfigValues() {
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA).
build();
@@ -270,6 +274,7 @@ public class ConfigurationControlManagerTest {
@Test
public void testIncrementalAlterConfigsWithoutExistence() {
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA).
setExistenceChecker(TestExistenceChecker.INSTANCE).
build();
@@ -331,6 +336,7 @@ public class ConfigurationControlManagerTest {
entry("quux", "456"),
entry("broker.config.to.remove", null)))));
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA).
setAlterConfigPolicy(Optional.of(policy)).
build();
@@ -389,6 +395,7 @@ public class ConfigurationControlManagerTest {
@Test
public void testLegacyAlterConfigs() {
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA).
setAlterConfigPolicy(Optional.of(new CheckForNullValuesPolicy())).
build();
@@ -424,6 +431,7 @@ public class ConfigurationControlManagerTest {
@ValueSource(booleans = {false, true})
public void testMaybeGenerateElrSafetyRecords(boolean setStaticConfig) {
ConfigurationControlManager.Builder builder = new
ConfigurationControlManager.Builder().
+ setFeatureControl(createFeatureControlManager()).
setKafkaConfigSchema(SCHEMA);
if (setStaticConfig) {
builder.setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2"));
@@ -473,6 +481,9 @@ public class ConfigurationControlManagerTest {
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.emptyList())).
build();
+ featureManager.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
"2")).
setFeatureControl(featureManager).
@@ -519,8 +530,10 @@ public class ConfigurationControlManagerTest {
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.emptyList())).
- setMetadataVersion(isMetadataVersionElrEnabled ?
MetadataVersion.IBP_4_0_IV1 : MetadataVersion.IBP_4_0_IV0).
build();
+ featureManager.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(isMetadataVersionElrEnabled ?
MetadataVersion.IBP_4_0_IV1.featureLevel() :
MetadataVersion.IBP_4_0_IV0.featureLevel()));
ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
"2")).
setFeatureControl(featureManager).
@@ -543,4 +556,12 @@ public class ConfigurationControlManagerTest {
assertEquals(Errors.INVALID_UPDATE_VERSION,
result.response().error());
}
}
+
+ private FeatureControlManager createFeatureControlManager() {
+ FeatureControlManager featureControlManager = new
FeatureControlManager.Builder().build();
+ featureControlManager.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+ return featureControlManager;
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 2d0bc4c860e..ee2f464ca5c 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -51,6 +51,7 @@ import java.util.Optional;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -100,8 +101,8 @@ public class FeatureControlManagerTest {
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 2)).
setSnapshotRegistry(snapshotRegistry).
- setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
snapshotRegistry.idempotentCreateSnapshot(-1);
assertEquals(new
FinalizedControllerFeatures(Collections.singletonMap("metadata.version",
MetadataVersion.MINIMUM_VERSION.featureLevel()), -1),
manager.finalizedFeatures(-1));
@@ -143,8 +144,8 @@ public class FeatureControlManagerTest {
setLogContext(logContext).
setQuorumFeatures(features("foo", 1, 2)).
setSnapshotRegistry(snapshotRegistry).
- setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
manager.replay(record);
snapshotRegistry.idempotentCreateSnapshot(123);
assertEquals(
@@ -181,6 +182,7 @@ public class FeatureControlManagerTest {
Collections.singletonList(new SimpleImmutableEntry<>(5,
Collections.singletonMap(TransactionVersion.FEATURE_NAME, VersionRange.of(0,
2)))),
emptyList())).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(emptyList(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 3 for feature foo. Broker 5 does not
support this feature.")),
@@ -226,12 +228,12 @@ public class FeatureControlManagerTest {
setLogContext(logContext).
setQuorumFeatures(features(TestFeatureVersion.FEATURE_NAME, 0, 5,
TransactionVersion.FEATURE_NAME, 0, 2)).
setSnapshotRegistry(snapshotRegistry).
- setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
ControllerResult<ApiError> result = manager.
updateFeatures(updateMap(TestFeatureVersion.FEATURE_NAME, 1,
TransactionVersion.FEATURE_NAME, 2), Collections.emptyMap(), false);
RecordTestUtils.replayAll(manager, result.records());
- assertEquals(MetadataVersion.MINIMUM_VERSION,
manager.metadataVersion());
+ assertEquals(MetadataVersion.MINIMUM_VERSION,
manager.metadataVersionOrThrow());
assertEquals(Optional.of((short) 1),
manager.finalizedFeatures(Long.MAX_VALUE).get(TestFeatureVersion.FEATURE_NAME));
assertEquals(Optional.of((short) 2),
manager.finalizedFeatures(Long.MAX_VALUE).get(TransactionVersion.FEATURE_NAME));
assertEquals(new HashSet<>(Arrays.asList(
@@ -239,25 +241,28 @@ public class FeatureControlManagerTest {
manager.finalizedFeatures(Long.MAX_VALUE).featureNames());
}
- private static final FeatureControlManager.Builder TEST_MANAGER_BUILDER1 =
- new FeatureControlManager.Builder().
+ private FeatureControlManager createTestManager() {
+ FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.IBP_3_6_IV0.featureLevel())).
- setMetadataVersion(MetadataVersion.IBP_3_4_IV0);
+ build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_3_4_IV0.featureLevel()));
+ return manager;
+ }
@Test
public void testApplyMetadataVersionChangeRecord() {
- FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
- MetadataVersion initialMetadataVersion = manager.metadataVersion();
+ FeatureControlManager manager = createTestManager();
+ MetadataVersion initialMetadataVersion =
manager.metadataVersionOrThrow();
manager.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel((short) (initialMetadataVersion.featureLevel() +
1)));
- assertEquals(MetadataVersion.IBP_3_5_IV0, manager.metadataVersion());
+ assertEquals(MetadataVersion.IBP_3_5_IV0,
manager.metadataVersionOrThrow());
}
@Test
public void testCannotDowngradeToHigherVersion() {
- FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+ FeatureControlManager manager = createTestManager();
assertEquals(ControllerResult.of(Collections.emptyList(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 9 for feature metadata.version. Can't
downgrade to a " +
"newer version.")),
@@ -269,7 +274,7 @@ public class FeatureControlManagerTest {
@Test
public void testCannotUnsafeDowngradeToHigherVersion() {
- FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+ FeatureControlManager manager = createTestManager();
assertEquals(ControllerResult.of(Collections.emptyList(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 9 for feature metadata.version. Can't
downgrade to a " +
"newer version.")),
@@ -284,7 +289,8 @@ public class FeatureControlManagerTest {
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.IBP_3_6_IV0.featureLevel())).
- setMetadataVersion(MetadataVersion.IBP_3_5_IV1).build();
+ build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_3_5_IV1.featureLevel()));
assertEquals(ControllerResult.of(Collections.emptyList(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 9 for feature metadata.version. Can't
downgrade the " +
"version of this feature without setting the upgrade type to
either safe or " +
@@ -297,7 +303,7 @@ public class FeatureControlManagerTest {
@Test
public void testCanUpgradeToHigherVersion() {
- FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+ FeatureControlManager manager = createTestManager();
assertEquals(ControllerResult.of(Collections.emptyList(),
ApiError.NONE),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_5_IV0.featureLevel()),
@@ -307,7 +313,7 @@ public class FeatureControlManagerTest {
@Test
public void testCannotUseSafeDowngradeIfMetadataChanged() {
- FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+ FeatureControlManager manager = createTestManager();
assertEquals(ControllerResult.of(Collections.emptyList(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 7. Refusing to perform the requested
downgrade because " +
"it might delete metadata information.")),
@@ -319,7 +325,7 @@ public class FeatureControlManagerTest {
@Test
public void testUnsafeDowngradeIsTemporarilyDisabled() {
- FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+ FeatureControlManager manager = createTestManager();
assertEquals(ControllerResult.of(Collections.emptyList(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid metadata.version 7. Unsafe metadata downgrade is not
supported in this version.")),
manager.updateFeatures(
@@ -331,7 +337,7 @@ public class FeatureControlManagerTest {
@Disabled
@Test
public void testCanUseUnsafeDowngradeIfMetadataChanged() {
- FeatureControlManager manager = TEST_MANAGER_BUILDER1.build();
+ FeatureControlManager manager = createTestManager();
assertEquals(ControllerResult.of(Collections.emptyList(),
ApiError.NONE),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_3_IV3.featureLevel()),
@@ -344,8 +350,8 @@ public class FeatureControlManagerTest {
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.IBP_3_6_IV0.featureLevel())).
- setMetadataVersion(MetadataVersion.IBP_3_5_IV0).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_3_5_IV0.featureLevel()));
assertEquals(ControllerResult.of(Collections.emptyList(),
ApiError.NONE),
manager.updateFeatures(
singletonMap(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_4_IV0.featureLevel()),
@@ -358,8 +364,8 @@ public class FeatureControlManagerTest {
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features(MetadataVersion.FEATURE_NAME,
MetadataVersion.MINIMUM_VERSION.featureLevel(),
MetadataVersion.latestTesting().featureLevel())).
- setMetadataVersion(MetadataVersion.MINIMUM_VERSION).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(Collections.emptyList(), new
ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 6 for feature metadata.version. Local
controller 0 only supports versions 7-26")),
manager.updateFeatures(
@@ -380,6 +386,7 @@ public class FeatureControlManagerTest {
Collections.singletonList(new SimpleImmutableEntry<>(1,
Collections.singletonMap(Feature.TEST_VERSION.featureName(), VersionRange.of(0,
3)))),
emptyList())).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
ControllerResult<ApiError> result = manager.updateFeatures(
Collections.singletonMap(Feature.TEST_VERSION.featureName(),
(short) 1),
Collections.singletonMap(Feature.TEST_VERSION.featureName(),
FeatureUpdate.UpgradeType.UPGRADE),
@@ -412,8 +419,8 @@ public class FeatureControlManagerTest {
setClusterFeatureSupportDescriber(createFakeClusterFeatureSupportDescriber(
Collections.singletonList(new SimpleImmutableEntry<>(1,
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
VersionRange.of(0, 1)))),
emptyList())).
- setMetadataVersion(MetadataVersion.IBP_4_0_IV1).
build();
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.IBP_4_0_IV1.featureLevel()));
ControllerResult<ApiError> result = manager.updateFeatures(
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
(short) 1),
Collections.singletonMap(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(),
FeatureUpdate.UpgradeType.UPGRADE),
@@ -429,4 +436,15 @@ public class FeatureControlManagerTest {
get(Feature.ELIGIBLE_LEADER_REPLICAS_VERSION.featureName()));
}
+
+ @Test
+ public void testMetadataVersion() {
+ FeatureControlManager manager = new
FeatureControlManager.Builder().build();
+ assertTrue(manager.metadataVersion().isEmpty());
+ assertThrows(IllegalStateException.class,
manager::metadataVersionOrThrow);
+ manager.replay(new
FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
+ assertTrue(manager.metadataVersion().isPresent());
+ assertEquals(MetadataVersion.MINIMUM_VERSION,
manager.metadataVersion().get());
+ assertEquals(MetadataVersion.MINIMUM_VERSION,
manager.metadataVersionOrThrow());
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java
deleted file mode 100644
index 02bf9faa35a..00000000000
---
a/metadata/src/test/java/org/apache/kafka/controller/LogReplayTrackerTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.metadata.NoOpRecord;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-
-@Timeout(value = 40)
-public class LogReplayTrackerTest {
- @Test
- public void testEmpty() {
- LogReplayTracker tracker = new LogReplayTracker.Builder().build();
- assertTrue(tracker.empty());
- tracker.replay(new NoOpRecord());
- assertFalse(tracker.empty());
- }
-}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 220627ed834..ceae7a511e1 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1593,15 +1593,16 @@ public class QuorumControllerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
FeatureControlManager featureControlManager = new
FeatureControlManager.Builder()
.setSnapshotRegistry(snapshotRegistry)
- .setMetadataVersion(metadataVersion)
.build();
+ featureControlManager.replay(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(metadataVersion.featureLevel()));
ControllerResult<Void> result = ActivationRecordsGenerator.generate(
msg -> { },
- true,
-1L,
BootstrapMetadata.fromVersion(metadataVersion, "test"),
- metadataVersion,
+ Optional.empty(),
3);
RecordTestUtils.replayAll(featureControlManager, result.records());
return featureControlManager;
@@ -1612,7 +1613,7 @@ public class QuorumControllerTest {
FeatureControlManager featureControl;
featureControl = getActivationRecords(MetadataVersion.IBP_3_3_IV3);
- assertEquals(MetadataVersion.IBP_3_3_IV3,
featureControl.metadataVersion());
+ assertEquals(MetadataVersion.IBP_3_3_IV3,
featureControl.metadataVersionOrThrow());
}
@Test
@@ -1620,24 +1621,23 @@ public class QuorumControllerTest {
FeatureControlManager featureControl;
featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0);
- assertEquals(MetadataVersion.IBP_3_4_IV0,
featureControl.metadataVersion());
+ assertEquals(MetadataVersion.IBP_3_4_IV0,
featureControl.metadataVersionOrThrow());
}
@Test
public void testActivationRecordsNonEmptyLog() {
FeatureControlManager featureControl = getActivationRecords(
MetadataVersion.IBP_3_9_IV0);
- assertEquals(MetadataVersion.IBP_3_9_IV0,
featureControl.metadataVersion());
+ assertEquals(MetadataVersion.IBP_3_9_IV0,
featureControl.metadataVersionOrThrow());
}
@Test
public void testActivationRecordsPartialBootstrap() {
ControllerResult<Void> result = ActivationRecordsGenerator.generate(
logMsg -> { },
- true,
0L,
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- MetadataVersion.IBP_3_6_IV1,
+ Optional.empty(),
3);
assertFalse(result.isAtomic());
assertTrue(RecordTestUtils.recordAtIndexAs(
@@ -1683,10 +1683,9 @@ public class QuorumControllerTest {
ControllerResult<Void> result = ActivationRecordsGenerator.generate(
logMsg -> { },
- false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV1, "test"),
- MetadataVersion.IBP_3_6_IV1,
+ Optional.of(MetadataVersion.IBP_3_6_IV1),
3);
assertTrue(result.isAtomic());
@@ -1707,10 +1706,9 @@ public class QuorumControllerTest {
assertThrows(RuntimeException.class, () ->
ActivationRecordsGenerator.generate(
msg -> { },
- false,
offsetControlManager.transactionStartOffset(),
BootstrapMetadata.fromVersion(MetadataVersion.IBP_3_6_IV0,
"test"),
- MetadataVersion.IBP_3_6_IV0,
+ Optional.of(MetadataVersion.IBP_3_6_IV0),
3));
}
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index f272fc78ef1..7da1bd331b7 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -241,8 +241,10 @@ public class ReplicationControlManagerTest {
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.singletonList(0))).
- setMetadataVersion(metadataVersion).
build();
+ this.featureControl.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(metadataVersion.featureLevel()));
featureControl.replay(new FeatureLevelRecord()
.setName(EligibleLeaderReplicasVersion.FEATURE_NAME)
.setFeatureLevel(isElrEnabled ?
@@ -3272,7 +3274,7 @@ public class ReplicationControlManagerTest {
put(new TopicIdPartition(topicB, 1), NONE);
}});
}})), AssignmentsHelper.normalize(controllerResult.response()));
- short recordVersion =
ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
+ short recordVersion =
ctx.featureControl.metadataVersionOrThrow().partitionChangeRecordVersion();
assertEquals(sortPartitionChangeRecords(asList(
new ApiMessageAndVersion(
new
PartitionChangeRecord().setTopicId(topicA).setPartitionId(0)
@@ -3351,7 +3353,7 @@ public class ReplicationControlManagerTest {
.setLogDirs(singletonList(dir2b1)), (short) 2)),
filter(records, BrokerRegistrationChangeRecord.class)
);
- short partitionChangeRecordVersion =
ctx.featureControl.metadataVersion().partitionChangeRecordVersion();
+ short partitionChangeRecordVersion =
ctx.featureControl.metadataVersionOrThrow().partitionChangeRecordVersion();
assertEquals(
sortPartitionChangeRecords(asList(
new ApiMessageAndVersion(new
PartitionChangeRecord().setTopicId(topicA).setPartitionId(0)