This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d027a4d83a KAFKA-19218: Add missing leader epoch to share group state 
summary response (#19602)
7d027a4d83a is described below

commit 7d027a4d83af15428d4aa9bdb2144adcad73408c
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue May 6 14:53:12 2025 +0100

    KAFKA-19218: Add missing leader epoch to share group state summary response 
(#19602)
    
    When the persister is responding to a read share-group state summary
    request, it has no way of including the leader epoch in its response,
    even though it has the information to hand. This means that the leader
    epoch information is not initialised in the admin client operation to
    list share group offsets, and this then means that the information
    cannot be displayed in kafka-share-groups.sh.
    
    Reviewers: Apoorv Mittal <[email protected]>, Sushant Mahajan
     <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   2 +-
 .../clients/admin/ListShareGroupOffsetsResult.java |  11 ++-
 .../internals/ListShareGroupOffsetsHandler.java    |  20 ++--
 .../ReadShareGroupStateSummaryResponse.java        |   2 +
 .../ReadShareGroupStateSummaryResponse.json        |   2 +
 .../kafka/clients/admin/AdminClientTestUtils.java  |   4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 110 ++++++++++++++-------
 .../coordinator/group/GroupCoordinatorService.java |   1 +
 .../share/persister/DefaultStatePersister.java     |   2 +
 .../server/share/persister/NoOpStatePersister.java |   3 +-
 .../server/share/persister/PartitionFactory.java   |   8 +-
 .../share/persister/PartitionStateSummaryData.java |   2 +
 .../ReadShareGroupStateSummaryResult.java          |   3 +-
 .../share/persister/DefaultStatePersisterTest.java |  18 ++--
 .../coordinator/share/ShareCoordinatorShard.java   |   2 +
 .../share/ShareCoordinatorShardTest.java           |   1 +
 .../tools/consumer/group/ShareGroupCommand.java    |  39 +++++---
 .../consumer/group/ShareGroupCommandTest.java      |  13 +--
 18 files changed, 155 insertions(+), 88 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 91698c813e7..0f37edb2dd8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3846,7 +3846,7 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, 
ListShareGroupOffsetsSpec> groupSpecs,
                                                              final 
ListShareGroupOffsetsOptions options) {
-        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> future 
= ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> future = 
ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
         ListShareGroupOffsetsHandler handler = new 
ListShareGroupOffsetsHandler(groupSpecs, logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new ListShareGroupOffsetsResult(future.all());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
index d39f3711f4c..e1dcd932309 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -35,9 +36,9 @@ import java.util.stream.Collectors;
 @InterfaceStability.Evolving
 public class ListShareGroupOffsetsResult {
 
-    private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
+    private final Map<String, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> futures;
 
-    ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+    ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
         this.futures = futures.entrySet().stream()
             .collect(Collectors.toMap(e -> e.getKey().idValue, 
Map.Entry::getValue));
     }
@@ -47,10 +48,10 @@ public class ListShareGroupOffsetsResult {
      *
      * @return Future which yields all {@code Map<String, Map<TopicPartition, 
Long>>} objects, if requests for all the groups succeed.
      */
-    public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
+    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
all() {
         return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture<?>[0])).thenApply(
             nil -> {
-                Map<String, Map<TopicPartition, Long>> offsets = new 
HashMap<>(futures.size());
+                Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = 
new HashMap<>(futures.size());
                 futures.forEach((groupId, future) -> {
                     try {
                         offsets.put(groupId, future.get());
@@ -70,7 +71,7 @@ public class ListShareGroupOffsetsResult {
      * @param groupId The group ID.
      * @return Future which yields a map of topic partitions to offsets for 
the specified group.
      */
-    public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String 
groupId) {
+    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata(String groupId) {
         if (!futures.containsKey(groupId)) {
             throw new IllegalArgumentException("Group ID not found: " + 
groupId);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index fcaba5a67e6..f9b9e987930 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
@@ -39,13 +40,14 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
  * This class is the handler for {@link 
KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
  */
-public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
{
 
     private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
     private final Logger log;
@@ -58,7 +60,7 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
         this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
     }
 
-    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) {
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
         return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
     }
 
@@ -108,13 +110,13 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
     }
 
     @Override
-    public ApiResult<CoordinatorKey, Map<TopicPartition, Long>> 
handleResponse(Node coordinator,
-                                                                               
Set<CoordinatorKey> groupIds,
-                                                                               
AbstractResponse abstractResponse) {
+    public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
handleResponse(Node coordinator,
+                                                                               
             Set<CoordinatorKey> groupIds,
+                                                                               
             AbstractResponse abstractResponse) {
         validateKeys(groupIds);
 
         final DescribeShareGroupOffsetsResponse response = 
(DescribeShareGroupOffsetsResponse) abstractResponse;
-        final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
completed = new HashMap<>();
         final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
         final List<CoordinatorKey> unmapped = new ArrayList<>();
 
@@ -123,17 +125,19 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
             if (response.hasGroupError(groupId)) {
                 handleGroupError(coordinatorKey, response.groupError(groupId), 
failed, unmapped);
             } else {
-                Map<TopicPartition, Long> groupOffsetsListing = new 
HashMap<>();
+                Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = 
new HashMap<>();
                 response.data().groups().stream().filter(g -> 
g.groupId().equals(groupId)).forEach(groupResponse -> {
                     for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topicResponse : groupResponse.topics()) {
                         for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
 partitionResponse : topicResponse.partitions()) {
                             TopicPartition tp = new 
TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
                             if (partitionResponse.errorCode() == 
Errors.NONE.code()) {
+                                final long startOffset = 
partitionResponse.startOffset();
+                                final Optional<Integer> leaderEpoch = 
partitionResponse.leaderEpoch() < 0 ? Optional.empty() : 
Optional.of(partitionResponse.leaderEpoch());
                                 // Negative offset indicates there is no start 
offset for this partition
                                 if (partitionResponse.startOffset() < 0) {
                                     groupOffsetsListing.put(tp, null);
                                 } else {
-                                    groupOffsetsListing.put(tp, 
partitionResponse.startOffset());
+                                    groupOffsetsListing.put(tp, new 
OffsetAndMetadata(startOffset, leaderEpoch, ""));
                                 }
                             } else {
                                 log.warn("Skipping return offset for {} due to 
error {}: {}.", tp, partitionResponse.errorCode(), 
partitionResponse.errorMessage());
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
index 86363add1e1..a2787ff82c9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
@@ -99,6 +99,7 @@ public class ReadShareGroupStateSummaryResponse extends 
AbstractResponse {
         Uuid topicId,
         int partition,
         long startOffset,
+        int leaderEpoch,
         int stateEpoch
     ) {
         return new ReadShareGroupStateSummaryResponseData()
@@ -109,6 +110,7 @@ public class ReadShareGroupStateSummaryResponse extends 
AbstractResponse {
                         new 
ReadShareGroupStateSummaryResponseData.PartitionResult()
                             .setPartition(partition)
                             .setStartOffset(startOffset)
+                            .setLeaderEpoch(leaderEpoch)
                             .setStateEpoch(stateEpoch)
                     ))
             ));
diff --git 
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
 
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
index ddf9d7044a6..81e3edc554e 100644
--- 
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
+++ 
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
@@ -41,6 +41,8 @@
           "about": "The error message, or null if there was no error." },
         { "name": "StateEpoch", "type": "int32", "versions": "0+",
           "about": "The state epoch of the share-partition." },
+        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
+          "about": "The leader epoch of the share-partition." },
         { "name": "StartOffset", "type": "int64", "versions": "0+",
           "about": "The share-partition start offset." }
       ]}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 061982e34d3..c98ffb9483f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -176,8 +176,8 @@ public class AdminClientTestUtils {
         return new ListClientMetricsResourcesResult(future);
     }
 
-    public static ListShareGroupOffsetsResult 
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, 
Long>>> groupOffsets) {
-        Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> 
coordinatorFutures = groupOffsets.entrySet().stream()
+    public static ListShareGroupOffsetsResult 
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> groupOffsets) {
+        Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> coordinatorFutures = groupOffsets.entrySet().stream()
             .collect(Collectors.toMap(
                 entry -> CoordinatorKey.byGroupId(entry.getKey()),
                 Map.Entry::getValue
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 339a431aa1c..74c381be17e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -10577,12 +10577,24 @@ public class KafkaAdminClientTest {
                 List.of(
                     new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
                         List.of(
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0),
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0),
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1)
+                                )
+                            ),
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2)
+                                )
+                            ),
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(3)
+                                )
+                            )
                         )
                     )
                 )
@@ -10590,15 +10602,15 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new 
DescribeShareGroupOffsetsResponse(data));
 
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
-            final Map<TopicPartition, Long> partitionToOffsetAndMetadata = 
result.partitionsToOffset(GROUP_ID).get();
+            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
 
             assertEquals(6, partitionToOffsetAndMetadata.size());
-            assertEquals(10, 
partitionToOffsetAndMetadata.get(myTopicPartition0));
-            assertEquals(11, 
partitionToOffsetAndMetadata.get(myTopicPartition1));
-            assertEquals(40, 
partitionToOffsetAndMetadata.get(myTopicPartition2));
-            assertEquals(50, 
partitionToOffsetAndMetadata.get(myTopicPartition3));
-            assertEquals(100, 
partitionToOffsetAndMetadata.get(myTopicPartition4));
-            assertEquals(500, 
partitionToOffsetAndMetadata.get(myTopicPartition5));
+            assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition0));
+            assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition1));
+            assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition2));
+            assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition3));
+            assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition4));
+            assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition5));
         }
     }
 
