This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d4d9f118165 KAFKA-18761: [2/N] List share group offsets with state and 
auth (#19328)
d4d9f118165 is described below

commit d4d9f118165ea22494cd6128050ed733e33d5871
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Apr 4 13:25:19 2025 +0100

    KAFKA-18761: [2/N] List share group offsets with state and auth (#19328)
    
    This PR approaches completion of Admin.listShareGroupOffsets() and
    kafka-share-groups.sh --describe --offsets.
    
    Prior to this patch, kafka-share-groups.sh was only able to describe the
    offsets for partitions which were assigned to active members. Now, the
    Admin.listShareGroupOffsets() uses the persister's knowledge of the
    share-partitions which have initialised state. Then, it uses this list
    to obtain a complete set of offset information.
    
    The PR also implements the topic-based authorisation checking. If
    Admin.listShareGroupOffsets() is called with a list of topic-partitions
    specified, the authz checking is performed on the supplied list,
    returning errors for any topics to which the client is not authorised.
    If Admin.listShareGroupOffsets() is called without a list of
    topic-partitions specified, the list of topics is discovered from the
    persister as described above, and then the response is filtered down to
    only show the topics to which the client is authorised. This is
    consistent with other similar RPCs in the Kafka protocol, such as
    OffsetFetch.
    
    Reviewers: David Arthur <[email protected]>, Sushant Mahajan 
<[email protected]>, Apoorv Mittal <[email protected]>
---
 .../clients/admin/ListShareGroupOffsetsSpec.java   |   4 +-
 .../internals/ListShareGroupOffsetsHandler.java    |  29 +-
 .../message/DescribeShareGroupOffsetsRequest.json  |   4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  69 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 477 ++++++++++++++++++++-
 .../kafka/coordinator/group/GroupCoordinator.java  |  14 +
 .../coordinator/group/GroupCoordinatorService.java |  78 +++-
 .../coordinator/group/GroupCoordinatorShard.java   |  15 +-
 .../coordinator/group/GroupMetadataManager.java    |  31 +-
 .../group/GroupCoordinatorServiceTest.java         | 207 ++++++++-
 .../group/GroupMetadataManagerTest.java            |   6 +
 .../tools/consumer/group/ShareGroupCommand.java    |  54 +--
 13 files changed, 915 insertions(+), 77 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
index 18ba3227346..d8144a0ce43 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -47,9 +46,10 @@ public class ListShareGroupOffsetsSpec {
 
     /**
      * Returns the topic partitions whose offsets are to be listed for a share 
group.
+     * {@code null} indicates that offsets of all partitions of the group are 
to be listed.
      */
     public Collection<TopicPartition> topicPartitions() {
-        return topicPartitions == null ? List.of() : topicPartitions;
+        return topicPartitions;
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index be46b341333..3f523b93833 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -37,7 +37,6 @@ import org.slf4j.Logger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -84,19 +83,23 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
             DescribeShareGroupOffsetsRequestGroup requestGroup = new 
DescribeShareGroupOffsetsRequestGroup()
                 .setGroupId(groupId);
 
-            Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
-            spec.topicPartitions().forEach(tp -> 
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new 
LinkedList<>()).add(tp.partition()));
-
-            Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics = 
new HashMap<>();
-            for (TopicPartition tp : spec.topicPartitions()) {
-                requestTopics.computeIfAbsent(tp.topic(), t ->
-                        new DescribeShareGroupOffsetsRequestTopic()
-                            .setTopicName(tp.topic())
-                            .setPartitions(new LinkedList<>()))
-                    .partitions()
-                    .add(tp.partition());
+            if (spec.topicPartitions() != null) {
+                Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+                spec.topicPartitions().forEach(tp -> 
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new 
ArrayList<>()).add(tp.partition()));
+
+                Map<String, DescribeShareGroupOffsetsRequestTopic> 
requestTopics = new HashMap<>();
+                for (TopicPartition tp : spec.topicPartitions()) {
+                    requestTopics.computeIfAbsent(tp.topic(), t ->
+                            new DescribeShareGroupOffsetsRequestTopic()
+                                .setTopicName(tp.topic())
+                                .setPartitions(new ArrayList<>()))
+                        .partitions()
+                        .add(tp.partition());
+                }
+                requestGroup.setTopics(new 
ArrayList<>(requestTopics.values()));
+            } else {
+                requestGroup.setTopics(null);
             }
-            requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
             groups.add(requestGroup);
         });
         DescribeShareGroupOffsetsRequestData data = new 
DescribeShareGroupOffsetsRequestData()
diff --git 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
index a9b7a0200e6..e9093296bb6 100644
--- 
a/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
+++ 
b/clients/src/main/resources/common/message/DescribeShareGroupOffsetsRequest.json
@@ -26,8 +26,8 @@
       "about": "The groups to describe offsets for.", "fields": [
       { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
         "about": "The group identifier." },
-      { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", 
"versions": "0+",
-        "about": "The topics to describe offsets for.", "fields": [
+      { "name": "Topics", "type": "[]DescribeShareGroupOffsetsRequestTopic", 
"versions": "0+", "nullableVersions": "0+",
+        "about": "The topics to describe offsets for, or null for all 
topic-partitions.", "fields": [
         { "name": "TopicName", "type": "string", "versions": "0+", 
"entityType": "topicName",
           "about": "The topic name." },
         { "name": "Partitions", "type": "[]int32", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ac309e1c4d9..a478092cfc6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -10559,9 +10559,7 @@ public class KafkaAdminClientTest {
             TopicPartition myTopicPartition4 = new 
TopicPartition("my_topic_1", 4);
             TopicPartition myTopicPartition5 = new 
TopicPartition("my_topic_2", 6);
 
-            ListShareGroupOffsetsSpec groupSpec = new 
ListShareGroupOffsetsSpec().topicPartitions(
-                List.of(myTopicPartition0, myTopicPartition1, 
myTopicPartition2, myTopicPartition3, myTopicPartition4, myTopicPartition5)
-            );
+            ListShareGroupOffsetsSpec groupSpec = new 
ListShareGroupOffsetsSpec();
             Map<String, ListShareGroupOffsetsSpec> groupSpecs = new 
HashMap<>();
             groupSpecs.put(GROUP_ID, groupSpec);
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9f416aa0e88..966607f11e1 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3508,6 +3508,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val futures = new 
mutable.ArrayBuffer[CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]](groups.size)
     groups.forEach { groupDescribeOffsets =>
+      val isAllPartitions = groupDescribeOffsets.topics == null
       if (!isShareGroupProtocolEnabled) {
         futures += CompletableFuture.completedFuture(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
           .setGroupId(groupDescribeOffsets.groupId)
@@ -3516,6 +3517,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         futures += CompletableFuture.completedFuture(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
           .setGroupId(groupDescribeOffsets.groupId)
           .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+      } else if (isAllPartitions) {
+        futures += describeShareGroupAllOffsetsForGroup(
+          request.context,
+          groupDescribeOffsets
+        )
       } else if (groupDescribeOffsets.topics.isEmpty) {
         futures += CompletableFuture.completedFuture(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
           .setGroupId(groupDescribeOffsets.groupId))
@@ -3535,19 +3541,76 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
+  private def describeShareGroupAllOffsetsForGroup(requestContext: 
RequestContext,
     groupDescribeOffsetsRequest: 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
   ): 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
 = {
-    groupCoordinator.describeShareGroupOffsets(
+    groupCoordinator.describeShareGroupAllOffsets(
       requestContext,
       groupDescribeOffsetsRequest
     
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
 { (groupDescribeOffsetsResponse, exception) =>
       if (exception != null) {
+        val error = Errors.forException(exception)
         new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
           .setGroupId(groupDescribeOffsetsRequest.groupId)
-          .setErrorCode(Errors.forException(exception).code)
+          .setErrorCode(error.code)
+          .setErrorMessage(error.message)
       } else {
+        // Clients are not allowed to see offsets for topics that are not 
authorized for Describe.
+        val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized(
+          requestContext,
+          DESCRIBE,
+          TOPIC,
+          groupDescribeOffsetsResponse.topics.asScala
+        )(_.topicName)
+        groupDescribeOffsetsResponse.setTopics(authorizedOffsets.asJava)
+      }
+    }
+  }
+
+  private def describeShareGroupOffsetsForGroup(requestContext: RequestContext,
+    groupDescribeOffsetsRequest: 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
+  ): 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
 = {
+    // Clients are not allowed to see offsets for topics that are not 
authorized for Describe.
+    val (authorizedTopics, unauthorizedTopics) = 
authHelper.partitionSeqByAuthorized(
+      requestContext,
+      DESCRIBE,
+      TOPIC,
+      groupDescribeOffsetsRequest.topics.asScala
+    )(_.topicName)
+
+    groupCoordinator.describeShareGroupOffsets(
+      requestContext,
+      new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+        .setGroupId(groupDescribeOffsetsRequest.groupId)
+        .setTopics(authorizedTopics.asJava)
+    
).handle[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
 { (groupDescribeOffsetsResponse, exception) =>
+      if (exception != null) {
+        val error = Errors.forException(exception)
+        new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+          .setGroupId(groupDescribeOffsetsRequest.groupId)
+          .setErrorCode(error.code)
+          .setErrorMessage(error.message)
+      } else if (groupDescribeOffsetsResponse.errorCode() != Errors.NONE.code) 
{
         groupDescribeOffsetsResponse
+      } else {
+        val topics = new 
util.ArrayList[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic](
+          groupDescribeOffsetsResponse.topics.size + unauthorizedTopics.size
+        )
+        topics.addAll(groupDescribeOffsetsResponse.topics)
+        unauthorizedTopics.foreach { topic =>
+          val topicResponse = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+            .setTopicName(topic.topicName)
+            .setTopicId(Uuid.ZERO_UUID)
+          topic.partitions().forEach { partitionIndex =>
+            topicResponse.partitions.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(partitionIndex)
+              .setStartOffset(-1)
+              .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+              .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message))
+          }
+          topics.add(topicResponse)
+        }
+        groupDescribeOffsetsResponse.setTopics(topics)
       }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index a98b67fd912..cf400517c25 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11134,7 +11134,7 @@ class KafkaApisTest extends Logging {
   }
 
   @Test
-  def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = {
+  def testDescribeShareGroupOffsetsRequestGroupAuthorizationFailed(): Unit = {
     val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData().setGroups(
       util.List.of(new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(
         util.List.of(new 
DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
@@ -11163,6 +11163,34 @@ class KafkaApisTest extends Logging {
     )
   }
 
+  @Test
+  def testDescribeShareGroupAllOffsetsRequestGroupAuthorizationFailed(): Unit 
= {
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData().setGroups(
+      util.List.of(new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group").setTopics(null))
+    )
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+      .thenReturn(util.List.of(AuthorizationResult.DENIED))
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+      authorizer = Some(authorizer),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    response.data.groups.forEach(
+      group => group.topics.forEach(
+        topic => topic.partitions.forEach(
+          partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
partition.errorCode)
+        )
+      )
+    )
+  }
+
   @Test
   def testDescribeShareGroupOffsetsRequestSuccess(): Unit = {
     val topicName1 = "topic-1"
@@ -11279,6 +11307,453 @@ class KafkaApisTest extends Logging {
     assertEquals(describeShareGroupOffsetsResponse, response.data)
   }
 
+  @Test
+  def testDescribeShareGroupOffsetsRequestTopicAuthorizationFailed(): Unit = {
+    val topicName1 = "topic-1"
+    val topicId1 = Uuid.randomUuid
+    val topicName2 = "topic-2"
+    val topicId2 = Uuid.randomUuid
+    val topicName3 = "topic-3"
+    val topicId3 = Uuid.randomUuid
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
+    addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
+
+    val describeShareGroupOffsetsRequestGroup1 = new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(
+      util.List.of(
+        new 
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1,
 2, 3)),
+        new 
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName2).setPartitions(util.List.of(10,
 20)),
+      )
+    )
+
+    val describeShareGroupOffsetsRequestGroup2 = new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(
+      util.List.of(
+        new 
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName3).setPartitions(util.List.of(0)),
+      )
+    )
+
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData()
+      .setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1, 
describeShareGroupOffsetsRequestGroup2))
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build)
+
+    // The group coordinator will only be asked for information about topics 
which are authorized
+    val futureGroup1 = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+    when(groupCoordinator.describeShareGroupOffsets(
+      requestChannelRequest.context,
+      new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(
+        util.List.of(
+          new 
DescribeShareGroupOffsetsRequestTopic().setTopicName(topicName1).setPartitions(util.List.of(1,
 2, 3)),
+        )
+      )
+    )).thenReturn(futureGroup1)
+
+    val futureGroup2 = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+    when(groupCoordinator.describeShareGroupOffsets(
+      requestChannelRequest.context,
+      new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(
+        util.List.of(
+        )
+      )
+    )).thenReturn(futureGroup2)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val acls = Map(
+      "group1" -> AuthorizationResult.ALLOWED,
+      "group2" -> AuthorizationResult.ALLOWED,
+      topicName1 -> AuthorizationResult.ALLOWED,
+      topicName2 -> AuthorizationResult.DENIED,
+      topicName3 -> AuthorizationResult.DENIED
+    )
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+      authorizer = Some(authorizer)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    // These are the responses to the KafkaApis request, complete with 
authorization errors
+    val describeShareGroupOffsetsResponseGroup1 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group1")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName1)
+          .setTopicId(topicId1)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(2)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(3)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          )),
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName2)
+          .setTopicId(Uuid.ZERO_UUID)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(10)
+              .setStartOffset(-1)
+              .setLeaderEpoch(0)
+              .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
+              .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(20)
+              .setStartOffset(-1)
+              .setLeaderEpoch(0)
+              .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
+              .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName3)
+          .setTopicId(Uuid.ZERO_UUID)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setStartOffset(-1)
+              .setLeaderEpoch(0)
+              .setErrorMessage(Errors.TOPIC_AUTHORIZATION_FAILED.message)
+              .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData()
+      .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, 
describeShareGroupOffsetsResponseGroup2))
+
+    // And these are the responses to the topics which were authorized
+    val describeShareGroupOffsetsGroupCoordinatorResponseGroup1 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group1")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName1)
+          .setTopicId(topicId1)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(2)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(3)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsGroupCoordinatorResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of())
+
+    
futureGroup1.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup1)
+    
futureGroup2.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup2)
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(describeShareGroupOffsetsResponse, response.data)
+  }
+
+  @Test
+  def testDescribeShareGroupAllOffsetsRequestTopicAuthorizationFailed(): Unit 
= {
+    val topicName1 = "topic-1"
+    val topicId1 = Uuid.randomUuid
+    val topicName2 = "topic-2"
+    val topicId2 = Uuid.randomUuid
+    val topicName3 = "topic-3"
+    val topicId3 = Uuid.randomUuid
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
+    addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
+
+    val describeShareGroupOffsetsRequestGroup1 = new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
+
+    val describeShareGroupOffsetsRequestGroup2 = new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
+
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData()
+      .setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1, 
describeShareGroupOffsetsRequestGroup2))
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build)
+
+    // The group coordinator is being asked for information about all topics, 
not just those which are authorized
+    val futureGroup1 = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+    when(groupCoordinator.describeShareGroupAllOffsets(
+      requestChannelRequest.context,
+      new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
+    )).thenReturn(futureGroup1)
+
+    val futureGroup2 = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+    when(groupCoordinator.describeShareGroupAllOffsets(
+      requestChannelRequest.context,
+      new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
+    )).thenReturn(futureGroup2)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val acls = Map(
+      "group1" -> AuthorizationResult.ALLOWED,
+      "group2" -> AuthorizationResult.ALLOWED,
+      topicName1 -> AuthorizationResult.ALLOWED,
+      topicName2 -> AuthorizationResult.DENIED,
+      topicName3 -> AuthorizationResult.DENIED
+    )
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+      authorizer = Some(authorizer)
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    // These are the responses to the KafkaApis request, with unauthorized 
topics filtered out
+    val describeShareGroupOffsetsResponseGroup1 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group1")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName1)
+          .setTopicId(topicId1)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(2)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(3)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of())
+
+    // And these are the responses from the group coordinator for all topics, 
even those which are not authorized
+    val describeShareGroupOffsetsGroupCoordinatorResponseGroup1 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group1")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName1)
+          .setTopicId(topicId1)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(2)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(3)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          )),
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName2)
+          .setTopicId(topicId2)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(10)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(20)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsGroupCoordinatorResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName3)
+          .setTopicId(topicId3)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData()
+      .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, 
describeShareGroupOffsetsResponseGroup2))
+
+    
futureGroup1.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup1)
+    
futureGroup2.complete(describeShareGroupOffsetsGroupCoordinatorResponseGroup2)
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(describeShareGroupOffsetsResponse, response.data)
+  }
+
+  @Test
+  def testDescribeShareGroupAllOffsetsRequestSuccess(): Unit = {
+    val topicName1 = "topic-1"
+    val topicId1 = Uuid.randomUuid
+    val topicName2 = "topic-2"
+    val topicId2 = Uuid.randomUuid
+    val topicName3 = "topic-3"
+    val topicId3 = Uuid.randomUuid
+    metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName1, 1, topicId = topicId1)
+    addTopicToMetadataCache(topicName2, 1, topicId = topicId2)
+    addTopicToMetadataCache(topicName3, 1, topicId = topicId3)
+
+    val describeShareGroupOffsetsRequestGroup1 = new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group1").setTopics(null)
+
+    val describeShareGroupOffsetsRequestGroup2 = new 
DescribeShareGroupOffsetsRequestGroup().setGroupId("group2").setTopics(null)
+
+    val describeShareGroupOffsetsRequest = new 
DescribeShareGroupOffsetsRequestData()
+      .setGroups(util.List.of(describeShareGroupOffsetsRequestGroup1, 
describeShareGroupOffsetsRequestGroup2))
+
+    val requestChannelRequest = buildRequest(new 
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest, 
true).build)
+
+    val futureGroup1 = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+    when(groupCoordinator.describeShareGroupAllOffsets(
+      requestChannelRequest.context,
+      describeShareGroupOffsetsRequestGroup1
+    )).thenReturn(futureGroup1)
+    val futureGroup2 = new 
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
+    when(groupCoordinator.describeShareGroupAllOffsets(
+      requestChannelRequest.context,
+      describeShareGroupOffsetsRequestGroup2
+    )).thenReturn(futureGroup2)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> 
"true"),
+    )
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val describeShareGroupOffsetsResponseGroup1 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group1")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName1)
+          .setTopicId(topicId1)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(1)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(2)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(3)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          )),
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName2)
+          .setTopicId(topicId2)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(10)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0),
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(20)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponseGroup2 = new 
DescribeShareGroupOffsetsResponseGroup()
+      .setGroupId("group2")
+      .setTopics(util.List.of(
+        new DescribeShareGroupOffsetsResponseTopic()
+          .setTopicName(topicName3)
+          .setTopicId(topicId3)
+          .setPartitions(util.List.of(
+            new DescribeShareGroupOffsetsResponsePartition()
+              .setPartitionIndex(0)
+              .setStartOffset(0)
+              .setLeaderEpoch(1)
+              .setErrorMessage(null)
+              .setErrorCode(0)
+          ))
+      ))
+
+    val describeShareGroupOffsetsResponse = new 
DescribeShareGroupOffsetsResponseData()
+      .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1, 
describeShareGroupOffsetsResponseGroup2))
+
+    futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+    futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+    val response = 
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+    assertEquals(describeShareGroupOffsetsResponse, response.data)
+  }
+
   @Test
   def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
     metadataCache = new KRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
