m1a2st commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1597596564


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##########
@@ -62,506 +91,787 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-    private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+    private static final String TOPIC_PREFIX = "foo-";
+    private static final String GROUP_PREFIX = "test.group-";
+
+    private static void generator(ClusterGenerator clusterGenerator) {
+        ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+    }
+
+    private String[] basicArgs(ClusterInstance cluster) {
         return new String[]{"--reset-offsets",
-            "--bootstrap-server", bootstrapServers(listenerName()),
+            "--bootstrap-server", cluster.bootstrapServers(),
             "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
     }
 
-    private String[] buildArgsForGroups(List<String> groups, String...args) {
-        List<String> res = new ArrayList<>(Arrays.asList(basicArgs()));
+    private String[] buildArgsForGroups(ClusterInstance cluster, List<String> 
groups, String... args) {
+        List<String> res = new ArrayList<>(asList(basicArgs(cluster)));
         for (String group : groups) {
             res.add("--group");
             res.add(group);
         }
-        res.addAll(Arrays.asList(args));
+        res.addAll(asList(args));
         return res.toArray(new String[0]);
     }
 
-    private String[] buildArgsForGroup(String group, String...args) {
-        return buildArgsForGroups(Collections.singletonList(group), args);
+    private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+        return buildArgsForGroups(cluster, singletonList(group), args);
     }
 
-    private String[] buildArgsForAllGroups(String...args) {
-        List<String> res = new ArrayList<>(Arrays.asList(basicArgs()));
+    private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+        List<String> res = new ArrayList<>(asList(basicArgs(cluster)));
         res.add("--all-groups");
-        res.addAll(Arrays.asList(args));
+        res.addAll(asList(args));
         return res.toArray(new String[0]);
     }
 
-    @Test
-    public void testResetOffsetsNotExistingGroup() throws Exception {
-        String group = "missing.group";
-        String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-        ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-        // Make sure we got a coordinator
-        TestUtils.waitForCondition(
-            () -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-            "Can't find a coordinator");
-        Map<TopicPartition, OffsetAndMetadata> resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-        assertTrue(resetOffsets.isEmpty());
-        assertTrue(committedOffsets(TOPIC, group).isEmpty());
-    }
-
-    @Test
-    public void testResetOffsetsExistingTopic() {
-        String group = "new.group";
-        String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-        produceMessages(TOPIC, 100);
-        resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-        resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-        resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
-    }
-
-    @Test
-    public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-        produceMessages(TOPIC, 100);
-        List<String> groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-        for (String group : groups) {
-            ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-            awaitConsumerProgress(TOPIC, group, 100L);
-            executor.shutdown();
+    @ClusterTemplate("generator")
+    public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String topic = generateRandomTopic();
+            String group = "missing.group";
+            String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+            try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+                // Make sure we got a coordinator
+                TestUtils.waitForCondition(
+                        () -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+                        "Can't find a coordinator");
+                Map<TopicPartition, OffsetAndMetadata> resetOffsets = 
service.resetOffsets().get(group);
+                assertTrue(resetOffsets.isEmpty());
+                assertTrue(committedOffsets(cluster, topic, group, 
groupProtocol).isEmpty());
+            }
         }
-        String[] args = buildArgsForGroups(groups, "--topic", TOPIC, 
"--to-offset", "50");
-        resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-        resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-        resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
-    }
-
-    @Test
-    public void testResetOffsetsExistingTopicAllGroups() throws Exception {
-        String[] args = buildArgsForAllGroups("--topic", TOPIC, "--to-offset", 
"50");
-        produceMessages(TOPIC, 100);
-        for (int i = 1; i <= 3; i++) {
-            String group = GROUP + i;
-            ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-            awaitConsumerProgress(TOPIC, group, 100L);
-            executor.shutdown();
+    }
+
+    @ClusterTemplate("generator")
+    public void testResetOffsetsExistingTopic(ClusterInstance cluster) {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String topic = generateRandomTopic();
+            String group = "new.group";
+            String[] args = buildArgsForGroup(cluster, group, "--topic", 
topic, "--to-offset", "50");
+
+            produceMessages(cluster, topic, 100);
+            resetAndAssertOffsets(cluster, args, 50, true, 
singletonList(topic), groupProtocol);
+            resetAndAssertOffsets(cluster, addTo(args, "--dry-run"),
+                    50, true, singletonList(topic), groupProtocol);
+            resetAndAssertOffsets(cluster, addTo(args, "--execute"),
+                    50, false, singletonList(topic), groupProtocol);
         }
-        resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-        resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-        resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
     }
 
