This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d4d9f118165 KAFKA-18761: [2/N] List share group offsets with state and
auth (#19328)
d4d9f118165 is described below
commit d4d9f118165ea22494cd6128050ed733e33d5871
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Apr 4 13:25:19 2025 +0100
KAFKA-18761: [2/N] List share group offsets with state and auth (#19328)
This PR approaches completion of Admin.listShareGroupOffsets() and
kafka-share-groups.sh --describe --offsets.
Prior to this patch, kafka-share-groups.sh was only able to describe the
offsets for partitions which were assigned to active members. Now, the
Admin.listShareGroupOffsets() uses the persister's knowledge of the
share-partitions which have initialised state. Then, it uses this list
to obtain a complete set of offset information.
The PR also implements the topic-based authorisation checking. If
Admin.listShareGroupOffsets() is called with a list of topic-partitions
specified, the authz checking is performed on the supplied list,
returning errors for any topics to which the client is not authorised.
If Admin.listShareGroupOffsets() is called without a list of
topic-partitions specified, the list of topics is discovered from the
persister as described above, and then the response is filtered down to
only show the topics to which the client is authorised. This is
consistent with other similar RPCs in the Kafka protocol, such as
OffsetFetch.
Reviewers: David Arthur <[email protected]>, Sushant Mahajan
<[email protected]>, Apoorv Mittal <[email protected]>
---
.../clients/admin/ListShareGroupOffsetsSpec.java | 4 +-
.../internals/ListShareGroupOffsetsHandler.java | 29 +-
.../message/DescribeShareGroupOffsetsRequest.json | 4 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 69 ++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 477 ++++++++++++++++++++-
.../kafka/coordinator/group/GroupCoordinator.java | 14 +
.../coordinator/group/GroupCoordinatorService.java | 78 +++-
.../coordinator/group/GroupCoordinatorShard.java | 15 +-
.../coordinator/group/GroupMetadataManager.java | 31 +-
.../group/GroupCoordinatorServiceTest.java | 207 ++++++++-
.../group/GroupMetadataManagerTest.java | 6 +
.../tools/consumer/group/ShareGroupCommand.java | 54 +--
13 files changed, 915 insertions(+), 77 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
index 18ba3227346..d8144a0ce43 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -47,9 +46,10 @@ public class ListShareGroupOffsetsSpec {
/**
* Returns the topic partitions whose offsets are to be listed for a share
group.
+ * {@code null} indicates that offsets of all partitions of the group are
to be listed.
*/
public Collection<TopicPartition> topicPartitions() {
- return topicPartitions == null ? List.of() : topicPartitions;
+ return topicPartitions;
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index be46b341333..3f523b93833 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -84,19 +83,23 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
DescribeShareGroupOffsetsRequestGroup requestGroup = new
DescribeShareGroupOffsetsRequestGroup()
.setGroupId(groupId);
- Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
- spec.topicPartitions().forEach(tp ->
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new
LinkedList<>()).add(tp.partition()));
-
- Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics =
new HashMap<>();
- for (TopicPartition tp : spec.topicPartitions()) {
- requestTopics.computeIfAbsent(tp.topic(), t ->
- new DescribeShareGroupOffsetsRequestTopic()
- .setTopicName(tp.topic())
- .setPartitions(new LinkedList<>()))
- .partitions()
- .add(tp.partition());
+ if (spec.topicPartitions() != null) {
+ Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+ spec.topicPartitions().forEach(tp ->
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new
ArrayList<>()).add(tp.partition()));
+
+ Map<String, DescribeShareGroupOffsetsRequestTopic>
requestTopics = new HashMap<>();
+ for (TopicPartition tp : spec.topicPartitions()) {
+ requestTopics.computeIfAbsent(tp.topic(), t ->
+ new DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(tp.topic())
+ .setPartitions(new ArrayList<>()))
+ .partitions()
+ .add(tp.partition());
+ }
+ requestGroup.setTopics(new
ArrayList<>(requestTopics.values()));
+ } else {
+ requestGroup.setTopics(null);
}
- requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
groups.add(requestGroup);
});
DescribeShareGroupOffsetsRequestData data = new
DescribeShareGroupOffsetsRequestData()
diff --git
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
index a9b7a0200e6..e9093296bb6 100644
---
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
+++
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
@@ -26,8 +26,8 @@
"about": "The groups to describe offsets for.", "fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType":
"groupId",
"about": "The group identifier." },
- { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic",
"versions": "0+",
- "about": "The topics to describe offsets for.", "fields": [
+ { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic",
"versions": "0+", "nullableVersions": "0+",
+ "about": "The topics to describe offsets for, or null for all
topic-partitions.", "fields": [
{ "name": "TopicName", "type": "string", "versions": "0+",
"entityType": "topicName",
"about": "The topic name." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ac309e1c4d9..a478092cfc6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -10559,9 +10559,7 @@ public class KafkaAdminClientTest {
TopicPartition myTopicPartition4 = new
TopicPartition("my_topic_1", 4);
TopicPartition myTopicPartition5 = new
TopicPartition("my_topic_2", 6);
- ListShareGroupOffsetsSpec groupSpec = new
ListShareGroupOffsetsSpec().topicPartitions(
- List.of(myTopicPartition0, myTopicPartition1,
myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5)
- );
+ ListShareGroupOffsetsSpec groupSpec = new
ListShareGroupOffsetsSpec();
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new
HashMap<>();
groupSpecs.put(GROUP_ID, groupSpec);
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9f416aa0e88..966607f11e1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3508,6 +3508,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val futures = new
mutable.ArrayBuffer[CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]](groups.size)
groups.forEach { groupDescribeOffsets =>
+ val isAllPartitions = groupDescribeOffsets.topics == null
if (!isShareGroupProtocolEnabled) {
futures += CompletableFuture.completedFuture(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId)
@@ -3516,6 +3517,11 @@ class KafkaApis(val requestChannel: RequestChannel,
futures += CompletableFuture.completedFuture(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId)
.setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+ } else if (isAllPartitions) {
+ futures += describeShareGroupAllOffsetsForGroup(
+ request.context,
+ groupDescribeOffsets
+ )
} else if (groupDescribeOffsets.topics.isEmpty) {
futures += CompletableFuture.completedFuture(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsets.groupId))
@@ -3535,19 +3541,76 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
+ private def describeShareGroupAllOffsetsForGroup(requestContext:
RequestContext,
groupDescribeOffsetsRequest:
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
):
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
= {
- groupCoordinator.describeShareGroupOffsets(
+ groupCoordinator.describeShareGroupAllOffsets(
requestContext,
groupDescribeOffsetsRequest
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
{ (groupDescribeOffsetsResponse, exception) =>
if (exception != null) {
+ val error = Errors.forException(exception)
new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
.setGroupId(groupDescribeOffsetsRequest.groupId)
- .setErrorCode(Errors.forException(exception).code)
+ .setErrorCode(error.code)
+ .setErrorMessage(error.message)
} else {
+ // Clients are not allowed to see offsets for topics that are not
authorized for Describe.
+ val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
+ requestContext,
+ DESCRIBE,
+ TOPIC,
+ groupDescribeOffsetsResponse.topics.asScala
+ )(_.topicName)
+ groupDescribeOffsetsResponse.setTopics(authorizedOffsets.asJava)
+ }
+ }
+ }
+
+ private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
+ groupDescribeOffsetsRequest:
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
+ ):
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
= {
+ // Clients are not allowed to see offsets for topics that are not
authorized for Describe.
+ val (authorizedTopics, unauthorizedTopics) =
authHelper.partitionSeqByAuthorized(
+ requestContext,
+ DESCRIBE,
+ TOPIC,
+ groupDescribeOffsetsRequest.topics.asScala
+ )(_.topicName)
+
+ groupCoordinator.describeShareGroupOffsets(
+ requestContext,
+ new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId(groupDescribeOffsetsRequest.groupId)
+ .setTopics(authorizedTopics.asJava)
+
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
{ (groupDescribeOffsetsResponse, exception) =>
+ if (exception != null) {
+ val error = Errors.forException(exception)
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupDescribeOffsetsRequest.groupId)
+ .setErrorCode(error.code)
+ .setErrorMessage(error.message)
+ } else if (groupDescribeOffsetsResponse.errorCode() != Errors.NONE.code)
{
groupDescribeOffsetsResponse
+ } else {
+ val topics = new
util.ArrayList[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic](
+ groupDescribeOffsetsResponse.topics.size + unauthorizedTopics.size
+ )
+ topics.addAll(groupDescribeOffsetsResponse.topics)
+ unauthorizedTopics.foreach { topic =>
+ val topicResponse = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topic.topicName)
+ .setTopicId(Uuid.ZERO_UUID)
+ topic.partitions().forEach { partitionIndex =>
+ topicResponse.partitions.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionIndex)
+ .setStartOffset(-1)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+ }
+ topics.add(topicResponse)
+ }
+ groupDescribeOffsetsResponse.setTopics(topics)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a98b67fd912..cf400517c25 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11134,7 +11134,7 @@ class KafkaApisTest extends Logging {
}
@Test
- def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = {
+ def testDescribeShareGroupOffsetsRequestGroupAuthorizationFailed(): Unit = {
val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData().setGroups(
util.List.of(new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(
util.List.of(new
DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
@@ -11163,6 +11163,34 @@ class KafkaApisTest extends Logging {
)
}
+ @Test
+ def testDescribeShareGroupAllOffsetsRequestGroupAuthorizationFailed(): Unit
= {
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData().setGroups(
+ util.List.of(new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(null))
+ )
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(util.List.of(AuthorizationResult.DENIED))
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ authorizer = Some(authorizer),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ response.data.groups.forEach(
+ group => group.topics.forEach(
+ topic => topic.partitions.forEach(
+ partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
partition.errorCode)
+ )
+ )
+ )
+ }
+
@Test
def testDescribeShareGroupOffsetsRequestSuccess(): Unit = {
val topicName1 = "topic-1"
@@ -11279,6 +11307,453 @@ class KafkaApisTest extends Logging {
assertEquals(describeShareGroupOffsetsResponse, response.data)
}
+ @Test
+ def testDescribeShareGroupOffsetsRequestTopicAuthorizationFailed(): Unit = {
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid
+ val topicName3 = "topic-3"
+ val topicId3 = Uuid.randomUuid
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
+ addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
+
+ val describeShareGroupOffsetsRequestGroup1 = new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(
+ util.List.of(
+ new
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1,
2, 3)),
+ new
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName2).setPartitions(util.List.of(10,
20)),
+ )
+ )
+
+ val describeShareGroupOffsetsRequestGroup2 = new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(
+ util.List.of(
+ new
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName3).setPartitions(util.List.of(0)),
+ )
+ )
+
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData()
+ .setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1,
describeShareGroupOffsetsRequestGroup2))
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build)
+
+ // The group coordinator will only be asked for information about topics
which are authorized
+ val futureGroup1 = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+ when(groupCoordinator.describeShareGroupOffsets(
+ requestChannelRequest.context,
+ new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(
+ util.List.of(
+ new
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1,
2, 3)),
+ )
+ )
+ )).thenReturn(futureGroup1)
+
+ val futureGroup2 = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+ when(groupCoordinator.describeShareGroupOffsets(
+ requestChannelRequest.context,
+ new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(
+ util.List.of(
+ )
+ )
+ )).thenReturn(futureGroup2)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val acls = Map(
+ "group1" -> AuthorizationResult.ALLOWED,
+ "group2" -> AuthorizationResult.ALLOWED,
+ topicName1 -> AuthorizationResult.ALLOWED,
+ topicName2 -> AuthorizationResult.DENIED,
+ topicName3 -> AuthorizationResult.DENIED
+ )
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ authorizer = Some(authorizer)
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ // These are the responses to the KafkaApis request, complete with
authorization errors
+ val describeShareGroupOffsetsResponseGroup1 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group1")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setTopicId(topicId1)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(2)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(3)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ )),
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setTopicId(Uuid.ZERO_UUID)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(10)
+ .setStartOffset(-1)
+ .setLeaderEpoch(0)
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(20)
+ .setStartOffset(-1)
+ .setLeaderEpoch(0)
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setTopicId(Uuid.ZERO_UUID)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setStartOffset(-1)
+ .setLeaderEpoch(0)
+ .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
+ .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData()
+ .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1,
describeShareGroupOffsetsResponseGroup2))
+
+ // And these are the responses to the topics which were authorized
+ val describeShareGroupOffsetsGroupCoordinatorResponseGroup1 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group1")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setTopicId(topicId1)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(2)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(3)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsGroupCoordinatorResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of())
+
+
futureGroup1.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup1)
+
futureGroup2.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup2)
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(describeShareGroupOffsetsResponse, response.data)
+ }
+
+ @Test
+ def testDescribeShareGroupAllOffsetsRequestTopicAuthorizationFailed(): Unit
= {
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid
+ val topicName3 = "topic-3"
+ val topicId3 = Uuid.randomUuid
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
+ addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
+
+ val describeShareGroupOffsetsRequestGroup1 = new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
+
+ val describeShareGroupOffsetsRequestGroup2 = new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
+
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData()
+ .setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1,
describeShareGroupOffsetsRequestGroup2))
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build)
+
+ // The group coordinator is being asked for information about all topics,
not just those which are authorized
+ val futureGroup1 = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+ when(groupCoordinator.describeShareGroupAllOffsets(
+ requestChannelRequest.context,
+ new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
+ )).thenReturn(futureGroup1)
+
+ val futureGroup2 = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+ when(groupCoordinator.describeShareGroupAllOffsets(
+ requestChannelRequest.context,
+ new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
+ )).thenReturn(futureGroup2)
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val acls = Map(
+ "group1" -> AuthorizationResult.ALLOWED,
+ "group2" -> AuthorizationResult.ALLOWED,
+ topicName1 -> AuthorizationResult.ALLOWED,
+ topicName2 -> AuthorizationResult.DENIED,
+ topicName3 -> AuthorizationResult.DENIED
+ )
+ when(authorizer.authorize(
+ any[RequestContext],
+ any[util.List[Action]]
+ )).thenAnswer { invocation =>
+ val actions = invocation.getArgument(1, classOf[util.List[Action]])
+ actions.asScala.map { action =>
+ acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+ }.asJava
+ }
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ authorizer = Some(authorizer)
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ // These are the responses to the KafkaApis request, with unauthorized
topics filtered out
+ val describeShareGroupOffsetsResponseGroup1 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group1")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setTopicId(topicId1)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(2)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(3)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of())
+
+ // And these are the responses from the group coordinator for all topics,
even those which are not authorized
+ val describeShareGroupOffsetsGroupCoordinatorResponseGroup1 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group1")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setTopicId(topicId1)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(2)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(3)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ )),
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setTopicId(topicId2)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(10)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(20)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsGroupCoordinatorResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setTopicId(topicId3)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData()
+ .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1,
describeShareGroupOffsetsResponseGroup2))
+
+
futureGroup1.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup1)
+
futureGroup2.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup2)
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(describeShareGroupOffsetsResponse, response.data)
+ }
+
+ @Test
+ def testDescribeShareGroupAllOffsetsRequestSuccess(): Unit = {
+ val topicName1 = "topic-1"
+ val topicId1 = Uuid.randomUuid
+ val topicName2 = "topic-2"
+ val topicId2 = Uuid.randomUuid
+ val topicName3 = "topic-3"
+ val topicId3 = Uuid.randomUuid
+ metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+ addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
+ addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
+
+ val describeShareGroupOffsetsRequestGroup1 = new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
+
+ val describeShareGroupOffsetsRequestGroup2 = new
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
+
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData()
+ .setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1,
describeShareGroupOffsetsRequestGroup2))
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build)
+
+ val futureGroup1 = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+ when(groupCoordinator.describeShareGroupAllOffsets(
+ requestChannelRequest.context,
+ describeShareGroupOffsetsRequestGroup1
+ )).thenReturn(futureGroup1)
+ val futureGroup2 = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+ when(groupCoordinator.describeShareGroupAllOffsets(
+ requestChannelRequest.context,
+ describeShareGroupOffsetsRequestGroup2
+ )).thenReturn(futureGroup2)
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val describeShareGroupOffsetsResponseGroup1 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group1")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName1)
+ .setTopicId(topicId1)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(2)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(3)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ )),
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setTopicId(topicId2)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(10)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0),
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(20)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setTopicId(topicId3)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData()
+ .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1,
describeShareGroupOffsetsResponseGroup2))
+
+ futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+ futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(describeShareGroupOffsetsResponse, response.data)
+ }
+
@Test
def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
metadataCache = new KRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 061c5d6d4b1..c1072aa8d86 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -297,6 +297,20 @@ public interface GroupCoordinator {
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
request
);
+ /**
+ * Describe all Share Group Offsets for a given group.
+ *
+ * @param context The request context
+ * @param request The DescribeShareGroupOffsetsRequestGroup
request.
+ *
+ * @return A future yielding the results.
+ * The error codes of the response are set to indicate the errors
occurred during the execution.
+ */
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
describeShareGroupAllOffsets(
+ RequestContext context,
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
request
+ );
+
/**
* Commit offsets for a given Group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 0096b21e398..310bb64bde8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1386,13 +1386,16 @@ public class GroupCoordinatorService implements
GroupCoordinator {
).toList()
));
} else {
+ // If the topic does not exist, the start offset is returned
as -1 (uninitialized offset).
+ // This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
+ // It's treated as absence of data, rather than an error,
unlike TOPIC_AUTHORIZATION_ERROR for example.
describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName(topic.topicName())
+ .setTopicId(Uuid.ZERO_UUID)
.setPartitions(topic.partitions().stream().map(
partition -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
).toList()));
}
});
@@ -1408,6 +1411,67 @@ public class GroupCoordinatorService implements
GroupCoordinator {
ReadShareGroupStateSummaryRequestData readSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
.setGroupId(requestData.groupId())
.setTopics(readStateSummaryData);
+
+ return readShareGroupStateSummary(readSummaryRequestData,
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+ }
+
+ /**
+ * See {@link
GroupCoordinator#describeShareGroupAllOffsets(RequestContext,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
+ */
+ @Override
+ public
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
describeShareGroupAllOffsets(
+ RequestContext context,
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData
+ ) {
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(),
Errors.COORDINATOR_NOT_AVAILABLE));
+ }
+
+ if (metadataImage == null) {
+ return CompletableFuture.completedFuture(
+
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(),
Errors.COORDINATOR_NOT_AVAILABLE));
+ }
+
+ return runtime.scheduleReadOperation(
+ "share-group-initialized-partitions",
+ topicPartitionFor(requestData.groupId()),
+ (coordinator, offset) ->
coordinator.initializedShareGroupPartitions(requestData.groupId())
+ ).thenCompose(topicPartitionMap -> {
+ Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList = new
ArrayList<>(topicPartitionMap.size());
+ ReadShareGroupStateSummaryRequestData readSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId(requestData.groupId());
+ topicPartitionMap.forEach((topicId, partitionSet) -> {
+ String topicName =
metadataImage.topics().topicIdToNameView().get(topicId);
+ if (topicName != null) {
+ requestTopicIdToNameMapping.put(topicId, topicName);
+ readSummaryRequestData.topics().add(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(topicId)
+ .setPartitions(
+ partitionSet.stream().map(
+ partitionIndex -> new
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
+ ).toList()
+ ));
+ }
+ });
+ return readShareGroupStateSummary(readSummaryRequestData,
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+ });
+ }
+
+ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
readShareGroupStateSummary(
+ ReadShareGroupStateSummaryRequestData readSummaryRequestData,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList
+ ) {
+ // If the request for the persister is empty, just complete the
operation right away.
+ if (readSummaryRequestData.topics().isEmpty()) {
+ return CompletableFuture.completedFuture(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(readSummaryRequestData.groupId())
+ .setTopics(describeShareGroupOffsetsResponseTopicList));
+ }
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future = new CompletableFuture<>();
persister.readSummary(ReadShareGroupStateSummaryParameters.from(readSummaryRequestData))
.whenComplete((result, error) -> {
@@ -1421,6 +1485,10 @@ public class GroupCoordinatorService implements
GroupCoordinator {
future.completeExceptionally(new
IllegalStateException("Result is null for the read state summary"));
return;
}
+
+ // Return -1 (uninitialized offset) for the situation where
the persister returned an error.
+ // This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
+ // It's treated as absence of data, rather than an error.
result.topicsData().forEach(topicData ->
describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
@@ -1428,15 +1496,13 @@ public class GroupCoordinatorService implements
GroupCoordinator {
.setPartitions(topicData.partitions().stream().map(
partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
- .setStartOffset(partitionData.startOffset())
-
.setErrorMessage(Errors.forCode(partitionData.errorCode()).message())
- .setErrorCode(partitionData.errorCode())
+ .setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
).toList())
));
future.complete(
new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
- .setGroupId(requestData.groupId())
+ .setGroupId(readSummaryRequestData.groupId())
.setTopics(describeShareGroupOffsetsResponseTopicList));
});
return future;
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 3224616f9a9..ded14600dab 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -400,7 +400,7 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
* @param context The request context.
* @param request The actual StreamsGroupHeartbeat request.
*
- * @return A result containing the StreamsGroupHeartbeat response, a list
of internal topics to be created and
+ * @return A Result containing the StreamsGroupHeartbeat response, a list
of internal topics to be created and
* a list of records to update the state machine.
*/
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
@@ -466,6 +466,19 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return
groupMetadataManager.reconcileShareGroupStateInitializingState(offset);
}
+ /**
+ * Returns the set of share-partitions whose share-group state has been
initialized in the persister.
+ *
+ * @param groupId The group id corresponding to the share group whose
share partitions have been initialized.
+ *
+ * @return A map representing the initialized share-partitions for the
share group.
+ */
+ public Map<Uuid, Set<Integer>> initializedShareGroupPartitions(
+ String groupId
+ ) {
+ return groupMetadataManager.initializedShareGroupPartitions(groupId);
+ }
+
/**
* Handles a JoinGroup request.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 62f227e50dc..8ec6348ed6b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3037,7 +3037,7 @@ public class GroupMetadataManager {
}
return member;
}
-
+
/**
* Gets or subscribes a static consumer group member. This method also
replaces the
* previous static member if allowed.
@@ -4115,7 +4115,8 @@ public class GroupMetadataManager {
return shareGroupFenceMember(group, member, response);
}
- /** Fences a member from a consumer group and maybe downgrade the consumer
group to a classic group.
+ /**
+ * Fences a member from a consumer group and maybe downgrade the consumer
group to a classic group.
*
* @param group The group.
* @param member The member.
@@ -5028,7 +5029,7 @@ public class GroupMetadataManager {
}
private Map<Uuid, Map.Entry<String, Set<Integer>>>
attachTopicName(Map<Uuid, Set<Integer>> initMap) {
- TopicsImage topicsImage = metadataImage.topics();
+ TopicsImage topicsImage = metadataImage.topics();
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
for (Map.Entry<Uuid, Set<Integer>> entry : initMap.entrySet()) {
Uuid topicId = entry.getKey();
@@ -5038,6 +5039,28 @@ public class GroupMetadataManager {
return Collections.unmodifiableMap(finalMap);
}
+ /**
+ * Returns the set of share partitions whose state has been initialized.
+ *
+ * @param groupId The group id corresponding to the share group whose
share partitions have been initialized.
+ *
+ * @return A map representing the initialized share-partitions for the
share group.
+ */
+ public Map<Uuid, Set<Integer>> initializedShareGroupPartitions(
+ String groupId
+ ) {
+ Map<Uuid, Set<Integer>> resultMap = new HashMap<>();
+
+ ShareGroupStatePartitionMetadataInfo currentMap =
shareGroupPartitionMetadata.get(groupId);
+ if (currentMap != null) {
+ currentMap.initializedTopics().forEach((topicId, partitions) -> {
+ resultMap.put(topicId, new HashSet<>(partitions));
+ });
+ }
+
+ return resultMap;
+ }
+
/**
* Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
* the consumer group. It updates the subscription part of the member or
@@ -8289,7 +8312,7 @@ public class GroupMetadataManager {
private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
return Map.of("group.streams.num.standby.replicas", "0");
}
-
+
/**
* Generate a classic group heartbeat key for the timer.
*
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 2096ca4bf5e..26e3cbd9469 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -122,6 +122,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -2845,9 +2846,7 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
-
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
- .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
-
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE))))
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
);
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
@@ -2890,9 +2889,7 @@ public class GroupCoordinatorServiceTest {
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
- .setStartOffset(21)
- .setErrorCode(Errors.NONE.code())
- .setErrorMessage(Errors.NONE.message()))))
+ .setStartOffset(21))))
);
ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
@@ -2902,9 +2899,7 @@ public class GroupCoordinatorServiceTest {
.setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(21)
- .setStateEpoch(1)
- .setErrorCode(Errors.NONE.code())
- .setErrorMessage(Errors.NONE.message())))
+ .setStateEpoch(1)))
)
);
@@ -2944,10 +2939,10 @@ public class GroupCoordinatorServiceTest {
.setTopics(
List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
.setTopicName("badtopic")
+ .setTopicId(Uuid.ZERO_UUID)
.setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
);
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
@@ -3097,6 +3092,196 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
+ @Test
+ public void testDescribeShareGroupAllOffsets() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+ .build();
+
+ service.onNewMetadataImage(image, null);
+
+ int partition = 1;
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-initialized-partitions"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID,
Set.of(partition))));
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("share-group-id")
+ .setTopics(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(21))))
+ );
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setStateEpoch(1)))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupAllOffsetsThrowsError() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+ .build();
+
+ service.onNewMetadataImage(image, null);
+
+ int partition = 1;
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-initialized-partitions"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID,
Set.of(partition))));
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.failedFuture(new Exception("Unable
to validate read state summary request")));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(Exception.class, future, "Unable to validate read
state summary request");
+ }
+
+ @Test
+ public void testDescribeShareGroupAllOffsetsNullResult() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+ .build();
+
+ service.onNewMetadataImage(image, null);
+
+ int partition = 1;
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-initialized-partitions"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID,
Set.of(partition))));
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(IllegalStateException.class, future, "Result is
null for the read state summary");
+ }
+
+ @Test
+ public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("share-group-id")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupAllOffsetsMetadataImageNull() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+
+ // Forcing a null Metadata Image
+ service.onNewMetadataImage(null, null);
+
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData = new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId("share-group-id")
+ .setTopics(null);
+
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseData = new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("share-group-id")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
future =
+
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
@Test
public void testPersisterInitializeSuccess() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2a0df23c1ab..868511217f6 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20579,6 +20579,8 @@ public class GroupMetadataManagerTest {
2,
true
);
+
+ assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)),
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
}
@Test
@@ -20713,6 +20715,8 @@ public class GroupMetadataManagerTest {
assertNull(result.response());
assertEquals(List.of(), result.records());
+
+ assertEquals(Map.of(),
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
}
@Test
@@ -20788,6 +20792,8 @@ public class GroupMetadataManagerTest {
t3Name, new TopicMetadata(t3Id, t3Name, 3)
))
);
+
+ assertEquals(Map.of(t2Id, Set.of(0, 1, 2)),
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
}
@Test
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 075d0a5b282..dcebff0d3ea 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -315,12 +315,7 @@ public class ShareGroupCommand {
TreeMap<String, Entry<ShareGroupDescription,
Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
shareGroups.forEach((groupId, shareGroup) -> {
- Set<TopicPartition> allTp = new HashSet<>();
- for (ShareMemberDescription memberDescription :
shareGroup.members()) {
-
allTp.addAll(memberDescription.assignment().topicPartitions());
- }
-
- ListShareGroupOffsetsSpec offsetsSpec = new
ListShareGroupOffsetsSpec().topicPartitions(allTp);
+ ListShareGroupOffsetsSpec offsetsSpec = new
ListShareGroupOffsetsSpec();
Map<String, ListShareGroupOffsetsSpec> groupSpecs = new
HashMap<>();
groupSpecs.put(groupId, offsetsSpec);
@@ -349,37 +344,34 @@ public class ShareGroupCommand {
private void printOffsets(TreeMap<String, Entry<ShareGroupDescription,
Collection<SharePartitionOffsetInformation>>> offsets, boolean verbose) {
offsets.forEach((groupId, tuple) -> {
- ShareGroupDescription description = tuple.getKey();
Collection<SharePartitionOffsetInformation> offsetsInfo =
tuple.getValue();
- if (maybePrintEmptyGroupState(groupId,
description.groupState(), offsetsInfo.size())) {
- String fmt = printOffsetFormat(groupId, offsetsInfo,
verbose);
+ String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
+
+ if (verbose) {
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "START-OFFSET");
+ } else {
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"START-OFFSET");
+ }
+ for (SharePartitionOffsetInformation info : offsetsInfo) {
if (verbose) {
- System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "START-OFFSET");
+ System.out.printf(fmt,
+ groupId,
+ info.topic,
+ info.partition,
+ MISSING_COLUMN_VALUE, // Temporary
+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+ );
} else {
- System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"START-OFFSET");
- }
-
- for (SharePartitionOffsetInformation info : offsetsInfo) {
- if (verbose) {
- System.out.printf(fmt,
- groupId,
- info.topic,
- info.partition,
- MISSING_COLUMN_VALUE, // Temporary
-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
- );
- } else {
- System.out.printf(fmt,
- groupId,
- info.topic,
- info.partition,
-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
- );
- }
+ System.out.printf(fmt,
+ groupId,
+ info.topic,
+ info.partition,
+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+ );
}
- System.out.println();
}
+ System.out.println();
});
}