dengziming commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r862416055


##########
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##########
@@ -271,4 +273,30 @@ public static ApiVersion toApiVersion(ApiKeys apiKey) {
             .setMinVersion(apiKey.oldestVersion())
             .setMaxVersion(apiKey.latestVersion());
     }
+
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        RecordVersion minRecordVersion,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Features<FinalizedVersionRange> finalizedFeatures,
+        long finalizedFeaturesEpoch,
+        NodeApiVersions controllerApiVersions,
+        ListenerType listenerType
+    ) {
+        ApiVersionCollection apiKeys;
+        if (controllerApiVersions != null) {
+            apiKeys = ApiVersionsResponse.intersectForwardableApis(
+                listenerType, minRecordVersion, 
controllerApiVersions.allSupportedApiVersions());
+        } else {
+            apiKeys = ApiVersionsResponse.filterApis(minRecordVersion, 
listenerType);
+        }
+
+        return ApiVersionsResponse.createApiVersionsResponse(

Review Comment:
   We can remove `ApiVersionsResponse.` since we are already in 
ApiVersionsResponse class, the same with line 288 and line 291.



##########
clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java:
##########
@@ -102,6 +110,91 @@ public void 
shouldHaveCommonlyAgreedApiVersionResponseWithControllerOnForwardabl
             ApiKeys.JOIN_GROUP.latestVersion(), commonResponse);
     }
 
