chia7712 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1596279690


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##########
@@ -415,153 +474,246 @@ public void testResetOffsetsExportImportPlan() throws 
Exception {
         TopicPartition t1p1 = new TopicPartition(topic1, 1);
         TopicPartition t2p0 = new TopicPartition(topic2, 0);
         TopicPartition t2p1 = new TopicPartition(topic2, 1);
-        createTopic(topic1, 2, 1, new Properties(), listenerName(), new 
Properties());
-        createTopic(topic2, 2, 1, new Properties(), listenerName(), new 
Properties());
+        String[] cgcArgs = buildArgsForGroups(cluster, asList(group1, group2),
+            "--all-topics", "--to-offset", "2", "--export");
+        File file = TestUtils.tempFile("reset", ".csv");
+        // Multiple --group's offset import
+        String[] cgcArgsExec = buildArgsForGroups(cluster, asList(group1, 
group2),
+            "--all-topics",
+            "--from-file", file.getCanonicalPath(), "--dry-run");
+        // Single --group offset import using "group,topic,partition,offset" 
csv format
+        String[] cgcArgsExec2 = buildArgsForGroup(cluster, group1, 
"--all-topics",
+            "--from-file", file.getCanonicalPath(), "--dry-run");
 
-        String[] cgcArgs = buildArgsForGroups(Arrays.asList(group1, group2), 
"--all-topics", "--to-offset", "2", "--export");
-        ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(cgcArgs);
+        try (Admin admin = cluster.createAdminClient();
+             ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+             BufferedWriter bw = new BufferedWriter(new FileWriter(file));
+             ConsumerGroupCommand.ConsumerGroupService serviceExec = 
getConsumerGroupService(cgcArgsExec);
+             ConsumerGroupCommand.ConsumerGroupService serviceExec2 = 
getConsumerGroupService(cgcArgsExec2)) {
 
-        produceConsumeAndShutdown(topic1, group1, 100, 1);
-        produceConsumeAndShutdown(topic2, group2, 100, 1);
+            admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1),
+                    new NewTopic(topic2, 2, (short) 1))).all().get();
 
-        awaitConsumerGroupInactive(consumerGroupCommand, group1);
-        awaitConsumerGroupInactive(consumerGroupCommand, group2);
+            produceConsumeAndShutdown(cluster, topic1, group1, 1);
+            produceConsumeAndShutdown(cluster, topic2, group2, 1);
 
-        File file = TestUtils.tempFile("reset", ".csv");
+            awaitConsumerGroupInactive(service, group1);
+            awaitConsumerGroupInactive(service, group2);
 
-        Map<String, Map<TopicPartition, OffsetAndMetadata>> exportedOffsets = 
consumerGroupCommand.resetOffsets();
-        BufferedWriter bw = new BufferedWriter(new FileWriter(file));
-        bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets));
-        bw.close();
-        Map<TopicPartition, Long> exp1 = new HashMap<>();
-        exp1.put(t1p0, 2L);
-        exp1.put(t1p1, 2L);
-        Map<TopicPartition, Long> exp2 = new HashMap<>();
-        exp2.put(t2p0, 2L);
-        exp2.put(t2p1, 2L);
+            Map<String, Map<TopicPartition, OffsetAndMetadata>> 
exportedOffsets = service.resetOffsets();
+            bw.write(service.exportOffsetsToCsv(exportedOffsets));
+            Map<TopicPartition, Long> exp1 = new HashMap<>();
+            exp1.put(t1p0, 2L);
+            exp1.put(t1p1, 2L);
+            Map<TopicPartition, Long> exp2 = new HashMap<>();
+            exp2.put(t2p0, 2L);
+            exp2.put(t2p1, 2L);
 
-        assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
-        assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
+            assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
+            assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
 
-        // Multiple --group's offset import
-        String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1, 
group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
-        ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = 
getConsumerGroupService(cgcArgsExec);
-        Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets = 
consumerGroupCommandExec.resetOffsets();
-        assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
-        assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
+            Map<String, Map<TopicPartition, OffsetAndMetadata>> 
importedOffsets = serviceExec.resetOffsets();
+            assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
+            assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
 
-        // Single --group offset import using "group,topic,partition,offset" 
csv format
-        String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", 
"--from-file", file.getCanonicalPath(), "--dry-run");
-        ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 = 
getConsumerGroupService(cgcArgsExec2);
-        Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets2 = 
consumerGroupCommandExec2.resetOffsets();
-        assertEquals(exp1, toOffsetMap(importedOffsets2.get(group1)));
+            Map<String, Map<TopicPartition, OffsetAndMetadata>> 
importedOffsets2 = serviceExec2.resetOffsets();
+            assertEquals(exp1, toOffsetMap(importedOffsets2.get(group1)));
 
-        adminZkClient().deleteTopic(TOPIC);
+            admin.deleteTopics(asList(topic1, topic2));
+        }
     }
 
-    @Test
-    public void testResetWithUnrecognizedNewConsumerOption() {
-        String[] cgcArgs = new String[]{"--new-consumer", 
"--bootstrap-server", bootstrapServers(listenerName()), "--reset-offsets",
-            "--group", GROUP, "--all-topics", "--to-offset", "2", "--export"};
+    @ClusterTemplate("generator")
+    public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance 
cluster) {
+        String[] cgcArgs = new String[]{"--new-consumer",
+            "--bootstrap-server", cluster.bootstrapServers(),
+            "--reset-offsets", "--group", GROUP, "--all-topics",
+            "--to-offset", "2", "--export"};
         assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
     }
 
-    private void produceMessages(String topic, int numMessages) {
-        List<ProducerRecord<byte[], byte[]>> records = IntStream.range(0, 
numMessages)
-            .mapToObj(i -> new ProducerRecord<byte[], byte[]>(topic, new 
byte[100 * 1000]))
-            .collect(Collectors.toList());
-        kafka.utils.TestUtils.produceMessages(servers(), seq(records), 1);
+    private Map<TopicPartition, Long> committedOffsets(ClusterInstance 
cluster, String topic, String group) {
+        try (Consumer<String, String> consumer = 
createNoAutoCommitConsumer(cluster, group)) {
+            Set<TopicPartition> partitions = consumer.partitionsFor(topic)
+                    .stream()
+                    .map(partitionInfo -> new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+                    .collect(Collectors.toSet());
+            return consumer.committed(partitions)
+                    .entrySet()
+                    .stream()
+                    .filter(e -> e.getValue() != null)
+                    .collect(toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+        }
     }
 
-    private void produceConsumeAndShutdown(String topic, String group, int 
totalMessages, int numConsumers) throws Exception {
-        produceMessages(topic, totalMessages);
-        ConsumerGroupExecutor executor = 
addConsumerGroupExecutor(numConsumers, topic, group, 
GroupProtocol.CLASSIC.name);
-        awaitConsumerProgress(topic, group, totalMessages);
-        executor.shutdown();
+    private Consumer<String, String> 
createNoAutoCommitConsumer(ClusterInstance cluster, String group) {

Review Comment:
   Could you test the new consumer also? 
   
   
https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/test/ClusterInstance.java#L157
   
   `ClusterInstance` can return the supported protocol, and you can leverage it 
to create specify consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to