@@ -10630,16 +10642,28 @@ public class KafkaAdminClientTest {
                 List.of(
                     new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
                         List.of(
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50)))
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0),
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0),
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1)
+                                )
+                            )
                         )
                     ),
                     new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId("group-1").setTopics(
                         List.of(
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2)
+                                )
+                            ),
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2)
+                                )
+                            )
                         )
                     )
                 )
@@ -10649,17 +10673,17 @@ public class KafkaAdminClientTest {
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
             assertEquals(2, result.all().get().size());
 
-            final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup0 
= result.partitionsToOffset(GROUP_ID).get();
+            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadataGroup0 = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
             assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
-            assertEquals(10, 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
-            assertEquals(11, 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
-            assertEquals(40, 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
-            assertEquals(50, 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
+            assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
+            assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
+            assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
+            assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
 
-            final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup1 
= result.partitionsToOffset("group-1").get();
+            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadataGroup1 = 
result.partitionsToOffsetAndMetadata("group-1").get();
             assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
-            assertEquals(100, 
partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
-            assertEquals(500, 
partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
+            assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), 
partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
+            assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), 
partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
         }
     }
 
@@ -10682,7 +10706,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new 
DescribeShareGroupOffsetsResponse(data));
 
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
-            final Map<TopicPartition, Long> partitionToOffsetAndMetadata = 
result.partitionsToOffset(GROUP_ID).get();
+            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
 
             assertEquals(0, partitionToOffsetAndMetadata.size());
         }
@@ -10711,12 +10735,22 @@ public class KafkaAdminClientTest {
                 List.of(
                     new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
                         List.of(
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(
-                                new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10),
-                                new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11)
-                            )),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not
 a Coordinator"))),
-                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new
 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(1)
+                                )
+                            ),
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not
 a Coordinator")
