This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cfe85c2a816 KAFKA-19802: Admin client changes for KIP-1226 (#20771)
cfe85c2a816 is described below

commit cfe85c2a8162476643c19888b1ade15286207f9f
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Nov 4 14:11:36 2025 +0000

    KAFKA-19802: Admin client changes for KIP-1226 (#20771)
    
    Admin client changes for KIP-1226 which adds lag information for share
    groups.
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  2 +-
 .../clients/admin/ListShareGroupOffsetsResult.java | 15 ++--
 .../clients/admin/SharePartitionOffsetInfo.java    | 82 ++++++++++++++++++++++
 .../internals/ListShareGroupOffsetsHandler.java    | 18 ++---
 .../kafka/clients/admin/AdminClientTestUtils.java  |  4 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 50 ++++++-------
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 10 +--
 .../tools/consumer/group/ShareGroupCommand.java    | 35 +++++----
 .../consumer/group/ShareGroupCommandTest.java      | 53 ++++++++------
 9 files changed, 182 insertions(+), 87 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 78a7f905319..9a50f52fb44 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3814,7 +3814,7 @@ public class KafkaAdminClient extends AdminClient {
     @Override
     public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, 
ListShareGroupOffsetsSpec> groupSpecs,
                                                              final 
ListShareGroupOffsetsOptions options) {
-        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> future = 
ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+        SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, 
SharePartitionOffsetInfo>> future = 
ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
         ListShareGroupOffsetsHandler handler = new 
ListShareGroupOffsetsHandler(groupSpecs, logContext);
         invokeDriver(handler, future, options.timeoutMs);
         return new ListShareGroupOffsetsResult(future.all());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
index 1a2c8869c6c..e75735ba260 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.admin.internals.CoordinatorKey;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -36,9 +35,9 @@ import java.util.stream.Collectors;
 @InterfaceStability.Evolving
 public class ListShareGroupOffsetsResult {
 
-    private final Map<String, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> futures;
+    private final Map<String, KafkaFuture<Map<TopicPartition, 
SharePartitionOffsetInfo>>> futures;
 
-    ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+    ListShareGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> futures) {
         this.futures = futures.entrySet().stream()
             .collect(Collectors.toMap(e -> e.getKey().idValue, 
Map.Entry::getValue));
     }
@@ -46,12 +45,12 @@ public class ListShareGroupOffsetsResult {
     /**
      * Return the future when the requests for all groups succeed.
      *
-     * @return Future which yields all {@code Map<String, Map<TopicPartition, 
OffsetAndMetadata>>} objects, if requests for all the groups succeed.
+     * @return Future which yields all {@code Map<String, Map<TopicPartition, 
SharePartitionOffsetInfo>>} objects, if requests for all the groups succeed.
      */
-    public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>> 
all() {
+    public KafkaFuture<Map<String, Map<TopicPartition, 
SharePartitionOffsetInfo>>> all() {
         return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture<?>[0])).thenApply(
             nil -> {
-                Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets = 
new HashMap<>(futures.size());
+                Map<String, Map<TopicPartition, SharePartitionOffsetInfo>> 
offsets = new HashMap<>(futures.size());
                 futures.forEach((groupId, future) -> {
                     try {
                         offsets.put(groupId, future.get());
@@ -67,9 +66,9 @@ public class ListShareGroupOffsetsResult {
 
     /**
      * Return a future which yields a map of topic partitions to offsets for 
the specified group. If the group doesn't
-     * have a committed offset for a specific partition, the corresponding 
value in the returned map will be null.
+     * have offset information for a specific partition, the corresponding 
value in the returned map will be null.
      */
-    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata(String groupId) {
+    public KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>> 
partitionsToOffsetInfo(String groupId) {
         if (!futures.containsKey(groupId)) {
             throw new IllegalArgumentException("Group ID not found: " + 
groupId);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java
new file mode 100644
index 00000000000..5948103dc17
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class is used to contain the offset and lag information for a 
share-partition.
+ */
[email protected]
+public class SharePartitionOffsetInfo {
+    private final long startOffset;
+    private final Optional<Integer> leaderEpoch;
+    private final Optional<Long> lag;
+
+    /**
+     * Construct a new SharePartitionOffsetInfo.
+     *
+     * @param startOffset The share-partition start offset
+     * @param leaderEpoch The optional leader epoch of the share-partition
+     * @param lag         The optional lag for the share-partition
+     */
+    public SharePartitionOffsetInfo(long startOffset, Optional<Integer> 
leaderEpoch, Optional<Long> lag) {
+        this.startOffset = startOffset;
+        this.leaderEpoch = leaderEpoch;
+        this.lag = lag;
+    }
+
+    public long startOffset() {
+        return startOffset;
+    }
+
+    public Optional<Integer> leaderEpoch() {
+        return leaderEpoch;
+    }
+
+    public Optional<Long> lag() {
+        return lag;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SharePartitionOffsetInfo that = (SharePartitionOffsetInfo) o;
+        return startOffset == that.startOffset &&
+            Objects.equals(leaderEpoch, that.leaderEpoch) &&
+            Objects.equals(lag, that.lag);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(startOffset, leaderEpoch, lag);
+    }
+
+    @Override
+    public String toString() {
+        return "SharePartitionOffsetInfo{" +
+            "startOffset=" + startOffset +
+            ", leaderEpoch=" + leaderEpoch.orElse(null) +
+            ", lag=" + lag.orElse(null) +
+            '}';
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index f9b9e987930..5b75a482aae 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.admin.internals;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
@@ -47,7 +47,7 @@ import java.util.stream.Collectors;
 /**
  * This class is the handler for {@link 
KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
  */
-public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
{
+public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, 
SharePartitionOffsetInfo>> {
 
     private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
     private final Logger log;
@@ -60,7 +60,7 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
         this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, 
logContext);
     }
 
-    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
+    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, 
Map<TopicPartition, SharePartitionOffsetInfo>> newFuture(Collection<String> 
groupIds) {
         return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
     }
 
@@ -110,13 +110,13 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
     }
 
     @Override
-    public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
handleResponse(Node coordinator,
-                                                                               
             Set<CoordinatorKey> groupIds,
-                                                                               
             AbstractResponse abstractResponse) {
+    public ApiResult<CoordinatorKey, Map<TopicPartition, 
SharePartitionOffsetInfo>> handleResponse(Node coordinator,
+                                                                               
                    Set<CoordinatorKey> groupIds,
+                                                                               
                    AbstractResponse abstractResponse) {
         validateKeys(groupIds);
 
         final DescribeShareGroupOffsetsResponse response = 
(DescribeShareGroupOffsetsResponse) abstractResponse;
-        final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> 
completed = new HashMap<>();
+        final Map<CoordinatorKey, Map<TopicPartition, 
SharePartitionOffsetInfo>> completed = new HashMap<>();
         final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
         final List<CoordinatorKey> unmapped = new ArrayList<>();
 
@@ -125,7 +125,7 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
             if (response.hasGroupError(groupId)) {
                 handleGroupError(coordinatorKey, response.groupError(groupId), 
failed, unmapped);
             } else {
-                Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = 
new HashMap<>();
+                Map<TopicPartition, SharePartitionOffsetInfo> 
groupOffsetsListing = new HashMap<>();
                 response.data().groups().stream().filter(g -> 
g.groupId().equals(groupId)).forEach(groupResponse -> {
                     for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topicResponse : groupResponse.topics()) {
                         for 
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
 partitionResponse : topicResponse.partitions()) {
@@ -137,7 +137,7 @@ public class ListShareGroupOffsetsHandler extends 
AdminApiHandler.Batched<Coordi
                                 if (partitionResponse.startOffset() < 0) {
                                     groupOffsetsListing.put(tp, null);
                                 } else {
-                                    groupOffsetsListing.put(tp, new 
OffsetAndMetadata(startOffset, leaderEpoch, ""));
+                                    groupOffsetsListing.put(tp, new 
SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty()));
                                 }
                             } else {
                                 log.warn("Skipping return offset for {} due to 
error {}: {}.", tp, partitionResponse.errorCode(), 
partitionResponse.errorMessage());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 02c09443370..a272bce672e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -186,8 +186,8 @@ public class AdminClientTestUtils {
         return new ListConfigResourcesResult(future);
     }
 
-    public static ListShareGroupOffsetsResult 
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> groupOffsets) {
-        Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>>> coordinatorFutures = groupOffsets.entrySet().stream()
+    public static ListShareGroupOffsetsResult 
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, 
SharePartitionOffsetInfo>>> groupOffsets) {
+        Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, 
SharePartitionOffsetInfo>>> coordinatorFutures = 
groupOffsets.entrySet().stream()
             .collect(Collectors.toMap(
                 entry -> CoordinatorKey.byGroupId(entry.getKey()),
                 Map.Entry::getValue
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d01cde83bb3..55b4119f0ce 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -11186,15 +11186,15 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new 
DescribeShareGroupOffsetsResponse(data));
 
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
-            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
+            final Map<TopicPartition, SharePartitionOffsetInfo> 
partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
 
-            assertEquals(6, partitionToOffsetAndMetadata.size());
-            assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition0));
-            assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition1));
-            assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition2));
-            assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition3));
-            assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition4));
-            assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition5));
+            assertEquals(6, partitionToOffsetInfo.size());
+            assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
+            assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
+            assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition2));
+            assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
+            assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition4));
+            assertEquals(new SharePartitionOffsetInfo(500, Optional.of(3), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition5));
         }
     }
 
@@ -11257,17 +11257,17 @@ public class KafkaAdminClientTest {
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
             assertEquals(2, result.all().get().size());
 
-            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadataGroup0 = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
-            assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
-            assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
-            assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
-            assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
-            assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), 
partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
+            final Map<TopicPartition, SharePartitionOffsetInfo> 
partitionToOffsetInfoGroup0 = result.partitionsToOffsetInfo(GROUP_ID).get();
+            assertEquals(4, partitionToOffsetInfoGroup0.size());
+            assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), 
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition0));
+            assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), 
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition1));
+            assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), 
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition2));
+            assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), 
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition3));
 
