This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 873379873e6 KAFKA-19435 Optimize `kafka-consumer-groups.sh` to return 
the offset info when some partitions without leaders (#20064)
873379873e6 is described below

commit 873379873e6cc7c4f0abb4e2fc3eb4bd5cc91c29
Author: xijiu <[email protected]>
AuthorDate: Mon Jul 14 22:13:01 2025 +0800

    KAFKA-19435 Optimize `kafka-consumer-groups.sh` to return the offset info 
when some partitions without leaders (#20064)
    
    1. Optimize the corresponding logic in the `ConsumerGroupCommand` by
    first checking if a leader exists for the partition before invoking the
    `admin.listOffsets`. Finally, concatenate the data and return
    2. Add integration test, create a cluster with 3 brokers, then shutdown
    a broker and observe whether the output meets the expectations
    
    Reviewers: Ken Huang <[email protected]>, PoAn Yang
     <[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../tools/consumer/group/ConsumerGroupCommand.java | 44 ++++++++++++----
 .../consumer/group/ConsumerGroupServiceTest.java   | 15 ++++++
 .../consumer/group/DescribeConsumerGroupTest.java  | 60 ++++++++++++++++++++++
 3 files changed, 110 insertions(+), 9 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index bd0f2dfb7f0..da3ccff9260 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -51,9 +51,6 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.re2j.Pattern;
 import com.google.re2j.PatternSyntaxException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
@@ -82,7 +79,6 @@ import joptsimple.OptionException;
 import joptsimple.OptionSpec;
 
 public class ConsumerGroupCommand {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommand.class);
 
     static final String MISSING_COLUMN_VALUE = "-";
 
@@ -592,8 +588,7 @@ public class ConsumerGroupCommand {
                         getLag(Optional.empty(), Optional.empty()), 
consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty())
                 );
             } else {
-                List<TopicPartition> topicPartitionsSorted = 
topicPartitions.stream().sorted(Comparator.comparingInt(TopicPartition::partition)).collect(Collectors.toList());
-                return describePartitions(group, coordinator, 
topicPartitionsSorted, committedOffsets, consumerIdOpt, hostOpt, clientIdOpt);
+                return describePartitions(group, coordinator, topicPartitions, 
committedOffsets, consumerIdOpt, hostOpt, clientIdOpt);
             }
         }
 