+                                )
+                            ),
+                            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
+                                List.of(
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2)
+                                )
+                            )
                         )
                     )
                 )
@@ -10724,13 +10758,13 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new 
DescribeShareGroupOffsetsResponse(data));
 
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
-            final Map<TopicPartition, Long> partitionToOffsetAndMetadata = 
result.partitionsToOffset(GROUP_ID).get();
+            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
 
             // For myTopicPartition2 we have set an error as the response. 
Thus, it should be skipped from the final result
             assertEquals(3, partitionToOffsetAndMetadata.size());
-            assertEquals(10, 
partitionToOffsetAndMetadata.get(myTopicPartition0));
-            assertEquals(11, 
partitionToOffsetAndMetadata.get(myTopicPartition1));
-            assertEquals(500, 
partitionToOffsetAndMetadata.get(myTopicPartition3));
+            assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition0));
+            assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition1));
+            assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition3));
         }
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index ebccd66359b..cc2566b492e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1702,6 +1702,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                             partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                                 .setPartitionIndex(partitionData.partition())
                                 .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
+                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
                         ).toList())
                     ));
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 1b4a0756515..ae8b8c317c3 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -486,6 +486,7 @@ public class DefaultStatePersister implements Persister {
                                     partitionResult.partition(),
                                     partitionResult.stateEpoch(),
                                     partitionResult.startOffset(),
+                                    partitionResult.leaderEpoch(),
                                     partitionResult.errorCode(),
                                     partitionResult.errorMessage()))
                                 .toList();
