chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576262016


##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##########
@@ -170,30 +227,23 @@ private void produceRecord() {
         }
     }
 
-    private void withStableConsumerGroup(Runnable body) {
-        Consumer<byte[], byte[]> consumer = createConsumer(new Properties());
-        try {
-            TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+    private void withConsumerGroup(Runnable body, boolean isStable, Properties 
consumerProperties) {
+        try (Consumer<byte[], byte[]> consumer = 
createConsumer(consumerProperties)) {
+            consumer.subscribe(Collections.singletonList(TOPIC));
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+            Assertions.assertNotEquals(0, records.count());
             consumer.commitSync();
-            body.run();
-        } finally {
-            Utils.closeQuietly(consumer, "consumer");
+            if (isStable) {
+                body.run();
+            }
         }
-    }
-
-    private void withEmptyConsumerGroup(Runnable body) {
-        Consumer<byte[], byte[]> consumer = createConsumer(new Properties());
-        try {
-            TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
-            consumer.commitSync();
-        } finally {
-            Utils.closeQuietly(consumer, "consumer");
+        if (!isStable) {
+            body.run();
         }
-        body.run();
     }
 
     private KafkaProducer<byte[], byte[]> createProducer(Properties config) {

Review Comment:
   It seems `config` is always empty, so please remove it.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##########
@@ -42,109 +56,152 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-    String[] getArgs(String group, String topic) {
-        return new String[] {
-            "--bootstrap-server", bootstrapServers(listenerName()),
-            "--delete-offsets",
-            "--group", group,
-            "--topic", topic
-        };
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+        @ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+    private final ClusterInstance clusterInstance;
+    private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+    public static final String TOPIC = "foo";
+    public static final String GROUP = "test.group";
+
+    DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (consumerGroupService != null) {
+            consumerGroupService.close();
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteOffsetsNonExistingGroup(String quorum) {
+    @ClusterTest
+    public void testDeleteOffsetsNonExistingGroup() {
         String group = "missing.group";
         String topic = "foo:1";
-        ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(getArgs(group, topic));
+        setupConsumerGroupService(getArgs(group, topic));
 
-        Entry<Errors, Map<TopicPartition, Throwable>> res = 
service.deleteOffsets(group, Collections.singletonList(topic));
+        Entry<Errors, Map<TopicPartition, Throwable>> res = 
consumerGroupService.deleteOffsets(group, Collections.singletonList(topic));
         assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void 
testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) {
-        testWithStableConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    @ClusterTest
+    public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
+        createTopic(TOPIC);
+        Properties consumerProperties = new Properties();
+        testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, 
true, consumerProperties);
+        if (clusterInstance.isKRaftTest()) {
+            consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+            testWithConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(String 
quorum) {
-        testWithStableConsumerGroup(TOPIC, -1, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+    @ClusterTest
+    public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
+        createTopic(TOPIC);
+        Properties consumerProperties = new Properties();
+        testWithConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, 
true, consumerProperties);
+        if (clusterInstance.isKRaftTest()) {
+            consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+            testWithConsumerGroup(TOPIC, -1, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void 
testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition(String quorum) {
-        testWithStableConsumerGroup("foobar", 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    @ClusterTest
+    public void 
testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
+        Properties consumerProperties = new Properties();
+        testWithConsumerGroup("foobar", 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+        if (clusterInstance.isKRaftTest()) {
+            consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+            testWithConsumerGroup("foobar", 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void 
testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly(String quorum) {
-        testWithStableConsumerGroup("foobar", -1, -1, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    @ClusterTest
+    public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
+        Properties consumerProperties = new Properties();
+        testWithConsumerGroup("foobar", -1, -1, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+        if (clusterInstance.isKRaftTest()) {
+            consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+            testWithConsumerGroup("foobar", -1, -1, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition(String 
quorum) {
-        testWithEmptyConsumerGroup(TOPIC, 0, 0, Errors.NONE);
+    @ClusterTest
+    public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
+        createTopic(TOPIC);
+        Properties consumerProperties = new Properties();
+        testWithConsumerGroup(TOPIC, 0, 0, Errors.NONE, false, 
consumerProperties);
+        if (clusterInstance.isKRaftTest()) {
+            consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+            testWithConsumerGroup(TOPIC, 0, 0, Errors.NONE, false, 
consumerProperties);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly(String 
quorum) {
-        testWithEmptyConsumerGroup(TOPIC, -1, 0, Errors.NONE);
+    @ClusterTest
+    public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
+        createTopic(TOPIC);
+        Properties consumerProperties = new Properties();
+        testWithConsumerGroup(TOPIC, -1, 0, Errors.NONE, false, 
consumerProperties);
+        if (clusterInstance.isKRaftTest()) {
+            consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+            testWithConsumerGroup(TOPIC, -1, 0, Errors.NONE, false, 
consumerProperties);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void 
testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition(String quorum) {
-        testWithEmptyConsumerGroup("foobar", 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION);
+    @ClusterTest
+    public void 
testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
+        Properties consumerProperties = new Properties();
+        testWithConsumerGroup("foobar", 0, 0, 
Errors.UNKNOWN_TOPIC_OR_PARTITION, false, consumerProperties);
+        if (clusterInstance.isKRaftTest()) {
+            consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());

Review Comment:
   we should test both `CONSUMER` and `CLASSIC` for kraft. Also, could you 
initialize the consumer configs in construction. for example;
   ```java
       DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
           this.clusterInstance = clusterInstance;
           this.consumerConfigs = clusterInstance.isKRaftTest()
               ? 
Arrays.asList(Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name()),
                   
Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name()))
               : Collections.emptyList();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to