@@ -604,7 +599,7 @@ public class ConsumerGroupCommand {
         private Collection<PartitionAssignmentState> describePartitions(
             String group,
             Optional<Node> coordinator,
-            List<TopicPartition> topicPartitions,
+            Collection<TopicPartition> topicPartitions,
             Map<TopicPartition, OffsetAndMetadata> committedOffsets,
             Optional<String> consumerIdOpt,
             Optional<String> hostOpt,
@@ -619,7 +614,11 @@ public class ConsumerGroupCommand {
                     consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt, 
leaderEpoch);
             };
 
-            return 
offsetsUtils.getLogEndOffsets(topicPartitions).entrySet().stream().map(logEndOffsetResult
 -> {
+            List<TopicPartition> topicPartitionsWithoutLeader = 
filterNoneLeaderPartitions(topicPartitions);
+            List<TopicPartition> topicPartitionsWithLeader = 
topicPartitions.stream().filter(tp -> 
!topicPartitionsWithoutLeader.contains(tp)).toList();
+
+            // prepare data for partitions with leaders
+            List<PartitionAssignmentState> existLeaderAssignments = 
offsetsUtils.getLogEndOffsets(topicPartitionsWithLeader).entrySet().stream().map(logEndOffsetResult
 -> {
                 if (logEndOffsetResult.getValue() instanceof 
OffsetsUtils.LogOffset)
                     return getDescribePartitionResult.apply(
                         logEndOffsetResult.getKey(),
@@ -631,7 +630,34 @@ public class ConsumerGroupCommand {
                     return null;
 
                 throw new IllegalStateException("Unknown LogOffset subclass: " 
+ logEndOffsetResult.getValue());
-            }).collect(Collectors.toList());
+            }).toList();
+
+            // prepare data for partitions without leaders
+            List<PartitionAssignmentState> noneLeaderAssignments = 
topicPartitionsWithoutLeader.stream()
+                    .map(tp -> getDescribePartitionResult.apply(tp, 
Optional.empty())).toList();
+
+            // concat the data and then sort them
+            return Stream.concat(existLeaderAssignments.stream(), 
noneLeaderAssignments.stream())
+                    .sorted(Comparator.<PartitionAssignmentState, 
String>comparing(
+                            state -> state.topic.orElse(""), String::compareTo)
+                            .thenComparingInt(state -> 
state.partition.orElse(-1)))
+                    .collect(Collectors.toList());
+        }
+
+        private List<TopicPartition> 
filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
+            // collect all topics
+            Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+
+            try {
+                return 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+                        .stream()
+                        .flatMap(entry -> 
entry.getValue().partitions().stream()
+                                .filter(partitionInfo -> 
partitionInfo.leader() == null)
+                                .map(partitionInfo -> new 
TopicPartition(entry.getKey(), partitionInfo.partition())))
+                        .toList();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
         }
 
         Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index bb4ae726789..0ddba4346ba 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -91,6 +91,8 @@ public class ConsumerGroupServiceTest {
                 .thenReturn(listGroupOffsetsResult(GROUP));
         when(admin.listOffsets(offsetsArgMatcher(), any()))
                 .thenReturn(listOffsetsResult());
+        when(admin.describeTopics(ArgumentMatchers.anySet()))
+                .thenReturn(describeTopicsResult());
 
         Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = 
groupService.collectGroupOffsets(GROUP);
         assertEquals(Optional.of(GroupState.STABLE), 
statesAndAssignments.getKey());
@@ -174,6 +176,7 @@ public class ConsumerGroupServiceTest {
                 any()
         )).thenReturn(new 
ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> 
unassignedTopicPartitions.contains(e.getKey()))
                 .collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
+        
when(admin.describeTopics(ArgumentMatchers.anySet())).thenReturn(describeTopicsResult());
 
         Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = 
groupService.collectGroupOffsets(GROUP);
         Optional<GroupState> state = statesAndAssignments.getKey();
@@ -289,6 +292,18 @@ public class ConsumerGroupServiceTest {
         );
     }
 
+    private DescribeTopicsResult describeTopicsResult() {
+        Map<String, TopicDescription> topicDescriptionMap = 
TOPICS.stream().collect(Collectors.toMap(
+                Function.identity(),
+                topic -> new TopicDescription(
+                        topic,
+                        false,
+                        IntStream.range(0, NUM_PARTITIONS)
+                                .mapToObj(i -> new TopicPartitionInfo(i, 
Node.noNode(), List.of(), List.of()))
+                                .toList())));
+        return AdminClientTestUtils.describeTopicsResult(topicDescriptionMap);
+    }
+
     private ListOffsetsResult listOffsetsResult() {
         ListOffsetsResultInfo resultInfo = new ListOffsetsResultInfo(100, 
System.currentTimeMillis(), Optional.of(1));
         Map<TopicPartition, KafkaFuture<ListOffsetsResultInfo>> futures = 
TOPIC_PARTITIONS.stream().collect(Collectors.toMap(
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index 3e023267f9b..a3f1009e884 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.tools.ToolsTestUtils;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,6 +60,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1053,6 +1055,64 @@ public class DescribeConsumerGroupTest {
         }
     }
 
+    /**
+     * The config `OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG` needs to be set to 
a value greater than 1 to ensure the
+     * normal invocation of APIs such as `FIND_COORDINATOR` when a broker has 
shutdown
+     */
+    @Timeout(60)
+    @ClusterTest(brokers = 3, serverProperties = {@ClusterConfigProperty(key = 
OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")})
+    public void testDescribeConsumerGroupWithoutLeaders(ClusterInstance 
clusterInstance) throws Exception {
+        int brokerNum = 3;
+        this.clusterInstance = clusterInstance;
+
+        // define topic and group, then send 5 records to each partition
+        String topic = TOPIC_PREFIX + UUID.randomUUID();
+        String group = GROUP_PREFIX + UUID.randomUUID();
+        clusterInstance.createTopic(topic, brokerNum, (short) 1);
+        for (int i = 0; i < brokerNum; i++) {
+            sendRecords(topic, i, 5);
+        }
+
+        // append the command
+        List<String> cgcArgs = List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--describe", "--group", group, 
"--all-topics");
+
+        try (AutoCloseable protocolConsumerGroupExecutor = 
consumerGroupClosable(GroupProtocol.CLASSIC, group, topic, Map.of());
+             ConsumerGroupCommand.ConsumerGroupService service = 
consumerGroupService(cgcArgs.toArray(new String[0]));
+             Admin admin = clusterInstance.admin()
+        ) {
+            // shutdown the target broker
+            int noneLeaderPartition = 2;
+            int shutdownBrokerId = clusterInstance.getLeaderBrokerId(new 
TopicPartition(topic, noneLeaderPartition));
+            clusterInstance.shutdownBroker(shutdownBrokerId);
+
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 4 || !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                // get the client data, such as `consumerId,host,clientId`, to 
append the expected output
+                ConsumerGroupDescription consumerGroupDescription = 
admin.describeConsumerGroups(Set.of(group)).describedGroups().get(group).get();
+                MemberDescription memberDescription = 
consumerGroupDescription.members().iterator().next();
+                String consumerId = memberDescription.consumerId();
+                String host = memberDescription.host();
+                String clientId = memberDescription.clientId();
+
+                // the expected output
+                List<String> partition0content = List.of(group, topic, "0", 
"5", "5", "0", consumerId, host, clientId);
+                List<String> partition1content = List.of(group, topic, "1", 
"5", "5", "0", consumerId, host, clientId);
+                List<String> partition2content = List.of(group, topic, "2", 
"-", "-", "-", consumerId, host, clientId);
+
+                return checkArgsHeaderOutput(cgcArgs, lines[0])
+                        && 
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(partition0content)
+                        && 
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(partition1content)
+                        && 
Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(partition2content);
+            }, "Expected 3 data rows excluding the header and no error in 
describe groups when a broker shutdown.");
+        }
+    }
+
+
     @Test
     public void testDescribeWithUnrecognizedNewConsumerOption() {
         String group = GROUP_PREFIX +  "unrecognized";

Reply via email to