-            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadataGroup1 = 
result.partitionsToOffsetAndMetadata("group-1").get();
-            assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
-            assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), 
partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
-            assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), 
partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
+            final Map<TopicPartition, SharePartitionOffsetInfo> 
partitionToOffsetInfoGroup1 = result.partitionsToOffsetInfo("group-1").get();
+            assertEquals(2, partitionToOffsetInfoGroup1.size());
+            assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), 
Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition4));
+            assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), 
Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition5));
         }
     }
 
@@ -11290,9 +11290,9 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new 
DescribeShareGroupOffsetsResponse(data));
 
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
-            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
+            final Map<TopicPartition, SharePartitionOffsetInfo> 
partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
 
-            assertEquals(0, partitionToOffsetAndMetadata.size());
+            assertEquals(0, partitionToOffsetInfo.size());
         }
     }
 
@@ -11342,13 +11342,13 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(new 
DescribeShareGroupOffsetsResponse(data));
 
             final ListShareGroupOffsetsResult result = 
env.adminClient().listShareGroupOffsets(groupSpecs);
-            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = 
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
+            final Map<TopicPartition, SharePartitionOffsetInfo> 
partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
 
             // For myTopicPartition2 we have set an error as the response. 
Thus, it should be skipped from the final result
-            assertEquals(3, partitionToOffsetAndMetadata.size());
-            assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition0));
-            assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition1));
-            assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), 
partitionToOffsetAndMetadata.get(myTopicPartition3));
+            assertEquals(3, partitionToOffsetInfo.size());
+            assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
+            assertEquals(new SharePartitionOffsetInfo(11, Optional.of(1), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
+            assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), 
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
         }
     }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c25ed9e9bfb..d057ce99d71 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2925,7 +2925,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
       val tp1 = new TopicPartition(testTopicName, 0)
       val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new 
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
-        .partitionsToOffsetAndMetadata(testGroupId)
+        .partitionsToOffsetInfo(testGroupId)
         .get()
       assertTrue(parts.containsKey(tp1))
       assertNull(parts.get(tp1))
