mumrah commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r857970386


##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+    IBP_0_8_0(-1),
+    IBP_0_8_1(-1),
+    IBP_0_8_2(-1),
+    IBP_0_9_0(-1),
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.

Review Comment:
   I've always disliked how these comments were in ApiVersion. Maybe we can put 
an extra whitespace so it's obvious which version a comment is referring to?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+    IBP_0_8_0(-1),
+    IBP_0_8_1(-1),
+    IBP_0_8_2(-1),
+    IBP_0_9_0(-1),
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+    IBP_0_10_0_IV0(-1),
+    // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+    IBP_0_10_0_IV1(-1),
+    // introduced for JoinGroup protocol change in KIP-62
+    IBP_0_10_1_IV0(-1),
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    IBP_0_10_1_IV1(-1),
+    // introduced ListOffsetRequest v1 in KIP-79
+    IBP_0_10_1_IV2(-1),
+    // introduced UpdateMetadataRequest v3 in KIP-103
+    IBP_0_10_2_IV0(-1),
+    // KIP-98 (idempotent and transactional producer support)
+    IBP_0_11_0_IV0(-1),
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    IBP_0_11_0_IV1(-1),
+    // Introduced leader epoch fetches to the replica fetcher via KIP-101
+    IBP_0_11_0_IV2(-1),
+    // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+    IBP_1_0_IV0(-1),
+    // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+    // and KafkaStorageException for fetch requests.
+    IBP_1_1_IV0(-1),
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+    IBP_2_0_IV0(-1),
+    // Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+    IBP_2_0_IV1(-1),
+    // Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+    IBP_2_1_IV0(-1),
+    // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+    IBP_2_1_IV1(-1),
+    // Support ZStandard Compression Codec (KIP-110)
+    IBP_2_1_IV2(-1),
+    // Introduced broker generation (KIP-380), and
+    // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+    IBP_2_2_IV0(-1),
+    // New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+    IBP_2_2_IV1(-1),
+    // Introduced static membership.
+    IBP_2_3_IV0(-1),
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+    IBP_2_3_IV1(-1),
+    // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+    IBP_2_4_IV0(-1),
+    // Flexible version support in inter-broker APIs
+    IBP_2_4_IV1(-1),
+    // No new APIs, equivalent to 2.4-IV1
+    IBP_2_5_IV0(-1),
+    // Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+    IBP_2_6_IV0(-1),
+    // Introduced feature versioning support (KIP-584)
+    IBP_2_7_IV0(-1),
+    // Bup Fetch protocol for Raft protocol (KIP-595)
+    IBP_2_7_IV1(-1),
+    // Introduced AlterPartition (KIP-497)
+    IBP_2_7_IV2(-1),
+    // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+    IBP_2_8_IV0(-1),
+    // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+    IBP_2_8_IV1(-1),
+    // Introduce AllocateProducerIds (KIP-730)
+    IBP_3_0_IV0(1),
+    // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+    // Assume message format version is 3.0 (KIP-724)
+    IBP_3_0_IV1(2),
+    // Adds topic IDs to Fetch requests/responses (KIP-516)
+    IBP_3_1_IV0(3),
+    // Support for leader recovery for unclean leader election (KIP-704)
+    IBP_3_2_IV0(4),
+    // KRaft GA
+    IBP_3_3_IV0(5);
+
+    private final Optional<Short> metadataVersion;
+    private final String shortVersion;
+    private final String version;
+
+    MetadataVersion(int metadataVersion) {
+        if (metadataVersion > 0) {
+            this.metadataVersion = Optional.of((short) metadataVersion);
+        } else {
+            this.metadataVersion = Optional.empty();
+        }
+
+        Pattern versionPattern = Pattern.compile("^IBP_([\\d_]+)(?:IV(\\d))?");
+        Matcher matcher = versionPattern.matcher(this.name());
+        if (matcher.find()) {
+            String withoutIV = matcher.group(1);
+            // remove any trailing underscores
+            if (withoutIV.endsWith("_")) {
+                withoutIV = withoutIV.substring(0, withoutIV.length() - 1);
+            }
+            shortVersion = withoutIV.replace("_", ".");
+
+            if (matcher.group(2) != null) { // versions less than 
IBP_0_10_0_IV0 do not have IVs
+                version = String.format("%s-IV%s", shortVersion, 
matcher.group(2));
+            } else {
+                version = shortVersion;
+            }
+        } else {
+            throw new IllegalArgumentException("Metadata version: " + 
this.name() + " does not fit "
+                + "the accepted pattern.");
+        }
+    }
+
+    public Optional<Short> metadataVersion() {
+        return metadataVersion;
+    }
+
+    public boolean isAlterIsrSupported() {
+        return this.isAtLeast(IBP_2_7_IV2);
+    }
+
+    public boolean isAllocateProducerIdsSupported() {
+        return this.isAtLeast(IBP_3_0_IV0);
+    }
+
+    public boolean isTruncationOnFetchSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isOffsetForLeaderEpochSupported() {
+        return this.isAtLeast(IBP_0_11_0_IV2);
+    }
+
+    public boolean isTopicIdsSupported() {
+        return this.isAtLeast(IBP_2_8_IV0);
+    }
+
+    public boolean isFeatureVersioningSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isSaslInterBrokerHandshakeRequestEnabled() {
+        return this.isAtLeast(IBP_0_10_0_IV1);
+    }
+
+    public RecordVersion recordVersion() {
+        if (this.compareTo(IBP_0_9_0) <= 0) { // IBPs up to IBP_0_9_0 use 
Record Version V0
+            return RecordVersion.V0;
+        } else if (this.compareTo(IBP_0_10_2_IV0) <= 0) { // IBPs up to 
IBP_0_10_2_IV0 use V1
+            return RecordVersion.V1;
+        } else return RecordVersion.V2; // all greater IBPs use V2
+    }
+
+    private static final Map<String, MetadataVersion> IBP_VERSIONS;
+    static {
+        {
+            IBP_VERSIONS = new HashMap<>();
+            Pattern versionPattern = 
Pattern.compile("^IBP_([\\d_]+)(?:IV(\\d))?");
+            Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
+            for (MetadataVersion version : MetadataVersion.values()) {
+                Matcher matcher = versionPattern.matcher(version.name());
+                if (matcher.find()) {
+                    String withoutIV = matcher.group(1);
+                    // remove any trailing underscores
+                    if (withoutIV.endsWith("_")) {
+                        withoutIV = withoutIV.substring(0, withoutIV.length() 
- 1);
+                    }
+                    String shortVersion = withoutIV.replace("_", ".");
+
+                    String normalizedVersion;
+                    if (matcher.group(2) != null) {
+                        normalizedVersion = String.format("%s-IV%s", 
shortVersion, matcher.group(2));
+                    } else {
+                        normalizedVersion = shortVersion;
+                    }
+                    maxInterVersion.compute(shortVersion, (__, currentVersion) 
-> {
+                        if (currentVersion == null) {
+                            return version;
+                        } else if (version.compareTo(currentVersion) > 0) {
+                            return version;
+                        } else {
+                            return currentVersion;
+                        }
+                    });
+                    IBP_VERSIONS.put(normalizedVersion, version);
+                } else {
+                    throw new IllegalArgumentException("Metadata version: " + 
version.name() + " does not fit "
+                            + "any of the accepted patterns.");
+                }
+            }
+            IBP_VERSIONS.putAll(maxInterVersion);
+        }
+    }
+
+    public String shortVersion() {
+        return shortVersion;
+    }
+
+    public String version() {
+        return version;
+    }
+
+    /**
+     * Return an `ApiVersion` instance for `versionString`, which can be in a 
variety of formats (e.g. "0.8.0", "0.8.0.x",
+     * "0.10.0", "0.10.0-IV1"). `IllegalArgumentException` is thrown if 
`versionString` cannot be mapped to an `ApiVersion`.
+     */
+    public static MetadataVersion fromVersionString(String versionString) {
+        String[] versionSegments = versionString.split(Pattern.quote("."));
+        int numSegments = (versionString.startsWith("0.")) ? 3 : 2;
+        String key;
+        if (numSegments >= versionSegments.length) {
+            key = versionString;
+        } else {
+            key = String.join(".", Arrays.copyOfRange(versionSegments, 0, 
numSegments));
+        }
+        return Optional.ofNullable(IBP_VERSIONS.get(key)).orElseThrow(() ->
+                new IllegalArgumentException("Version " + versionString + " is 
not a valid version")
+        );
+    }
+
+    /**
+     * Return the minimum `MetadataVersion` that supports `RecordVersion`.
+     */
+    public static MetadataVersion minSupportedFor(RecordVersion recordVersion) 
{
+        switch (recordVersion) {
+            case V0:
+                return IBP_0_8_0;
+            case V1:
+                return IBP_0_10_0_IV0;
+            case V2:
+                return IBP_0_11_0_IV0;
+            default:
+                throw new IllegalArgumentException("Invalid message format 
version " + recordVersion);
+        }
+    }
+
+    public static MetadataVersion latest() {
+        MetadataVersion[] values = MetadataVersion.values();
+        return values[values.length - 1];
+    }
+
+    public static MetadataVersion stable() {

Review Comment:
   Let's drop this for now since we don't have any use cases (yet). We can add 
it back in the future.



##########
core/src/main/scala/kafka/log/LogValidator.scala:
##########
@@ -365,11 +367,11 @@ private[log] object LogValidator extends Logging {
                                                  timestampDiffMaxMs: Long,
                                                  partitionLeaderEpoch: Int,
                                                  origin: AppendOrigin,
-                                                 interBrokerProtocolVersion: 
ApiVersion,
+                                                 interBrokerProtocolVersion: 
MetadataVersion,
                                                  brokerTopicStats: 
BrokerTopicStats,
                                                  requestLocal: RequestLocal): 
ValidationAndOffsetAssignResult = {
 
-    if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < 
KAFKA_2_1_IV0)
+    if (targetCodec == ZStdCompressionCodec && 
interBrokerProtocolVersion.compareTo(IBP_2_1_IV0) < 0)

Review Comment:
   isLessThan



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+    IBP_0_8_0(-1),
+    IBP_0_8_1(-1),
+    IBP_0_8_2(-1),
+    IBP_0_9_0(-1),
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+    IBP_0_10_0_IV0(-1),
+    // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+    IBP_0_10_0_IV1(-1),
+    // introduced for JoinGroup protocol change in KIP-62
+    IBP_0_10_1_IV0(-1),
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    IBP_0_10_1_IV1(-1),
+    // introduced ListOffsetRequest v1 in KIP-79
+    IBP_0_10_1_IV2(-1),
+    // introduced UpdateMetadataRequest v3 in KIP-103
+    IBP_0_10_2_IV0(-1),
+    // KIP-98 (idempotent and transactional producer support)
+    IBP_0_11_0_IV0(-1),
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    IBP_0_11_0_IV1(-1),
+    // Introduced leader epoch fetches to the replica fetcher via KIP-101
+    IBP_0_11_0_IV2(-1),
+    // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+    IBP_1_0_IV0(-1),
+    // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+    // and KafkaStorageException for fetch requests.
+    IBP_1_1_IV0(-1),
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+    IBP_2_0_IV0(-1),
+    // Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+    IBP_2_0_IV1(-1),
+    // Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+    IBP_2_1_IV0(-1),
+    // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+    IBP_2_1_IV1(-1),
+    // Support ZStandard Compression Codec (KIP-110)
+    IBP_2_1_IV2(-1),
+    // Introduced broker generation (KIP-380), and
+    // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+    IBP_2_2_IV0(-1),
+    // New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+    IBP_2_2_IV1(-1),
+    // Introduced static membership.
+    IBP_2_3_IV0(-1),
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+    IBP_2_3_IV1(-1),
+    // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+    IBP_2_4_IV0(-1),
+    // Flexible version support in inter-broker APIs
+    IBP_2_4_IV1(-1),
+    // No new APIs, equivalent to 2.4-IV1
+    IBP_2_5_IV0(-1),
+    // Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+    IBP_2_6_IV0(-1),
+    // Introduced feature versioning support (KIP-584)
+    IBP_2_7_IV0(-1),
+    // Bup Fetch protocol for Raft protocol (KIP-595)
+    IBP_2_7_IV1(-1),
+    // Introduced AlterPartition (KIP-497)
+    IBP_2_7_IV2(-1),
+    // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+    IBP_2_8_IV0(-1),
+    // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+    IBP_2_8_IV1(-1),
+    // Introduce AllocateProducerIds (KIP-730)
+    IBP_3_0_IV0(1),
+    // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+    // Assume message format version is 3.0 (KIP-724)
+    IBP_3_0_IV1(2),
+    // Adds topic IDs to Fetch requests/responses (KIP-516)
+    IBP_3_1_IV0(3),
+    // Support for leader recovery for unclean leader election (KIP-704)
+    IBP_3_2_IV0(4),
+    // KRaft GA
+    IBP_3_3_IV0(5);
+
+    private final Optional<Short> metadataVersion;
+    private final String shortVersion;
+    private final String version;
+
+    MetadataVersion(int metadataVersion) {
+        if (metadataVersion > 0) {
+            this.metadataVersion = Optional.of((short) metadataVersion);
+        } else {
+            this.metadataVersion = Optional.empty();
+        }
+
+        Pattern versionPattern = Pattern.compile("^IBP_([\\d_]+)(?:IV(\\d))?");
+        Matcher matcher = versionPattern.matcher(this.name());
+        if (matcher.find()) {
+            String withoutIV = matcher.group(1);
+            // remove any trailing underscores
+            if (withoutIV.endsWith("_")) {
+                withoutIV = withoutIV.substring(0, withoutIV.length() - 1);
+            }
+            shortVersion = withoutIV.replace("_", ".");
+
+            if (matcher.group(2) != null) { // versions less than 
IBP_0_10_0_IV0 do not have IVs
+                version = String.format("%s-IV%s", shortVersion, 
matcher.group(2));
+            } else {
+                version = shortVersion;
+            }
+        } else {
+            throw new IllegalArgumentException("Metadata version: " + 
this.name() + " does not fit "
+                + "the accepted pattern.");
+        }
+    }
+
+    public Optional<Short> metadataVersion() {
+        return metadataVersion;
+    }
+
+    public boolean isAlterIsrSupported() {
+        return this.isAtLeast(IBP_2_7_IV2);
+    }
+
+    public boolean isAllocateProducerIdsSupported() {
+        return this.isAtLeast(IBP_3_0_IV0);
+    }
+
+    public boolean isTruncationOnFetchSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isOffsetForLeaderEpochSupported() {
+        return this.isAtLeast(IBP_0_11_0_IV2);
+    }
+
+    public boolean isTopicIdsSupported() {
+        return this.isAtLeast(IBP_2_8_IV0);
+    }
+
+    public boolean isFeatureVersioningSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isSaslInterBrokerHandshakeRequestEnabled() {
+        return this.isAtLeast(IBP_0_10_0_IV1);
+    }
+
+    public RecordVersion recordVersion() {
+        if (this.compareTo(IBP_0_9_0) <= 0) { // IBPs up to IBP_0_9_0 use 
Record Version V0
+            return RecordVersion.V0;
+        } else if (this.compareTo(IBP_0_10_2_IV0) <= 0) { // IBPs up to 
IBP_0_10_2_IV0 use V1
+            return RecordVersion.V1;
+        } else return RecordVersion.V2; // all greater IBPs use V2
+    }

Review Comment:
   nit: can you move the comments inside the if/else rather than trailing off 
the end?



##########
core/src/main/scala/kafka/server/AlterIsrManager.scala:
##########
@@ -232,7 +231,7 @@ class DefaultAlterIsrManager(
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setPartitionEpoch(item.leaderAndIsr.partitionEpoch)
 
-        if (ibpVersion >= KAFKA_3_2_IV0) {
+        if (ibpVersion.compareTo(MetadataVersion.IBP_3_2_IV0) >= 0) {

Review Comment:
   Use `isAtLeast` helper



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -627,9 +627,9 @@ class KafkaServer(
 
               // send the controlled shutdown request
               val controlledShutdownApiVersion: Short =
-                if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0
-                else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1
-                else if (config.interBrokerProtocolVersion < KAFKA_2_4_IV1) 2
+                if (config.interBrokerProtocolVersion.compareTo(IBP_0_9_0) < 
0) 0
+                else if 
(config.interBrokerProtocolVersion.compareTo(IBP_2_2_IV0) < 0) 1
+                else if 
(config.interBrokerProtocolVersion.compareTo(IBP_2_4_IV1) < 0) 2

Review Comment:
   Use isLessThan



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+    IBP_0_8_0(-1),
+    IBP_0_8_1(-1),
+    IBP_0_8_2(-1),
+    IBP_0_9_0(-1),
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+    IBP_0_10_0_IV0(-1),
+    // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+    IBP_0_10_0_IV1(-1),
+    // introduced for JoinGroup protocol change in KIP-62
+    IBP_0_10_1_IV0(-1),
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    IBP_0_10_1_IV1(-1),
+    // introduced ListOffsetRequest v1 in KIP-79
+    IBP_0_10_1_IV2(-1),
+    // introduced UpdateMetadataRequest v3 in KIP-103
+    IBP_0_10_2_IV0(-1),
+    // KIP-98 (idempotent and transactional producer support)
+    IBP_0_11_0_IV0(-1),
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    IBP_0_11_0_IV1(-1),
+    // Introduced leader epoch fetches to the replica fetcher via KIP-101
+    IBP_0_11_0_IV2(-1),
+    // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+    IBP_1_0_IV0(-1),
+    // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+    // and KafkaStorageException for fetch requests.
+    IBP_1_1_IV0(-1),
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+    IBP_2_0_IV0(-1),
+    // Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+    IBP_2_0_IV1(-1),
+    // Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+    IBP_2_1_IV0(-1),
+    // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+    IBP_2_1_IV1(-1),
+    // Support ZStandard Compression Codec (KIP-110)
+    IBP_2_1_IV2(-1),
+    // Introduced broker generation (KIP-380), and
+    // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+    IBP_2_2_IV0(-1),
+    // New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+    IBP_2_2_IV1(-1),
+    // Introduced static membership.
+    IBP_2_3_IV0(-1),
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+    IBP_2_3_IV1(-1),
+    // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+    IBP_2_4_IV0(-1),
+    // Flexible version support in inter-broker APIs
+    IBP_2_4_IV1(-1),
+    // No new APIs, equivalent to 2.4-IV1
+    IBP_2_5_IV0(-1),
+    // Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+    IBP_2_6_IV0(-1),
+    // Introduced feature versioning support (KIP-584)
+    IBP_2_7_IV0(-1),
+    // Bup Fetch protocol for Raft protocol (KIP-595)
+    IBP_2_7_IV1(-1),
+    // Introduced AlterPartition (KIP-497)
+    IBP_2_7_IV2(-1),
+    // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+    IBP_2_8_IV0(-1),
+    // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+    IBP_2_8_IV1(-1),
+    // Introduce AllocateProducerIds (KIP-730)
+    IBP_3_0_IV0(1),
+    // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+    // Assume message format version is 3.0 (KIP-724)
+    IBP_3_0_IV1(2),
+    // Adds topic IDs to Fetch requests/responses (KIP-516)
+    IBP_3_1_IV0(3),
+    // Support for leader recovery for unclean leader election (KIP-704)
+    IBP_3_2_IV0(4),
+    // KRaft GA
+    IBP_3_3_IV0(5);
+
+    private final Optional<Short> metadataVersion;
+    private final String shortVersion;
+    private final String version;
+
+    MetadataVersion(int metadataVersion) {
+        if (metadataVersion > 0) {
+            this.metadataVersion = Optional.of((short) metadataVersion);
+        } else {
+            this.metadataVersion = Optional.empty();
+        }
+
+        Pattern versionPattern = Pattern.compile("^IBP_([\\d_]+)(?:IV(\\d))?");
+        Matcher matcher = versionPattern.matcher(this.name());
+        if (matcher.find()) {
+            String withoutIV = matcher.group(1);
+            // remove any trailing underscores
+            if (withoutIV.endsWith("_")) {
+                withoutIV = withoutIV.substring(0, withoutIV.length() - 1);
+            }
+            shortVersion = withoutIV.replace("_", ".");
+
+            if (matcher.group(2) != null) { // versions less than 
IBP_0_10_0_IV0 do not have IVs
+                version = String.format("%s-IV%s", shortVersion, 
matcher.group(2));
+            } else {
+                version = shortVersion;
+            }
+        } else {
+            throw new IllegalArgumentException("Metadata version: " + 
this.name() + " does not fit "
+                + "the accepted pattern.");
+        }
+    }
+
+    public Optional<Short> metadataVersion() {
+        return metadataVersion;
+    }
+
+    public boolean isAlterIsrSupported() {
+        return this.isAtLeast(IBP_2_7_IV2);
+    }
+
+    public boolean isAllocateProducerIdsSupported() {
+        return this.isAtLeast(IBP_3_0_IV0);
+    }
+
+    public boolean isTruncationOnFetchSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isOffsetForLeaderEpochSupported() {
+        return this.isAtLeast(IBP_0_11_0_IV2);
+    }
+
+    public boolean isTopicIdsSupported() {
+        return this.isAtLeast(IBP_2_8_IV0);
+    }
+
+    public boolean isFeatureVersioningSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);

Review Comment:
   This should be `IBP_2_7_IV0`



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+    IBP_0_8_0(-1),
+    IBP_0_8_1(-1),
+    IBP_0_8_2(-1),
+    IBP_0_9_0(-1),
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+    IBP_0_10_0_IV0(-1),
+    // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+    IBP_0_10_0_IV1(-1),
+    // introduced for JoinGroup protocol change in KIP-62
+    IBP_0_10_1_IV0(-1),
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    IBP_0_10_1_IV1(-1),
+    // introduced ListOffsetRequest v1 in KIP-79
+    IBP_0_10_1_IV2(-1),
+    // introduced UpdateMetadataRequest v3 in KIP-103
+    IBP_0_10_2_IV0(-1),
+    // KIP-98 (idempotent and transactional producer support)
+    IBP_0_11_0_IV0(-1),
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    IBP_0_11_0_IV1(-1),
+    // Introduced leader epoch fetches to the replica fetcher via KIP-101
+    IBP_0_11_0_IV2(-1),
+    // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+    IBP_1_0_IV0(-1),
+    // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+    // and KafkaStorageException for fetch requests.
+    IBP_1_1_IV0(-1),
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+    IBP_2_0_IV0(-1),
+    // Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+    IBP_2_0_IV1(-1),
+    // Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+    IBP_2_1_IV0(-1),
+    // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+    IBP_2_1_IV1(-1),
+    // Support ZStandard Compression Codec (KIP-110)
+    IBP_2_1_IV2(-1),
+    // Introduced broker generation (KIP-380), and
+    // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+    IBP_2_2_IV0(-1),
+    // New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+    IBP_2_2_IV1(-1),
+    // Introduced static membership.
+    IBP_2_3_IV0(-1),
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+    IBP_2_3_IV1(-1),
+    // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+    IBP_2_4_IV0(-1),
+    // Flexible version support in inter-broker APIs
+    IBP_2_4_IV1(-1),
+    // No new APIs, equivalent to 2.4-IV1
+    IBP_2_5_IV0(-1),
+    // Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+    IBP_2_6_IV0(-1),
+    // Introduced feature versioning support (KIP-584)
+    IBP_2_7_IV0(-1),
+    // Bup Fetch protocol for Raft protocol (KIP-595)
+    IBP_2_7_IV1(-1),
+    // Introduced AlterPartition (KIP-497)
+    IBP_2_7_IV2(-1),
+    // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+    IBP_2_8_IV0(-1),
+    // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+    IBP_2_8_IV1(-1),
+    // Introduce AllocateProducerIds (KIP-730)
+    IBP_3_0_IV0(1),
+    // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+    // Assume message format version is 3.0 (KIP-724)
+    IBP_3_0_IV1(2),
+    // Adds topic IDs to Fetch requests/responses (KIP-516)
+    IBP_3_1_IV0(3),
+    // Support for leader recovery for unclean leader election (KIP-704)
+    IBP_3_2_IV0(4),
+    // KRaft GA
+    IBP_3_3_IV0(5);
+
+    private final Optional<Short> metadataVersion;
+    private final String shortVersion;
+    private final String version;
+
+    MetadataVersion(int metadataVersion) {
+        if (metadataVersion > 0) {
+            this.metadataVersion = Optional.of((short) metadataVersion);
+        } else {
+            this.metadataVersion = Optional.empty();
+        }
+
+        Pattern versionPattern = Pattern.compile("^IBP_([\\d_]+)(?:IV(\\d))?");
+        Matcher matcher = versionPattern.matcher(this.name());
+        if (matcher.find()) {
+            String withoutIV = matcher.group(1);
+            // remove any trailing underscores
+            if (withoutIV.endsWith("_")) {
+                withoutIV = withoutIV.substring(0, withoutIV.length() - 1);
+            }
+            shortVersion = withoutIV.replace("_", ".");
+
+            if (matcher.group(2) != null) { // versions less than 
IBP_0_10_0_IV0 do not have IVs
+                version = String.format("%s-IV%s", shortVersion, 
matcher.group(2));
+            } else {
+                version = shortVersion;
+            }
+        } else {
+            throw new IllegalArgumentException("Metadata version: " + 
this.name() + " does not fit "
+                + "the accepted pattern.");
+        }
+    }
+
+    public Optional<Short> metadataVersion() {
+        return metadataVersion;
+    }
+
+    public boolean isAlterIsrSupported() {
+        return this.isAtLeast(IBP_2_7_IV2);
+    }
+
+    public boolean isAllocateProducerIdsSupported() {
+        return this.isAtLeast(IBP_3_0_IV0);
+    }
+
+    public boolean isTruncationOnFetchSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isOffsetForLeaderEpochSupported() {
+        return this.isAtLeast(IBP_0_11_0_IV2);
+    }
+
+    public boolean isTopicIdsSupported() {
+        return this.isAtLeast(IBP_2_8_IV0);
+    }
+
+    public boolean isFeatureVersioningSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isSaslInterBrokerHandshakeRequestEnabled() {
+        return this.isAtLeast(IBP_0_10_0_IV1);
+    }
+
+    public RecordVersion recordVersion() {
+        if (this.compareTo(IBP_0_9_0) <= 0) { // IBPs up to IBP_0_9_0 use 
Record Version V0
+            return RecordVersion.V0;
+        } else if (this.compareTo(IBP_0_10_2_IV0) <= 0) { // IBPs up to 
IBP_0_10_2_IV0 use V1
+            return RecordVersion.V1;
+        } else return RecordVersion.V2; // all greater IBPs use V2
+    }
+
+    private static final Map<String, MetadataVersion> IBP_VERSIONS;
+    static {
+        {
+            IBP_VERSIONS = new HashMap<>();
+            Pattern versionPattern = 
Pattern.compile("^IBP_([\\d_]+)(?:IV(\\d))?");
+            Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
+            for (MetadataVersion version : MetadataVersion.values()) {
+                Matcher matcher = versionPattern.matcher(version.name());
+                if (matcher.find()) {
+                    String withoutIV = matcher.group(1);
+                    // remove any trailing underscores
+                    if (withoutIV.endsWith("_")) {
+                        withoutIV = withoutIV.substring(0, withoutIV.length() 
- 1);
+                    }
+                    String shortVersion = withoutIV.replace("_", ".");
+
+                    String normalizedVersion;
+                    if (matcher.group(2) != null) {
+                        normalizedVersion = String.format("%s-IV%s", 
shortVersion, matcher.group(2));
+                    } else {
+                        normalizedVersion = shortVersion;
+                    }
+                    maxInterVersion.compute(shortVersion, (__, currentVersion) 
-> {
+                        if (currentVersion == null) {
+                            return version;
+                        } else if (version.compareTo(currentVersion) > 0) {
+                            return version;
+                        } else {
+                            return currentVersion;
+                        }
+                    });
+                    IBP_VERSIONS.put(normalizedVersion, version);
+                } else {
+                    throw new IllegalArgumentException("Metadata version: " + 
version.name() + " does not fit "
+                            + "any of the accepted patterns.");
+                }
+            }
+            IBP_VERSIONS.putAll(maxInterVersion);
+        }
+    }
+
+    public String shortVersion() {
+        return shortVersion;
+    }
+
+    public String version() {
+        return version;
+    }
+
+    /**
+     * Return an `ApiVersion` instance for `versionString`, which can be in a 
variety of formats (e.g. "0.8.0", "0.8.0.x",
+     * "0.10.0", "0.10.0-IV1"). `IllegalArgumentException` is thrown if 
`versionString` cannot be mapped to an `ApiVersion`.
+     */
+    public static MetadataVersion fromVersionString(String versionString) {
+        String[] versionSegments = versionString.split(Pattern.quote("."));
+        int numSegments = (versionString.startsWith("0.")) ? 3 : 2;
+        String key;
+        if (numSegments >= versionSegments.length) {
+            key = versionString;
+        } else {
+            key = String.join(".", Arrays.copyOfRange(versionSegments, 0, 
numSegments));
+        }
+        return Optional.ofNullable(IBP_VERSIONS.get(key)).orElseThrow(() ->
+                new IllegalArgumentException("Version " + versionString + " is 
not a valid version")

Review Comment:
   nit: too much indentation



##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.HashSet;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType.ListenerType;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import 
org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
+import 
org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.utils.Utils;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_9_0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_1_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_1_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_5_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import org.junit.jupiter.api.Test;
+
+class MetadataVersionTest {

Review Comment:
   Can we also include some negative test cases? Since we re-wrote the parsing 
logic for version strings it would be good to have some additional coverage 
there. E.g., `MetadataVersion.fromString("0.3.2-IV1")` should throw an 
exception, etc.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1074,14 +1075,14 @@ object GroupMetadataManager {
    * Generates the payload for offset commit message from given offset and 
metadata
    *
    * @param offsetAndMetadata consumer's current offset and metadata
-   * @param apiVersion the api version
+   * @param metadataVersion the api version
    * @return payload for offset commit message
    */
   def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
-                        apiVersion: ApiVersion): Array[Byte] = {
+                        metadataVersion: MetadataVersion): Array[Byte] = {
     val version =
-      if (apiVersion < KAFKA_2_1_IV0 || 
offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
-      else if (apiVersion < KAFKA_2_1_IV1) 2.toShort
+      if (metadataVersion.compareTo(IBP_2_1_IV0) < 0 || 
offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
+      else if (metadataVersion.compareTo(IBP_2_1_IV1) < 0) 2.toShort

Review Comment:
   These can use the `isLessThan` helper



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1099,17 +1100,17 @@ object GroupMetadataManager {
    *
    * @param groupMetadata current group metadata
    * @param assignment the assignment for the rebalancing generation
-   * @param apiVersion the api version
+   * @param metadataVersion the api version
    * @return payload for offset commit message
    */
   def groupMetadataValue(groupMetadata: GroupMetadata,
                          assignment: Map[String, Array[Byte]],
-                         apiVersion: ApiVersion): Array[Byte] = {
+                         metadataVersion: MetadataVersion): Array[Byte] = {
 
     val version =
-      if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
-      else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
-      else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      if (metadataVersion.compareTo(IBP_0_10_1_IV0) < 0) 0.toShort
+      else if (metadataVersion.compareTo(IBP_2_1_IV0) < 0) 1.toShort
+      else if (metadataVersion.compareTo(IBP_2_3_IV0) < 0) 2.toShort

Review Comment:
   Use isLessThan



##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.HashSet;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType.ListenerType;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import 
org.apache.kafka.common.message.ApiVersionsResponseData.FinalizedFeatureKey;
+import 
org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.utils.Utils;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_0_9_0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_1_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_1_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_5_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;

Review Comment:
   Will our checkstyle let us do a wildcard static import here? Might be nicer 
than 30+ imports



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1749,18 +1751,18 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
   def uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
 
-  // We keep the user-provided String as `ApiVersion.apply` can choose a 
slightly different version (eg if `0.10.0`
+  // We keep the user-provided String as `MetadataVersion.apply` can choose a 
slightly different version (eg if `0.10.0`

Review Comment:
   Same fromVersionString comment



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -307,7 +309,7 @@ class ReplicaManager(val config: KafkaConfig,
     // If inter-broker protocol (IBP) < 1.0, the controller will send 
LeaderAndIsrRequest V0 which does not include isNew field.
     // In this case, the broker receiving the request cannot determine whether 
it is safe to create a partition if a log directory has failed.
     // Thus, we choose to halt the broker on any log directory failure if IBP 
< 1.0
-    val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
+    val haltBrokerOnFailure = 
config.interBrokerProtocolVersion.compareTo(IBP_1_0_IV0) < 0

Review Comment:
   Use isLessThan



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+    IBP_0_8_0(-1),
+    IBP_0_8_1(-1),
+    IBP_0_8_2(-1),
+    IBP_0_9_0(-1),
+    // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+    IBP_0_10_0_IV0(-1),
+    // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+    IBP_0_10_0_IV1(-1),
+    // introduced for JoinGroup protocol change in KIP-62
+    IBP_0_10_1_IV0(-1),
+    // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+    IBP_0_10_1_IV1(-1),
+    // introduced ListOffsetRequest v1 in KIP-79
+    IBP_0_10_1_IV2(-1),
+    // introduced UpdateMetadataRequest v3 in KIP-103
+    IBP_0_10_2_IV0(-1),
+    // KIP-98 (idempotent and transactional producer support)
+    IBP_0_11_0_IV0(-1),
+    // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+    IBP_0_11_0_IV1(-1),
+    // Introduced leader epoch fetches to the replica fetcher via KIP-101
+    IBP_0_11_0_IV2(-1),
+    // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+    IBP_1_0_IV0(-1),
+    // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+    // and KafkaStorageException for fetch requests.
+    IBP_1_1_IV0(-1),
+    // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+    IBP_2_0_IV0(-1),
+    // Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+    IBP_2_0_IV1(-1),
+    // Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+    IBP_2_1_IV0(-1),
+    // New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+    IBP_2_1_IV1(-1),
+    // Support ZStandard Compression Codec (KIP-110)
+    IBP_2_1_IV2(-1),
+    // Introduced broker generation (KIP-380), and
+    // LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+    IBP_2_2_IV0(-1),
+    // New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+    IBP_2_2_IV1(-1),
+    // Introduced static membership.
+    IBP_2_3_IV0(-1),
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+    IBP_2_3_IV1(-1),
+    // Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+    IBP_2_4_IV0(-1),
+    // Flexible version support in inter-broker APIs
+    IBP_2_4_IV1(-1),
+    // No new APIs, equivalent to 2.4-IV1
+    IBP_2_5_IV0(-1),
+    // Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+    IBP_2_6_IV0(-1),
+    // Introduced feature versioning support (KIP-584)
+    IBP_2_7_IV0(-1),
+    // Bup Fetch protocol for Raft protocol (KIP-595)
+    IBP_2_7_IV1(-1),
+    // Introduced AlterPartition (KIP-497)
+    IBP_2_7_IV2(-1),
+    // Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+    IBP_2_8_IV0(-1),
+    // Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+    IBP_2_8_IV1(-1),
+    // Introduce AllocateProducerIds (KIP-730)
+    IBP_3_0_IV0(1),
+    // Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+    // Assume message format version is 3.0 (KIP-724)
+    IBP_3_0_IV1(2),
+    // Adds topic IDs to Fetch requests/responses (KIP-516)
+    IBP_3_1_IV0(3),
+    // Support for leader recovery for unclean leader election (KIP-704)
+    IBP_3_2_IV0(4),
+    // KRaft GA
+    IBP_3_3_IV0(5);
+
+    private final Optional<Short> metadataVersion;
+    private final String shortVersion;
+    private final String version;
+
+    MetadataVersion(int metadataVersion) {
+        if (metadataVersion > 0) {
+            this.metadataVersion = Optional.of((short) metadataVersion);
+        } else {
+            this.metadataVersion = Optional.empty();
+        }
+
+        Pattern versionPattern = Pattern.compile("^IBP_([\\d_]+)(?:IV(\\d))?");
+        Matcher matcher = versionPattern.matcher(this.name());
+        if (matcher.find()) {
+            String withoutIV = matcher.group(1);
+            // remove any trailing underscores
+            if (withoutIV.endsWith("_")) {
+                withoutIV = withoutIV.substring(0, withoutIV.length() - 1);
+            }
+            shortVersion = withoutIV.replace("_", ".");
+
+            if (matcher.group(2) != null) { // versions less than 
IBP_0_10_0_IV0 do not have IVs
+                version = String.format("%s-IV%s", shortVersion, 
matcher.group(2));
+            } else {
+                version = shortVersion;
+            }
+        } else {
+            throw new IllegalArgumentException("Metadata version: " + 
this.name() + " does not fit "
+                + "the accepted pattern.");
+        }
+    }
+
+    public Optional<Short> metadataVersion() {
+        return metadataVersion;
+    }
+
+    public boolean isAlterIsrSupported() {
+        return this.isAtLeast(IBP_2_7_IV2);
+    }
+
+    public boolean isAllocateProducerIdsSupported() {
+        return this.isAtLeast(IBP_3_0_IV0);
+    }
+
+    public boolean isTruncationOnFetchSupported() {
+        return this.isAtLeast(IBP_2_7_IV1);
+    }
+
+    public boolean isOffsetForLeaderEpochSupported() {
+        return this.isAtLeast(IBP_0_11_0_IV2);
+    }
+
+    public boolean isTopicIdsSupported() {
+        return this.isAtLeast(IBP_2_8_IV0);
+    }
+
+    public boolean isFeatureVersioningSupported() {

Review Comment:
   Let's order these by the version they are checking



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -1649,31 +1651,31 @@ class KafkaApisTest {
   @Test
   def 
shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported():
 Unit = {
     assertThrows(classOf[UnsupportedVersionException],
-      () => 
createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null, 
RequestLocal.withThreadConfinedCaching))
+      () => 
createKafkaApis(MetadataVersion.IBP_0_10_2_IV0).handleAddOffsetsToTxnRequest(null,
 RequestLocal.withThreadConfinedCaching))

Review Comment:
   nit: don't need the `MetadataVersion.` here since the enum value is directly 
imported



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1712,7 +1714,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)
   def logPreAllocateEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.LogPreAllocateProp)
 
-  // We keep the user-provided String as `ApiVersion.apply` can choose a 
slightly different version (eg if `0.10.0`
+  // We keep the user-provided String as `MetadataVersion.apply` can choose a 
slightly different version (eg if `0.10.0`

Review Comment:
   We don't have an `apply` method anymore. Need to reference the new 
fromVersionString we added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to