+    @Test
+    public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() {
+        ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(
+            10,
+            RecordVersion.V1,
+            Features.emptySupportedFeatures(),
+            Features.emptyFinalizedFeatures(),
+            ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+            null,
+            ListenerType.ZK_BROKER
+        );
+        verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
+        assertEquals(10, response.throttleTimeMs());
+        assertTrue(response.data().supportedFeatures().isEmpty());
+        assertTrue(response.data().finalizedFeatures().isEmpty());
+        assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, 
response.data().finalizedFeaturesEpoch());
+    }
+
+    @Test
+    public void 
shouldReturnFeatureKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {
+        ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(
+            10,
+            RecordVersion.V1,
+            Features.supportedFeatures(
+                Utils.mkMap(Utils.mkEntry("feature", new 
SupportedVersionRange((short) 1, (short) 4)))),
+            Features.finalizedFeatures(
+                Utils.mkMap(Utils.mkEntry("feature", new 
FinalizedVersionRange((short) 2, (short) 3)))),
+            10L,
+            null,
+            ListenerType.ZK_BROKER
+        );
+
+        verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
+        assertEquals(10, response.throttleTimeMs());
+        assertEquals(1, response.data().supportedFeatures().size());
+        SupportedFeatureKey sKey = 
response.data().supportedFeatures().find("feature");
+        assertNotNull(sKey);
+        assertEquals(1, sKey.minVersion());
+        assertEquals(4, sKey.maxVersion());
+        assertEquals(1, response.data().finalizedFeatures().size());
+        FinalizedFeatureKey fKey = 
response.data().finalizedFeatures().find("feature");
+        assertNotNull(fKey);
+        assertEquals(2, fKey.minVersionLevel());
+        assertEquals(3, fKey.maxVersionLevel());
+        assertEquals(10, response.data().finalizedFeaturesEpoch());
+    }
+
+    public void 
shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() {

Review Comment:
   Is this method used anywhere? or we should add an @Test to it.



##########
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java:
##########
@@ -271,4 +273,30 @@ public static ApiVersion toApiVersion(ApiKeys apiKey) {
             .setMinVersion(apiKey.oldestVersion())
             .setMaxVersion(apiKey.latestVersion());
     }
+
+    public static ApiVersionsResponse apiVersionsResponse(

Review Comment:
   The naming strategy here needs to be unified, we have many overloads of 
public static createApiVersionsResponse methods, maybe we can also change this 
method to createApiVersionsResponse and move it upwards to put it together with 
others.



##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType.ListenerType;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import 
org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
+import 
org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.utils.Utils;
+
+import static org.apache.kafka.server.common.MetadataVersion.*;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.jupiter.api.Test;
+
+class MetadataVersionTest {
+
+    @Test
+    public void testFeatureLevel() {
+        MetadataVersion[] metadataVersions = MetadataVersion.values();
+        int firstFeatureLevelIndex = 
Arrays.asList(metadataVersions).indexOf(IBP_3_0_IV0);
+        for (int i = 0; i < firstFeatureLevelIndex; i++) {
+            assertFalse(metadataVersions[i].featureLevel().isPresent());
+        }
+        short expectedFeatureLevel = 1;
+        for (int i = firstFeatureLevelIndex; i < metadataVersions.length; i++) 
{
+            MetadataVersion metadataVersion = metadataVersions[i];
+            short featureLevel = metadataVersion.featureLevel().orElseThrow(() 
->
+                new IllegalArgumentException(
+                    String.format("Metadata version %s must have a non-null 
feature level", metadataVersion.version())));
+            assertEquals(expectedFeatureLevel, featureLevel,
+                String.format("Metadata version %s should have feature level 
%s", metadataVersion.version(), expectedFeatureLevel));
+            expectedFeatureLevel += 1;
+        }
+    }
+
+    @Test
+    public void testFromVersionString() {
+        assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0"));
+        assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0.0"));
+        assertEquals(IBP_0_8_0, MetadataVersion.fromVersionString("0.8.0.1"));
+        // should throw an exception as long as IBP_8_0_IV0 is not defined
+        assertThrows(IllegalArgumentException.class, () -> 
MetadataVersion.fromVersionString("8.0"));
+
+        assertEquals(IBP_0_8_1, MetadataVersion.fromVersionString("0.8.1"));
+        assertEquals(IBP_0_8_1, MetadataVersion.fromVersionString("0.8.1.0"));
+        assertEquals(IBP_0_8_1, MetadataVersion.fromVersionString("0.8.1.1"));
+
+        assertEquals(IBP_0_8_2, MetadataVersion.fromVersionString("0.8.2"));
+        assertEquals(IBP_0_8_2, MetadataVersion.fromVersionString("0.8.2.0"));
+        assertEquals(IBP_0_8_2, MetadataVersion.fromVersionString("0.8.2.1"));
+
+        assertEquals(IBP_0_9_0, MetadataVersion.fromVersionString("0.9.0"));
+        assertEquals(IBP_0_9_0, MetadataVersion.fromVersionString("0.9.0.0"));
+        assertEquals(IBP_0_9_0, MetadataVersion.fromVersionString("0.9.0.1"));
+
+        assertEquals(IBP_0_10_0_IV0, 
MetadataVersion.fromVersionString("0.10.0-IV0"));
+
+        assertEquals(IBP_0_10_0_IV1, 
MetadataVersion.fromVersionString("0.10.0"));
+        assertEquals(IBP_0_10_0_IV1, 
MetadataVersion.fromVersionString("0.10.0.0"));
+        assertEquals(IBP_0_10_0_IV1, 
MetadataVersion.fromVersionString("0.10.0.0-IV0"));
+        assertEquals(IBP_0_10_0_IV1, 
MetadataVersion.fromVersionString("0.10.0.1"));
+
+        assertEquals(IBP_0_10_1_IV0, 
MetadataVersion.fromVersionString("0.10.1-IV0"));
+        assertEquals(IBP_0_10_1_IV1, 
MetadataVersion.fromVersionString("0.10.1-IV1"));
+
+        assertEquals(IBP_0_10_1_IV2, 
MetadataVersion.fromVersionString("0.10.1"));
+        assertEquals(IBP_0_10_1_IV2, 
MetadataVersion.fromVersionString("0.10.1.0"));
+        assertEquals(IBP_0_10_1_IV2, 
MetadataVersion.fromVersionString("0.10.1-IV2"));
+        assertEquals(IBP_0_10_1_IV2, 
MetadataVersion.fromVersionString("0.10.1.1"));
+
+        assertEquals(IBP_0_10_2_IV0, 
MetadataVersion.fromVersionString("0.10.2"));
+        assertEquals(IBP_0_10_2_IV0, 
MetadataVersion.fromVersionString("0.10.2.0"));
+        assertEquals(IBP_0_10_2_IV0, 
MetadataVersion.fromVersionString("0.10.2-IV0"));
+        assertEquals(IBP_0_10_2_IV0, 
MetadataVersion.fromVersionString("0.10.2.1"));
+
+        assertEquals(IBP_0_11_0_IV0, 
MetadataVersion.fromVersionString("0.11.0-IV0"));
+        assertEquals(IBP_0_11_0_IV1, 
MetadataVersion.fromVersionString("0.11.0-IV1"));
+
+        assertEquals(IBP_0_11_0_IV2, 
MetadataVersion.fromVersionString("0.11.0"));
+        assertEquals(IBP_0_11_0_IV2, 
MetadataVersion.fromVersionString("0.11.0.0"));
+        assertEquals(IBP_0_11_0_IV2, 
MetadataVersion.fromVersionString("0.11.0-IV2"));
+        assertEquals(IBP_0_11_0_IV2, 
MetadataVersion.fromVersionString("0.11.0.1"));
+
+        assertEquals(IBP_1_0_IV0, MetadataVersion.fromVersionString("1.0"));
+        assertEquals(IBP_1_0_IV0, MetadataVersion.fromVersionString("1.0.0"));
+        assertEquals(IBP_1_0_IV0, 
MetadataVersion.fromVersionString("1.0.0-IV0"));
+        assertEquals(IBP_1_0_IV0, MetadataVersion.fromVersionString("1.0.1"));
+        assertThrows(IllegalArgumentException.class, () -> 
MetadataVersion.fromVersionString("0.1.0"));
+        assertThrows(IllegalArgumentException.class, () -> 
MetadataVersion.fromVersionString("0.1.0.0"));
+        assertThrows(IllegalArgumentException.class, () -> 
MetadataVersion.fromVersionString("0.1.0-IV0"));
+        assertThrows(IllegalArgumentException.class, () -> 
MetadataVersion.fromVersionString("0.1.0.0-IV0"));
+
+        assertEquals(IBP_1_1_IV0, 
MetadataVersion.fromVersionString("1.1-IV0"));
+
+        assertEquals(IBP_2_0_IV1, MetadataVersion.fromVersionString("2.0"));
+        assertEquals(IBP_2_0_IV0, 
MetadataVersion.fromVersionString("2.0-IV0"));
+        assertEquals(IBP_2_0_IV1, 
MetadataVersion.fromVersionString("2.0-IV1"));
+
+        assertEquals(IBP_2_1_IV2, MetadataVersion.fromVersionString("2.1"));
+        assertEquals(IBP_2_1_IV0, 
MetadataVersion.fromVersionString("2.1-IV0"));
+        assertEquals(IBP_2_1_IV1, 
MetadataVersion.fromVersionString("2.1-IV1"));
+        assertEquals(IBP_2_1_IV2, 
MetadataVersion.fromVersionString("2.1-IV2"));
+
+        assertEquals(IBP_2_2_IV1, MetadataVersion.fromVersionString("2.2"));
+        assertEquals(IBP_2_2_IV0, 
MetadataVersion.fromVersionString("2.2-IV0"));
+        assertEquals(IBP_2_2_IV1, 
MetadataVersion.fromVersionString("2.2-IV1"));
+
+        assertEquals(IBP_2_3_IV1, MetadataVersion.fromVersionString("2.3"));
+        assertEquals(IBP_2_3_IV0, 
MetadataVersion.fromVersionString("2.3-IV0"));
+        assertEquals(IBP_2_3_IV1, 
MetadataVersion.fromVersionString("2.3-IV1"));
+
+        assertEquals(IBP_2_4_IV1, MetadataVersion.fromVersionString("2.4"));
+        assertEquals(IBP_2_4_IV0, 
MetadataVersion.fromVersionString("2.4-IV0"));
+        assertEquals(IBP_2_4_IV1, 
MetadataVersion.fromVersionString("2.4-IV1"));
+
+        assertEquals(IBP_2_5_IV0, MetadataVersion.fromVersionString("2.5"));
+        assertEquals(IBP_2_5_IV0, 
MetadataVersion.fromVersionString("2.5-IV0"));
+
+        assertEquals(IBP_2_6_IV0, MetadataVersion.fromVersionString("2.6"));
+        assertEquals(IBP_2_6_IV0, 
MetadataVersion.fromVersionString("2.6-IV0"));
+
+        assertEquals(IBP_2_7_IV0, 
MetadataVersion.fromVersionString("2.7-IV0"));
+        assertEquals(IBP_2_7_IV1, 
MetadataVersion.fromVersionString("2.7-IV1"));
+        assertEquals(IBP_2_7_IV2, 
MetadataVersion.fromVersionString("2.7-IV2"));
+
+        assertEquals(IBP_2_8_IV1, MetadataVersion.fromVersionString("2.8"));
+        assertEquals(IBP_2_8_IV0, 
MetadataVersion.fromVersionString("2.8-IV0"));
+        assertEquals(IBP_2_8_IV1, 
MetadataVersion.fromVersionString("2.8-IV1"));
+
+        assertEquals(IBP_3_0_IV1, MetadataVersion.fromVersionString("3.0"));
+        assertEquals(IBP_3_0_IV0, 
MetadataVersion.fromVersionString("3.0-IV0"));
+        assertEquals(IBP_3_0_IV1, 
MetadataVersion.fromVersionString("3.0-IV1"));
+
+        assertEquals(IBP_3_1_IV0, MetadataVersion.fromVersionString("3.1"));
+        assertEquals(IBP_3_1_IV0, 
MetadataVersion.fromVersionString("3.1-IV0"));
+
+        assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
+        assertEquals(IBP_3_2_IV0, 
MetadataVersion.fromVersionString("3.2-IV0"));
+    }
+
+    @Test
+    public void testMinSupportedVersionFor() {
+        assertEquals(IBP_0_8_0, 
MetadataVersion.minSupportedFor(RecordVersion.V0));
+        assertEquals(IBP_0_10_0_IV0, 
MetadataVersion.minSupportedFor(RecordVersion.V1));
+        assertEquals(IBP_0_11_0_IV0, 
MetadataVersion.minSupportedFor(RecordVersion.V2));
+
+        // Ensure that all record versions have a defined min version so that 
we remember to update the method
+        for (RecordVersion recordVersion : RecordVersion.values()) {
+            assertNotNull(MetadataVersion.minSupportedFor(recordVersion));
+        }
+    }
+
+    @Test
+    public void testShortVersion() {
+        assertEquals("0.8.0", IBP_0_8_0.shortVersion());
+        assertEquals("0.10.0", IBP_0_10_0_IV0.shortVersion());
+        assertEquals("0.10.0", IBP_0_10_0_IV1.shortVersion());
+        assertEquals("0.11.0", IBP_0_11_0_IV0.shortVersion());
+        assertEquals("0.11.0", IBP_0_11_0_IV1.shortVersion());
+        assertEquals("0.11.0", IBP_0_11_0_IV2.shortVersion());
+        assertEquals("1.0", IBP_1_0_IV0.shortVersion());
+        assertEquals("1.1", IBP_1_1_IV0.shortVersion());
+        assertEquals("2.0", IBP_2_0_IV0.shortVersion());
+        assertEquals("2.0", IBP_2_0_IV1.shortVersion());
+        assertEquals("2.1", IBP_2_1_IV0.shortVersion());
+        assertEquals("2.1", IBP_2_1_IV1.shortVersion());
+        assertEquals("2.1", IBP_2_1_IV2.shortVersion());
+        assertEquals("2.2", IBP_2_2_IV0.shortVersion());
+        assertEquals("2.2", IBP_2_2_IV1.shortVersion());
+        assertEquals("2.3", IBP_2_3_IV0.shortVersion());
+        assertEquals("2.3", IBP_2_3_IV1.shortVersion());
+        assertEquals("2.4", IBP_2_4_IV0.shortVersion());
+        assertEquals("2.5", IBP_2_5_IV0.shortVersion());
+        assertEquals("2.6", IBP_2_6_IV0.shortVersion());
+        assertEquals("2.7", IBP_2_7_IV2.shortVersion());
+        assertEquals("2.8", IBP_2_8_IV0.shortVersion());
+        assertEquals("2.8", IBP_2_8_IV1.shortVersion());
+        assertEquals("3.0", IBP_3_0_IV0.shortVersion());
+        assertEquals("3.0", IBP_3_0_IV1.shortVersion());
+        assertEquals("3.1", IBP_3_1_IV0.shortVersion());
+        assertEquals("3.2", IBP_3_2_IV0.shortVersion());
+    }
+
+    @Test
+    public void testVersion() {
+        assertEquals("0.8.0", IBP_0_8_0.version());
+        assertEquals("0.8.2", IBP_0_8_2.version());
+        assertEquals("0.10.0-IV0", IBP_0_10_0_IV0.version());
+        assertEquals("0.10.0-IV1", IBP_0_10_0_IV1.version());
+        assertEquals("0.11.0-IV0", IBP_0_11_0_IV0.version());
+        assertEquals("0.11.0-IV1", IBP_0_11_0_IV1.version());
+        assertEquals("0.11.0-IV2", IBP_0_11_0_IV2.version());
+        assertEquals("1.0-IV0", IBP_1_0_IV0.version());
+        assertEquals("1.1-IV0", IBP_1_1_IV0.version());
+        assertEquals("2.0-IV0", IBP_2_0_IV0.version());
+        assertEquals("2.0-IV1", IBP_2_0_IV1.version());
+        assertEquals("2.1-IV0", IBP_2_1_IV0.version());
+        assertEquals("2.1-IV1", IBP_2_1_IV1.version());
+        assertEquals("2.1-IV2", IBP_2_1_IV2.version());
+        assertEquals("2.2-IV0", IBP_2_2_IV0.version());
+        assertEquals("2.2-IV1", IBP_2_2_IV1.version());
+        assertEquals("2.3-IV0", IBP_2_3_IV0.version());
+        assertEquals("2.3-IV1", IBP_2_3_IV1.version());
+        assertEquals("2.4-IV0", IBP_2_4_IV0.version());
+        assertEquals("2.5-IV0", IBP_2_5_IV0.version());
+        assertEquals("2.6-IV0", IBP_2_6_IV0.version());
+        assertEquals("2.7-IV2", IBP_2_7_IV2.version());
+        assertEquals("2.8-IV0", IBP_2_8_IV0.version());
+        assertEquals("2.8-IV1", IBP_2_8_IV1.version());
+        assertEquals("3.0-IV0", IBP_3_0_IV0.version());
+        assertEquals("3.0-IV1", IBP_3_0_IV1.version());
+        assertEquals("3.1-IV0", IBP_3_1_IV0.version());
+        assertEquals("3.2-IV0", IBP_3_2_IV0.version());
+    }
+
+    @Test
+    public void testMetadataVersionValidator() {

Review Comment:
   nit: Add a MetadataVersionValidatorTest class?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+/**
+ * This class contains the different Kafka versions.
+ * Right now, we use them for upgrades - users can configure the version of 
the API brokers will use to communicate between themselves.
+ * This is only for inter-broker communications - when communicating with 
clients, the client decides on the API version.
+ *
+ * Note that the ID we initialize for each version is important.
+ * We consider a version newer than another if it is lower in the enum list 
(to avoid depending on lexicographic order)
+ *
+ * Since the api protocol may change more than once within the same release 
and to facilitate people deploying code from
+ * trunk, we have the concept of internal versions (first introduced during 
the 0.10.0 development cycle). For example,
+ * the first time we introduce a version change in a release, say 0.10.0, we 
will add a config value "0.10.0-IV0" and a
+ * corresponding enum constant IBP_0_10_0-IV0. We will also add a config value 
"0.10.0" that will be mapped to the
+ * latest internal version object, which is IBP_0_10_0-IV0. When we change the 
protocol a second time while developing
+ * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding 
enum constant IBP_0_10_0-IV1. We will change
+ * the config value "0.10.0" to map to the latest internal version 
IBP_0_10_0-IV1. The config value of
+ * "0.10.0-IV0" is still mapped to IBP_0_10_0-IV0. This way, if people are 
deploying from trunk, they can use
+ * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. 
For most people who just want to use
+ * released version, they can use "0.10.0" when upgrading to the 0.10.0 
release.
+ */
+public enum MetadataVersion {
+    IBP_0_8_0(-1, "0.8.0", ""),
+    IBP_0_8_1(-1, "0.8.1", ""),
+    IBP_0_8_2(-1, "0.8.2", ""),
+    IBP_0_9_0(-1, "0.9.0", ""),
+
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+    IBP_0_10_0_IV0(-1, "0.10.0", "IV0"),
+
+    // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+    IBP_0_10_0_IV1(-1, "0.10.0", "IV1"),
+
+    // introduced for JoinGroup protocol change in KIP-62
+    IBP_0_10_1_IV0(-1, "0.10.1", "IV0"),
+
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    IBP_0_10_1_IV1(-1, "0.10.1", "IV1"),
+
+    // introduced ListOffsetRequest v1 in KIP-79
+    IBP_0_10_1_IV2(-1, "0.10.1", "IV2"),
+
+    // introduced UpdateMetadataRequest v3 in KIP-103
+    IBP_0_10_2_IV0(-1, "0.10.2", "IV0"),
+
+    // KIP-98 (idempotent and transactional producer support)
+    IBP_0_11_0_IV0(-1, "0.11.0", "IV0"),
+
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    IBP_0_11_0_IV1(-1, "0.11.0", "IV1"),
+
+    // Introduced leader epoch fetches to the replica fetcher via KIP-101
+    IBP_0_11_0_IV2(-1, "0.11.0", "IV2"),
+
+    // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+    IBP_1_0_IV0(-1, "1.0", "IV0"),
+
+    // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+    // and KafkaStorageException for fetch requests.
+    IBP_1_1_IV0(-1, "1.1", "IV0"),
+
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+    IBP_2_0_IV0(-1, "2.0", "IV0"),
+
+    // Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+    IBP_2_0_IV1(-1, "2.0", "IV1"),
+
+    // Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+    IBP_2_1_IV0(-1, "2.1", "IV0"),
+
+    // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+    IBP_2_1_IV1(-1, "2.1", "IV1"),
+
+    // Support ZStandard Compression Codec (KIP-110)
+    IBP_2_1_IV2(-1, "2.1", "IV2"),
+
+    // Introduced broker generation (KIP-380), and
+    // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+    IBP_2_2_IV0(-1, "2.2", "IV0"),
+
+    // New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+    IBP_2_2_IV1(-1, "2.2", "IV1"),
+
+    // Introduced static membership.
+    IBP_2_3_IV0(-1, "2.3", "IV0"),
+
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+    IBP_2_3_IV1(-1, "2.3", "IV1"),
+
+    // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+    IBP_2_4_IV0(-1, "2.4", "IV0"),
+
+    // Flexible version support in inter-broker APIs
+    IBP_2_4_IV1(-1, "2.4", "IV1"),
+
+    // No new APIs, equivalent to 2.4-IV1
+    IBP_2_5_IV0(-1, "2.5", "IV0"),
+
+    // Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+    IBP_2_6_IV0(-1, "2.6", "IV0"),
+
+    // Introduced feature versioning support (KIP-584)
+    IBP_2_7_IV0(-1, "2.7", "IV0"),
+
+    // Bup Fetch protocol for Raft protocol (KIP-595)
+    IBP_2_7_IV1(-1, "2.7", "IV1"),
+
+    // Introduced AlterPartition (KIP-497)
+    IBP_2_7_IV2(-1, "2.7", "IV2"),
+
+    // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+    IBP_2_8_IV0(-1, "2.8", "IV0"),
+
+    // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+    IBP_2_8_IV1(-1, "2.8", "IV1"),
+
+    // Introduce AllocateProducerIds (KIP-730)
+    IBP_3_0_IV0(1, "3.0", "IV0"),
+
+    // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+    // Assume message format version is 3.0 (KIP-724)
+    IBP_3_0_IV1(2, "3.0", "IV1"),
+
+    // Adds topic IDs to Fetch requests/responses (KIP-516)
+    IBP_3_1_IV0(3, "3.1", "IV0"),
+
+    // Support for leader recovery for unclean leader election (KIP-704)
+    IBP_3_2_IV0(4, "3.2", "IV0");
+
+    private static final MetadataVersion[] METADATA_VERSIONS = 
MetadataVersion.values();
+    private final Optional<Short> featureLevel;
+    private final String release;
+    private final String ibpVersion;
+
+    MetadataVersion(int featureLevel, String release, String subVersion) {
+        if (featureLevel > 0) {
+            this.featureLevel = Optional.of((short) featureLevel);
+        } else {
+            this.featureLevel = Optional.empty();
+        }
+        this.release = release;
+        if (subVersion.isEmpty()) {
+            this.ibpVersion = release;
+        } else {
+            this.ibpVersion = String.format("%s-%s", release, subVersion);
+        }
+    }
+
+    public Optional<Short> featureLevel() {
+        return featureLevel;
+    }
+
+    public boolean isSaslInterBrokerHandshakeRequestEnabled() {
+        return this.isAtLeast(IBP_0_10_0_IV1);
+    }
+
+    public boolean isOffsetForLeaderEpochSupported() {
+        return this.isAtLeast(IBP_0_11_0_IV2);
+    }
+
+    public boolean isFeatureVersioningSupported() {
+        return this.isAtLeast(IBP_2_7_IV0);
+    }
+
+    public boolean isTruncationOnFetchSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isAlterIsrSupported() {
+        return this.isAtLeast(IBP_2_7_IV2);
+    }
+
+    public boolean isTopicIdsSupported() {
+        return this.isAtLeast(IBP_2_8_IV0);
+    }
+
+    public boolean isAllocateProducerIdsSupported() {
+        return this.isAtLeast(IBP_3_0_IV0);
+    }
+
+
+    public RecordVersion highestSupportedRecordVersion() {
+        if (this.isLessThan(IBP_0_10_0_IV0)) {
+            return RecordVersion.V0;
+        } else if (this.isLessThan(IBP_0_11_0_IV0)) {
+            return RecordVersion.V1;
+        } else {
+            return RecordVersion.V2;
+        }
+    }
+
+    private static final Map<String, MetadataVersion> IBP_VERSIONS;
+    static {
+        {
+            IBP_VERSIONS = new HashMap<>();
+            Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
+            for (MetadataVersion metadataVersion : MetadataVersion.values()) {

Review Comment:
   Use `METADATA_VERSIONS` instead of `MetadataVersion.values()` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to