index 061c5d6d4b1..c1072aa8d86 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
@@ -297,6 +297,20 @@ public interface GroupCoordinator {
         
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
request
     );
 
+    /**
+     * Describe all Share Group Offsets for a given group.
+     *
+     * @param context           The request context
+     * @param request           The DescribeShareGroupOffsetsRequestGroup 
request.
+     *
+     * @return  A future yielding the results.
+     *          The error codes of the response are set to indicate the errors 
occurred during the execution.
+     */
+    
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 describeShareGroupAllOffsets(
+        RequestContext context,
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
request
+    );
+
     /**
      * Commit offsets for a given Group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 0096b21e398..310bb64bde8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1386,13 +1386,16 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                         ).toList()
                     ));
             } else {
+                // If the topic does not exist, the start offset is returned 
as -1 (uninitialized offset).
+                // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
+                // It's treated as absence of data, rather than an error, 
unlike TOPIC_AUTHORIZATION_ERROR for example.
                 describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
                     .setTopicName(topic.topicName())
+                    .setTopicId(Uuid.ZERO_UUID)
                     .setPartitions(topic.partitions().stream().map(
                         partition -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                             .setPartitionIndex(partition)
-                            
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-                            
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+                            
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
                     ).toList()));
             }
         });
@@ -1408,6 +1411,67 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         ReadShareGroupStateSummaryRequestData readSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
             .setGroupId(requestData.groupId())
             .setTopics(readStateSummaryData);
+
+        return readShareGroupStateSummary(readSummaryRequestData, 
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+    }
+
+    /**
+     * See {@link 
GroupCoordinator#describeShareGroupAllOffsets(RequestContext, 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
+     */
+    @Override
+    public 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 describeShareGroupAllOffsets(
+        RequestContext context,
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData
+    ) {
+        if (!isActive.get()) {
+            return CompletableFuture.completedFuture(
+                
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+        }
+
+        if (metadataImage == null) {
+            return CompletableFuture.completedFuture(
+                
DescribeShareGroupOffsetsRequest.getErrorDescribedGroup(requestData.groupId(), 
Errors.COORDINATOR_NOT_AVAILABLE));
+        }
+
+        return runtime.scheduleReadOperation(
+            "share-group-initialized-partitions",
+            topicPartitionFor(requestData.groupId()),
+            (coordinator, offset) -> 
coordinator.initializedShareGroupPartitions(requestData.groupId())
+        ).thenCompose(topicPartitionMap -> {
+            Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
+            
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList = new 
ArrayList<>(topicPartitionMap.size());
+            ReadShareGroupStateSummaryRequestData readSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+                .setGroupId(requestData.groupId());
+            topicPartitionMap.forEach((topicId, partitionSet) -> {
+                String topicName = 
metadataImage.topics().topicIdToNameView().get(topicId);
+                if (topicName != null) {
+                    requestTopicIdToNameMapping.put(topicId, topicName);
+                    readSummaryRequestData.topics().add(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                        .setTopicId(topicId)
+                        .setPartitions(
+                            partitionSet.stream().map(
+                                partitionIndex -> new 
ReadShareGroupStateSummaryRequestData.PartitionData().setPartition(partitionIndex)
+                            ).toList()
+                        ));
+                }
+            });
+            return readShareGroupStateSummary(readSummaryRequestData, 
requestTopicIdToNameMapping, describeShareGroupOffsetsResponseTopicList);
+        });
+    }
+
+    private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 readShareGroupStateSummary(
+        ReadShareGroupStateSummaryRequestData readSummaryRequestData,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList
+    ) {
+        // If the request for the persister is empty, just complete the 
operation right away.
+        if (readSummaryRequestData.topics().isEmpty()) {
+            return CompletableFuture.completedFuture(
+                new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+                    .setGroupId(readSummaryRequestData.groupId())
+                    .setTopics(describeShareGroupOffsetsResponseTopicList));
+        }
+
         
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future = new CompletableFuture<>();
         
persister.readSummary(ReadShareGroupStateSummaryParameters.from(readSummaryRequestData))
             .whenComplete((result, error) -> {
@@ -1421,6 +1485,10 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                     future.completeExceptionally(new 
IllegalStateException("Result is null for the read state summary"));
                     return;
                 }
+
+                // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
+                // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
+                // It's treated as absence of data, rather than an error.
                 result.topicsData().forEach(topicData ->
                     describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
                         .setTopicId(topicData.topicId())
@@ -1428,15 +1496,13 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                         .setPartitions(topicData.partitions().stream().map(
                             partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                                 .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.startOffset())
-                                
.setErrorMessage(Errors.forCode(partitionData.errorCode()).message())
-                                .setErrorCode(partitionData.errorCode())
+                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
                         ).toList())
                     ));
 
                 future.complete(
                     new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
-                        .setGroupId(requestData.groupId())
+                        .setGroupId(readSummaryRequestData.groupId())
                         
.setTopics(describeShareGroupOffsetsResponseTopicList));
             });
         return future;
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 3224616f9a9..ded14600dab 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -400,7 +400,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
      * @param context The request context.
      * @param request The actual StreamsGroupHeartbeat request.
      *
