This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7054625c45d KAFKA-14499: [6/N] Add MemberId and MemberEpoch to
OffsetFetchRequest (#14321)
7054625c45d is described below
commit 7054625c45dc6edb3c07271fe4a6c24b4638424f
Author: David Jacot <[email protected]>
AuthorDate: Tue Sep 5 23:36:38 2023 -0700
KAFKA-14499: [6/N] Add MemberId and MemberEpoch to OffsetFetchRequest
(#14321)
This patch adds the MemberId and the MemberEpoch fields to the
OffsetFetchRequest. Those fields will be populated when the new consumer group
protocol is used to ensure that the member fetching the offset has the correct
member id and epoch. If it does not, UNKNOWN_MEMBER_ID or STALE_MEMBER_EPOCH
are returned to the client.
Our initial idea was to implement the same for the old protocol. The field
is called GenerationIdOrMemberEpoch in KIP-848 to materialize this. As a second
though, I think that we should only do it for the new protocol. The effort to
implement it in the old protocol is not worth it in my opinion.
Reviewers: Ritika Reddy <[email protected]>, Calvin Liu
<[email protected]>, Justine Olshan <[email protected]>
---
.../common/message/OffsetFetchRequest.json | 21 ++-
.../common/message/OffsetFetchResponse.json | 17 ++-
.../kafka/common/requests/RequestResponseTest.java | 44 +++---
.../unit/kafka/server/OffsetFetchRequestTest.scala | 1 +
.../org/apache/kafka/coordinator/group/Group.java | 10 +-
.../coordinator/group/OffsetMetadataManager.java | 18 ++-
.../coordinator/group/consumer/ConsumerGroup.java | 41 +++++-
.../coordinator/group/generic/GenericGroup.java | 10 +-
.../group/OffsetMetadataManagerTest.java | 154 ++++++++++++++++++++-
.../group/consumer/ConsumerGroupTest.java | 28 ++++
10 files changed, 299 insertions(+), 45 deletions(-)
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json
b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index 8f3c4144ab0..b0f564e7764 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -32,8 +32,15 @@
//
// Version 7 is adding the require stable flag.
//
- // Version 8 is adding support for fetching offsets for multiple groups at a
time
- "validVersions": "0-8",
+ // Version 8 is adding support for fetching offsets for multiple groups at a
time.
+ //
+ // Version 9 is the first version that can be used with the new consumer
group protocol (KIP-848). It adds
+ // the MemberId and MemberEpoch fields. Those are filled in and validated
when the new consumer protocol is used.
+ //
+ // Version 9 is added as part of KIP-848 and is still under development.
Hence, the last version of the
+ // API is not exposed by default by brokers unless explicitly enabled.
+ "latestVersionUnstable": true,
+ "validVersions": "0-9",
"flexibleVersions": "6+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0-7", "entityType":
"groupId",
@@ -47,8 +54,12 @@
]},
{ "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
"about": "Each group we would like to fetch offsets for", "fields": [
- { "name": "groupId", "type": "string", "versions": "8+", "entityType":
"groupId",
+ { "name": "GroupId", "type": "string", "versions": "8+", "entityType":
"groupId",
"about": "The group ID."},
+ { "name": "MemberId", "type": "string", "versions": "9+",
"nullableVersions": "9+", "default": null, "ignorable": true,
+ "about": "The member ID assigned by the group coordinator if using the
new consumer protocol (KIP-848)." },
+ { "name": "MemberEpoch", "type": "int32", "versions": "9+", "default":
"-1", "ignorable": true,
+ "about": "The member epoch if using the new consumer protocol
(KIP-848)." },
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions":
"8+", "nullableVersions": "8+",
"about": "Each topic we would like to fetch offsets for, or null to
fetch offsets for all topics.", "fields": [
{ "name": "Name", "type": "string", "versions": "8+", "entityType":
"topicName",
@@ -57,7 +68,7 @@
"about": "The partition indexes we would like to fetch offsets for."
}
]}
]},
- {"name": "RequireStable", "type": "bool", "versions": "7+", "default":
"false",
- "about": "Whether broker should hold on returning unstable offsets but
set a retriable error code for the partitions."}
+ { "name": "RequireStable", "type": "bool", "versions": "7+", "default":
"false",
+ "about": "Whether broker should hold on returning unstable offsets but
set a retriable error code for the partitions." }
]
}
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index 71acf0b4d2e..0b4cc10c3b4 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -32,8 +32,21 @@
// Version 7 adds pending offset commit as new error response on partition
level.
//
// Version 8 is adding support for fetching offsets for multiple groups
- "validVersions": "0-8",
+ //
+ // Version 9 is the first version that can be used with the new consumer
group protocol (KIP-848). The response is
+ // the same as version 8 but can return STALE_MEMBER_EPOCH and
UNKNOWN_MEMBER_ID errors when the new consumer group
+ // protocol is used.
+ "validVersions": "0-9",
"flexibleVersions": "6+",
+ // Supported errors:
+ // - GROUP_AUTHORIZATION_FAILED (version 0+)
+ // - NOT_COORDINATOR (version 0+)
+ // - COORDINATOR_NOT_AVAILABLE (version 0+)
+ // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+ // - GROUP_ID_NOT_FOUND (version 0+)
+ // - UNSTABLE_OFFSET_COMMIT (version 7+)
+ // - UNKNOWN_MEMBER_ID (version 9+)
+ // - STALE_MEMBER_EPOCH (version 9+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"ignorable": true,
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
@@ -59,7 +72,7 @@
"about": "The top-level error code, or 0 if there was no error." },
{ "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
"about": "The responses per group id.", "fields": [
- { "name": "groupId", "type": "string", "versions": "8+", "entityType":
"groupId",
+ { "name": "GroupId", "type": "string", "versions": "8+", "entityType":
"groupId",
"about": "The group ID." },
{ "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions":
"8+",
"about": "The responses per topic.", "fields": [
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index be4cd2244b6..c8bd3563b53 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -242,6 +242,8 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
@@ -777,29 +779,27 @@ public class RequestResponseTest {
}
}
- @Test
- public void testOffsetFetchRequestBuilderToStringV8AndAbove() {
- List<Boolean> stableFlags = asList(true, false);
- for (Boolean requireStable : stableFlags) {
- String allTopicPartitionsString = new OffsetFetchRequest.Builder(
- Collections.singletonMap("someGroup", null),
- requireStable,
- false)
- .toString();
-
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
- + "(groupId='someGroup', topics=null)], requireStable=" +
requireStable));
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testOffsetFetchRequestBuilderToStringV8AndAbove(boolean
requireStable) {
+ String allTopicPartitionsString = new OffsetFetchRequest.Builder(
+ Collections.singletonMap("someGroup", null),
+ requireStable,
+ false
+ ).toString();
+
assertTrue(allTopicPartitionsString.contains("groups=[OffsetFetchRequestGroup"
+ + "(groupId='someGroup', memberId='', memberEpoch=-1,
topics=null)], requireStable=" + requireStable));
- String subsetTopicPartitionsString = new
OffsetFetchRequest.Builder(
- Collections.singletonMap(
- "group1",
- singletonList(new TopicPartition("test11", 1))),
- requireStable,
- false)
- .toString();
- assertTrue(subsetTopicPartitionsString.contains("test11"));
- assertTrue(subsetTopicPartitionsString.contains("group1"));
- assertTrue(subsetTopicPartitionsString.contains("requireStable=" +
requireStable));
- }
+ String subsetTopicPartitionsString = new OffsetFetchRequest.Builder(
+ Collections.singletonMap(
+ "group1",
+ singletonList(new TopicPartition("test11", 1))),
+ requireStable,
+ false
+ ).toString();
+ assertTrue(subsetTopicPartitionsString.contains("test11"));
+ assertTrue(subsetTopicPartitionsString.contains("group1"));
+ assertTrue(subsetTopicPartitionsString.contains("requireStable=" +
requireStable));
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index cf1c52841e5..cfe550e2671 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -70,6 +70,7 @@ class OffsetFetchRequestTest extends BaseRequestTest {
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
+ properties.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
}
@BeforeEach
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
index 0ffc741dcdd..44290ae7fe7 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java
@@ -69,6 +69,14 @@ public interface Group {
/**
* Validates the OffsetFetch request.
+ *
+ * @param memberId The member id for consumer groups.
+ * @param memberEpoch The member epoch for consumer groups.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
*/
- void validateOffsetFetch() throws KafkaException;
+ void validateOffsetFetch(
+ String memberId,
+ int memberEpoch,
+ long lastCommittedOffset
+ ) throws KafkaException;
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index e551a61e849..c8bf388d714 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -225,17 +225,21 @@ public class OffsetMetadataManager {
}
/**
- * Validates an OffsetCommit request.
+ * Validates an OffsetFetch request.
*
- * @param groupId The group id.
+ * @param request The actual request.
* @param lastCommittedOffset The last committed offsets in the timeline.
*/
private void validateOffsetFetch(
- String groupId,
+ OffsetFetchRequestData.OffsetFetchRequestGroup request,
long lastCommittedOffset
) throws GroupIdNotFoundException {
- Group group = groupMetadataManager.group(groupId, lastCommittedOffset);
- group.validateOffsetFetch();
+ Group group = groupMetadataManager.group(request.groupId(),
lastCommittedOffset);
+ group.validateOffsetFetch(
+ request.memberId(),
+ request.memberEpoch(),
+ lastCommittedOffset
+ );
}
/**
@@ -343,7 +347,7 @@ public class OffsetMetadataManager {
) throws ApiException {
boolean failAllPartitions = false;
try {
- validateOffsetFetch(request.groupId(), lastCommittedOffset);
+ validateOffsetFetch(request, lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
failAllPartitions = true;
}
@@ -398,7 +402,7 @@ public class OffsetMetadataManager {
long lastCommittedOffset
) throws ApiException {
try {
- validateOffsetFetch(request.groupId(), lastCommittedOffset);
+ validateOffsetFetch(request, lastCommittedOffset);
} catch (GroupIdNotFoundException ex) {
return new OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(request.groupId())
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index f6eaed02fa3..5328020baac 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -19,7 +19,6 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
@@ -538,17 +537,47 @@ public class ConsumerGroup implements Group {
if (memberEpoch < 0 && members().isEmpty()) return;
final ConsumerGroupMember member = getOrMaybeCreateMember(memberId,
false);
- if (memberEpoch != member.memberEpoch()) {
- throw Errors.STALE_MEMBER_EPOCH.exception();
- }
+ validateMemberEpoch(memberEpoch, member.memberEpoch());
}
/**
* Validates the OffsetFetch request.
+ *
+ * @param memberId The member id for consumer groups.
+ * @param memberEpoch The member epoch for consumer groups.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
*/
@Override
- public void validateOffsetFetch() {
- // Nothing.
+ public void validateOffsetFetch(
+ String memberId,
+ int memberEpoch,
+ long lastCommittedOffset
+ ) throws UnknownMemberIdException, StaleMemberEpochException {
+ // When the member id is null and the member epoch is -1, the request
either comes
+ // from the admin client or from a client which does not provide them.
In this case,
+ // the fetch request is accepted.
+ if (memberId == null && memberEpoch < 0) return;
+
+ final ConsumerGroupMember member = members.get(memberId,
lastCommittedOffset);
+ if (member == null) {
+ throw new UnknownMemberIdException(String.format("Member %s is not
a member of group %s.",
+ memberId, groupId));
+ }
+ validateMemberEpoch(memberEpoch, member.memberEpoch());
+ }
+
+ /**
+ * Throws a StaleMemberEpochException if the received member epoch does
not match
+ * the expected member epoch.
+ */
+ private void validateMemberEpoch(
+ int receivedMemberEpoch,
+ int expectedMemberEpoch
+ ) throws StaleMemberEpochException {
+ if (receivedMemberEpoch != expectedMemberEpoch) {
+ throw new StaleMemberEpochException(String.format("The received
member epoch %d does not match "
+ + "the expected member epoch %d.", receivedMemberEpoch,
expectedMemberEpoch));
+ }
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index edad170806b..3347970aaf0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -823,9 +823,17 @@ public class GenericGroup implements Group {
/**
* Validates the OffsetFetch request.
+ *
+ * @param memberId The member id. This is not provided for
generic groups.
+ * @param memberEpoch The member epoch for consumer groups. This
is not provided for generic groups.
+ * @param lastCommittedOffset The last committed offsets in the timeline.
*/
@Override
- public void validateOffsetFetch() throws GroupIdNotFoundException {
+ public void validateOffsetFetch(
+ String memberId,
+ int memberEpoch,
+ long lastCommittedOffset
+ ) throws GroupIdNotFoundException {
if (isInState(DEAD)) {
throw new GroupIdNotFoundException(String.format("Group %s is in
dead state.", groupId));
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index e60ec1bc4e9..7f355da76aa 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -181,10 +181,28 @@ public class OffsetMetadataManagerTest {
String groupId,
List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
long committedOffset
+ ) {
+ return fetchOffsets(
+ groupId,
+ null,
+ -1,
+ topics,
+ committedOffset
+ );
+ }
+
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchOffsets(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics,
+ long committedOffset
) {
OffsetFetchResponseData.OffsetFetchResponseGroup response =
offsetMetadataManager.fetchOffsets(
new OffsetFetchRequestData.OffsetFetchRequestGroup()
.setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch)
.setTopics(topics),
committedOffset
);
@@ -195,9 +213,26 @@ public class OffsetMetadataManagerTest {
public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchAllOffsets(
String groupId,
long committedOffset
+ ) {
+ return fetchAllOffsets(
+ groupId,
+ null,
+ -1,
+ committedOffset
+ );
+ }
+
+ public List<OffsetFetchResponseData.OffsetFetchResponseTopics>
fetchAllOffsets(
+ String groupId,
+ String memberId,
+ int memberEpoch,
+ long committedOffset
) {
OffsetFetchResponseData.OffsetFetchResponseGroup response =
offsetMetadataManager.fetchAllOffsets(
- new
OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId),
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(memberEpoch),
committedOffset
);
assertEquals(groupId, response.groupId());
@@ -1409,6 +1444,123 @@ public class OffsetMetadataManagerTest {
), context.fetchAllOffsets("group", Long.MAX_VALUE));
}
+ @Test
+ public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ // Create consumer group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+ // Create member.
+ group.getOrMaybeCreateMember("member", true);
+ // Commit offset.
+ context.commitOffset("group", "foo", 0, 100L, 1);
+
+ // Fetch offsets case.
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics =
Collections.singletonList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Collections.singletonList(0))
+ );
+
+ assertEquals(Collections.singletonList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+ ))
+ ), context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE));
+
+ // Fetch all offsets case.
+ assertEquals(Collections.singletonList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+ ))
+ ), context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testConsumerGroupOffsetFetchFromAdminClient() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ // Create consumer group.
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+ // Create member.
+ group.getOrMaybeCreateMember("member", true);
+ // Commit offset.
+ context.commitOffset("group", "foo", 0, 100L, 1);
+
+ // Fetch offsets case.
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics =
Collections.singletonList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Collections.singletonList(0))
+ );
+
+ assertEquals(Collections.singletonList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+ ))
+ ), context.fetchOffsets("group", topics, Long.MAX_VALUE));
+
+ // Fetch all offsets case.
+ assertEquals(Collections.singletonList(
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
+ .setName("foo")
+ .setPartitions(Collections.singletonList(
+ mkOffsetPartitionResponse(0, 100L, 1, "metadata")
+ ))
+ ), context.fetchAllOffsets("group", Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testConsumerGroupOffsetFetchWithUnknownMemberId() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group",
true);
+
+ // Fetch offsets case.
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics =
Collections.singletonList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Collections.singletonList(0))
+ );
+
+ // Fetch offsets cases.
+ assertThrows(UnknownMemberIdException.class,
+ () -> context.fetchOffsets("group", "", 0, topics,
Long.MAX_VALUE));
+ assertThrows(UnknownMemberIdException.class,
+ () -> context.fetchOffsets("group", "member", 0, topics,
Long.MAX_VALUE));
+
+ // Fetch all offsets cases.
+ assertThrows(UnknownMemberIdException.class,
+ () -> context.fetchAllOffsets("group", "", 0, Long.MAX_VALUE));
+ assertThrows(UnknownMemberIdException.class,
+ () -> context.fetchAllOffsets("group", "member", 0,
Long.MAX_VALUE));
+ }
+
+ @Test
+ public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ ConsumerGroup group =
context.groupMetadataManager.getOrMaybeCreateConsumerGroup("group", true);
+ group.getOrMaybeCreateMember("member", true);
+
+ // Fetch offsets case.
+ List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics =
Collections.singletonList(
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
+ .setName("foo")
+ .setPartitionIndexes(Collections.singletonList(0))
+ );
+
+ // Fetch offsets case.
+ assertThrows(StaleMemberEpochException.class,
+ () -> context.fetchOffsets("group", "member", 10, topics,
Long.MAX_VALUE));
+
+ // Fetch all offsets case.
+ assertThrows(StaleMemberEpochException.class,
+ () -> context.fetchAllOffsets("group", "member", 10,
Long.MAX_VALUE));
+ }
+
static private OffsetFetchResponseData.OffsetFetchResponsePartitions
mkOffsetPartitionResponse(
int partition,
long offset,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index a715d4187bb..aa848aa3f5d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -631,4 +631,32 @@ public class ConsumerGroupTest {
// This should succeed.
group.validateOffsetCommit("member-id", "", 0);
}
+
+ @Test
+ public void testValidateOffsetFetch() {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
+ ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
+
+ // Simulate a call from the admin client without member id and member
epoch.
+ group.validateOffsetFetch(null, -1, Long.MAX_VALUE);
+
+ // The member does not exist.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
+
+ // Create a member.
+ snapshotRegistry.getOrCreateSnapshot(0);
+ group.getOrMaybeCreateMember("member-id", true);
+
+ // The member does not exist at last committed offset 0.
+ assertThrows(UnknownMemberIdException.class, () ->
+ group.validateOffsetFetch("member-id", 0, 0));
+
+ // The member exists but the epoch is stale when the last committed
offset is not considered.
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetFetch("member-id", 10, Long.MAX_VALUE));
+
+ // This should succeed.
+ group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE);
+ }
}