This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cfe85c2a816 KAFKA-19802: Admin client changes for KIP-1226 (#20771)
cfe85c2a816 is described below
commit cfe85c2a8162476643c19888b1ade15286207f9f
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Nov 4 14:11:36 2025 +0000
KAFKA-19802: Admin client changes for KIP-1226 (#20771)
Admin client changes for KIP-1226 which adds lag information for share
groups.
Reviewers: Lianet Magrans <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../clients/admin/ListShareGroupOffsetsResult.java | 15 ++--
.../clients/admin/SharePartitionOffsetInfo.java | 82 ++++++++++++++++++++++
.../internals/ListShareGroupOffsetsHandler.java | 18 ++---
.../kafka/clients/admin/AdminClientTestUtils.java | 4 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 50 ++++++-------
.../kafka/api/PlaintextAdminIntegrationTest.scala | 10 +--
.../tools/consumer/group/ShareGroupCommand.java | 35 +++++----
.../consumer/group/ShareGroupCommandTest.java | 53 ++++++++------
9 files changed, 182 insertions(+), 87 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 78a7f905319..9a50f52fb44 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3814,7 +3814,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String,
ListShareGroupOffsetsSpec> groupSpecs,
final
ListShareGroupOffsetsOptions options) {
- SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> future =
ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+ SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition,
SharePartitionOffsetInfo>> future =
ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
ListShareGroupOffsetsHandler handler = new
ListShareGroupOffsetsHandler(groupSpecs, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new ListShareGroupOffsetsResult(future.all());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
index 1a2c8869c6c..e75735ba260 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
@@ -18,7 +18,6 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
@@ -36,9 +35,9 @@ import java.util.stream.Collectors;
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
- private final Map<String, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> futures;
+ private final Map<String, KafkaFuture<Map<TopicPartition,
SharePartitionOffsetInfo>>> futures;
- ListShareGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+ ListShareGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>> futures) {
this.futures = futures.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().idValue,
Map.Entry::getValue));
}
@@ -46,12 +45,12 @@ public class ListShareGroupOffsetsResult {
/**
* Return the future when the requests for all groups succeed.
*
- * @return Future which yields all {@code Map<String, Map<TopicPartition,
OffsetAndMetadata>>} objects, if requests for all the groups succeed.
+ * @return Future which yields all {@code Map<String, Map<TopicPartition,
SharePartitionOffsetInfo>>} objects, if requests for all the groups succeed.
*/
- public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>>
all() {
+ public KafkaFuture<Map<String, Map<TopicPartition,
SharePartitionOffsetInfo>>> all() {
return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture<?>[0])).thenApply(
nil -> {
- Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets =
new HashMap<>(futures.size());
+ Map<String, Map<TopicPartition, SharePartitionOffsetInfo>>
offsets = new HashMap<>(futures.size());
futures.forEach((groupId, future) -> {
try {
offsets.put(groupId, future.get());
@@ -67,9 +66,9 @@ public class ListShareGroupOffsetsResult {
/**
* Return a future which yields a map of topic partitions to offsets for
the specified group. If the group doesn't
- * have a committed offset for a specific partition, the corresponding
value in the returned map will be null.
+ * have offset information for a specific partition, the corresponding
value in the returned map will be null.
*/
- public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>
partitionsToOffsetAndMetadata(String groupId) {
+ public KafkaFuture<Map<TopicPartition, SharePartitionOffsetInfo>>
partitionsToOffsetInfo(String groupId) {
if (!futures.containsKey(groupId)) {
throw new IllegalArgumentException("Group ID not found: " +
groupId);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java
b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java
new file mode 100644
index 00000000000..5948103dc17
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class is used to contain the offset and lag information for a
share-partition.
+ */
[email protected]
+public class SharePartitionOffsetInfo {
+ private final long startOffset;
+ private final Optional<Integer> leaderEpoch;
+ private final Optional<Long> lag;
+
+ /**
+ * Construct a new SharePartitionOffsetInfo.
+ *
+ * @param startOffset The share-partition start offset
+ * @param leaderEpoch The optional leader epoch of the share-partition
+ * @param lag The optional lag for the share-partition
+ */
+ public SharePartitionOffsetInfo(long startOffset, Optional<Integer>
leaderEpoch, Optional<Long> lag) {
+ this.startOffset = startOffset;
+ this.leaderEpoch = leaderEpoch;
+ this.lag = lag;
+ }
+
+ public long startOffset() {
+ return startOffset;
+ }
+
+ public Optional<Integer> leaderEpoch() {
+ return leaderEpoch;
+ }
+
+ public Optional<Long> lag() {
+ return lag;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ SharePartitionOffsetInfo that = (SharePartitionOffsetInfo) o;
+ return startOffset == that.startOffset &&
+ Objects.equals(leaderEpoch, that.leaderEpoch) &&
+ Objects.equals(lag, that.lag);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(startOffset, leaderEpoch, lag);
+ }
+
+ @Override
+ public String toString() {
+ return "SharePartitionOffsetInfo{" +
+ "startOffset=" + startOffset +
+ ", leaderEpoch=" + leaderEpoch.orElse(null) +
+ ", lag=" + lag.orElse(null) +
+ '}';
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index f9b9e987930..5b75a482aae 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -19,7 +19,7 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
@@ -47,7 +47,7 @@ import java.util.stream.Collectors;
/**
* This class is the handler for {@link
KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
*/
-public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>
{
+public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition,
SharePartitionOffsetInfo>> {
private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
private final Logger log;
@@ -60,7 +60,7 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
}
- public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, SharePartitionOffsetInfo>> newFuture(Collection<String>
groupIds) {
return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
}
@@ -110,13 +110,13 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
}
@Override
- public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>
handleResponse(Node coordinator,
-
Set<CoordinatorKey> groupIds,
-
AbstractResponse abstractResponse) {
+ public ApiResult<CoordinatorKey, Map<TopicPartition,
SharePartitionOffsetInfo>> handleResponse(Node coordinator,
+
Set<CoordinatorKey> groupIds,
+
AbstractResponse abstractResponse) {
validateKeys(groupIds);
final DescribeShareGroupOffsetsResponse response =
(DescribeShareGroupOffsetsResponse) abstractResponse;
- final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>
completed = new HashMap<>();
+ final Map<CoordinatorKey, Map<TopicPartition,
SharePartitionOffsetInfo>> completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final List<CoordinatorKey> unmapped = new ArrayList<>();
@@ -125,7 +125,7 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
if (response.hasGroupError(groupId)) {
handleGroupError(coordinatorKey, response.groupError(groupId),
failed, unmapped);
} else {
- Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing =
new HashMap<>();
+ Map<TopicPartition, SharePartitionOffsetInfo>
groupOffsetsListing = new HashMap<>();
response.data().groups().stream().filter(g ->
g.groupId().equals(groupId)).forEach(groupResponse -> {
for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic
topicResponse : groupResponse.topics()) {
for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
partitionResponse : topicResponse.partitions()) {
@@ -137,7 +137,7 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
if (partitionResponse.startOffset() < 0) {
groupOffsetsListing.put(tp, null);
} else {
- groupOffsetsListing.put(tp, new
OffsetAndMetadata(startOffset, leaderEpoch, ""));
+ groupOffsetsListing.put(tp, new
SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty()));
}
} else {
log.warn("Skipping return offset for {} due to
error {}: {}.", tp, partitionResponse.errorCode(),
partitionResponse.errorMessage());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 02c09443370..a272bce672e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -186,8 +186,8 @@ public class AdminClientTestUtils {
return new ListConfigResourcesResult(future);
}
- public static ListShareGroupOffsetsResult
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> groupOffsets) {
- Map<CoordinatorKey, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> coordinatorFutures = groupOffsets.entrySet().stream()
+ public static ListShareGroupOffsetsResult
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition,
SharePartitionOffsetInfo>>> groupOffsets) {
+ Map<CoordinatorKey, KafkaFuture<Map<TopicPartition,
SharePartitionOffsetInfo>>> coordinatorFutures =
groupOffsets.entrySet().stream()
.collect(Collectors.toMap(
entry -> CoordinatorKey.byGroupId(entry.getKey()),
Map.Entry::getValue
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d01cde83bb3..55b4119f0ce 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -11186,15 +11186,15 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new
DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
- final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadata =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
+ final Map<TopicPartition, SharePartitionOffsetInfo>
partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
- assertEquals(6, partitionToOffsetAndMetadata.size());
- assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition0));
- assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition1));
- assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition2));
- assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""),
partitionToOffsetAndMetadata.get(myTopicPartition3));
- assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""),
partitionToOffsetAndMetadata.get(myTopicPartition4));
- assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""),
partitionToOffsetAndMetadata.get(myTopicPartition5));
+ assertEquals(6, partitionToOffsetInfo.size());
+ assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
+ assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
+ assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition2));
+ assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
+ assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition4));
+ assertEquals(new SharePartitionOffsetInfo(500, Optional.of(3),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition5));
}
}
@@ -11257,17 +11257,17 @@ public class KafkaAdminClientTest {
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
assertEquals(2, result.all().get().size());
- final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadataGroup0 =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
- assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
- assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
- assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
- assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
- assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
+ final Map<TopicPartition, SharePartitionOffsetInfo>
partitionToOffsetInfoGroup0 = result.partitionsToOffsetInfo(GROUP_ID).get();
+ assertEquals(4, partitionToOffsetInfoGroup0.size());
+ assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0),
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition0));
+ assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0),
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition1));
+ assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0),
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition2));
+ assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1),
Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition3));
- final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadataGroup1 =
result.partitionsToOffsetAndMetadata("group-1").get();
- assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
- assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""),
partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
- assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""),
partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
+ final Map<TopicPartition, SharePartitionOffsetInfo>
partitionToOffsetInfoGroup1 = result.partitionsToOffsetInfo("group-1").get();
+ assertEquals(2, partitionToOffsetInfoGroup1.size());
+ assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2),
Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition4));
+ assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2),
Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition5));
}
}
@@ -11290,9 +11290,9 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new
DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
- final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadata =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
+ final Map<TopicPartition, SharePartitionOffsetInfo>
partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
- assertEquals(0, partitionToOffsetAndMetadata.size());
+ assertEquals(0, partitionToOffsetInfo.size());
}
}
@@ -11342,13 +11342,13 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new
DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
- final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadata =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
+ final Map<TopicPartition, SharePartitionOffsetInfo>
partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get();
// For myTopicPartition2 we have set an error as the response.
Thus, it should be skipped from the final result
- assertEquals(3, partitionToOffsetAndMetadata.size());
- assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition0));
- assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""),
partitionToOffsetAndMetadata.get(myTopicPartition1));
- assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""),
partitionToOffsetAndMetadata.get(myTopicPartition3));
+ assertEquals(3, partitionToOffsetInfo.size());
+ assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0));
+ assertEquals(new SharePartitionOffsetInfo(11, Optional.of(1),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1));
+ assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2),
Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3));
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index c25ed9e9bfb..d057ce99d71 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2925,7 +2925,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val tp1 = new TopicPartition(testTopicName, 0)
val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
- .partitionsToOffsetAndMetadata(testGroupId)
+ .partitionsToOffsetInfo(testGroupId)
.get()
assertTrue(parts.containsKey(tp1))
assertNull(parts.get(tp1))
@@ -2991,10 +2991,10 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFutureThrows(classOf[UnknownTopicOrPartitionException],
offsetAlterResult.partitionResult(tp2))
val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))))
- .partitionsToOffsetAndMetadata(testGroupId)
+ .partitionsToOffsetInfo(testGroupId)
.get()
assertTrue(parts.containsKey(tp1))
- assertEquals(0, parts.get(tp1).offset())
+ assertEquals(0, parts.get(tp1).startOffset())
} finally {
Utils.closeQuietly(client, "adminClient")
}
@@ -3031,14 +3031,14 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Test listShareGroupOffsets
TestUtils.waitUntilTrue(() => {
val parts = client.listShareGroupOffsets(util.Map.of(testGroupId,
new ListShareGroupOffsetsSpec()))
- .partitionsToOffsetAndMetadata(testGroupId)
+ .partitionsToOffsetInfo(testGroupId)
.get()
parts.containsKey(tp1) && parts.containsKey(tp2)
}, "Expected the result contains all partitions.")
// Test listShareGroupOffsets with listShareGroupOffsetsSpec
val groupSpecs = util.Map.of(testGroupId, new
ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))
- val parts =
client.listShareGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata(testGroupId).get()
+ val parts =
client.listShareGroupOffsets(groupSpecs).partitionsToOffsetInfo(testGroupId).get()
assertTrue(parts.containsKey(tp1))
assertFalse(parts.containsKey(tp2))
} finally {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 2c3e21fedb1..6d36e49dcfd 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -32,6 +32,7 @@ import
org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
@@ -437,7 +438,7 @@ public class ShareGroupCommand {
partitionsToReset =
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
} else {
Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
- Map<TopicPartition, OffsetAndMetadata>
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
+ Map<TopicPartition, SharePartitionOffsetInfo>
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
groupSpecs,
withTimeoutMs(new ListShareGroupOffsetsOptions())
).all().get().get(groupId);
@@ -494,11 +495,11 @@ public class ShareGroupCommand {
Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
try {
- Map<TopicPartition, OffsetAndMetadata> startOffsets =
adminClient.listShareGroupOffsets(
+ Map<TopicPartition, SharePartitionOffsetInfo>
offsetInfoMap = adminClient.listShareGroupOffsets(
groupSpecs,
withTimeoutMs(new ListShareGroupOffsetsOptions())
).all().get().get(groupId);
- Set<SharePartitionOffsetInformation> partitionOffsets =
mapOffsetsToSharePartitionInformation(groupId, startOffsets);
+ Set<SharePartitionOffsetInformation> partitionOffsets =
mapOffsetInfoToSharePartitionInformation(groupId, offsetInfoMap);
groupOffsets.put(groupId, new
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
} catch (InterruptedException | ExecutionException e) {
@@ -509,17 +510,18 @@ public class ShareGroupCommand {
return groupOffsets;
}
- private static Set<SharePartitionOffsetInformation>
mapOffsetsToSharePartitionInformation(String groupId, Map<TopicPartition,
OffsetAndMetadata> startOffsets) {
+ private static Set<SharePartitionOffsetInformation>
mapOffsetInfoToSharePartitionInformation(String groupId, Map<TopicPartition,
SharePartitionOffsetInfo> offsetInfoMap) {
Set<SharePartitionOffsetInformation> partitionOffsets = new
HashSet<>();
- startOffsets.forEach((tp, offsetAndMetadata) -> {
- if (offsetAndMetadata != null) {
+ offsetInfoMap.forEach((tp, offsetInfo) -> {
+ if (offsetInfo != null) {
partitionOffsets.add(new SharePartitionOffsetInformation(
groupId,
tp.topic(),
tp.partition(),
- Optional.of(offsetAndMetadata.offset()),
- offsetAndMetadata.leaderEpoch()
+ Optional.of(offsetInfo.startOffset()),
+ offsetInfo.leaderEpoch(),
+ offsetInfo.lag()
));
} else {
partitionOffsets.add(new SharePartitionOffsetInformation(
@@ -527,6 +529,7 @@ public class ShareGroupCommand {
tp.topic(),
tp.partition(),
Optional.empty(),
+ Optional.empty(),
Optional.empty()
));
}
@@ -545,9 +548,9 @@ public class ShareGroupCommand {
String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
if (verbose) {
- System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "START-OFFSET");
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "START-OFFSET", "LAG");
} else {
- System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"START-OFFSET");
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"START-OFFSET", "LAG");
}
for (SharePartitionOffsetInformation info : offsetsInfo) {
@@ -557,14 +560,16 @@ public class ShareGroupCommand {
info.topic,
info.partition,
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
} else {
System.out.printf(fmt,
groupId,
info.topic,
info.partition,
-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
}
}
@@ -579,9 +584,9 @@ public class ShareGroupCommand {
maxTopicLen = Math.max(maxTopicLen, info.topic.length());
}
if (verbose) {
- return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s
%-13s %s";
+ return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s
%-13s %-13s %s";
} else {
- return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s
%s";
+ return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s
%-13s %s";
}
}
@@ -674,6 +679,6 @@ public class ShareGroupCommand {
}
record SharePartitionOffsetInformation(String group, String topic, int
partition, Optional<Long> offset,
- Optional<Integer> leaderEpoch) {
+ Optional<Integer> leaderEpoch,
Optional<Long> lag) {
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 28e51e8c530..fb5d459febb 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
@@ -207,7 +208,8 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
firstGroup,
- KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), "")))
+ KafkaFuture.completedFuture(
+ Map.of(new TopicPartition("topic1", 0), new
SharePartitionOffsetInfo(0L, Optional.of(1), Optional.of(0L))))
)
);
@@ -224,9 +226,9 @@ public class ShareGroupCommandTest {
List<String> expectedValues;
if (describeType.contains("--verbose")) {
- expectedValues = List.of(firstGroup, "topic1", "0",
"1", "0");
+ expectedValues = List.of(firstGroup, "topic1", "0",
"1", "0", "0");
} else {
- expectedValues = List.of(firstGroup, "topic1", "0",
"0");
+ expectedValues = List.of(firstGroup, "topic1", "0",
"0", "0");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
@@ -273,9 +275,9 @@ public class ShareGroupCommandTest {
List<String> expectedValues;
if (describeType.contains("--verbose")) {
- expectedValues = List.of(firstGroup, "topic1", "0",
"-", "-");
+ expectedValues = List.of(firstGroup, "topic1", "0",
"-", "-", "-");
} else {
- expectedValues = List.of(firstGroup, "topic1", "0",
"-");
+ expectedValues = List.of(firstGroup, "topic1", "0",
"-", "-");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
@@ -315,13 +317,13 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult1 =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
firstGroup,
- KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
+ KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1),
Optional.empty())))
)
);
ListShareGroupOffsetsResult listShareGroupOffsetsResult2 =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
secondGroup,
- KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
+ KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1),
Optional.of(0L))))
)
);
@@ -349,11 +351,11 @@ public class ShareGroupCommandTest {
List<String> expectedValues1, expectedValues2;
if (describeType.contains("--verbose")) {
- expectedValues1 = List.of(firstGroup, "topic1", "0",
"1", "0");
- expectedValues2 = List.of(secondGroup, "topic1", "0",
"1", "0");
+ expectedValues1 = List.of(firstGroup, "topic1", "0",
"1", "0", "-");
+ expectedValues2 = List.of(secondGroup, "topic1", "0",
"1", "0", "0");
} else {
- expectedValues1 = List.of(firstGroup, "topic1", "0",
"0");
- expectedValues2 = List.of(secondGroup, "topic1", "0",
"0");
+ expectedValues1 = List.of(firstGroup, "topic1", "0",
"0", "-");
+ expectedValues2 = List.of(secondGroup, "topic1", "0",
"0", "0");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
checkArgsHeaderOutput(cgcArgs, lines[3]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
@@ -1080,8 +1082,10 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
group,
- KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1,
0), new OffsetAndMetadata(10L), new TopicPartition(topic1, 1), new
OffsetAndMetadata(10L),
- new TopicPartition(topic2, 0), new OffsetAndMetadata(0L)))
+ KafkaFuture.completedFuture(Map.of(
+ new TopicPartition(topic1, 0), new
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+ new TopicPartition(topic1, 1), new
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+ new TopicPartition(topic2, 0), new
SharePartitionOffsetInfo(0L, Optional.empty(), Optional.empty())))
)
);
when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1136,7 +1140,9 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
group,
- KafkaFuture.completedFuture(Map.of(t1, new
OffsetAndMetadata(10L), t2, new OffsetAndMetadata(10L)))
+ KafkaFuture.completedFuture(Map.of(
+ t1, new SharePartitionOffsetInfo(10L, Optional.empty(),
Optional.empty()),
+ t2, new SharePartitionOffsetInfo(10L, Optional.empty(),
Optional.empty())))
)
);
Map<String, TopicDescription> descriptions = Map.of(
@@ -1188,8 +1194,11 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
group,
- KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1,
0), new OffsetAndMetadata(5L), new TopicPartition(topic1, 1), new
OffsetAndMetadata(10L),
- new TopicPartition(topic2, 0), new OffsetAndMetadata(10L),
new TopicPartition(topic3, 0), new OffsetAndMetadata(10L)))
+ KafkaFuture.completedFuture(Map.of(
+ new TopicPartition(topic1, 0), new
SharePartitionOffsetInfo(5L, Optional.empty(), Optional.empty()),
+ new TopicPartition(topic1, 1), new
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+ new TopicPartition(topic2, 0), new
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()),
+ new TopicPartition(topic3, 0), new
SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
)
);
when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1255,7 +1264,7 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
group,
- KafkaFuture.completedFuture(Map.of(new TopicPartition(topic,
0), new OffsetAndMetadata(10L)))
+ KafkaFuture.completedFuture(Map.of(new TopicPartition(topic,
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
)
);
when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1322,7 +1331,7 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
group,
- KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
)
);
when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1381,7 +1390,7 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
group,
- KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
)
);
when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1428,7 +1437,7 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
group,
- KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty())))
)
);
when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
@@ -1515,8 +1524,8 @@ public class ShareGroupCommandTest {
private boolean checkOffsetsArgsHeaderOutput(String output, boolean
verbose) {
List<String> expectedKeys = verbose ?
- List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH",
"START-OFFSET") :
- List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET");
+ List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH",
"START-OFFSET", "LAG") :
+ List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET", "LAG");
return
Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
}