-     * @return A result containing the StreamsGroupHeartbeat response, a list 
of internal topics to be created and
+     * @return A Result containing the StreamsGroupHeartbeat response, a list 
of internal topics to be created and
      *         a list of records to update the state machine.
      */
     public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
streamsGroupHeartbeat(
@@ -466,6 +466,19 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return 
groupMetadataManager.reconcileShareGroupStateInitializingState(offset);
     }
 
+    /**
+     * Returns the set of share-partitions whose share-group state has been 
initialized in the persister.
+     *
+     * @param groupId The group id corresponding to the share group whose 
share partitions have been initialized.
+     *
+     * @return A map representing the initialized share-partitions for the 
share group.
+     */
+    public Map<Uuid, Set<Integer>> initializedShareGroupPartitions(
+        String groupId
+    ) {
+        return groupMetadataManager.initializedShareGroupPartitions(groupId);
+    }
+
     /**
      * Handles a JoinGroup request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 62f227e50dc..8ec6348ed6b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3037,7 +3037,7 @@ public class GroupMetadataManager {
         }
         return member;
     }
-    
+
     /**
      * Gets or subscribes a static consumer group member. This method also 
replaces the
      * previous static member if allowed.
@@ -4115,7 +4115,8 @@ public class GroupMetadataManager {
         return shareGroupFenceMember(group, member, response);
     }
 
-    /** Fences a member from a consumer group and maybe downgrade the consumer 
group to a classic group.
+    /**
+      * Fences a member from a consumer group and maybe downgrade the consumer 
group to a classic group.
      *
      * @param group     The group.
      * @param member    The member.
@@ -5028,7 +5029,7 @@ public class GroupMetadataManager {
     }
 
     private Map<Uuid, Map.Entry<String, Set<Integer>>> 
attachTopicName(Map<Uuid, Set<Integer>> initMap) {
-        TopicsImage topicsImage = metadataImage.topics(); 
+        TopicsImage topicsImage = metadataImage.topics();
         Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
         for (Map.Entry<Uuid, Set<Integer>> entry : initMap.entrySet()) {
             Uuid topicId = entry.getKey();
@@ -5038,6 +5039,28 @@ public class GroupMetadataManager {
         return Collections.unmodifiableMap(finalMap);
     }
 
+    /**
+     * Returns the set of share partitions whose state has been initialized.
+     *
+     * @param groupId The group id corresponding to the share group whose 
share partitions have been initialized.
+     *
+     * @return A map representing the initialized share-partitions for the 
share group.
+     */
+    public Map<Uuid, Set<Integer>> initializedShareGroupPartitions(
+        String groupId
+    ) {
+        Map<Uuid, Set<Integer>> resultMap = new HashMap<>();
+
+        ShareGroupStatePartitionMetadataInfo currentMap = 
shareGroupPartitionMetadata.get(groupId);
+        if (currentMap != null) {
+            currentMap.initializedTopics().forEach((topicId, partitions) -> {
+                resultMap.put(topicId, new HashSet<>(partitions));
+            });
+        }
+
+        return resultMap;
+    }
+
     /**
      * Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
      * the consumer group. It updates the subscription part of the member or
@@ -8289,7 +8312,7 @@ public class GroupMetadataManager {
     private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
         return Map.of("group.streams.num.standby.replicas", "0");
     }
-    
+
     /**
      * Generate a classic group heartbeat key for the timer.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 2096ca4bf5e..26e3cbd9469 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -122,6 +122,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -2845,9 +2846,7 @@ public class GroupCoordinatorServiceTest {
                     .setTopicId(TOPIC_ID)
                     .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partition)
-                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
-                        .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
-                        
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE))))
+                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
             );
 
         
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
@@ -2890,9 +2889,7 @@ public class GroupCoordinatorServiceTest {
                     .setTopicId(TOPIC_ID)
                     .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partition)
-                        .setStartOffset(21)
-                        .setErrorCode(Errors.NONE.code())
-                        .setErrorMessage(Errors.NONE.message()))))
+                        .setStartOffset(21))))
             );
 
         ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
@@ -2902,9 +2899,7 @@ public class GroupCoordinatorServiceTest {
                     .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                         .setPartition(partition)
                         .setStartOffset(21)
-                        .setStateEpoch(1)
-                        .setErrorCode(Errors.NONE.code())
-                        .setErrorMessage(Errors.NONE.message())))
+                        .setStateEpoch(1)))
                 )
             );
 
@@ -2944,10 +2939,10 @@ public class GroupCoordinatorServiceTest {
             .setTopics(
                 List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
                     .setTopicName("badtopic")
+                    .setTopicId(Uuid.ZERO_UUID)
                     .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                         .setPartitionIndex(partition)
-                        .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-                        
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
+                        
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET))))
             );
 
         
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
@@ -3097,6 +3092,196 @@ public class GroupCoordinatorServiceTest {
         assertEquals(responseData, future.get());
     }
 
+    @Test
+    public void testDescribeShareGroupAllOffsets() throws 
InterruptedException, ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+            .build();
+
+        service.onNewMetadataImage(image, null);
+
+        int partition = 1;
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-initialized-partitions"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, 
Set.of(partition))));
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        ReadShareGroupStateSummaryRequestData 
readShareGroupStateSummaryRequestData = new 
ReadShareGroupStateSummaryRequestData()
+            .setGroupId("share-group-id")
+            .setTopics(List.of(new 
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
ReadShareGroupStateSummaryRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId("share-group-id")
+            .setTopics(
+                List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicName(TOPIC_NAME)
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                        .setPartitionIndex(partition)
+                        .setStartOffset(21))))
+            );
+
+        ReadShareGroupStateSummaryResponseData 
readShareGroupStateSummaryResponseData = new 
ReadShareGroupStateSummaryResponseData()
+            .setResults(
+                List.of(new 
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+                    .setTopicId(TOPIC_ID)
+                    .setPartitions(List.of(new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
+                        .setPartition(partition)
+                        .setStartOffset(21)
+                        .setStateEpoch(1)))
+                )
+            );
+
+        ReadShareGroupStateSummaryParameters 
readShareGroupStateSummaryParameters = 
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+        ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult = 
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+        when(persister.readSummary(
+            ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+        
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupAllOffsetsThrowsError() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+            .build();
+
+        service.onNewMetadataImage(image, null);
+
+        int partition = 1;
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-initialized-partitions"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, 
Set.of(partition))));
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.failedFuture(new Exception("Unable 
to validate read state summary request")));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(Exception.class, future, "Unable to validate read 
state summary request");
+    }
+
+    @Test
+    public void testDescribeShareGroupAllOffsetsNullResult() {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(DefaultStatePersister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setPersister(persister)
+            .build(true);
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(TOPIC_ID, TOPIC_NAME, 3)
+            .build();
+
+        service.onNewMetadataImage(image, null);
+
+        int partition = 1;
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-initialized-partitions"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(Map.of(TOPIC_ID, 
Set.of(partition))));
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        when(persister.readSummary(ArgumentMatchers.any()))
+            .thenReturn(CompletableFuture.completedFuture(null));
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+        assertFutureThrows(IllegalStateException.class, future, "Result is 
null for the read state summary");
+    }
+
+    @Test
+    public void testDescribeShareGroupAllOffsetsCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build();
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId("share-group-id")
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
+    @Test
+    public void testDescribeShareGroupAllOffsetsMetadataImageNull() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .build(true);
+
+        // Forcing a null Metadata Image
+        service.onNewMetadataImage(null, null);
+
+        
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup 
requestData = new 
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
+            .setGroupId("share-group-id")
+            .setTopics(null);
+
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseData = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+            .setGroupId("share-group-id")
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 future =
+            
service.describeShareGroupAllOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
 requestData);
+
+        assertEquals(responseData, future.get());
+    }
+
     @Test
     public void testPersisterInitializeSuccess() {
         CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2a0df23c1ab..868511217f6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20579,6 +20579,8 @@ public class GroupMetadataManagerTest {
             2,
             true
         );
+
+        assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)), 
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
     }
 
     @Test
@@ -20713,6 +20715,8 @@ public class GroupMetadataManagerTest {
 
         assertNull(result.response());
         assertEquals(List.of(), result.records());
+
+        assertEquals(Map.of(), 
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
     }
 
     @Test
@@ -20788,6 +20792,8 @@ public class GroupMetadataManagerTest {
                 t3Name, new TopicMetadata(t3Id, t3Name, 3)
             ))
         );
+
+        assertEquals(Map.of(t2Id, Set.of(0, 1, 2)), 
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
     }
 
     @Test
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 075d0a5b282..dcebff0d3ea 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -315,12 +315,7 @@ public class ShareGroupCommand {
             TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
 
             shareGroups.forEach((groupId, shareGroup) -> {
-                Set<TopicPartition> allTp = new HashSet<>();
-                for (ShareMemberDescription memberDescription : 
shareGroup.members()) {
-                    
allTp.addAll(memberDescription.assignment().topicPartitions());
-                }
-
-                ListShareGroupOffsetsSpec offsetsSpec = new 
ListShareGroupOffsetsSpec().topicPartitions(allTp);
+                ListShareGroupOffsetsSpec offsetsSpec = new 
ListShareGroupOffsetsSpec();
                 Map<String, ListShareGroupOffsetsSpec> groupSpecs = new 
HashMap<>();
                 groupSpecs.put(groupId, offsetsSpec);
 
@@ -349,37 +344,34 @@ public class ShareGroupCommand {
 
         private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> offsets, boolean verbose) {
             offsets.forEach((groupId, tuple) -> {
-                ShareGroupDescription description = tuple.getKey();
                 Collection<SharePartitionOffsetInformation> offsetsInfo = 
tuple.getValue();
-                if (maybePrintEmptyGroupState(groupId, 
description.groupState(), offsetsInfo.size())) {
-                    String fmt = printOffsetFormat(groupId, offsetsInfo, 
verbose);
+                String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
+
+                if (verbose) {
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "START-OFFSET");
+                } else {
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"START-OFFSET");
+                }
 
+                for (SharePartitionOffsetInformation info : offsetsInfo) {
                     if (verbose) {
-                        System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "START-OFFSET");
+                        System.out.printf(fmt,
+                            groupId,
+                            info.topic,
+                            info.partition,
+                            MISSING_COLUMN_VALUE, // Temporary
+                            
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+                        );
                     } else {
-                        System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"START-OFFSET");
-                    }
-
-                    for (SharePartitionOffsetInformation info : offsetsInfo) {
-                        if (verbose) {
-                            System.out.printf(fmt,
-                                groupId,
-                                info.topic,
-                                info.partition,
-                                MISSING_COLUMN_VALUE, // Temporary
-                                
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
-                            );
-                        } else {
-                            System.out.printf(fmt,
-                                groupId,
-                                info.topic,
-                                info.partition,
-                                
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
-                            );
-                        }
+                        System.out.printf(fmt,
+                            groupId,
+                            info.topic,
+                            info.partition,
+                            
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+                        );
                     }
-                    System.out.println();
                 }
+                System.out.println();
             });
         }
 

Reply via email to