@@ -495,6 +496,7 @@ public class DefaultStatePersister implements Persister {
                                 partition,
                                 -1,
                                 -1,
+                                -1,
                                 Errors.UNKNOWN_SERVER_ERROR.code(),   // No 
specific public error code exists for InterruptedException / ExecutionException
                                 "Error reading state from share coordinator: " 
+ e.getMessage()));
                         }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
index 2814fc3aa82..908891dd463 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
@@ -92,7 +92,8 @@ public class NoOpStatePersister implements Persister {
         for (TopicData<PartitionIdLeaderEpochData> topicData : 
reqData.topicsData()) {
             resultArgs.add(new TopicData<>(topicData.topicId(), 
topicData.partitions().stream().
                 map(partitionIdData -> 
PartitionFactory.newPartitionStateSummaryData(
-                    partitionIdData.partition(), 
PartitionFactory.DEFAULT_STATE_EPOCH, 
PartitionFactory.UNINITIALIZED_START_OFFSET, 
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
+                    partitionIdData.partition(), 
PartitionFactory.DEFAULT_STATE_EPOCH, 
PartitionFactory.UNINITIALIZED_START_OFFSET,
+                    PartitionFactory.DEFAULT_LEADER_EPOCH, 
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
                 .collect(Collectors.toList())));
         }
         return CompletableFuture.completedFuture(new 
ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index 009eb9cccc1..78a6902a170 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -47,12 +47,8 @@ public class PartitionFactory {
         return new PartitionData(partition, DEFAULT_STATE_EPOCH, 
UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, 
null);
     }
 
-    public static PartitionStateErrorData newPartitionStateErrorData(int 
partition, int stateEpoch, long startOffset, short errorCode, String 
errorMessage) {
-        return new PartitionData(partition, stateEpoch, startOffset, 
errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
-    }
-
-    public static PartitionStateSummaryData newPartitionStateSummaryData(int 
partition, int stateEpoch, long startOffset, short errorCode, String 
errorMessage) {
-        return new PartitionData(partition, stateEpoch, startOffset, 
errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
+    public static PartitionStateSummaryData newPartitionStateSummaryData(int 
partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode, 
String errorMessage) {
+        return new PartitionData(partition, stateEpoch, startOffset, 
errorCode, errorMessage, leaderEpoch, null);
     }
 
     public static PartitionStateBatchData newPartitionStateBatchData(int 
partition, int stateEpoch, long startOffset, int leaderEpoch, 
List<PersisterStateBatch> stateBatches) {
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
index dc4732a79ae..58a9dc10615 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
@@ -24,6 +24,8 @@ package org.apache.kafka.server.share.persister;
 public interface PartitionStateSummaryData extends PartitionInfoData, 
PartitionIdData {
     int stateEpoch();
 
+    int leaderEpoch();
+
     long startOffset();
 
     short errorCode();
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
index 7e0bee13c38..249eb20ed94 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
@@ -38,7 +38,8 @@ public class ReadShareGroupStateSummaryResult implements 
PersisterResult {
                         .map(readStateSummaryResult -> new 
TopicData<>(readStateSummaryResult.topicId(),
                                 readStateSummaryResult.partitions().stream()
                                         .map(partitionResult -> 
PartitionFactory.newPartitionStateSummaryData(
-                                                partitionResult.partition(), 
partitionResult.stateEpoch(), partitionResult.startOffset(), 
partitionResult.errorCode(), partitionResult.errorMessage()))
+                                                partitionResult.partition(), 
partitionResult.stateEpoch(), partitionResult.startOffset(),
+                                                partitionResult.leaderEpoch(), 
partitionResult.errorCode(), partitionResult.errorMessage()))
                                         .collect(Collectors.toList())))
                         .collect(Collectors.toList()))
                 .build();
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 1a668b5dc32..697d958723a 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -868,7 +868,7 @@ class DefaultStatePersisterTest {
 
                 return requestGroupId.equals(groupId) && requestTopicId == 
topicId1 && requestPartition == partition1;
             },
-            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1,
 partition1, 0, 1)),
+            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1,
 partition1, 0, 1, 1)),
             coordinatorNode1);
 
         client.prepareResponseFrom(
@@ -880,7 +880,7 @@ class DefaultStatePersisterTest {
 
                 return requestGroupId.equals(groupId) && requestTopicId == 
topicId2 && requestPartition == partition2;
             },
-            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2,
 partition2, 0, 1)),
+            new 
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2,
 partition2, 0, 1, 1)),
             coordinatorNode2);
 
         ShareCoordinatorMetadataCacheHelper cacheHelper = 