-    @Test
-    public void testResetOffsetsAllTopicsAllGroups() throws Exception {
-        String[] args = buildArgsForAllGroups("--all-topics", "--to-offset", 
"50");
-        List<String> topics = IntStream.rangeClosed(1, 3).mapToObj(i -> TOPIC 
+ i).collect(Collectors.toList());
-        List<String> groups = IntStream.rangeClosed(1, 3).mapToObj(i -> GROUP 
+ i).collect(Collectors.toList());
-        topics.forEach(topic -> produceMessages(topic, 100));
+    @ClusterTemplate("generator")
+    public void testResetOffsetsExistingTopicSelectedGroups(ClusterInstance 
cluster) throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String topic = generateRandomTopic();
 
-        for (String topic : topics) {
+            produceMessages(cluster, topic, 100);
+            List<String> groups = generateIds(topic);
             for (String group : groups) {
-                ConsumerGroupExecutor executor = addConsumerGroupExecutor(3, 
topic, group, GroupProtocol.CLASSIC.name);
-                awaitConsumerProgress(topic, group, 100);
-                executor.shutdown();
+                try (AutoCloseable consumerGroupCloseable =
+                             consumerGroupClosable(cluster, 1, topic, group, 
groupProtocol)) {
+                    awaitConsumerProgress(cluster, topic, group, 100L, 
groupProtocol);
+                }
             }
+
+            String[] args = buildArgsForGroups(cluster, groups, "--topic", 
topic, "--to-offset", "50");
+            resetAndAssertOffsets(cluster, args, 50, true, 
singletonList(topic), groupProtocol);
+            resetAndAssertOffsets(cluster, addTo(args, "--dry-run"),
+                    50, true, singletonList(topic), groupProtocol);
+            resetAndAssertOffsets(cluster, addTo(args, "--execute"),
+                    50, false, singletonList(topic), groupProtocol);
         }
-        resetAndAssertOffsets(args, 50, true, topics);
-        resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, topics);
-        resetAndAssertOffsets(addTo(args, "--execute"), 50, false, topics);
     }
 
-    @Test
-    public void testResetOffsetsToLocalDateTime() throws Exception {
-        SimpleDateFormat format = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
-        Calendar calendar = Calendar.getInstance();
-        calendar.add(Calendar.DATE, -1);
-
-        produceMessages(TOPIC, 100);
+    @ClusterTemplate("generator")
+    public void testResetOffsetsExistingTopicAllGroups(ClusterInstance 
cluster) throws Exception {
+        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+            String topic = generateRandomTopic();
+            String[] args = buildArgsForAllGroups(cluster, "--topic", topic, 
"--to-offset", "50");
+
+            produceMessages(cluster, topic, 100);
+            for (int i = 1; i <= 3; i++) {
+                String group = GROUP_PREFIX + i;
+                try (AutoCloseable consumerGroupCloseable =

Review Comment:
   @chia7712, There is a weird problem, I use the try with auto closeable 
resource, I should not use `onsumerGroupCloseable.close()` this method to close 
resource. But if I don't use `close()` method, when I testing Kraft and 
co-Kraft mode with multi groupProtocol, it will be fail. Please take a look, 
thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to