This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7d027a4d83a KAFKA-19218: Add missing leader epoch to share group state
summary response (#19602)
7d027a4d83a is described below
commit 7d027a4d83af15428d4aa9bdb2144adcad73408c
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue May 6 14:53:12 2025 +0100
KAFKA-19218: Add missing leader epoch to share group state summary response
(#19602)
When the persister is responding to a read share-group state summary
request, it has no way of including the leader epoch in its response,
even though it has the information to hand. This means that the leader
epoch information is not initialised in the admin client operation to
list share group offsets, and this then means that the information
cannot be displayed in kafka-share-groups.sh.
Reviewers: Apoorv Mittal <[email protected]>, Sushant Mahajan
<[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../clients/admin/ListShareGroupOffsetsResult.java | 11 ++-
.../internals/ListShareGroupOffsetsHandler.java | 20 ++--
.../ReadShareGroupStateSummaryResponse.java | 2 +
.../ReadShareGroupStateSummaryResponse.json | 2 +
.../kafka/clients/admin/AdminClientTestUtils.java | 4 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 110 ++++++++++++++-------
.../coordinator/group/GroupCoordinatorService.java | 1 +
.../share/persister/DefaultStatePersister.java | 2 +
.../server/share/persister/NoOpStatePersister.java | 3 +-
.../server/share/persister/PartitionFactory.java | 8 +-
.../share/persister/PartitionStateSummaryData.java | 2 +
.../ReadShareGroupStateSummaryResult.java | 3 +-
.../share/persister/DefaultStatePersisterTest.java | 18 ++--
.../coordinator/share/ShareCoordinatorShard.java | 2 +
.../share/ShareCoordinatorShardTest.java | 1 +
.../tools/consumer/group/ShareGroupCommand.java | 39 +++++---
.../consumer/group/ShareGroupCommandTest.java | 13 +--
18 files changed, 155 insertions(+), 88 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 91698c813e7..0f37edb2dd8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3846,7 +3846,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String,
ListShareGroupOffsetsSpec> groupSpecs,
final
ListShareGroupOffsetsOptions options) {
- SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, Long>> future
= ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+ SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> future =
ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet());
ListShareGroupOffsetsHandler handler = new
ListShareGroupOffsetsHandler(groupSpecs, logContext);
invokeDriver(handler, future, options.timeoutMs);
return new ListShareGroupOffsetsResult(future.all());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
index d39f3711f4c..e1dcd932309 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
@@ -18,6 +18,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
@@ -35,9 +36,9 @@ import java.util.stream.Collectors;
@InterfaceStability.Evolving
public class ListShareGroupOffsetsResult {
- private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures;
+ private final Map<String, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> futures;
- ListShareGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, Long>>> futures) {
+ ListShareGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
this.futures = futures.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().idValue,
Map.Entry::getValue));
}
@@ -47,10 +48,10 @@ public class ListShareGroupOffsetsResult {
*
* @return Future which yields all {@code Map<String, Map<TopicPartition,
Long>>} objects, if requests for all the groups succeed.
*/
- public KafkaFuture<Map<String, Map<TopicPartition, Long>>> all() {
+ public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>>
all() {
return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture<?>[0])).thenApply(
nil -> {
- Map<String, Map<TopicPartition, Long>> offsets = new
HashMap<>(futures.size());
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets =
new HashMap<>(futures.size());
futures.forEach((groupId, future) -> {
try {
offsets.put(groupId, future.get());
@@ -70,7 +71,7 @@ public class ListShareGroupOffsetsResult {
* @param groupId The group ID.
* @return Future which yields a map of topic partitions to offsets for
the specified group.
*/
- public KafkaFuture<Map<TopicPartition, Long>> partitionsToOffset(String
groupId) {
+ public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>
partitionsToOffsetAndMetadata(String groupId) {
if (!futures.containsKey(groupId)) {
throw new IllegalArgumentException("Group ID not found: " +
groupId);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
index fcaba5a67e6..f9b9e987930 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
@@ -39,13 +40,14 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is the handler for {@link
KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
*/
-public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>
{
private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
private final Logger log;
@@ -58,7 +60,7 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
}
- public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) {
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
}
@@ -108,13 +110,13 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
}
@Override
- public ApiResult<CoordinatorKey, Map<TopicPartition, Long>>
handleResponse(Node coordinator,
-
Set<CoordinatorKey> groupIds,
-
AbstractResponse abstractResponse) {
+ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>
handleResponse(Node coordinator,
+
Set<CoordinatorKey> groupIds,
+
AbstractResponse abstractResponse) {
validateKeys(groupIds);
final DescribeShareGroupOffsetsResponse response =
(DescribeShareGroupOffsetsResponse) abstractResponse;
- final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new
HashMap<>();
+ final Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>
completed = new HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
final List<CoordinatorKey> unmapped = new ArrayList<>();
@@ -123,17 +125,19 @@ public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<Coordi
if (response.hasGroupError(groupId)) {
handleGroupError(coordinatorKey, response.groupError(groupId),
failed, unmapped);
} else {
- Map<TopicPartition, Long> groupOffsetsListing = new
HashMap<>();
+ Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing =
new HashMap<>();
response.data().groups().stream().filter(g ->
g.groupId().equals(groupId)).forEach(groupResponse -> {
for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic
topicResponse : groupResponse.topics()) {
for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
partitionResponse : topicResponse.partitions()) {
TopicPartition tp = new
TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
if (partitionResponse.errorCode() ==
Errors.NONE.code()) {
+ final long startOffset =
partitionResponse.startOffset();
+ final Optional<Integer> leaderEpoch =
partitionResponse.leaderEpoch() < 0 ? Optional.empty() :
Optional.of(partitionResponse.leaderEpoch());
// Negative offset indicates there is no start
offset for this partition
if (partitionResponse.startOffset() < 0) {
groupOffsetsListing.put(tp, null);
} else {
- groupOffsetsListing.put(tp,
partitionResponse.startOffset());
+ groupOffsetsListing.put(tp, new
OffsetAndMetadata(startOffset, leaderEpoch, ""));
}
} else {
log.warn("Skipping return offset for {} due to
error {}: {}.", tp, partitionResponse.errorCode(),
partitionResponse.errorMessage());
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
index 86363add1e1..a2787ff82c9 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java
@@ -99,6 +99,7 @@ public class ReadShareGroupStateSummaryResponse extends
AbstractResponse {
Uuid topicId,
int partition,
long startOffset,
+ int leaderEpoch,
int stateEpoch
) {
return new ReadShareGroupStateSummaryResponseData()
@@ -109,6 +110,7 @@ public class ReadShareGroupStateSummaryResponse extends
AbstractResponse {
new
ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(startOffset)
+ .setLeaderEpoch(leaderEpoch)
.setStateEpoch(stateEpoch)
))
));
diff --git
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
index ddf9d7044a6..81e3edc554e 100644
---
a/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
+++
b/clients/src/main/resources/common/message/ReadShareGroupStateSummaryResponse.json
@@ -41,6 +41,8 @@
"about": "The error message, or null if there was no error." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch of the share-partition." },
+ { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
+ "about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." }
]}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 061982e34d3..c98ffb9483f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -176,8 +176,8 @@ public class AdminClientTestUtils {
return new ListClientMetricsResourcesResult(future);
}
- public static ListShareGroupOffsetsResult
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition,
Long>>> groupOffsets) {
- Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>>
coordinatorFutures = groupOffsets.entrySet().stream()
+ public static ListShareGroupOffsetsResult
createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> groupOffsets) {
+ Map<CoordinatorKey, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> coordinatorFutures = groupOffsets.entrySet().stream()
.collect(Collectors.toMap(
entry -> CoordinatorKey.byGroupId(entry.getKey()),
Map.Entry::getValue
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 339a431aa1c..74c381be17e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -10577,12 +10577,24 @@ public class KafkaAdminClientTest {
List.of(
new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1)
+ )
+ ),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2)
+ )
+ ),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(3)
+ )
+ )
)
)
)
@@ -10590,15 +10602,15 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new
DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
- final Map<TopicPartition, Long> partitionToOffsetAndMetadata =
result.partitionsToOffset(GROUP_ID).get();
+ final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadata =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
assertEquals(6, partitionToOffsetAndMetadata.size());
- assertEquals(10,
partitionToOffsetAndMetadata.get(myTopicPartition0));
- assertEquals(11,
partitionToOffsetAndMetadata.get(myTopicPartition1));
- assertEquals(40,
partitionToOffsetAndMetadata.get(myTopicPartition2));
- assertEquals(50,
partitionToOffsetAndMetadata.get(myTopicPartition3));
- assertEquals(100,
partitionToOffsetAndMetadata.get(myTopicPartition4));
- assertEquals(500,
partitionToOffsetAndMetadata.get(myTopicPartition5));
+ assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition0));
+ assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition1));
+ assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition2));
+ assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""),
partitionToOffsetAndMetadata.get(myTopicPartition3));
+ assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""),
partitionToOffsetAndMetadata.get(myTopicPartition4));
+ assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""),
partitionToOffsetAndMetadata.get(myTopicPartition5));
}
}
@@ -10630,16 +10642,28 @@ public class KafkaAdminClientTest {
List.of(
new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50)))
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(0),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(2).setStartOffset(40).setLeaderEpoch(0),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(3).setStartOffset(50).setLeaderEpoch(1)
+ )
+ )
)
),
new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId("group-1").setTopics(
List.of(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setStartOffset(100).setLeaderEpoch(2)
+ )
+ ),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2)
+ )
+ )
)
)
)
@@ -10649,17 +10673,17 @@ public class KafkaAdminClientTest {
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
assertEquals(2, result.all().get().size());
- final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup0
= result.partitionsToOffset(GROUP_ID).get();
+ final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadataGroup0 =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
assertEquals(4, partitionToOffsetAndMetadataGroup0.size());
- assertEquals(10,
partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
- assertEquals(11,
partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
- assertEquals(40,
partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
- assertEquals(50,
partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
+ assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition0));
+ assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition1));
+ assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition2));
+ assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""),
partitionToOffsetAndMetadataGroup0.get(myTopicPartition3));
- final Map<TopicPartition, Long> partitionToOffsetAndMetadataGroup1
= result.partitionsToOffset("group-1").get();
+ final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadataGroup1 =
result.partitionsToOffsetAndMetadata("group-1").get();
assertEquals(2, partitionToOffsetAndMetadataGroup1.size());
- assertEquals(100,
partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
- assertEquals(500,
partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
+ assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""),
partitionToOffsetAndMetadataGroup1.get(myTopicPartition4));
+ assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""),
partitionToOffsetAndMetadataGroup1.get(myTopicPartition5));
}
}
@@ -10682,7 +10706,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new
DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
- final Map<TopicPartition, Long> partitionToOffsetAndMetadata =
result.partitionsToOffset(GROUP_ID).get();
+ final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadata =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
assertEquals(0, partitionToOffsetAndMetadata.size());
}
@@ -10711,12 +10735,22 @@ public class KafkaAdminClientTest {
List.of(
new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup().setGroupId(GROUP_ID).setTopics(
List.of(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(List.of(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11)
- )),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not
a Coordinator"))),
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500)))
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(0).setStartOffset(10).setLeaderEpoch(0),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(1).setStartOffset(11).setLeaderEpoch(1)
+ )
+ ),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_1").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(4).setErrorCode(Errors.NOT_COORDINATOR.code()).setErrorMessage("Not
a Coordinator")
+ )
+ ),
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic().setTopicName("my_topic_2").setPartitions(
+ List.of(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition().setPartitionIndex(6).setStartOffset(500).setLeaderEpoch(2)
+ )
+ )
)
)
)
@@ -10724,13 +10758,13 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(new
DescribeShareGroupOffsetsResponse(data));
final ListShareGroupOffsetsResult result =
env.adminClient().listShareGroupOffsets(groupSpecs);
- final Map<TopicPartition, Long> partitionToOffsetAndMetadata =
result.partitionsToOffset(GROUP_ID).get();
+ final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadata =
result.partitionsToOffsetAndMetadata(GROUP_ID).get();
// For myTopicPartition2 we have set an error as the response.
Thus, it should be skipped from the final result
assertEquals(3, partitionToOffsetAndMetadata.size());
- assertEquals(10,
partitionToOffsetAndMetadata.get(myTopicPartition0));
- assertEquals(11,
partitionToOffsetAndMetadata.get(myTopicPartition1));
- assertEquals(500,
partitionToOffsetAndMetadata.get(myTopicPartition3));
+ assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""),
partitionToOffsetAndMetadata.get(myTopicPartition0));
+ assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""),
partitionToOffsetAndMetadata.get(myTopicPartition1));
+ assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""),
partitionToOffsetAndMetadata.get(myTopicPartition3));
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index ebccd66359b..cc2566b492e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1702,6 +1702,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
.setPartitionIndex(partitionData.partition())
.setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
+ .setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
).toList())
));
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
index 1b4a0756515..ae8b8c317c3 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java
@@ -486,6 +486,7 @@ public class DefaultStatePersister implements Persister {
partitionResult.partition(),
partitionResult.stateEpoch(),
partitionResult.startOffset(),
+ partitionResult.leaderEpoch(),
partitionResult.errorCode(),
partitionResult.errorMessage()))
.toList();
@@ -495,6 +496,7 @@ public class DefaultStatePersister implements Persister {
partition,
-1,
-1,
+ -1,
Errors.UNKNOWN_SERVER_ERROR.code(), // No
specific public error code exists for InterruptedException / ExecutionException
"Error reading state from share coordinator: "
+ e.getMessage()));
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
index 2814fc3aa82..908891dd463 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/NoOpStatePersister.java
@@ -92,7 +92,8 @@ public class NoOpStatePersister implements Persister {
for (TopicData<PartitionIdLeaderEpochData> topicData :
reqData.topicsData()) {
resultArgs.add(new TopicData<>(topicData.topicId(),
topicData.partitions().stream().
map(partitionIdData ->
PartitionFactory.newPartitionStateSummaryData(
- partitionIdData.partition(),
PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
+ partitionIdData.partition(),
PartitionFactory.DEFAULT_STATE_EPOCH,
PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_LEADER_EPOCH,
PartitionFactory.DEFAULT_ERROR_CODE, PartitionFactory.DEFAULT_ERR_MESSAGE))
.collect(Collectors.toList())));
}
return CompletableFuture.completedFuture(new
ReadShareGroupStateSummaryResult.Builder().setTopicsData(resultArgs).build());
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
index 009eb9cccc1..78a6902a170 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionFactory.java
@@ -47,12 +47,8 @@ public class PartitionFactory {
return new PartitionData(partition, DEFAULT_STATE_EPOCH,
UNINITIALIZED_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH,
null);
}
- public static PartitionStateErrorData newPartitionStateErrorData(int
partition, int stateEpoch, long startOffset, short errorCode, String
errorMessage) {
- return new PartitionData(partition, stateEpoch, startOffset,
errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
- }
-
- public static PartitionStateSummaryData newPartitionStateSummaryData(int
partition, int stateEpoch, long startOffset, short errorCode, String
errorMessage) {
- return new PartitionData(partition, stateEpoch, startOffset,
errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
+ public static PartitionStateSummaryData newPartitionStateSummaryData(int
partition, int stateEpoch, long startOffset, int leaderEpoch, short errorCode,
String errorMessage) {
+ return new PartitionData(partition, stateEpoch, startOffset,
errorCode, errorMessage, leaderEpoch, null);
}
public static PartitionStateBatchData newPartitionStateBatchData(int
partition, int stateEpoch, long startOffset, int leaderEpoch,
List<PersisterStateBatch> stateBatches) {
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
index dc4732a79ae..58a9dc10615 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PartitionStateSummaryData.java
@@ -24,6 +24,8 @@ package org.apache.kafka.server.share.persister;
public interface PartitionStateSummaryData extends PartitionInfoData,
PartitionIdData {
int stateEpoch();
+ int leaderEpoch();
+
long startOffset();
short errorCode();
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
index 7e0bee13c38..249eb20ed94 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryResult.java
@@ -38,7 +38,8 @@ public class ReadShareGroupStateSummaryResult implements
PersisterResult {
.map(readStateSummaryResult -> new
TopicData<>(readStateSummaryResult.topicId(),
readStateSummaryResult.partitions().stream()
.map(partitionResult ->
PartitionFactory.newPartitionStateSummaryData(
- partitionResult.partition(),
partitionResult.stateEpoch(), partitionResult.startOffset(),
partitionResult.errorCode(), partitionResult.errorMessage()))
+ partitionResult.partition(),
partitionResult.stateEpoch(), partitionResult.startOffset(),
+ partitionResult.leaderEpoch(),
partitionResult.errorCode(), partitionResult.errorMessage()))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();
diff --git
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
index 1a668b5dc32..697d958723a 100644
---
a/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/share/persister/DefaultStatePersisterTest.java
@@ -868,7 +868,7 @@ class DefaultStatePersisterTest {
return requestGroupId.equals(groupId) && requestTopicId ==
topicId1 && requestPartition == partition1;
},
- new
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1,
partition1, 0, 1)),
+ new
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId1,
partition1, 0, 1, 1)),
coordinatorNode1);
client.prepareResponseFrom(
@@ -880,7 +880,7 @@ class DefaultStatePersisterTest {
return requestGroupId.equals(groupId) && requestTopicId ==
topicId2 && requestPartition == partition2;
},
- new
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2,
partition2, 0, 1)),
+ new
ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponse.toResponseData(topicId2,
partition2, 0, 1, 1)),
coordinatorNode2);
ShareCoordinatorMetadataCacheHelper cacheHelper =
getDefaultCacheHelper(suppliedNode);
@@ -930,12 +930,12 @@ class DefaultStatePersisterTest {
HashSet<PartitionData> expectedResultMap = new HashSet<>();
expectedResultMap.add(
- (PartitionData)
PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0,
Errors.NONE.code(),
+ (PartitionData)
PartitionFactory.newPartitionStateSummaryData(partition1, 1, 0, 1,
Errors.NONE.code(),
null
));
expectedResultMap.add(
- (PartitionData)
PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0,
Errors.NONE.code(),
+ (PartitionData)
PartitionFactory.newPartitionStateSummaryData(partition2, 1, 0, 1,
Errors.NONE.code(),
null
));
@@ -1437,6 +1437,7 @@ class DefaultStatePersisterTest {
tp1.topicId(),
tp1.partition(),
1L,
+ 1,
2
)
)
@@ -1468,7 +1469,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
-
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L,
Errors.NONE.code(), null))
+
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L,
1, Errors.NONE.code(), null))
)
)
);
@@ -1476,7 +1477,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
-
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
+
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), 0, 0, 0,
Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "unknown tp"))
)
)
);
@@ -1496,6 +1497,7 @@ class DefaultStatePersisterTest {
tp1.topicId(),
tp1.partition(),
1L,
+ 1,
2
)
)
@@ -1517,7 +1519,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp1.topicId(),
-
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L,
Errors.NONE.code(), null))
+
List.of(PartitionFactory.newPartitionStateSummaryData(tp1.partition(), 2, 1L,
1, Errors.NONE.code(), null))
)
)
);
@@ -1525,7 +1527,7 @@ class DefaultStatePersisterTest {
results.topicsData().contains(
new TopicData<>(
tp2.topicId(),
-
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L,
Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share
coordinator: java.lang.Exception: scary stuff"))
+
List.of(PartitionFactory.newPartitionStateSummaryData(tp2.partition(), -1, -1L,
-1, Errors.UNKNOWN_SERVER_ERROR.code(), "Error reading state from share
coordinator: java.lang.Exception: scary stuff"))
)
)
);
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index d38564fd6f8..7f03f9254b1 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -453,6 +453,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
topicId,
partitionId,
PartitionFactory.UNINITIALIZED_START_OFFSET,
+ PartitionFactory.DEFAULT_LEADER_EPOCH,
PartitionFactory.DEFAULT_STATE_EPOCH
);
} else {
@@ -470,6 +471,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
topicId,
partitionId,
offsetValue.startOffset(),
+ offsetValue.leaderEpoch(),
offsetValue.stateEpoch()
);
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index e4b95d1e439..4ff5a2ad92e 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -605,6 +605,7 @@ class ShareCoordinatorShardTest {
TOPIC_ID,
PARTITION,
0,
+ 0,
0
), result.response());
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 7d53514a250..11a9a020e5c 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
@@ -403,19 +404,30 @@ public class ShareGroupCommand {
groupSpecs.put(groupId, offsetsSpec);
try {
- Map<TopicPartition, Long> earliestResult =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+ Map<TopicPartition, OffsetAndMetadata> startOffsets =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
Set<SharePartitionOffsetInformation> partitionOffsets =
new HashSet<>();
- for (Entry<TopicPartition, Long> tp :
earliestResult.entrySet()) {
- SharePartitionOffsetInformation partitionOffsetInfo =
new SharePartitionOffsetInformation(
- groupId,
- tp.getKey().topic(),
- tp.getKey().partition(),
-
Optional.ofNullable(earliestResult.get(tp.getKey()))
- );
- partitionOffsets.add(partitionOffsetInfo);
- }
+ startOffsets.forEach((tp, offsetAndMetadata) -> {
+ if (offsetAndMetadata != null) {
+ partitionOffsets.add(new
SharePartitionOffsetInformation(
+ groupId,
+ tp.topic(),
+ tp.partition(),
+ Optional.of(offsetAndMetadata.offset()),
+ offsetAndMetadata.leaderEpoch()
+ ));
+ } else {
+ partitionOffsets.add(new
SharePartitionOffsetInformation(
+ groupId,
+ tp.topic(),
+ tp.partition(),
+ Optional.empty(),
+ Optional.empty()
+ ));
+ }
+ });
+
groupOffsets.put(groupId, new
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
@@ -447,7 +459,7 @@ public class ShareGroupCommand {
groupId,
info.topic,
info.partition,
- MISSING_COLUMN_VALUE, // Temporary
+
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
);
} else {
@@ -569,17 +581,20 @@ public class ShareGroupCommand {
final String topic;
final int partition;
final Optional<Long> offset;
+ final Optional<Integer> leaderEpoch;
SharePartitionOffsetInformation(
String group,
String topic,
int partition,
- Optional<Long> offset
+ Optional<Long> offset,
+ Optional<Integer> leaderEpoch
) {
this.group = group;
this.topic = topic;
this.partition = partition;
this.offset = offset;
+ this.leaderEpoch = leaderEpoch;
}
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index f1f91217511..546cab50e0d 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
@@ -191,7 +192,7 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
firstGroup,
- KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), 0L))
+ KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), "")))
)
);
@@ -208,7 +209,7 @@ public class ShareGroupCommandTest {
List<String> expectedValues;
if (describeType.contains("--verbose")) {
- expectedValues = List.of(firstGroup, "topic1", "0",
"-", "0");
+ expectedValues = List.of(firstGroup, "topic1", "0",
"1", "0");
} else {
expectedValues = List.of(firstGroup, "topic1", "0",
"0");
}
@@ -299,13 +300,13 @@ public class ShareGroupCommandTest {
ListShareGroupOffsetsResult listShareGroupOffsetsResult1 =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
firstGroup,
- KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), 0L))
+ KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
)
);
ListShareGroupOffsetsResult listShareGroupOffsetsResult2 =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
secondGroup,
- KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), 0L))
+ KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), "")))
)
);
@@ -333,8 +334,8 @@ public class ShareGroupCommandTest {
List<String> expectedValues1, expectedValues2;
if (describeType.contains("--verbose")) {
- expectedValues1 = List.of(firstGroup, "topic1", "0",
"-", "0");
- expectedValues2 = List.of(secondGroup, "topic1", "0",
"-", "0");
+ expectedValues1 = List.of(firstGroup, "topic1", "0",
"1", "0");
+ expectedValues2 = List.of(secondGroup, "topic1", "0",
"1", "0");
} else {
expectedValues1 = List.of(firstGroup, "topic1", "0",
"0");
expectedValues2 = List.of(secondGroup, "topic1", "0",
"0");