getDefaultCacheHelper(suppliedNode);
@@ -930,12 +930,12 @@ class DefaultStatePersisterTest {
 
         HashSet<PartitionData> expectedResultMap = new HashSet<>();
         expectedResultMap.add(
-            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 
Errors.NONE.code(),
+            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1, 
Errors.NONE.code(),
                 null
             ));
 
         expectedResultMap.add(
-            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 
Errors.NONE.code(),
+            (PartitionData) 
PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1, 
Errors.NONE.code(),
                 null
             ));
 
@@ -1437,6 +1437,7 @@ class DefaultStatePersisterTest {
                             tp1.topicId(),
                             tp1.partition(),
                             1L,
+                            1,
                             2
                         )
                     )
@@ -1468,7 +1469,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp1.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
Errors.NONE.code(), null))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
1, Errors.NONE.code(), null))
                 )
             )
         );
@@ -1476,7 +1477,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp2.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
                 )
             )
         );
@@ -1496,6 +1497,7 @@ class DefaultStatePersisterTest {
                             tp1.topicId(),
                             tp1.partition(),
                             1L,
+                            1,
                             2
                         )
                     )
@@ -1517,7 +1519,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp1.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
Errors.NONE.code(), null))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L, 
1, Errors.NONE.code(), null))
                 )
             )
         );
@@ -1525,7 +1527,7 @@ class DefaultStatePersisterTest {
             results.topicsData().contains(
                 new TopicData<>(
                     tp2.topicId(),
-                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, 
Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share 
coordinator: java.lang.Exception: scary stuff"))
+                    
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L, 
-1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share 
coordinator: java.lang.Exception: scary stuff"))
                 )
             )
         );
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index d38564fd6f8..7f03f9254b1 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -453,6 +453,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 topicId,
                 partitionId,
                 PartitionFactory.UNINITIALIZED_START_OFFSET,
+                PartitionFactory.DEFAULT_LEADER_EPOCH,
                 PartitionFactory.DEFAULT_STATE_EPOCH
             );
         } else {
@@ -470,6 +471,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     topicId,
                     partitionId,
                     offsetValue.startOffset(),
+                    offsetValue.leaderEpoch(),
                     offsetValue.stateEpoch()
                 );
             }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index e4b95d1e439..4ff5a2ad92e 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -605,6 +605,7 @@ class ShareCoordinatorShardTest {
             TOPIC_ID,
             PARTITION,
             0,
+            0,
             0
         ), result.response());
 
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 7d53514a250..11a9a020e5c 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
 import org.apache.kafka.clients.admin.ShareMemberAssignment;
 import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