@@ -2991,10 +2991,10 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       assertFutureThrows(classOf[UnknownTopicOrPartitionException], 
offsetAlterResult.partitionResult(tp2))
 
       val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new 
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
-        .partitionsToOffsetAndMetadata(testGroupId)
+        .partitionsToOffsetInfo(testGroupId)
         .get()
       assertTrue(parts.containsKey(tp1))
-      assertEquals(0, parts.get(tp1).offset())
+      assertEquals(0, parts.get(tp1).startOffset())
     } finally {
       Utils.closeQuietly(client, "adminClient")
     }
@@ -3031,14 +3031,14 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         // Test listShareGroupOffsets
         TestUtils.waitUntilTrue(() => {
           val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, 
new ListShareGroupOffsetsSpec()))
-            .partitionsToOffsetAndMetadata(testGroupId)
+            .partitionsToOffsetInfo(testGroupId)
             .get()
           parts.containsKey(tp1) && parts.containsKey(tp2)
         }, "Expected the result contains all partitions.")
 
         // Test listShareGroupOffsets with listShareGroupOffsetsSpec
         val groupSpecs = util.Map.of(testGroupId, new 
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))
-        val parts = 
client.listShareGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata(testGroupId).get()
+        val parts = 
client.listShareGroupOffsets(groupSpecs).partitionsToOffsetInfo(testGroupId).get()
         assertTrue(parts.containsKey(tp1))
         assertFalse(parts.containsKey(tp2))
       } finally {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 2c3e21fedb1..6d36e49dcfd 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
 import org.apache.kafka.clients.admin.ShareMemberAssignment;
 import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
@@ -437,7 +438,7 @@ public class ShareGroupCommand {
                 partitionsToReset = 
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
             } else {
                 Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
-                Map<TopicPartition, OffsetAndMetadata> 
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
+                Map<TopicPartition, SharePartitionOffsetInfo> 
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
                     groupSpecs,
                     withTimeoutMs(new ListShareGroupOffsetsOptions())
                 ).all().get().get(groupId);
@@ -494,11 +495,11 @@ public class ShareGroupCommand {
                 Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
 
                 try {
-                    Map<TopicPartition, OffsetAndMetadata> startOffsets = 
adminClient.listShareGroupOffsets(
+                    Map<TopicPartition, SharePartitionOffsetInfo> 
offsetInfoMap = adminClient.listShareGroupOffsets(
                         groupSpecs,
                         withTimeoutMs(new ListShareGroupOffsetsOptions())
                     ).all().get().get(groupId);
-                    Set<SharePartitionOffsetInformation> partitionOffsets = 
mapOffsetsToSharePartitionInformation(groupId, startOffsets);
+                    Set<SharePartitionOffsetInformation> partitionOffsets = 
mapOffsetInfoToSharePartitionInformation(groupId, offsetInfoMap);
 
                     groupOffsets.put(groupId, new 
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
                 } catch (InterruptedException | ExecutionException e) {
@@ -509,17 +510,18 @@ public class ShareGroupCommand {
             return groupOffsets;
         }
 
-        private static Set<SharePartitionOffsetInformation> 
mapOffsetsToSharePartitionInformation(String groupId, Map<TopicPartition, 
OffsetAndMetadata> startOffsets) {
+        private static Set<SharePartitionOffsetInformation> 
mapOffsetInfoToSharePartitionInformation(String groupId, Map<TopicPartition, 
SharePartitionOffsetInfo> offsetInfoMap) {
             Set<SharePartitionOffsetInformation> partitionOffsets = new 
HashSet<>();
 
-            startOffsets.forEach((tp, offsetAndMetadata) -> {
-                if (offsetAndMetadata != null) {
+            offsetInfoMap.forEach((tp, offsetInfo) -> {
+                if (offsetInfo != null) {
                     partitionOffsets.add(new SharePartitionOffsetInformation(
                         groupId,
                         tp.topic(),
                         tp.partition(),
-                        Optional.of(offsetAndMetadata.offset()),
-                        offsetAndMetadata.leaderEpoch()
+                        Optional.of(offsetInfo.startOffset()),
+                        offsetInfo.leaderEpoch(),
+                        offsetInfo.lag()
                     ));
                 } else {
                     partitionOffsets.add(new SharePartitionOffsetInformation(
@@ -527,6 +529,7 @@ public class ShareGroupCommand {
                         tp.topic(),
                         tp.partition(),
                         Optional.empty(),
+                        Optional.empty(),
                         Optional.empty()
                     ));
                 }
@@ -545,9 +548,9 @@ public class ShareGroupCommand {
                 String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
 
                 if (verbose) {
-                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "START-OFFSET");
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "START-OFFSET", "LAG");
                 } else {
-                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"START-OFFSET");
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"START-OFFSET", "LAG");
                 }
 
                 for (SharePartitionOffsetInformation info : offsetsInfo) {
@@ -557,14 +560,16 @@ public class ShareGroupCommand {
                             info.topic,
                             info.partition,
                             
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-                            
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+                            
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+                            
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
                         );
                     } else {
                         System.out.printf(fmt,
                             groupId,
                             info.topic,
                             info.partition,
-                            
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+                            
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+                            
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
                         );
                     }
                 }
@@ -579,9 +584,9 @@ public class ShareGroupCommand {
                 maxTopicLen = Math.max(maxTopicLen, info.topic.length());
             }
             if (verbose) {
-                return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%-13s %s";
+                return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%-13s %-13s %s";
             } else {
-                return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%s";
+                return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%-13s %s";
             }
         }
 
@@ -674,6 +679,6 @@ public class ShareGroupCommand {
     }
 
     record SharePartitionOffsetInformation(String group, String topic, int 
partition, Optional<Long> offset,
-                                           Optional<Integer> leaderEpoch) {
+                                           Optional<Integer> leaderEpoch, 
Optional<Long> lag) {
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 28e51e8c530..fb5d459febb 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
 import org.apache.kafka.clients.admin.ShareMemberAssignment;
 import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.GroupState;
@@ -207,7 +208,8 @@ public class ShareGroupCommandTest {
             ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
                 Map.of(
                     firstGroup,
-                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), "")))
+                    KafkaFuture.completedFuture(
+                            Map.of(new TopicPartition("topic1", 0), new 
SharePartitionOffsetInfo(0L, Optional.of(1), Optional.of(0L))))
                 )
             );
 
@@ -224,9 +226,9 @@ public class ShareGroupCommandTest {
 
                     List<String> expectedValues;
                     if (describeType.contains("--verbose")) {
-                        expectedValues = List.of(firstGroup, "topic1", "0", 
"1", "0");
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"1", "0", "0");
                     } else {
-                        expectedValues = List.of(firstGroup, "topic1", "0", 
"0");
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"0", "0");
                     }
                     return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
                         
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
@@ -273,9 +275,9 @@ public class ShareGroupCommandTest {
 
                     List<String> expectedValues;
                     if (describeType.contains("--verbose")) {
-                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-", "-");
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-", "-", "-");
                     } else {
-                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-");
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-", "-");
                     }
                     return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
                         
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
@@ -315,13 +317,13 @@ public class ShareGroupCommandTest {
             ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
                 Map.of(
                     firstGroup,
-                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
+                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), 
Optional.empty())))
                 )
             );
             ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
                 Map.of(
                     secondGroup,
-                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
+                    KafkaFuture.completedFuture(Map.of(new 
TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), 
Optional.of(0L))))
                 )
             );
 
