This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 69659b70fca KAFKA-14499: [1/N] Introduce OffsetCommit API version 9
and add new StaleMemberEpochException error (#14046)
69659b70fca is described below
commit 69659b70fca10349d8c799d29e9115bb5ced64db
Author: David Jacot <[email protected]>
AuthorDate: Fri Jul 21 20:08:06 2023 +0200
KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new
StaleMemberEpochException error (#14046)
This patch does a few things:
1) It introduces version 9 of the OffsetCommit API. This new version has no
schema changes but it can return a StaleMemberEpochException if the new
consumer group protocol is used. Note the use of `"latestVersionUnstable":
true` in the request schema. This means that this new version is not available
yet unless activated.
2) It renames the `generationId` field in the request to
`GenerationIdOrMemberEpoch`. This is backward compatible change.
3) It introduces the new StaleMemberEpochException error.
4) It does a minor refactoring in OffsetCommitRequest class.
Reviewers: Jeff Kim <[email protected]>, David Arthur
<[email protected]>, Justine Olshan <[email protected]>
---
.../consumer/internals/CommitRequestManager.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 2 +-
.../common/errors/StaleMemberEpochException.java | 32 ++++++++++++++
.../org/apache/kafka/common/protocol/Errors.java | 4 +-
.../kafka/common/requests/AbstractRequest.java | 9 +++-
.../requests/ConsumerGroupHeartbeatRequest.java | 6 ++-
.../kafka/common/requests/OffsetCommitRequest.java | 49 +++++++++++-----------
.../common/message/OffsetCommitRequest.json | 12 ++++--
.../common/message/OffsetCommitResponse.json | 15 ++++++-
.../internals/ConsumerCoordinatorTest.java | 2 +-
.../apache/kafka/common/message/MessageTest.java | 4 +-
.../common/requests/OffsetCommitRequestTest.java | 42 ++++++++++---------
.../kafka/common/requests/RequestResponseTest.java | 2 +-
.../group/GroupCoordinatorAdapter.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 3 +-
.../group/GroupCoordinatorAdapterTest.scala | 4 +-
.../server/ConsumerGroupHeartbeatRequestTest.scala | 15 ++++---
.../scala/unit/kafka/server/KafkaApisTest.scala | 10 ++---
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
20 files changed, 146 insertions(+), 75 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 962b1e6cba9..82959d26ce3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -215,7 +215,7 @@ public class CommitRequestManager implements RequestManager
{
OffsetCommitRequest.Builder builder = new
OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.groupId)
- .setGenerationId(generation.generationId)
+
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new
ArrayList<>(requestTopicDataMap.values())));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index bcd4b377881..82125be1705 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1361,7 +1361,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.rebalanceConfig.groupId)
- .setGenerationId(generation.generationId)
+ .setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new
ArrayList<>(requestTopicDataMap.values()))
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java
b/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java
new file mode 100644
index 00000000000..7b373c5b5c1
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * The StaleMemberEpochException is used in the context of the new
+ * consumer group protocol (KIP-848). This error is returned in the
+ * OffsetCommit/Fetch APIs when the member epoch received does not
+ * match the current member epoch.
+ */
[email protected]
+public class StaleMemberEpochException extends ApiException {
+ public StaleMemberEpochException(String message) {
+ super(message);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index b5ea650b166..1ccdcd0627c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -108,6 +108,7 @@ import
org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -378,7 +379,8 @@ public enum Errors {
OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to
tiered storage.", OffsetMovedToTieredStorageException::new),
FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group
coordinator. The member must abandon all its partitions and rejoin.",
FencedMemberEpochException::new),
UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another
member in the consumer group. That member must leave first.",
UnreleasedInstanceIdException::new),
- UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not
supported by the consumer group.", UnsupportedAssignorException::new);
+ UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not
supported by the consumer group.", UnsupportedAssignorException::new),
+ STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry
after receiving its updated member epoch via the ConsumerGroupHeartbeat API.",
StaleMemberEpochException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 1988347a974..64f1c2e4a2f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -37,8 +37,15 @@ public abstract class AbstractRequest implements
AbstractRequestResponse {
/**
* Construct a new builder which allows any supported version
*/
+ public Builder(ApiKeys apiKey, boolean enableUnstableLastVersion) {
+ this(apiKey, apiKey.oldestVersion(),
apiKey.latestVersion(enableUnstableLastVersion));
+ }
+
+ /**
+ * Construct a new builder which allows any supported and released
version
+ */
public Builder(ApiKeys apiKey) {
- this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
+ this(apiKey, false);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index 215e18ea2de..6e42111670a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -30,7 +30,11 @@ public class ConsumerGroupHeartbeatRequest extends
AbstractRequest {
private final ConsumerGroupHeartbeatRequestData data;
public Builder(ConsumerGroupHeartbeatRequestData data) {
- super(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
+ this(data, false);
+ }
+
+ public Builder(ConsumerGroupHeartbeatRequestData data, boolean
enableUnstableLastVersion) {
+ super(ApiKeys.CONSUMER_GROUP_HEARTBEAT, enableUnstableLastVersion);
this.data = data;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 8fc884e6614..0e33ceb7085 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -28,10 +28,9 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class OffsetCommitRequest extends AbstractRequest {
// default values for the current version
@@ -89,33 +88,29 @@ public class OffsetCommitRequest extends AbstractRequest {
return offsets;
}
- public static List<OffsetCommitResponseTopic> getErrorResponseTopics(
- List<OffsetCommitRequestTopic> requestTopics,
- Errors e) {
- List<OffsetCommitResponseTopic> responseTopicData = new ArrayList<>();
- for (OffsetCommitRequestTopic entry : requestTopics) {
- List<OffsetCommitResponsePartition> responsePartitions =
- new ArrayList<>();
- for (OffsetCommitRequestData.OffsetCommitRequestPartition
requestPartition : entry.partitions()) {
- responsePartitions.add(new OffsetCommitResponsePartition()
-
.setPartitionIndex(requestPartition.partitionIndex())
- .setErrorCode(e.code()));
- }
- responseTopicData.add(new OffsetCommitResponseTopic()
- .setName(entry.name())
- .setPartitions(responsePartitions)
- );
- }
- return responseTopicData;
+ public static OffsetCommitResponseData getErrorResponse(
+ OffsetCommitRequestData request,
+ Errors error
+ ) {
+ OffsetCommitResponseData response = new OffsetCommitResponseData();
+ request.topics().forEach(topic -> {
+ OffsetCommitResponseTopic responseTopic = new
OffsetCommitResponseTopic()
+ .setName(topic.name());
+ response.topics().add(responseTopic);
+
+ topic.partitions().forEach(partition -> {
+ responseTopic.partitions().add(new
OffsetCommitResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(error.code()));
+ });
+ });
+ return response;
}
@Override
public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable
e) {
- List<OffsetCommitResponseTopic>
- responseTopicData = getErrorResponseTopics(data.topics(),
Errors.forException(e));
- return new OffsetCommitResponse(new OffsetCommitResponseData()
- .setTopics(responseTopicData)
- .setThrottleTimeMs(throttleTimeMs));
+ return new OffsetCommitResponse(getErrorResponse(data,
Errors.forException(e))
+ .setThrottleTimeMs(throttleTimeMs));
}
@Override
@@ -126,4 +121,8 @@ public class OffsetCommitRequest extends AbstractRequest {
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new
ByteBufferAccessor(buffer), version), version);
}
+
+ public static Optional<String> groupInstanceId(OffsetCommitRequestData
request) {
+ return Optional.ofNullable(request.groupInstanceId());
+ }
}
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json
b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index cf112e1ed72..c11b56c9544 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -31,13 +31,19 @@
// version 7 adds a new field called groupInstanceId to indicate member
identity across restarts.
//
// Version 8 is the first flexible version.
- "validVersions": "0-8",
+ //
+ // Version 9 is the first version that can be used with the new consumer
group protocol (KIP-848). The
+ // request is the same as version 8.
+ // Version 9 is added as part of KIP-848 and is still under development.
Hence, the last version of the
+ // API is not exposed by default by brokers unless explicitly enabled.
+ "latestVersionUnstable": true,
+ "validVersions": "0-9",
"flexibleVersions": "8+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The unique group identifier." },
- { "name": "GenerationId", "type": "int32", "versions": "1+", "default":
"-1", "ignorable": true,
- "about": "The generation of the group." },
+ { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+",
"default": "-1", "ignorable": true,
+ "about": "The generation of the group if using the generic group
protocol or the member epoch if using the consumer protocol." },
{ "name": "MemberId", "type": "string", "versions": "1+", "ignorable":
true,
"about": "The member ID assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string", "versions": "7+",
diff --git
a/clients/src/main/resources/common/message/OffsetCommitResponse.json
b/clients/src/main/resources/common/message/OffsetCommitResponse.json
index 3d547794cfb..797c7bfe515 100644
--- a/clients/src/main/resources/common/message/OffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json
@@ -28,8 +28,21 @@
// Version 7 offsetCommitRequest supports a new field called groupInstanceId
to indicate member identity across restarts.
//
// Version 8 is the first flexible version.
- "validVersions": "0-8",
+ //
+ // Version 9 is the first version that can be used with the new consumer
group protocol (KIP-848). The response is
+ // the same as version 8 but can return STALE_MEMBER_EPOCH when the new
consumer group protocol is used.
+ "validVersions": "0-9",
"flexibleVersions": "8+",
+ // Supported errors:
+ // - GROUP_AUTHORIZATION_FAILED (version 0+)
+ // - NOT_COORDINATOR (version 0+)
+ // - COORDINATOR_NOT_AVAILABLE (version 0+)
+ // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+ // - ILLEGAL_GENERATION (version 1+)
+ // - UNKNOWN_MEMBER_ID (version 1+)
+ // - INVALID_COMMIT_OFFSET_SIZE (version 0+)
+ // - FENCED_MEMBER_EPOCH (version 7+)
+ // - STALE_MEMBER_EPOCH (version 9+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"ignorable": true,
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 03493a12290..fdf6cf1d73b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -2575,7 +2575,7 @@ public abstract class ConsumerCoordinatorTest {
client.prepareResponse(body -> {
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
return
commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
- commitRequest.data().generationId() ==
OffsetCommitRequest.DEFAULT_GENERATION_ID;
+ commitRequest.data().generationIdOrMemberEpoch() ==
OffsetCommitRequest.DEFAULT_GENERATION_ID;
}, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
AtomicBoolean success = new AtomicBoolean(false);
diff --git
a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 5762e84f60b..478bfa0668d 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -276,7 +276,7 @@ public final class MessageTest {
.setGroupId("groupId")
.setMemberId(memberId)
.setTopics(new ArrayList<>())
- .setGenerationId(15);
+ .setGenerationIdOrMemberEpoch(15);
testAllMessageRoundTripsFromVersion((short) 1, request.get());
testAllMessageRoundTripsFromVersion((short) 1,
request.get().setGroupInstanceId(null));
testAllMessageRoundTripsFromVersion((short) 7,
request.get().setGroupInstanceId(instanceId));
@@ -482,7 +482,7 @@ public final class MessageTest {
OffsetCommitRequestData requestData = request.get();
if (version < 1) {
requestData.setMemberId("");
- requestData.setGenerationId(-1);
+ requestData.setGenerationIdOrMemberEpoch(-1);
}
if (version != 1) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
index 08ae7a3fbd5..7da2271d97c 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
import
org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
+import org.apache.kafka.common.message.OffsetCommitResponseData;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import
org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -34,7 +35,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics;
+import static
org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -102,25 +103,6 @@ public class OffsetCommitRequestTest {
}
}
- @Test
- public void testGetErrorResponseTopics() {
- List<OffsetCommitResponseTopic> expectedTopics = Arrays.asList(
- new OffsetCommitResponseTopic()
- .setName(topicOne)
- .setPartitions(Collections.singletonList(
- new OffsetCommitResponsePartition()
- .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
- .setPartitionIndex(partitionOne))),
- new OffsetCommitResponseTopic()
- .setName(topicTwo)
- .setPartitions(Collections.singletonList(
- new OffsetCommitResponsePartition()
- .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
- .setPartitionIndex(partitionTwo)))
- );
- assertEquals(expectedTopics, getErrorResponseTopics(topics,
Errors.UNKNOWN_MEMBER_ID));
- }
-
@Test
public void testVersionSupportForGroupInstanceId() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
@@ -139,4 +121,24 @@ public class OffsetCommitRequestTest {
}
}
}
+
+ @Test
+ public void testGetErrorResponse() {
+ OffsetCommitResponseData expectedResponse = new
OffsetCommitResponseData()
+ .setTopics(Arrays.asList(
+ new OffsetCommitResponseTopic()
+ .setName(topicOne)
+ .setPartitions(Collections.singletonList(
+ new OffsetCommitResponsePartition()
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ .setPartitionIndex(partitionOne))),
+ new OffsetCommitResponseTopic()
+ .setName(topicTwo)
+ .setPartitions(Collections.singletonList(
+ new OffsetCommitResponsePartition()
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ .setPartitionIndex(partitionTwo)))));
+
+ assertEquals(expectedResponse, getErrorResponse(data,
Errors.UNKNOWN_MEMBER_ID));
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 64a7300935b..be4cd2244b6 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2038,7 +2038,7 @@ public class RequestResponseTest {
.setGroupId("group1")
.setMemberId("consumer1")
.setGroupInstanceId(null)
- .setGenerationId(100)
+ .setGenerationIdOrMemberEpoch(100)
.setTopics(singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("test")
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 87dba8b1019..86ef919c93e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -407,7 +407,7 @@ private[group] class GroupCoordinatorAdapter(
request.groupId,
request.memberId,
Option(request.groupInstanceId),
- request.generationId,
+ request.generationIdOrMemberEpoch,
partitions.toMap,
callback,
RequestLocal(bufferSupplier)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6a4971c44d3..5d6233d1100 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -534,7 +534,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetCommitRequestData = new OffsetCommitRequestData()
.setGroupId(offsetCommitRequest.data.groupId)
.setMemberId(offsetCommitRequest.data.memberId)
- .setGenerationId(offsetCommitRequest.data.generationId)
+
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
.setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
.setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
.setTopics(authorizedTopicsRequest.asJava)
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 084d3f7be93..c7e7fb0ac0c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -173,6 +173,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
+ properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[PrincipalBuilder].getName)
}
@@ -493,7 +494,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
new OffsetCommitRequestData()
.setGroupId(group)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
- .setGenerationId(1)
+ .setGenerationIdOrMemberEpoch(1)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topic)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 0a7b6f2ba24..7b7e10b1c94 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -644,7 +644,7 @@ class GroupCoordinatorAdapterTest {
val data = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
- .setGenerationId(10)
+ .setGenerationIdOrMemberEpoch(10)
.setRetentionTimeMs(1000)
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
@@ -669,7 +669,7 @@ class GroupCoordinatorAdapterTest {
ArgumentMatchers.eq(data.groupId),
ArgumentMatchers.eq(data.memberId),
ArgumentMatchers.eq(None),
- ArgumentMatchers.eq(data.generationId),
+ ArgumentMatchers.eq(data.generationIdOrMemberEpoch),
ArgumentMatchers.eq(Map(
new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new
OffsetAndMetadata(
offset = 100,
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 7982fa00885..89cddfe97e7 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -42,7 +42,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest
def testConsumerGroupHeartbeatIsDisabledByDefault(): Unit = {
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
+ new ConsumerGroupHeartbeatRequestData(),
+ true
).build()
assertThrows(classOf[EOFException], () =>
connectAndReceive(consumerGroupHeartbeatRequest))
}
@@ -50,7 +51,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
@ClusterTest(serverProperties = Array(new ClusterConfigProperty(key =
"unstable.api.versions.enable", value = "true")))
def testConsumerGroupHeartbeatIsAccessibleWhenEnabled(): Unit = {
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequest.Builder(
- new ConsumerGroupHeartbeatRequestData()
+ new ConsumerGroupHeartbeatRequestData(),
+ true
).build()
val consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
@@ -83,7 +85,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
.setMemberEpoch(0)
.setRebalanceTimeoutMs(5 * 60 * 1000)
.setSubscribedTopicNames(List("foo").asJava)
- .setTopicPartitions(List.empty.asJava)
+ .setTopicPartitions(List.empty.asJava),
+ true
).build()
// Send the request until receiving a successful response. There is a delay
@@ -111,7 +114,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
- .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
+ .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch),
+ true
).build()
// This is the expected assignment.
@@ -137,7 +141,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
- .setMemberEpoch(-1)
+ .setMemberEpoch(-1),
+ true
).build()
consumerGroupHeartbeatResponse =
connectAndReceive(consumerGroupHeartbeatRequest)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6f135af363a..9c6c2c73bff 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -4080,7 +4080,7 @@ class KafkaApisTest {
.setGroupId("test")
.setMemberId("test")
.setGroupInstanceId("instanceId")
- .setGenerationId(100)
+ .setGenerationIdOrMemberEpoch(100)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("test")
@@ -6118,7 +6118,7 @@ class KafkaApisTest {
def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest,
true).build())
createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching)
@@ -6132,7 +6132,7 @@ class KafkaApisTest {
def testConsumerGroupHeartbeatRequest(): Unit = {
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest,
true).build())
val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
when(groupCoordinator.consumerGroupHeartbeat(
@@ -6156,7 +6156,7 @@ class KafkaApisTest {
def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = {
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest,
true).build())
val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]()
when(groupCoordinator.consumerGroupHeartbeat(
@@ -6177,7 +6177,7 @@ class KafkaApisTest {
def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = {
val consumerGroupHeartbeatRequest = new
ConsumerGroupHeartbeatRequestData().setGroupId("group")
- val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build())
+ val requestChannelRequest = buildRequest(new
ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest,
true).build())
val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 9bb6e3cee9a..c06bde91f91 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -302,7 +302,7 @@ class RequestQuotaTest extends BaseRequestTest {
new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId("test-group")
- .setGenerationId(1)
+ .setGenerationIdOrMemberEpoch(1)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setTopics(
Collections.singletonList(
@@ -646,7 +646,7 @@ class RequestQuotaTest extends BaseRequestTest {
new AllocateProducerIdsRequest.Builder(new
AllocateProducerIdsRequestData())
case ApiKeys.CONSUMER_GROUP_HEARTBEAT =>
- new ConsumerGroupHeartbeatRequest.Builder(new
ConsumerGroupHeartbeatRequestData())
+ new ConsumerGroupHeartbeatRequest.Builder(new
ConsumerGroupHeartbeatRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)