@@ -403,19 +404,30 @@ public class ShareGroupCommand {
                 groupSpecs.put(groupId, offsetsSpec);
 
                 try {
-                    Map<TopicPartition, Long> earliestResult = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+                    Map<TopicPartition, OffsetAndMetadata> startOffsets = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
 
                     Set<SharePartitionOffsetInformation> partitionOffsets = 
new HashSet<>();
 
-                    for (Entry<TopicPartition, Long> tp : 
earliestResult.entrySet()) {
-                        SharePartitionOffsetInformation partitionOffsetInfo = 
new SharePartitionOffsetInformation(
-                            groupId,
-                            tp.getKey().topic(),
-                            tp.getKey().partition(),
-                            
Optional.ofNullable(earliestResult.get(tp.getKey()))
-                        );
-                        partitionOffsets.add(partitionOffsetInfo);
-                    }
+                    startOffsets.forEach((tp, offsetAndMetadata) -> {
+                        if (offsetAndMetadata != null) {
+                            partitionOffsets.add(new 
SharePartitionOffsetInformation(
+                                groupId,
+                                tp.topic(),
+                                tp.partition(),
+                                Optional.of(offsetAndMetadata.offset()),
+                                offsetAndMetadata.leaderEpoch()
+                            ));
+                        } else {
+                            partitionOffsets.add(new 
SharePartitionOffsetInformation(
+                                groupId,
+                                tp.topic(),
+                                tp.partition(),
+                                Optional.empty(),
+                                Optional.empty()
+                            ));
+                        }
+                    });
+
                     groupOffsets.put(groupId, new 
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
                 } catch (InterruptedException | ExecutionException e) {
                     throw new RuntimeException(e);
@@ -447,7 +459,7 @@ public class ShareGroupCommand {
                             groupId,
                             info.topic,
                             info.partition,
-                            MISSING_COLUMN_VALUE, // Temporary
+                            
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
                             
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
                         );
                     } else {
@@ -569,17 +581,20 @@ public class ShareGroupCommand {
         final String topic;
         final int partition;
         final Optional<Long> offset;
+        final Optional<Integer> leaderEpoch;
 
         SharePartitionOffsetInformation(
             String group,
             String topic,
             int partition,
-            Optional<Long> offset
+            Optional<Long> offset,
+            Optional<Integer> leaderEpoch
         ) {
             this.group = group;
             this.topic = topic;
             this.partition = partition;
             this.offset = offset;
+            this.leaderEpoch = leaderEpoch;
         }
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index f1f91217511..546cab50e0d 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
 import org.apache.kafka.clients.admin.ShareMemberAssignment;
 import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
@@ -191,7 +192,7 @@ public class ShareGroupCommandTest {
             ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
                 Map.of(
                     firstGroup,
-                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), 0L))
+                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), "")))
                 )
             );
 
@@ -208,7 +209,7 @@ public class ShareGroupCommandTest {
 
                     List<String> expectedValues;
                     if (describeType.contains("--verbose")) {
-                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-", "0");
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"1", "0");
                     } else {
                         expectedValues = List.of(firstGroup, "topic1", "0", 
"0");
                     }
@@ -299,13 +300,13 @@ public class ShareGroupCommandTest {
             ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
                 Map.of(
                     firstGroup,
-                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), 0L))
+                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
                 )
             );
             ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
                 Map.of(
                     secondGroup,
-                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), 0L))
+                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
                 )
             );
 
@@ -333,8 +334,8 @@ public class ShareGroupCommandTest {
 
                     List<String> expectedValues1, expectedValues2;
                     if (describeType.contains("--verbose")) {
-                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"-", "0");
-                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"-", "0");
+                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"1", "0");
+                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"1", "0");
                     } else {
                         expectedValues1 = List.of(firstGroup, "topic1", "0", 
"0");
                         expectedValues2 = List.of(secondGroup, "topic1", "0", 
"0");

Reply via email to