@@ -349,11 +351,11 @@ public class ShareGroupCommandTest {
 
                     List<String> expectedValues1, expectedValues2;
                     if (describeType.contains("--verbose")) {
-                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"1", "0");
-                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"1", "0");
+                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"1", "0", "-");
+                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"1", "0", "0");
                     } else {
-                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"0");
-                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"0");
+                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"0", "-");
+                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"0", "0");
                     }
                     return checkArgsHeaderOutput(cgcArgs, lines[0]) && 
checkArgsHeaderOutput(cgcArgs, lines[3]) &&
                         
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
@@ -1080,8 +1082,10 @@ public class ShareGroupCommandTest {
         ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
             Map.of(
                 group,
-                KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 
0), new OffsetAndMetadata(10L), new TopicPartition(topic1, 1), new 
OffsetAndMetadata(10L), 
-                    new TopicPartition(topic2, 0), new OffsetAndMetadata(0L)))
+                KafkaFuture.completedFuture(Map.of(
+                    new TopicPartition(topic1, 0), new 
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+                    new TopicPartition(topic1, 1), new 
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+                    new TopicPartition(topic2, 0), new 
SharePartitionOffsetInfo(0L, Optional.empty(), Optional.empty())))
             )
         );
         when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1136,7 +1140,9 @@ public class ShareGroupCommandTest {
         ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
             Map.of(
                 group,
-                KafkaFuture.completedFuture(Map.of(t1, new 
OffsetAndMetadata(10L), t2, new OffsetAndMetadata(10L)))
+                KafkaFuture.completedFuture(Map.of(
+                    t1, new SharePartitionOffsetInfo(10L, Optional.empty(), 
Optional.empty()),
+                    t2, new SharePartitionOffsetInfo(10L, Optional.empty(), 
Optional.empty())))
             )
         );
         Map<String, TopicDescription> descriptions = Map.of(
@@ -1188,8 +1194,11 @@ public class ShareGroupCommandTest {
         ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
             Map.of(
                 group,
-                KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 
0), new OffsetAndMetadata(5L), new TopicPartition(topic1, 1), new 
OffsetAndMetadata(10L),
-                    new TopicPartition(topic2, 0), new OffsetAndMetadata(10L), 
new TopicPartition(topic3, 0), new OffsetAndMetadata(10L)))
+                KafkaFuture.completedFuture(Map.of(
+                    new TopicPartition(topic1, 0), new 
SharePartitionOffsetInfo(5L, Optional.empty(), Optional.empty()),
+                    new TopicPartition(topic1, 1), new 
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+                    new TopicPartition(topic2, 0), new 
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+                    new TopicPartition(topic3, 0), new 
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
             )
         );
         when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1255,7 +1264,7 @@ public class ShareGroupCommandTest {
         ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
             Map.of(
                 group,
-                KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 
0), new OffsetAndMetadata(10L)))
+                KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
             )
         );
         when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1322,7 +1331,7 @@ public class ShareGroupCommandTest {
         ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
             Map.of(
                 group,
-                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
+                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
             )
         );
         when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1381,7 +1390,7 @@ public class ShareGroupCommandTest {
         ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
             Map.of(
                 group,
-                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
+                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
             )
         );
         when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1428,7 +1437,7 @@ public class ShareGroupCommandTest {
         ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
             Map.of(
                 group,
-                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
+                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
             )
         );
         when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1515,8 +1524,8 @@ public class ShareGroupCommandTest {
 
     private boolean checkOffsetsArgsHeaderOutput(String output, boolean 
verbose) {
         List<String> expectedKeys = verbose ?
-            List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", 
"START-OFFSET") :
-            List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET");
+            List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", 
"START-OFFSET", "LAG") :
+            List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET", "LAG");
         return 
Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
     }
 


Reply via email to