This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0ca9cd4d2d MINOR: Several fixes and improvements for 
FeatureControlManager (#12207)
0ca9cd4d2d is described below

commit 0ca9cd4d2d2a89510dbe357783a316f2f0789799
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Jun 1 16:09:38 2022 -0700

    MINOR: Several fixes and improvements for FeatureControlManager (#12207)
    
    This PR fixes a bug where FeatureControlManager#replay(FeatureLevelRecord) 
was throwing an
    exception if not all controllers in the quorum supported the feature being 
applied. While we do
    want to validate this, it needs to be validated earlier, before the record 
is committed to the log.
    Once the record has been committed to the log it should always be applied 
if the current controller
    supports it.
    
    Fix another bug where removing a feature was not supported once it had been 
configured. Note that
    because we reserve feature level 0 for "feature not enabled", we don't need 
to use
    Optional<VersionRange>; we can just return a range of 0-0 when the feature 
is not supported.
    
    Allow the metadata version to be downgraded when 
UpgradeType.UNSAFE_DOWNGRADE has been set.
    Previously we were unconditionally denying this even when this was set.
    
    Add a builder for FeatureControlManager, so that we can easily add new 
parameters to the
    constructor in the future. This will also be useful for creating 
FeatureControlManagers that are
    initialized to a specific MetadataVersion.
    
    Get rid of RemoveFeatureLevelRecord, since it's easier to just issue a 
FeatureLevelRecord with
    the level set to 0.
    
    Set metadata.max.idle.interval.ms to 0 in RaftClusterSnapshotTest for more 
predictability.
    
    Reviewers: David Arthur <[email protected]>, dengziming 
<[email protected]>
---
 .../kafka/server/RaftClusterSnapshotTest.scala     |   8 +-
 .../kafka/controller/FeatureControlManager.java    | 134 ++++++++++----------
 .../apache/kafka/controller/QuorumController.java  |   6 +-
 .../apache/kafka/controller/QuorumFeatures.java    |  78 ++++++------
 .../java/org/apache/kafka/image/FeaturesDelta.java |  15 +--
 .../java/org/apache/kafka/image/MetadataDelta.java |   8 --
 .../common/metadata/RemoveFeatureLevelRecord.json  |  26 ----
 .../controller/FeatureControlManagerTest.java      | 139 ++++++++++++---------
 .../kafka/controller/QuorumFeaturesTest.java       | 108 +++++++---------
 .../org/apache/kafka/image/FeaturesImageTest.java  |  15 ++-
 10 files changed, 257 insertions(+), 280 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala 
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index e34a5ed6ed..503ce7d2be 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -21,6 +21,7 @@ import java.util.Collections
 import kafka.testkit.KafkaClusterTestKit
 import kafka.testkit.TestKitNodes
 import kafka.utils.TestUtils
+import kafka.server.KafkaConfig.{MetadataMaxIdleIntervalMsProp, 
MetadataSnapshotMaxNewRecordBytesProp}
 import org.apache.kafka.common.utils.BufferSupplier
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.snapshot.RecordsSnapshotReader
@@ -38,7 +39,6 @@ class RaftClusterSnapshotTest {
   def testSnapshotsGenerated(): Unit = {
     val numberOfBrokers = 3
     val numberOfControllers = 3
-    val metadataSnapshotMaxNewRecordBytes = 100
 
     TestUtils.resource(
       new KafkaClusterTestKit
@@ -48,10 +48,8 @@ class RaftClusterSnapshotTest {
             .setNumControllerNodes(numberOfControllers)
             .build()
         )
-        .setConfigProp(
-          KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp,
-          metadataSnapshotMaxNewRecordBytes.toString
-        )
+        .setConfigProp(MetadataSnapshotMaxNewRecordBytesProp, "10")
+        .setConfigProp(MetadataMaxIdleIntervalMsProp, "0")
         .build()
     ) { cluster =>
       cluster.format()
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 52d3c5d521..19127afa72 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -29,6 +29,7 @@ import java.util.Optional;
 import java.util.TreeMap;
 import java.util.function.Consumer;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.FeatureUpdate;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.protocol.Errors;
@@ -47,6 +48,46 @@ import static 
org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
 
 
 public class FeatureControlManager {
+    public static class Builder {
+        private LogContext logContext = null;
+        private SnapshotRegistry snapshotRegistry = null;
+        private QuorumFeatures quorumFeatures = null;
+        private MetadataVersion metadataVersion = 
MetadataVersion.UNINITIALIZED;
+
+        Builder setLogContext(LogContext logContext) {
+            this.logContext = logContext;
+            return this;
+        }
+
+        Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+            this.snapshotRegistry = snapshotRegistry;
+            return this;
+        }
+
+        Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
+            this.quorumFeatures = quorumFeatures;
+            return this;
+        }
+
+        Builder setMetadataVersion(MetadataVersion metadataVersion) {
+            this.metadataVersion = metadataVersion;
+            return this;
+        }
+
+        public FeatureControlManager build() {
+            if (logContext == null) logContext = new LogContext();
+            if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
+            if (quorumFeatures == null) {
+                quorumFeatures = new QuorumFeatures(0, new ApiVersions(), 
QuorumFeatures.defaultFeatureMap(),
+                        Collections.emptyList());
+            }
+            return new FeatureControlManager(logContext,
+                quorumFeatures,
+                snapshotRegistry,
+                metadataVersion);
+        }
+    }
+
     private final Logger log;
 
     /**
@@ -65,13 +106,16 @@ public class FeatureControlManager {
     private final TimelineObject<MetadataVersion> metadataVersion;
 
 
-    FeatureControlManager(LogContext logContext,
-                          QuorumFeatures quorumFeatures,
-                          SnapshotRegistry snapshotRegistry) {
+    private FeatureControlManager(
+        LogContext logContext,
+        QuorumFeatures quorumFeatures,
+        SnapshotRegistry snapshotRegistry,
+        MetadataVersion metadataVersion
+    ) {
         this.log = logContext.logger(FeatureControlManager.class);
         this.quorumFeatures = quorumFeatures;
         this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
-        this.metadataVersion = new TimelineObject<>(snapshotRegistry, 
MetadataVersion.UNINITIALIZED);
+        this.metadataVersion = new TimelineObject<>(snapshotRegistry, 
metadataVersion);
     }
 
     ControllerResult<Map<String, ApiError>> updateFeatures(
@@ -94,35 +138,6 @@ public class FeatureControlManager {
         }
     }
 
-    ControllerResult<Map<String, ApiError>> initializeMetadataVersion(short 
initVersion) {
-        if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
-            return ControllerResult.atomicOf(
-                Collections.emptyList(),
-                Collections.singletonMap(
-                    MetadataVersion.FEATURE_NAME,
-                    new ApiError(Errors.INVALID_UPDATE_VERSION,
-                        "Cannot initialize metadata.version to " + initVersion 
+ " since it has already been " +
-                            "initialized to " + 
metadataVersion().featureLevel() + ".")
-            ));
-        }
-        List<ApiMessageAndVersion> records = new ArrayList<>();
-        ApiError result = updateMetadataVersion(initVersion, false, 
records::add);
-        return ControllerResult.atomicOf(records, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, result));
-    }
-
-    /**
-     * Test if the quorum can support this feature and version
-     */
-    boolean canSupportVersion(String featureName, short version) {
-        return quorumFeatures.quorumSupportedFeature(featureName)
-            .filter(versionRange -> versionRange.contains(version))
-            .isPresent();
-    }
-
-    boolean featureExists(String featureName) {
-        return quorumFeatures.localSupportedFeature(featureName).isPresent();
-    }
-
     MetadataVersion metadataVersion() {
         return metadataVersion.get();
     }
@@ -134,11 +149,6 @@ public class FeatureControlManager {
         Map<Integer, Map<String, VersionRange>> brokersAndFeatures,
         List<ApiMessageAndVersion> records
     ) {
-        if (!featureExists(featureName)) {
-            return invalidUpdateVersion(featureName, newVersion,
-                "The controller does not support the given feature.");
-        }
-
         if (upgradeType.equals(FeatureUpdate.UpgradeType.UNKNOWN)) {
             return invalidUpdateVersion(featureName, newVersion,
                 "The controller does not support the given upgrade type.");
@@ -151,14 +161,14 @@ public class FeatureControlManager {
             currentVersion = finalizedVersions.get(featureName);
         }
 
-        if (newVersion <= 0) {
+        if (newVersion < 0) {
             return invalidUpdateVersion(featureName, newVersion,
-                "A feature version cannot be less than 1.");
+                "A feature version cannot be less than 0.");
         }
 
-        if (!canSupportVersion(featureName, newVersion)) {
-            return invalidUpdateVersion(featureName, newVersion,
-                "The quorum does not support the given feature version.");
+        Optional<String> reasonNotSupported = 
quorumFeatures.reasonNotSupported(featureName, newVersion);
+        if (reasonNotSupported.isPresent()) {
+            return invalidUpdateVersion(featureName, newVersion, 
reasonNotSupported.get());
         }
 
         for (Entry<Integer, Map<String, VersionRange>> brokerEntry : 
brokersAndFeatures.entrySet()) {
@@ -208,19 +218,6 @@ public class FeatureControlManager {
         boolean allowUnsafeDowngrade,
         Consumer<ApiMessageAndVersion> recordConsumer
     ) {
-        Optional<VersionRange> quorumSupported = 
quorumFeatures.quorumSupportedFeature(MetadataVersion.FEATURE_NAME);
-        if (!quorumSupported.isPresent()) {
-            return invalidMetadataVersion(newVersionLevel, "The quorum does 
not support metadata.version.");
-        }
-
-        if (newVersionLevel <= 0) {
-            return invalidMetadataVersion(newVersionLevel, "KRaft mode/the 
quorum does not support metadata.version values less than 1.");
-        }
-
-        if (!quorumSupported.get().contains(newVersionLevel)) {
-            return invalidMetadataVersion(newVersionLevel, "The controller 
quorum does support this version.");
-        }
-
         MetadataVersion currentVersion = metadataVersion();
         final MetadataVersion newVersion;
         try {
@@ -234,9 +231,15 @@ public class FeatureControlManager {
             boolean metadataChanged = 
MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
             if (!metadataChanged) {
                 log.info("Downgrading metadata.version from {} to {}.", 
currentVersion, newVersion);
+            } else if (allowUnsafeDowngrade) {
+                log.info("Downgrading metadata.version unsafely from {} to 
{}.", currentVersion, newVersion);
             } else {
-                return invalidMetadataVersion(newVersionLevel, "Unsafe 
metadata.version downgrades are not supported.");
+                return invalidMetadataVersion(newVersionLevel, "Refusing to 
perform the requested " +
+                        "downgrade because it might delete metadata 
information. Retry using " +
+                        "UNSAFE_DOWNGRADE if you want to force the downgrade 
to proceed.");
             }
+        } else {
+            log.info("Upgrading metadata.version from {} to {}.", 
currentVersion, newVersion);
         }
 
         recordConsumer.accept(new ApiMessageAndVersion(
@@ -264,17 +267,22 @@ public class FeatureControlManager {
     }
 
     public void replay(FeatureLevelRecord record) {
-        if (!canSupportVersion(record.name(), record.featureLevel())) {
-            throw new RuntimeException("Controller cannot support feature " + 
record.name() +
-                                       " at version " + record.featureLevel());
+        VersionRange range = 
quorumFeatures.localSupportedFeature(record.name());
+        if (!range.contains(record.featureLevel())) {
+            throw new RuntimeException("Tried to apply FeatureLevelRecord " + 
record + ", but this controller only " +
+                "supports versions " + range);
         }
-
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
             log.info("Setting metadata.version to {}", record.featureLevel());
             
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
         } else {
-            log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
-            finalizedVersions.put(record.name(), record.featureLevel());
+            if (record.featureLevel() == 0) {
+                log.info("Removing feature {}", record.name());
+                finalizedVersions.remove(record.name());
+            } else {
+                log.info("Setting feature {} to {}", record.name(), 
record.featureLevel());
+                finalizedVersions.put(record.name(), record.featureLevel());
+            }
         }
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7ca557dada..97a2040a38 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1563,7 +1563,11 @@ public final class QuorumController implements 
Controller {
             setReplicaPlacer(replicaPlacer).
             setControllerMetrics(controllerMetrics).
             build();
-        this.featureControl = new FeatureControlManager(logContext, 
quorumFeatures, snapshotRegistry);
+        this.featureControl = new FeatureControlManager.Builder().
+                setLogContext(logContext).
+                setQuorumFeatures(quorumFeatures).
+                setSnapshotRegistry(snapshotRegistry).
+                build();
         this.producerIdControlManager = new 
ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 80a5c0a0b1..9b723515bd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -33,40 +33,41 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.OptionalInt;
 import java.util.stream.Collectors;
 
 /**
  * A holder class of the local node's supported feature flags as well as the 
ApiVersions of other nodes.
  */
 public class QuorumFeatures {
+    private static final VersionRange DISABLED = VersionRange.of(0, 0);
+
     private static final Logger log = 
LoggerFactory.getLogger(QuorumFeatures.class);
 
     private final int nodeId;
     private final ApiVersions apiVersions;
-    private final Map<String, VersionRange> supportedFeatures;
+    private final Map<String, VersionRange> localSupportedFeatures;
     private final List<Integer> quorumNodeIds;
 
     QuorumFeatures(
         int nodeId,
         ApiVersions apiVersions,
-        Map<String, VersionRange> supportedFeatures,
+        Map<String, VersionRange> localSupportedFeatures,
         List<Integer> quorumNodeIds
     ) {
         this.nodeId = nodeId;
         this.apiVersions = apiVersions;
-        this.supportedFeatures = 
Collections.unmodifiableMap(supportedFeatures);
+        this.localSupportedFeatures = 
Collections.unmodifiableMap(localSupportedFeatures);
         this.quorumNodeIds = Collections.unmodifiableList(quorumNodeIds);
     }
 
     public static QuorumFeatures create(
         int nodeId,
         ApiVersions apiVersions,
-        Map<String, VersionRange> supportedFeatures,
+        Map<String, VersionRange> localSupportedFeatures,
         Collection<Node> quorumNodes
     ) {
         List<Integer> nodeIds = 
quorumNodes.stream().map(Node::id).collect(Collectors.toList());
-        return new QuorumFeatures(nodeId, apiVersions, supportedFeatures, 
nodeIds);
+        return new QuorumFeatures(nodeId, apiVersions, localSupportedFeatures, 
nodeIds);
     }
 
     public static Map<String, VersionRange> defaultFeatureMap() {
@@ -75,47 +76,50 @@ public class QuorumFeatures {
         return features;
     }
 
-    Optional<VersionRange> quorumSupportedFeature(String featureName) {
-        List<VersionRange> supportedVersions = new 
ArrayList<>(quorumNodeIds.size());
-        for (int nodeId : quorumNodeIds) {
-            if (nodeId == this.nodeId) {
-                // We get this node's features from "supportedFeatures"
-                continue;
+    /**
+     * Return the reason a specific feature level is not supported, or 
Optional.empty if it is supported.
+     *
+     * @param featureName   The feature name.
+     * @param level         The feature level.
+     * @return              The reason why the feature level is not supported, 
or Optional.empty if it is supported.
+     */
+    public Optional<String> reasonNotSupported(String featureName, short 
level) {
+        VersionRange localRange = 
localSupportedFeatures.getOrDefault(featureName, DISABLED);
+        if (!localRange.contains(level)) {
+            if (localRange.equals(DISABLED)) {
+                return Optional.of("Local controller " + nodeId + " does not 
support this feature.");
+            } else {
+                return Optional.of("Local controller " + nodeId + " only 
supports versions " + localRange);
             }
-            NodeApiVersions nodeVersions = 
apiVersions.get(Integer.toString(nodeId));
+        }
+        List<String> missing = new ArrayList<>();
+        for (int id : quorumNodeIds) {
+            if (nodeId == id) {
+                continue; // We get the local node's features from 
localSupportedFeatures.
+            }
+            NodeApiVersions nodeVersions = 
apiVersions.get(Integer.toString(id));
             if (nodeVersions == null) {
+                missing.add(Integer.toString(id));
                 continue;
             }
             SupportedVersionRange supportedRange = 
nodeVersions.supportedFeatures().get(featureName);
-            if (supportedRange == null) {
-                supportedVersions.add(VersionRange.of(0, 0));
-            } else {
-                supportedVersions.add(VersionRange.of(supportedRange.min(), 
supportedRange.max()));
-            }
-        }
-        localSupportedFeature(featureName).ifPresent(supportedVersions::add);
-
-        if (supportedVersions.isEmpty()) {
-            return Optional.empty();
-        } else {
-            OptionalInt highestMinVersion = 
supportedVersions.stream().mapToInt(VersionRange::min).max();
-            OptionalInt lowestMaxVersion = 
supportedVersions.stream().mapToInt(VersionRange::max).min();
-            if (highestMinVersion.isPresent() && lowestMaxVersion.isPresent()) 
{
-                if (highestMinVersion.getAsInt() <= 
lowestMaxVersion.getAsInt()) {
-                    if (supportedVersions.size() < quorumNodeIds.size()) {
-                        log.info("Using incomplete set of quorum supported 
features.");
-                    }
-                    return Optional.of(VersionRange.of((short) 
highestMinVersion.getAsInt(), (short) lowestMaxVersion.getAsInt()));
+            VersionRange range = supportedRange == null ? DISABLED :
+                    VersionRange.of(supportedRange.min(), 
supportedRange.max());
+            if (!range.contains(level)) {
+                if (range.equals(DISABLED)) {
+                    return Optional.of("Controller " + id + " does not support 
this feature.");
                 } else {
-                    return Optional.empty();
+                    return Optional.of("Controller " + id + " only supports 
versions " + range);
                 }
-            } else {
-                return Optional.empty();
             }
         }
+        if (!missing.isEmpty()) {
+            log.info("Unable to get feature level information for 
controller(s): " + String.join(", ", missing));
+        }
+        return Optional.empty();
     }
 
-    Optional<VersionRange> localSupportedFeature(String featureName) {
-        return Optional.ofNullable(supportedFeatures.get(featureName));
+    VersionRange localSupportedFeature(String featureName) {
+        return localSupportedFeatures.getOrDefault(featureName, DISABLED);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 28eb187bfb..7f431c2d06 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.HashMap;
@@ -61,15 +60,11 @@ public final class FeaturesDelta {
         if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
             metadataVersionChange = 
MetadataVersion.fromFeatureLevel(record.featureLevel());
         } else {
-            changes.put(record.name(), Optional.of(record.featureLevel()));
-        }
-    }
-
-    public void replay(RemoveFeatureLevelRecord record) {
-        if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
-            metadataVersionChange = null;
-        } else {
-            changes.put(record.name(), Optional.empty());
+            if (record.featureLevel() == 0) {
+                changes.put(record.name(), Optional.empty());
+            } else {
+                changes.put(record.name(), Optional.of(record.featureLevel()));
+            }
         }
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 01455e360c..25e141ea0d 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
@@ -204,9 +203,6 @@ public final class MetadataDelta {
             case PRODUCER_IDS_RECORD:
                 replay((ProducerIdsRecord) record);
                 break;
-            case REMOVE_FEATURE_LEVEL_RECORD:
-                replay((RemoveFeatureLevelRecord) record);
-                break;
             case BROKER_REGISTRATION_CHANGE_RECORD:
                 replay((BrokerRegistrationChangeRecord) record);
                 break;
@@ -291,10 +287,6 @@ public final class MetadataDelta {
         getOrCreateProducerIdsDelta().replay(record);
     }
 
-    public void replay(RemoveFeatureLevelRecord record) {
-        getOrCreateFeaturesDelta().replay(record);
-    }
-
     public void replay(AccessControlEntryRecord record) {
         getOrCreateAclsDelta().replay(record);
     }
diff --git 
a/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json 
b/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json
deleted file mode 100644
index 6ed716192e..0000000000
--- a/metadata/src/main/resources/common/metadata/RemoveFeatureLevelRecord.json
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-{
-  "apiKey": 16,
-  "type": "metadata",
-  "name": "RemoveFeatureLevelRecord",
-  "validVersions": "0",
-  "flexibleVersions": "0+",
-  "fields": [
-    { "name": "Name", "type": "string", "versions": "0+",
-      "about": "The feature name." }
-  ]
-}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 02b1493548..3a8c4042df 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.admin.FeatureUpdate;
@@ -39,6 +40,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import static java.util.Collections.emptyList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 
@@ -70,7 +72,7 @@ public class FeatureControlManagerTest {
     public static QuorumFeatures features(Object... args) {
         Map<String, VersionRange> features = 
QuorumFeatures.defaultFeatureMap();
         features.putAll(rangeMap(args));
-        return new QuorumFeatures(0, new ApiVersions(), features, 
Collections.emptyList());
+        return new QuorumFeatures(0, new ApiVersions(), features, emptyList());
     }
 
     private static Map<String, Short> updateMap(Object... args) {
@@ -85,16 +87,17 @@ public class FeatureControlManagerTest {
 
     @Test
     public void testUpdateFeatures() {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features("foo", 1, 2)).
+            setSnapshotRegistry(snapshotRegistry).
+            build();
         snapshotRegistry.getOrCreateSnapshot(-1);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 2), snapshotRegistry);
         assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(), 
-1),
             manager.finalizedFeatures(-1));
-        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), 
Collections.
+        assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                    "Invalid update version 3 for feature foo. The quorum does 
not support the given feature version."))),
+                    "Invalid update version 3 for feature foo. Local 
controller 0 only supports versions 1-2"))),
             manager.updateFeatures(updateMap("foo", 3),
                 Collections.singletonMap("foo", 
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
                 Collections.emptyMap(), false));
@@ -104,7 +107,7 @@ public class FeatureControlManagerTest {
         Map<String, ApiError> expectedMap = new HashMap<>();
         expectedMap.put("foo", ApiError.NONE);
         expectedMap.put("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
-                "Invalid update version 1 for feature bar. The controller does 
not support the given feature."));
+                "Invalid update version 1 for feature bar. Local controller 0 
does not support this feature."));
         assertEquals(expectedMap, result.response());
         List<ApiMessageAndVersion> expectedMessages = new ArrayList<>();
         expectedMessages.add(new ApiMessageAndVersion(new FeatureLevelRecord().
@@ -121,8 +124,11 @@ public class FeatureControlManagerTest {
             setName("foo").setFeatureLevel((short) 2);
 
         snapshotRegistry.getOrCreateSnapshot(-1);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 2), snapshotRegistry);
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+                setLogContext(logContext).
+                setQuorumFeatures(features("foo", 1, 2)).
+                setSnapshotRegistry(snapshotRegistry).
+                build();
         manager.replay(record);
         snapshotRegistry.getOrCreateSnapshot(123);
         assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2), 
123),
@@ -133,12 +139,15 @@ public class FeatureControlManagerTest {
     public void testUpdateFeaturesErrorCases() {
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setLogContext(logContext).
+            setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
+            setSnapshotRegistry(snapshotRegistry).
+            build();
 
         assertEquals(
             ControllerResult.atomicOf(
-                Collections.emptyList(),
+                emptyList(),
                 Collections.singletonMap(
                     "foo",
                     new ApiError(
@@ -160,7 +169,7 @@ public class FeatureControlManagerTest {
         manager.replay((FeatureLevelRecord) result.records().get(0).message());
         snapshotRegistry.getOrCreateSnapshot(3);
 
-        assertEquals(ControllerResult.atomicOf(Collections.emptyList(), 
Collections.
+        assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
                 singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
                     "Invalid update version 2 for feature foo. Can't downgrade 
the version of this feature " +
                     "without setting the upgrade type to either safe or unsafe 
downgrade."))),
@@ -191,8 +200,11 @@ public class FeatureControlManagerTest {
     public void testFeatureControlIterator() throws Exception {
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext,
-            features("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setLogContext(logContext).
+            setQuorumFeatures(features("foo", 1, 5, "bar", 1, 2)).
+            setSnapshotRegistry(snapshotRegistry).
+            build();
         ControllerResult<Map<String, ApiError>> result = manager.
             updateFeatures(updateMap("foo", 5, "bar", 1),
                 Collections.emptyMap(), Collections.emptyMap(), false);
@@ -208,57 +220,28 @@ public class FeatureControlManagerTest {
     }
 
     @Test
-    public void testInitializeMetadataVersion() {
-        // Default QuorumFeatures
-        checkMetadataVersion(features(), MetadataVersion.IBP_3_0_IV0, 
Errors.NONE);
-        checkMetadataVersion(features(), MetadataVersion.latest(), 
Errors.NONE);
-        checkMetadataVersion(features(), MetadataVersion.UNINITIALIZED, 
Errors.INVALID_UPDATE_VERSION);
-        checkMetadataVersion(features(), MetadataVersion.IBP_2_7_IV1, 
Errors.INVALID_UPDATE_VERSION);
-
-        // Increased QuorumFeatures
-        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_2_IV0.featureLevel(), 
MetadataVersion.IBP_3_3_IV0.featureLevel());
-        checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, 
Errors.INVALID_UPDATE_VERSION);
-
-        // Empty QuorumFeatures
-        features = new QuorumFeatures(0, new ApiVersions(), 
Collections.emptyMap(), Collections.emptyList());
-        checkMetadataVersion(features, MetadataVersion.latest(), 
Errors.INVALID_UPDATE_VERSION);
-        checkMetadataVersion(features, MetadataVersion.IBP_3_0_IV0, 
Errors.INVALID_UPDATE_VERSION);
-    }
-
-    @Test
-    public void reInitializeMetadataVersion() {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext, 
features(), snapshotRegistry);
-        ControllerResult<Map<String, ApiError>> result = 
manager.initializeMetadataVersion(MetadataVersion.IBP_3_0_IV0.featureLevel());
-        Errors actual = 
result.response().get(MetadataVersion.FEATURE_NAME).error();
-        assertEquals(Errors.NONE, actual);
-        RecordTestUtils.replayAll(manager, result.records());
-
-        result = 
manager.initializeMetadataVersion(MetadataVersion.latest().featureLevel());
-        actual = result.response().get(MetadataVersion.FEATURE_NAME).error();
-        assertEquals(Errors.INVALID_UPDATE_VERSION, actual);
-    }
-
-    public void checkMetadataVersion(QuorumFeatures features, MetadataVersion 
version, Errors expected) {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager(logContext, 
features, snapshotRegistry);
-        ControllerResult<Map<String, ApiError>> result = 
manager.initializeMetadataVersion(version.featureLevel());
-        Errors actual = 
result.response().get(MetadataVersion.FEATURE_NAME).error();
-        assertEquals(expected, actual);
+    public void testApplyMetadataVersionChangeRecord() {
+        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
+                MetadataVersion.IBP_3_0_IV0.featureLevel(), 
MetadataVersion.IBP_3_3_IV0.featureLevel());
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features).build();
+        manager.replay(new FeatureLevelRecord().
+            setName(MetadataVersion.FEATURE_NAME).
+            setFeatureLevel(MetadataVersion.IBP_3_0_IV0.featureLevel()));
+        assertEquals(MetadataVersion.IBP_3_0_IV0, manager.metadataVersion());
     }
 
     @Test
     public void testDowngradeMetadataVersion() {
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_2_IV0.featureLevel(), 
MetadataVersion.IBP_3_3_IV0.featureLevel());
-        FeatureControlManager manager = new FeatureControlManager(logContext, 
features, snapshotRegistry);
-        ControllerResult<Map<String, ApiError>> result = 
manager.initializeMetadataVersion(MetadataVersion.IBP_3_3_IV0.featureLevel());
-        RecordTestUtils.replayAll(manager, result.records());
+        QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
+                MetadataVersion.IBP_3_2_IV0.featureLevel(), 
MetadataVersion.IBP_3_3_IV0.featureLevel());
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+            setQuorumFeatures(features).
+            setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
+            build();
         assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0);
 
+        ControllerResult<Map<String, ApiError>> result;
         result = manager.updateFeatures(
             Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_2_IV0.featureLevel()),
             Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
FeatureUpdate.UpgradeType.UPGRADE),
@@ -280,7 +263,39 @@ public class FeatureControlManagerTest {
                 Collections.emptyMap(),
                 true);
         assertEquals(Errors.INVALID_UPDATE_VERSION, 
result.response().get(MetadataVersion.FEATURE_NAME).error());
-        assertEquals("Invalid update version 1 for feature metadata.version. 
The quorum does not support the given feature version.",
+        assertEquals("Invalid update version 1 for feature metadata.version. 
Local controller 0 only supports versions 4-5",
             result.response().get(MetadataVersion.FEATURE_NAME).message());
     }
+
+    @Test
+    public void testCreateFeatureLevelRecords() {
+        Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
+        localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, 
VersionRange.of(
+                MetadataVersion.IBP_3_0_IV0.featureLevel(), 
MetadataVersion.latest().featureLevel()));
+        localSupportedFeatures.put("foo", VersionRange.of(0, 2));
+        FeatureControlManager manager = new FeatureControlManager.Builder().
+                setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), 
localSupportedFeatures, emptyList())).
+                build();
+        ControllerResult<Map<String, ApiError>> result  = 
manager.updateFeatures(
+                Collections.singletonMap("foo", (short) 1),
+                Collections.singletonMap("foo", 
FeatureUpdate.UpgradeType.UPGRADE),
+                Collections.singletonMap(1, Collections.singletonMap("foo", 
VersionRange.of(0, 3))),
+                false);
+        assertEquals(ControllerResult.atomicOf(Arrays.asList(new 
ApiMessageAndVersion(
+                new 
FeatureLevelRecord().setName("foo").setFeatureLevel((short) 1), (short) 0)),
+                        Collections.singletonMap("foo", ApiError.NONE)), 
result);
+        RecordTestUtils.replayAll(manager, result.records());
+        assertEquals(Optional.of((short) 1), 
manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
+
+        ControllerResult<Map<String, ApiError>> result2  = 
manager.updateFeatures(
+                Collections.singletonMap("foo", (short) 0),
+                Collections.singletonMap("foo", 
FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
+                Collections.singletonMap(1, Collections.singletonMap("foo", 
VersionRange.of(0, 3))),
+                false);
+        assertEquals(ControllerResult.atomicOf(Arrays.asList(new 
ApiMessageAndVersion(
+                        new 
FeatureLevelRecord().setName("foo").setFeatureLevel((short) 0), (short) 0)),
+                Collections.singletonMap("foo", ApiError.NONE)), result2);
+        RecordTestUtils.replayAll(manager, result2.records());
+        assertEquals(Optional.empty(), 
manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
index 0194cd674e..7d8ba5bfec 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java
@@ -19,86 +19,74 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.common.message.ApiVersionsResponseData;
+import 
org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
 import org.apache.kafka.metadata.VersionRange;
 import org.junit.jupiter.api.Test;
 
+import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 
+import static java.util.Collections.emptyMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class QuorumFeaturesTest {
-    @Test
-    public void testQuorumFeatures() {
-        ApiVersions apiVersions = new ApiVersions();
-        Map<String, VersionRange> featureMap = new HashMap<>(2);
-        featureMap.put("foo", VersionRange.of(1, 2));
-        featureMap.put("bar", VersionRange.of(3, 5));
-
-        List<Integer> nodeIds = new ArrayList<>();
-        nodeIds.add(0);
-
-        QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, 
featureMap, nodeIds);
-        assertLocalFeature(quorumFeatures, "foo", 1, 2);
-        assertLocalFeature(quorumFeatures, "bar", 3, 5);
-        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
-        assertQuorumFeature(quorumFeatures, "bar", 3, 5);
+    private final static Map<String, VersionRange> LOCAL;
 
-        // Add a second node with identical features
-        nodeIds.add(1);
-        apiVersions.update("1", nodeApiVersions(featureMap));
-        assertLocalFeature(quorumFeatures, "foo", 1, 2);
-        assertLocalFeature(quorumFeatures, "bar", 3, 5);
-        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
-        assertQuorumFeature(quorumFeatures, "bar", 3, 5);
-
-        // Change the supported features of one node
-        Map<String, VersionRange> node1Features = new HashMap<>(featureMap);
-        node1Features.put("bar", VersionRange.of(3, 4));
-        apiVersions.update("1", nodeApiVersions(node1Features));
-        assertLocalFeature(quorumFeatures, "foo", 1, 2);
-        assertLocalFeature(quorumFeatures, "bar", 3, 5);
-        assertQuorumFeature(quorumFeatures, "foo", 1, 2);
-        assertQuorumFeature(quorumFeatures, "bar", 3, 4);
-
-        // Add a third node with no features
-        nodeIds.add(2);
-        apiVersions.update("1", NodeApiVersions.create());
-        assertFalse(quorumFeatures.quorumSupportedFeature("foo").isPresent());
-        assertFalse(quorumFeatures.quorumSupportedFeature("bar").isPresent());
+    static {
+        Map<String, VersionRange> local = new HashMap<>();
+        local.put("foo", VersionRange.of(0, 3));
+        local.put("bar", VersionRange.of(0, 4));
+        local.put("baz", VersionRange.of(2, 2));
+        LOCAL = Collections.unmodifiableMap(local);
     }
 
-
-    public static NodeApiVersions nodeApiVersions(Map<String, VersionRange> 
featureMap) {
-        List<ApiVersionsResponseData.SupportedFeatureKey> supportedFeatures = 
new ArrayList<>(featureMap.size());
-        featureMap.forEach((featureName, versionRange) -> {
-            supportedFeatures.add(new 
ApiVersionsResponseData.SupportedFeatureKey()
-                .setName(featureName)
-                .setMinVersion(versionRange.min())
-                .setMaxVersion(versionRange.max()));
-        });
-        return new NodeApiVersions(Collections.emptyList(), supportedFeatures);
+    @Test
+    public void testDefaultSupportedLevels() {
+        QuorumFeatures quorumFeatures = new QuorumFeatures(0, new 
ApiVersions(), emptyMap(), Arrays.asList(0, 1, 2));
+        assertEquals(Optional.empty(), 
quorumFeatures.reasonNotSupported("foo", (short) 0));
+        assertEquals(Optional.of("Local controller 0 does not support this 
feature."),
+            quorumFeatures.reasonNotSupported("foo", (short) 1));
     }
 
-    private void assertLocalFeature(QuorumFeatures features, String name, int 
expectedMin, int expectedMax) {
-        Optional<VersionRange> featureRange = 
features.localSupportedFeature(name);
-        assertTrue(featureRange.isPresent());
-        assertEquals(expectedMin, featureRange.get().min());
-        assertEquals(expectedMax, featureRange.get().max());
+    @Test
+    public void testLocalSupportedFeature() {
+        QuorumFeatures quorumFeatures = new QuorumFeatures(0, new 
ApiVersions(), LOCAL, Arrays.asList(0, 1, 2));
+        assertEquals(VersionRange.of(0, 3), 
quorumFeatures.localSupportedFeature("foo"));
+        assertEquals(VersionRange.of(0, 4), 
quorumFeatures.localSupportedFeature("bar"));
+        assertEquals(VersionRange.of(2, 2), 
quorumFeatures.localSupportedFeature("baz"));
+        assertEquals(VersionRange.of(0, 0), 
quorumFeatures.localSupportedFeature("quux"));
     }
 
-    private void assertQuorumFeature(QuorumFeatures features, String name, int 
expectedMin, int expectedMax) {
-        Optional<VersionRange> featureRange = 
features.quorumSupportedFeature(name);
-        assertTrue(featureRange.isPresent());
-        assertEquals(expectedMin, featureRange.get().min());
-        assertEquals(expectedMax, featureRange.get().max());
+    @Test
+    public void testReasonNotSupported() {
+        ApiVersions apiVersions = new ApiVersions();
+        QuorumFeatures quorumFeatures = new QuorumFeatures(0, apiVersions, 
LOCAL, Arrays.asList(0, 1, 2));
+        assertEquals(Optional.of("Local controller 0 only supports versions 
0-3"),
+                quorumFeatures.reasonNotSupported("foo", (short) 10));
+        apiVersions.update("1", nodeApiVersions(Arrays.asList(
+                new SimpleImmutableEntry<>("foo", VersionRange.of(1, 3)),
+                new SimpleImmutableEntry<>("bar", VersionRange.of(1, 3)),
+                new SimpleImmutableEntry<>("baz", VersionRange.of(1, 2)))));
+        assertEquals(Optional.empty(), 
quorumFeatures.reasonNotSupported("bar", (short) 3));
+        assertEquals(Optional.of("Controller 1 only supports versions 1-3"),
+                quorumFeatures.reasonNotSupported("bar", (short) 4));
     }
 
+    private static NodeApiVersions nodeApiVersions(List<Entry<String, 
VersionRange>> entries) {
+        List<SupportedFeatureKey> features = new ArrayList<>();
+        entries.forEach(entry -> {
+            features.add(new SupportedFeatureKey().
+                    setName(entry.getKey()).
+                    setMinVersion(entry.getValue().min()).
+                    setMaxVersion(entry.getValue().max()));
+        });
+        return new NodeApiVersions(Collections.emptyList(), features);
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
index 0812048bb0..6ea31b080b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.RemoveFeatureLevelRecord;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -30,8 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_RECORD;
-import static 
org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_FEATURE_LEVEL_RECORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 
@@ -52,11 +49,13 @@ public class FeaturesImageTest {
         DELTA1_RECORDS = new ArrayList<>();
         DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
             setName("foo").setFeatureLevel((short) 3),
-            FEATURE_LEVEL_RECORD.highestSupportedVersion()));
-        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveFeatureLevelRecord().
-            setName("bar"), 
REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
-        DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveFeatureLevelRecord().
-            setName("baz"), 
REMOVE_FEATURE_LEVEL_RECORD.highestSupportedVersion()));
+            (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("bar").setFeatureLevel((short) 0),
+            (short) 0));
+        DELTA1_RECORDS.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+            setName("baz").setFeatureLevel((short) 0),
+            (short) 0));
 
         DELTA1 = new FeaturesDelta(IMAGE1);
         RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);

Reply via email to