This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9fa8c5b3fb KAFKA-19944: kafka-share-groups.sh describe offsets
message improvement (#21024)
c9fa8c5b3fb is described below
commit c9fa8c5b3fb8973b97683b53e3eee97c00476557
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Dec 3 09:55:30 2025 +0000
KAFKA-19944: kafka-share-groups.sh describe offsets message improvement
(#21024)
kafka-share-groups.sh --describe --offsets prints an empty table
consisting only of headers when there is no offset information to
display for a share group. This is most likely only when a topic which
was formerly being consumed is deleted. This PR displays a message
instead of an empty table, similar to how the tool works if the list of
members is requested and there are no active members.
Reviewers: Apoorv Mittal <[email protected]>
---
.../tools/consumer/group/ShareGroupCommand.java | 50 ++++++++++++----------
.../consumer/group/ShareGroupCommandTest.java | 41 ++++++++++++++++++
2 files changed, 68 insertions(+), 23 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 6d36e49dcfd..d39eb9396e5 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -545,35 +545,39 @@ public class ShareGroupCommand {
.thenComparingInt(info -> info.partition))
.toList();
- String fmt = printOffsetFormat(groupId, offsetsInfo, verbose);
-
- if (verbose) {
- System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "START-OFFSET", "LAG");
+ if (offsetsInfo.isEmpty()) {
+ System.out.println("\nShare group '" + groupId + "' has no
offset information.");
} else {
- System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"START-OFFSET", "LAG");
- }
+ String fmt = printOffsetFormat(groupId, offsetsInfo,
verbose);
- for (SharePartitionOffsetInformation info : offsetsInfo) {
if (verbose) {
- System.out.printf(fmt,
- groupId,
- info.topic,
- info.partition,
-
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
- );
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "START-OFFSET", "LAG");
} else {
- System.out.printf(fmt,
- groupId,
- info.topic,
- info.partition,
-
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
- );
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"START-OFFSET", "LAG");
}
+
+ for (SharePartitionOffsetInformation info : offsetsInfo) {
+ if (verbose) {
+ System.out.printf(fmt,
+ groupId,
+ info.topic,
+ info.partition,
+
info.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+ );
+ } else {
+ System.out.printf(fmt,
+ groupId,
+ info.topic,
+ info.partition,
+
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
info.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+ );
+ }
+ }
+ System.out.println();
}
- System.out.println();
});
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index fb5d459febb..edc678d650a 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -286,6 +286,47 @@ public class ShareGroupCommandTest {
}
}
+ @Test
+ public void testDescribeOffsetsOfExistingGroupWithNoOffsetInfo() throws
Exception {
+ String firstGroup = "group1";
+ String bootstrapServer = "localhost:9092";
+
+ for (List<String> describeType : DESCRIBE_TYPE_OFFSETS) {
+ List<String> cgcArgs = new
ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe",
"--group", firstGroup));
+ cgcArgs.addAll(describeType);
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+ ShareGroupDescription exp = new ShareGroupDescription(
+ firstGroup,
+ List.of(),
+ GroupState.EMPTY,
+ new Node(0, "host1", 9090), 0, 0);
+ // When there is no offset information at all, an empty map will
be returned
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ firstGroup,
+ KafkaFuture.completedFuture(Map.of())
+ )
+ );
+
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup,
KafkaFuture.completedFuture(exp)));
+
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+ when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
+ try (ShareGroupService service =
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+ TestUtils.waitForCondition(() -> {
+ Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
+ String[] lines = res.getKey().trim().split("\n");
+ if (lines.length != 1 && !res.getValue().isEmpty()) {
+ return false;
+ }
+
+ String expectedValue = "Share group '" + firstGroup + "'
has no offset information.";
+ return expectedValue.equals(lines[0]);
+ }, "Expected just an informational message with describe type
" + String.join(" ", describeType) + ".");
+ }
+ }
+ }
+
@Test
public void testDescribeOffsetsOfAllExistingGroups() throws Exception {
String firstGroup = "group1";