This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 873379873e6 KAFKA-19435 Optimize `kafka-consumer-groups.sh` to return
the offset info when some partitions without leaders (#20064)
873379873e6 is described below
commit 873379873e6cc7c4f0abb4e2fc3eb4bd5cc91c29
Author: xijiu <[email protected]>
AuthorDate: Mon Jul 14 22:13:01 2025 +0800
KAFKA-19435 Optimize `kafka-consumer-groups.sh` to return the offset info
when some partitions without leaders (#20064)
1. Optimize the corresponding logic in the `ConsumerGroupCommand` by
first checking if a leader exists for the partition before invoking the
`admin.listOffsets`. Finally, concatenate the data and return
2. Add integration test, create a cluster with 3 brokers, then shutdown
a broker and observe whether the output meets the expectations
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../tools/consumer/group/ConsumerGroupCommand.java | 44 ++++++++++++----
.../consumer/group/ConsumerGroupServiceTest.java | 15 ++++++
.../consumer/group/DescribeConsumerGroupTest.java | 60 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 9 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index bd0f2dfb7f0..da3ccff9260 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -51,9 +51,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.re2j.Pattern;
import com.google.re2j.PatternSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
@@ -82,7 +79,6 @@ import joptsimple.OptionException;
import joptsimple.OptionSpec;
public class ConsumerGroupCommand {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerGroupCommand.class);
static final String MISSING_COLUMN_VALUE = "-";
@@ -592,8 +588,7 @@ public class ConsumerGroupCommand {
getLag(Optional.empty(), Optional.empty()),
consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty())
);
} else {
- List<TopicPartition> topicPartitionsSorted =
topicPartitions.stream().sorted(Comparator.comparingInt(TopicPartition::partition)).collect(Collectors.toList());
- return describePartitions(group, coordinator,
topicPartitionsSorted, committedOffsets, consumerIdOpt, hostOpt, clientIdOpt);
+ return describePartitions(group, coordinator, topicPartitions,
committedOffsets, consumerIdOpt, hostOpt, clientIdOpt);
}
}
@@ -604,7 +599,7 @@ public class ConsumerGroupCommand {
private Collection<PartitionAssignmentState> describePartitions(
String group,
Optional<Node> coordinator,
- List<TopicPartition> topicPartitions,
+ Collection<TopicPartition> topicPartitions,
Map<TopicPartition, OffsetAndMetadata> committedOffsets,
Optional<String> consumerIdOpt,
Optional<String> hostOpt,
@@ -619,7 +614,11 @@ public class ConsumerGroupCommand {
consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt,
leaderEpoch);
};
- return
offsetsUtils.getLogEndOffsets(topicPartitions).entrySet().stream().map(logEndOffsetResult
-> {
+ List<TopicPartition> topicPartitionsWithoutLeader =
filterNoneLeaderPartitions(topicPartitions);
+ List<TopicPartition> topicPartitionsWithLeader =
topicPartitions.stream().filter(tp ->
!topicPartitionsWithoutLeader.contains(tp)).toList();
+
+ // prepare data for partitions with leaders
+ List<PartitionAssignmentState> existLeaderAssignments =
offsetsUtils.getLogEndOffsets(topicPartitionsWithLeader).entrySet().stream().map(logEndOffsetResult
-> {
if (logEndOffsetResult.getValue() instanceof
OffsetsUtils.LogOffset)
return getDescribePartitionResult.apply(
logEndOffsetResult.getKey(),
@@ -631,7 +630,34 @@ public class ConsumerGroupCommand {
return null;
throw new IllegalStateException("Unknown LogOffset subclass: "
+ logEndOffsetResult.getValue());
- }).collect(Collectors.toList());
+ }).toList();
+
+ // prepare data for partitions without leaders
+ List<PartitionAssignmentState> noneLeaderAssignments =
topicPartitionsWithoutLeader.stream()
+ .map(tp -> getDescribePartitionResult.apply(tp,
Optional.empty())).toList();
+
+ // concat the data and then sort them
+ return Stream.concat(existLeaderAssignments.stream(),
noneLeaderAssignments.stream())
+ .sorted(Comparator.<PartitionAssignmentState,
String>comparing(
+ state -> state.topic.orElse(""), String::compareTo)
+ .thenComparingInt(state ->
state.partition.orElse(-1)))
+ .collect(Collectors.toList());
+ }
+
+ private List<TopicPartition>
filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
+ // collect all topics
+ Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+
+ try {
+ return
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+ .stream()
+ .flatMap(entry ->
entry.getValue().partitions().stream()
+ .filter(partitionInfo ->
partitionInfo.leader() == null)
+ .map(partitionInfo -> new
TopicPartition(entry.getKey(), partitionInfo.partition())))
+ .toList();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index bb4ae726789..0ddba4346ba 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -91,6 +91,8 @@ public class ConsumerGroupServiceTest {
.thenReturn(listGroupOffsetsResult(GROUP));
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
+ when(admin.describeTopics(ArgumentMatchers.anySet()))
+ .thenReturn(describeTopicsResult());
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments =
groupService.collectGroupOffsets(GROUP);
assertEquals(Optional.of(GroupState.STABLE),
statesAndAssignments.getKey());
@@ -174,6 +176,7 @@ public class ConsumerGroupServiceTest {
any()
)).thenReturn(new
ListOffsetsResult(endOffsets.entrySet().stream().filter(e ->
unassignedTopicPartitions.contains(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
+
when(admin.describeTopics(ArgumentMatchers.anySet())).thenReturn(describeTopicsResult());
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments =
groupService.collectGroupOffsets(GROUP);
Optional<GroupState> state = statesAndAssignments.getKey();
@@ -289,6 +292,18 @@ public class ConsumerGroupServiceTest {
);
}
+ private DescribeTopicsResult describeTopicsResult() {
+ Map<String, TopicDescription> topicDescriptionMap =
TOPICS.stream().collect(Collectors.toMap(
+ Function.identity(),
+ topic -> new TopicDescription(
+ topic,
+ false,
+ IntStream.range(0, NUM_PARTITIONS)
+ .mapToObj(i -> new TopicPartitionInfo(i,
Node.noNode(), List.of(), List.of()))
+ .toList())));
+ return AdminClientTestUtils.describeTopicsResult(topicDescriptionMap);
+ }
+
private ListOffsetsResult listOffsetsResult() {
ListOffsetsResultInfo resultInfo = new ListOffsetsResultInfo(100,
System.currentTimeMillis(), Optional.of(1));
Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures =
TOPIC_PARTITIONS.stream().collect(Collectors.toMap(
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index 3e023267f9b..a3f1009e884 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.tools.ToolsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Arrays;
@@ -59,6 +60,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -1053,6 +1055,64 @@ public class DescribeConsumerGroupTest {
}
}
+ /**
+ * The config `OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG` needs to be set to
a value greater than 1 to ensure the
+ * normal invocation of APIs such as `FIND_COORDINATOR` when a broker has
shutdown
+ */
+ @Timeout(60)
+ @ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key =
OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")})
+ public void testDescribeConsumerGroupWithoutLeaders(ClusterInstance
clusterInstance) throws Exception {
+ int brokerNum = 3;
+ this.clusterInstance = clusterInstance;
+
+ // define topic and group, then send 5 records to each partition
+ String topic = TOPIC_PREFIX + UUID.randomUUID();
+ String group = GROUP_PREFIX + UUID.randomUUID();
+ clusterInstance.createTopic(topic, brokerNum, (short) 1);
+ for (int i = 0; i < brokerNum; i++) {
+ sendRecords(topic, i, 5);
+ }
+
+ // append the command
+ List<String> cgcArgs = List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--describe", "--group", group,
"--all-topics");
+
+ try (AutoCloseable protocolConsumerGroupExecutor =
consumerGroupClosable(GroupProtocol.CLASSIC, group, topic, Map.of());
+ ConsumerGroupCommand.ConsumerGroupService service =
consumerGroupService(cgcArgs.toArray(new String[0]));
+ Admin admin = clusterInstance.admin()
+ ) {
+ // shutdown the target broker
+ int noneLeaderPartition = 2;
+ int shutdownBrokerId = clusterInstance.getLeaderBrokerId(new
TopicPartition(topic, noneLeaderPartition));
+ clusterInstance.shutdownBroker(shutdownBrokerId);
+
+ TestUtils.waitForCondition(() -> {
+ Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
+ String[] lines = res.getKey().trim().split("\n");
+ if (lines.length != 4 || !res.getValue().isEmpty()) {
+ return false;
+ }
+
+ // get the client data, such as `consumerId,host,clientId`, to
append the expected output
+ ConsumerGroupDescription consumerGroupDescription =
admin.describeConsumerGroups(Set.of(group)).describedGroups().get(group).get();
+ MemberDescription memberDescription =
consumerGroupDescription.members().iterator().next();
+ String consumerId = memberDescription.consumerId();
+ String host = memberDescription.host();
+ String clientId = memberDescription.clientId();
+
+ // the expected output
+ List<String> partition0content = List.of(group, topic, "0",
"5", "5", "0", consumerId, host, clientId);
+ List<String> partition1content = List.of(group, topic, "1",
"5", "5", "0", consumerId, host, clientId);
+ List<String> partition2content = List.of(group, topic, "2",
"-", "-", "-", consumerId, host, clientId);
+
+ return checkArgsHeaderOutput(cgcArgs, lines[0])
+ &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(partition0content)
+ &&
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(partition1content)
+ &&
Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(partition2content);
+ }, "Expected 3 data rows excluding the header and no error in
describe groups when a broker shutdown.");
+ }
+ }
+
+
@Test
public void testDescribeWithUnrecognizedNewConsumerOption() {
String group = GROUP_PREFIX + "unrecognized";