This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dfaf9f9cf71 MINOR: add test for `kafka-consumer-groups.sh` should not
fail when partition offline (#20235)
dfaf9f9cf71 is described below
commit dfaf9f9cf71fd47cd74ff5e9950dbd1c1f2bda9a
Author: TaiJuWu <[email protected]>
AuthorDate: Thu Jul 31 22:54:27 2025 +0800
MINOR: add test for `kafka-consumer-groups.sh` should not fail when
partition offline (#20235)
See: https://github.com/apache/kafka/pull/20168#discussion_r2227310093
add follow test case:
Given a topic with three partitions, where partition `t-2` is offline,
if partitionsToReset contains only `t-1`, the method
filterNoneLeaderPartitions incorrectly includes `t-2` in the result,
leading to a failure in the tool.
Reviewers: Chia-Ping Tsai <[email protected]>, Jhen-Yung Hsu
<[email protected]>, Ken Huang <[email protected]>, Andrew
Schofield <[email protected]>
---
.../group/ResetConsumerGroupOffsetTest.java | 23 ++++++++++++++++++++++
1 file changed, 23 insertions(+)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 5fb704cf53d..5bf9da0c370 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -153,6 +154,28 @@ public class ResetConsumerGroupOffsetTest {
}
}
+ @ClusterTest(
+ brokers = 2,
+ serverProperties = {
+ @ClusterConfigProperty(key =
OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
+ }
+ )
+ public void
testResetOffsetsWithOfflinePartitionNotInResetTarget(ClusterInstance cluster)
throws Exception {
+ String topic = generateRandomTopic();
+ String group = "new.group";
+ String[] args = buildArgsForGroup(cluster, group, "--to-earliest",
"--execute", "--topic", topic + ":0");
+
+ try (Admin admin = cluster.admin();
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+ admin.createTopics(List.of(new NewTopic(topic, Map.of(0,
List.of(0), 1, List.of(1)))));
+ cluster.waitTopicCreation(topic, 2);
+
+ cluster.shutdownBroker(1);
+
+ Map<TopicPartition, OffsetAndMetadata> resetOffsets =
service.resetOffsets().get(group);
+ assertEquals(Set.of(new TopicPartition(topic, 0)),
resetOffsets.keySet());
+ }
+ }
+
@ClusterTest
public void testResetOffsetsExistingTopic(ClusterInstance cluster) {
String topic = generateRandomTopic();