This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69659b70fca KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 
and add new StaleMemberEpochException error (#14046)
69659b70fca is described below

commit 69659b70fca10349d8c799d29e9115bb5ced64db
Author: David Jacot <[email protected]>
AuthorDate: Fri Jul 21 20:08:06 2023 +0200

    KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new 
StaleMemberEpochException error (#14046)
    
    This patch does a few things:
    1) It introduces version 9 of the OffsetCommit API. This new version has no 
schema changes but it can return a StaleMemberEpochException if the new 
consumer group protocol is used. Note the use of `"latestVersionUnstable": 
true` in the request schema. This means that this new version is not available 
yet unless activated.
    2) It renames the `generationId` field in the request to 
`GenerationIdOrMemberEpoch`. This is backward compatible change.
    3) It introduces the new StaleMemberEpochException error.
    4) It does a minor refactoring in OffsetCommitRequest class.
    
    Reviewers: Jeff Kim <[email protected]>, David Arthur 
<[email protected]>, Justine Olshan <[email protected]>
---
 .../consumer/internals/CommitRequestManager.java   |  2 +-
 .../consumer/internals/ConsumerCoordinator.java    |  2 +-
 .../common/errors/StaleMemberEpochException.java   | 32 ++++++++++++++
 .../org/apache/kafka/common/protocol/Errors.java   |  4 +-
 .../kafka/common/requests/AbstractRequest.java     |  9 +++-
 .../requests/ConsumerGroupHeartbeatRequest.java    |  6 ++-
 .../kafka/common/requests/OffsetCommitRequest.java | 49 +++++++++++-----------
 .../common/message/OffsetCommitRequest.json        | 12 ++++--
 .../common/message/OffsetCommitResponse.json       | 15 ++++++-
 .../internals/ConsumerCoordinatorTest.java         |  2 +-
 .../apache/kafka/common/message/MessageTest.java   |  4 +-
 .../common/requests/OffsetCommitRequestTest.java   | 42 ++++++++++---------
 .../kafka/common/requests/RequestResponseTest.java |  2 +-
 .../group/GroupCoordinatorAdapter.scala            |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  3 +-
 .../group/GroupCoordinatorAdapterTest.scala        |  4 +-
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 15 ++++---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 10 ++---
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  4 +-
 20 files changed, 146 insertions(+), 75 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 962b1e6cba9..82959d26ce3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -215,7 +215,7 @@ public class CommitRequestManager implements RequestManager 
{
             OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(
                     new OffsetCommitRequestData()
                             .setGroupId(this.groupId)
-                            .setGenerationId(generation.generationId)
+                            
.setGenerationIdOrMemberEpoch(generation.generationId)
                             .setMemberId(generation.memberId)
                             .setGroupInstanceId(groupInstanceId)
                             .setTopics(new 
ArrayList<>(requestTopicDataMap.values())));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bcd4b377881..82125be1705 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1361,7 +1361,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
                 new OffsetCommitRequestData()
                         .setGroupId(this.rebalanceConfig.groupId)
-                        .setGenerationId(generation.generationId)
+                        .setGenerationIdOrMemberEpoch(generation.generationId)
                         .setMemberId(generation.memberId)
                         .setGroupInstanceId(groupInstanceId)
                         .setTopics(new 
ArrayList<>(requestTopicDataMap.values()))
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java
new file mode 100644
index 00000000000..7b373c5b5c1
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The StaleMemberEpochException is used in the context of the new
+ * consumer group protocol (KIP-848). This error is returned in the
+ * OffsetCommit/Fetch APIs when the member epoch received does not
+ * match the current member epoch.
+ */
[email protected]
+public class StaleMemberEpochException extends ApiException {
+    public StaleMemberEpochException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index b5ea650b166..1ccdcd0627c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -108,6 +108,7 @@ import 
org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.SnapshotNotFoundException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -378,7 +379,8 @@ public enum Errors {
     OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to 
tiered storage.", OffsetMovedToTieredStorageException::new),
     FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group 
coordinator. The member must abandon all its partitions and rejoin.", 
FencedMemberEpochException::new),
     UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another 
member in the consumer group. That member must leave first.", 
UnreleasedInstanceIdException::new),
-    UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not 
supported by the consumer group.", UnsupportedAssignorException::new);
+    UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not 
supported by the consumer group.", UnsupportedAssignorException::new),
+    STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry 
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", 
StaleMemberEpochException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 1988347a974..64f1c2e4a2f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -37,8 +37,15 @@ public abstract class AbstractRequest implements 
AbstractRequestResponse {
         /**
          * Construct a new builder which allows any supported version
          */
+        public Builder(ApiKeys apiKey, boolean enableUnstableLastVersion) {
+            this(apiKey, apiKey.oldestVersion(), 
apiKey.latestVersion(enableUnstableLastVersion));
+        }
+
+        /**
+         * Construct a new builder which allows any supported and released 
version
+         */
         public Builder(ApiKeys apiKey) {
-            this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
+            this(apiKey, false);
         }
 
         /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index 215e18ea2de..6e42111670a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -30,7 +30,11 @@ public class ConsumerGroupHeartbeatRequest extends 
AbstractRequest {
         private final ConsumerGroupHeartbeatRequestData data;
 
         public Builder(ConsumerGroupHeartbeatRequestData data) {
-            super(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
+            this(data, false);
+        }
+
+        public Builder(ConsumerGroupHeartbeatRequestData data, boolean 
enableUnstableLastVersion) {
+            super(ApiKeys.CONSUMER_GROUP_HEARTBEAT, enableUnstableLastVersion);
             this.data = data;
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 8fc884e6614..0e33ceb7085 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -28,10 +28,9 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class OffsetCommitRequest extends AbstractRequest {
     // default values for the current version
@@ -89,33 +88,29 @@ public class OffsetCommitRequest extends AbstractRequest {
         return offsets;
     }
 
-    public static List<OffsetCommitResponseTopic> getErrorResponseTopics(
-            List<OffsetCommitRequestTopic> requestTopics,
-            Errors e) {
-        List<OffsetCommitResponseTopic> responseTopicData = new ArrayList<>();
-        for (OffsetCommitRequestTopic entry : requestTopics) {
-            List<OffsetCommitResponsePartition> responsePartitions =
-                    new ArrayList<>();
-            for (OffsetCommitRequestData.OffsetCommitRequestPartition 
requestPartition : entry.partitions()) {
-                responsePartitions.add(new OffsetCommitResponsePartition()
-                                           
.setPartitionIndex(requestPartition.partitionIndex())
-                                           .setErrorCode(e.code()));
-            }
-            responseTopicData.add(new OffsetCommitResponseTopic()
-                    .setName(entry.name())
-                    .setPartitions(responsePartitions)
-            );
-        }
-        return responseTopicData;
+    public static OffsetCommitResponseData getErrorResponse(
+        OffsetCommitRequestData request,
+        Errors error
+    ) {
+        OffsetCommitResponseData response = new OffsetCommitResponseData();
+        request.topics().forEach(topic -> {
+            OffsetCommitResponseTopic responseTopic = new 
OffsetCommitResponseTopic()
+                .setName(topic.name());
+            response.topics().add(responseTopic);
+
+            topic.partitions().forEach(partition -> {
+                responseTopic.partitions().add(new 
OffsetCommitResponsePartition()
+                    .setPartitionIndex(partition.partitionIndex())
+                    .setErrorCode(error.code()));
+            });
+        });
+        return response;
     }
 
     @Override
     public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable 
e) {
-        List<OffsetCommitResponseTopic>
-                responseTopicData = getErrorResponseTopics(data.topics(), 
Errors.forException(e));
-        return new OffsetCommitResponse(new OffsetCommitResponseData()
-                .setTopics(responseTopicData)
-                .setThrottleTimeMs(throttleTimeMs));
+        return new OffsetCommitResponse(getErrorResponse(data, 
Errors.forException(e))
+            .setThrottleTimeMs(throttleTimeMs));
     }
 
     @Override
@@ -126,4 +121,8 @@ public class OffsetCommitRequest extends AbstractRequest {
     public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
         return new OffsetCommitRequest(new OffsetCommitRequestData(new 
ByteBufferAccessor(buffer), version), version);
     }
+
+    public static Optional<String> groupInstanceId(OffsetCommitRequestData 
request) {
+        return Optional.ofNullable(request.groupInstanceId());
+    }
 }
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json 
b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index cf112e1ed72..c11b56c9544 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -31,13 +31,19 @@
   // version 7 adds a new field called groupInstanceId to indicate member 
identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The
+  // request is the same as version 8.
+  // Version 9 is added as part of KIP-848 and is still under development. 
Hence, the last version of the
+  // API is not exposed by default by brokers unless explicitly enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0-9",
   "flexibleVersions": "8+",
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The unique group identifier." },
-    { "name": "GenerationId", "type": "int32", "versions": "1+", "default": 
"-1", "ignorable": true,
-      "about": "The generation of the group." },
+    { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", 
"default": "-1", "ignorable": true,
+      "about": "The generation of the group if using the generic group 
protocol or the member epoch if using the consumer protocol." },
     { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": 
true,
       "about": "The member ID assigned by the group coordinator." },
     { "name": "GroupInstanceId", "type": "string", "versions": "7+",
diff --git 
a/clients/src/main/resources/common/message/OffsetCommitResponse.json 
b/clients/src/main/resources/common/message/OffsetCommitResponse.json
index 3d547794cfb..797c7bfe515 100644
--- a/clients/src/main/resources/common/message/OffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json
@@ -28,8 +28,21 @@
   // Version 7 offsetCommitRequest supports a new field called groupInstanceId 
to indicate member identity across restarts.
   //
   // Version 8 is the first flexible version.
-  "validVersions": "0-8",
+  //
+  // Version 9 is the first version that can be used with the new consumer 
group protocol (KIP-848). The response is
+  // the same as version 8 but can return STALE_MEMBER_EPOCH when the new 
consumer group protocol is used.
+  "validVersions": "0-9",
   "flexibleVersions": "8+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED (version 0+)
+  // - NOT_COORDINATOR (version 0+)
+  // - COORDINATOR_NOT_AVAILABLE (version 0+)
+  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+  // - ILLEGAL_GENERATION (version 1+)
+  // - UNKNOWN_MEMBER_ID (version 1+)
+  // - INVALID_COMMIT_OFFSET_SIZE (version 0+)
+  // - FENCED_MEMBER_EPOCH (version 7+)
+  // - STALE_MEMBER_EPOCH (version 9+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 03493a12290..fdf6cf1d73b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2575,7 +2575,7 @@ public abstract class ConsumerCoordinatorTest {
         client.prepareResponse(body -> {
             OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
             return 
commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
-                    commitRequest.data().generationId() == 
OffsetCommitRequest.DEFAULT_GENERATION_ID;
+                    commitRequest.data().generationIdOrMemberEpoch() == 
OffsetCommitRequest.DEFAULT_GENERATION_ID;
         }, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
 
         AtomicBoolean success = new AtomicBoolean(false);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java 
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 5762e84f60b..478bfa0668d 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -276,7 +276,7 @@ public final class MessageTest {
                 .setGroupId("groupId")
                 .setMemberId(memberId)
                 .setTopics(new ArrayList<>())
-                .setGenerationId(15);
+                .setGenerationIdOrMemberEpoch(15);
         testAllMessageRoundTripsFromVersion((short) 1, request.get());
         testAllMessageRoundTripsFromVersion((short) 1, 
request.get().setGroupInstanceId(null));
         testAllMessageRoundTripsFromVersion((short) 7, 
request.get().setGroupInstanceId(instanceId));
@@ -482,7 +482,7 @@ public final class MessageTest {
             OffsetCommitRequestData requestData = request.get();
             if (version < 1) {
                 requestData.setMemberId("");
-                requestData.setGenerationId(-1);
+                requestData.setGenerationIdOrMemberEpoch(-1);
             }
 
             if (version != 1) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
index 08ae7a3fbd5..7da2271d97c 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.OffsetCommitRequestData;
 import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
 import 
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
 import 
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -34,7 +35,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics;
+import static 
org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -102,25 +103,6 @@ public class OffsetCommitRequestTest {
         }
     }
 
-    @Test
-    public void testGetErrorResponseTopics() {
-        List<OffsetCommitResponseTopic> expectedTopics = Arrays.asList(
-            new OffsetCommitResponseTopic()
-                .setName(topicOne)
-                .setPartitions(Collections.singletonList(
-                    new OffsetCommitResponsePartition()
-                        .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
-                        .setPartitionIndex(partitionOne))),
-            new OffsetCommitResponseTopic()
-                .setName(topicTwo)
-                .setPartitions(Collections.singletonList(
-                    new OffsetCommitResponsePartition()
-                        .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
-                        .setPartitionIndex(partitionTwo)))
-        );
-        assertEquals(expectedTopics, getErrorResponseTopics(topics, 
Errors.UNKNOWN_MEMBER_ID));
-    }
-
     @Test
     public void testVersionSupportForGroupInstanceId() {
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
@@ -139,4 +121,24 @@ public class OffsetCommitRequestTest {
             }
         }
     }
+
+    @Test
+    public void testGetErrorResponse() {
+        OffsetCommitResponseData expectedResponse = new 
OffsetCommitResponseData()
+            .setTopics(Arrays.asList(
+                new OffsetCommitResponseTopic()
+                    .setName(topicOne)
+                    .setPartitions(Collections.singletonList(
+                        new OffsetCommitResponsePartition()
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                            .setPartitionIndex(partitionOne))),
+                new OffsetCommitResponseTopic()
+                    .setName(topicTwo)
+                    .setPartitions(Collections.singletonList(
+                        new OffsetCommitResponsePartition()
+                            .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+                            .setPartitionIndex(partitionTwo)))));
+
+        assertEquals(expectedResponse, getErrorResponse(data, 
Errors.UNKNOWN_MEMBER_ID));
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 64a7300935b..be4cd2244b6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2038,7 +2038,7 @@ public class RequestResponseTest {
                 .setGroupId("group1")
                 .setMemberId("consumer1")
                 .setGroupInstanceId(null)
-                .setGenerationId(100)
+                .setGenerationIdOrMemberEpoch(100)
                 .setTopics(singletonList(
                         new OffsetCommitRequestData.OffsetCommitRequestTopic()
                                 .setName("test")
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 87dba8b1019..86ef919c93e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -407,7 +407,7 @@ private[group] class GroupCoordinatorAdapter(
       request.groupId,
       request.memberId,
       Option(request.groupInstanceId),
-      request.generationId,
+      request.generationIdOrMemberEpoch,
       partitions.toMap,
       callback,
       RequestLocal(bufferSupplier)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6a4971c44d3..5d6233d1100 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -534,7 +534,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val offsetCommitRequestData = new OffsetCommitRequestData()
       .setGroupId(offsetCommitRequest.data.groupId)
       .setMemberId(offsetCommitRequest.data.memberId)
-      .setGenerationId(offsetCommitRequest.data.generationId)
+      
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
       .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
       .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
       .setTopics(authorizedTopicsRequest.asJava)
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 084d3f7be93..c7e7fb0ac0c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -173,6 +173,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
     properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
     properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
+    properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true")
     properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
classOf[PrincipalBuilder].getName)
   }
 
@@ -493,7 +494,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
         new OffsetCommitRequestData()
           .setGroupId(group)
           .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
-          .setGenerationId(1)
+          .setGenerationIdOrMemberEpoch(1)
           .setTopics(Collections.singletonList(
             new OffsetCommitRequestData.OffsetCommitRequestTopic()
               .setName(topic)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 0a7b6f2ba24..7b7e10b1c94 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -644,7 +644,7 @@ class GroupCoordinatorAdapterTest {
     val data = new OffsetCommitRequestData()
       .setGroupId("group")
       .setMemberId("member")
-      .setGenerationId(10)
+      .setGenerationIdOrMemberEpoch(10)
       .setRetentionTimeMs(1000)
       .setTopics(List(
         new OffsetCommitRequestData.OffsetCommitRequestTopic()
@@ -669,7 +669,7 @@ class GroupCoordinatorAdapterTest {
       ArgumentMatchers.eq(data.groupId),
       ArgumentMatchers.eq(data.memberId),
       ArgumentMatchers.eq(None),
-      ArgumentMatchers.eq(data.generationId),
+      ArgumentMatchers.eq(data.generationIdOrMemberEpoch),
       ArgumentMatchers.eq(Map(
         new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new 
OffsetAndMetadata(
           offset = 100,
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 7982fa00885..89cddfe97e7 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -42,7 +42,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest
   def testConsumerGroupHeartbeatIsDisabledByDefault(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-      new ConsumerGroupHeartbeatRequestData()
+      new ConsumerGroupHeartbeatRequestData(),
+      true
     ).build()
     assertThrows(classOf[EOFException], () => 
connectAndReceive(consumerGroupHeartbeatRequest))
   }
@@ -50,7 +51,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
   @ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = 
"unstable.api.versions.enable", value = "true")))
   def testConsumerGroupHeartbeatIsAccessibleWhenEnabled(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
-      new ConsumerGroupHeartbeatRequestData()
+      new ConsumerGroupHeartbeatRequestData(),
+      true
     ).build()
 
     val consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
@@ -83,7 +85,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
         .setMemberEpoch(0)
         .setRebalanceTimeoutMs(5 * 60 * 1000)
         .setSubscribedTopicNames(List("foo").asJava)
-        .setTopicPartitions(List.empty.asJava)
+        .setTopicPartitions(List.empty.asJava),
+      true
     ).build()
 
     // Send the request until receiving a successful response. There is a delay
@@ -111,7 +114,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       new ConsumerGroupHeartbeatRequestData()
         .setGroupId("grp")
         .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
-        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+        .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
+      true
     ).build()
 
     // This is the expected assignment.
@@ -137,7 +141,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
       new ConsumerGroupHeartbeatRequestData()
         .setGroupId("grp")
         .setMemberId(consumerGroupHeartbeatResponse.data.memberId)
-        .setMemberEpoch(-1)
+        .setMemberEpoch(-1),
+      true
     ).build()
 
     consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6f135af363a..9c6c2c73bff 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4080,7 +4080,7 @@ class KafkaApisTest {
         .setGroupId("test")
         .setMemberId("test")
         .setGroupInstanceId("instanceId")
-        .setGenerationId(100)
+        .setGenerationIdOrMemberEpoch(100)
         .setTopics(Collections.singletonList(
           new OffsetCommitRequestData.OffsetCommitRequestTopic()
             .setName("test")
@@ -6118,7 +6118,7 @@ class KafkaApisTest {
   def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, 
true).build())
 
     createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching)
 
@@ -6132,7 +6132,7 @@ class KafkaApisTest {
   def testConsumerGroupHeartbeatRequest(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, 
true).build())
 
     val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
     when(groupCoordinator.consumerGroupHeartbeat(
@@ -6156,7 +6156,7 @@ class KafkaApisTest {
   def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, 
true).build())
 
     val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
     when(groupCoordinator.consumerGroupHeartbeat(
@@ -6177,7 +6177,7 @@ class KafkaApisTest {
   def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
     val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequestData().setGroupId("group")
 
-    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, 
true).build())
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
     when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 9bb6e3cee9a..c06bde91f91 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -302,7 +302,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new OffsetCommitRequest.Builder(
             new OffsetCommitRequestData()
               .setGroupId("test-group")
-              .setGenerationId(1)
+              .setGenerationIdOrMemberEpoch(1)
               .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
               .setTopics(
                 Collections.singletonList(
@@ -646,7 +646,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new AllocateProducerIdsRequest.Builder(new 
AllocateProducerIdsRequestData())
 
         case ApiKeys.CONSUMER_GROUP_HEARTBEAT =>
-          new ConsumerGroupHeartbeatRequest.Builder(new 
ConsumerGroupHeartbeatRequestData())
+          new ConsumerGroupHeartbeatRequest.Builder(new 
ConsumerGroupHeartbeatRequestData(), true)
 
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)


Reply via email to