chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576262016
########## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ########## @@ -170,30 +227,23 @@ private void produceRecord() { } } - private void withStableConsumerGroup(Runnable body) { - Consumer<byte[], byte[]> consumer = createConsumer(new Properties()); - try { - TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); + private void withConsumerGroup(Runnable body, boolean isStable, Properties consumerProperties) { + try (Consumer<byte[], byte[]> consumer = createConsumer(consumerProperties)) { + consumer.subscribe(Collections.singletonList(TOPIC)); + ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); + Assertions.assertNotEquals(0, records.count()); consumer.commitSync(); - body.run(); - } finally { - Utils.closeQuietly(consumer, "consumer"); + if (isStable) { + body.run(); + } } - } - - private void withEmptyConsumerGroup(Runnable body) { - Consumer<byte[], byte[]> consumer = createConsumer(new Properties()); - try { - TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); - consumer.commitSync(); - } finally { - Utils.closeQuietly(consumer, "consumer"); + if (!isStable) { + body.run(); } - body.run(); } private KafkaProducer<byte[], byte[]> createProducer(Properties config) { Review Comment: It seems `config` is always empty, so please remove it. ########## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ########## @@ -42,109 +56,152 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { - String[] getArgs(String group, String topic) { - return new String[] { - "--bootstrap-server", bootstrapServers(listenerName()), - "--delete-offsets", - "--group", group, - "--topic", topic - }; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + @ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { + private final ClusterInstance clusterInstance; + private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; + public static final String TOPIC = "foo"; + public static final String GROUP = "test.group"; + + DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @AfterEach + public void tearDown() { + if (consumerGroupService != null) { + consumerGroupService.close(); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsNonExistingGroup(String quorum) { + @ClusterTest + public void testDeleteOffsetsNonExistingGroup() { String group = "missing.group"; String topic = "foo:1"; - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic)); + setupConsumerGroupService(getArgs(group, topic)); - Entry<Errors, Map<TopicPartition, Throwable>> res = service.deleteOffsets(group, Collections.singletonList(topic)); + Entry<Errors, Map<TopicPartition, Throwable>> res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic)); assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) { - testWithStableConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { + createTopic(TOPIC); + Properties consumerProperties = new Properties(); + testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); + if (clusterInstance.isKRaftTest()) { + consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(String quorum) { - testWithStableConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() { + createTopic(TOPIC); + Properties consumerProperties = new Properties(); + testWithConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); + if (clusterInstance.isKRaftTest()) { + consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + testWithConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition(String quorum) { - testWithStableConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION); + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() { + Properties consumerProperties = new Properties(); + testWithConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties); + if (clusterInstance.isKRaftTest()) { + consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + testWithConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly(String quorum) { - testWithStableConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION); + @ClusterTest + public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() { + Properties consumerProperties = new Properties(); + testWithConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties); + if (clusterInstance.isKRaftTest()) { + consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + testWithConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION, true, consumerProperties); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition(String quorum) { - testWithEmptyConsumerGroup(TOPIC, 0, 0, Errors.NONE); + @ClusterTest + public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() { + createTopic(TOPIC); + Properties consumerProperties = new Properties(); + testWithConsumerGroup(TOPIC, 0, 0, Errors.NONE, false, consumerProperties); + if (clusterInstance.isKRaftTest()) { + consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + testWithConsumerGroup(TOPIC, 0, 0, Errors.NONE, false, consumerProperties); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly(String quorum) { - testWithEmptyConsumerGroup(TOPIC, -1, 0, Errors.NONE); + @ClusterTest + public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() { + createTopic(TOPIC); + Properties consumerProperties = new Properties(); + testWithConsumerGroup(TOPIC, -1, 0, Errors.NONE, false, consumerProperties); + if (clusterInstance.isKRaftTest()) { + consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + testWithConsumerGroup(TOPIC, -1, 0, Errors.NONE, false, consumerProperties); + } } - @ParameterizedTest - @ValueSource(strings = {"zk", "kraft"}) - public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition(String quorum) { - testWithEmptyConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION); + @ClusterTest + public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() { + Properties consumerProperties = new Properties(); + testWithConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION, false, consumerProperties); + if (clusterInstance.isKRaftTest()) { + consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); Review Comment: we should test both `CONSUMER` and `CLASSIC` for kraft. Also, could you initialize the consumer configs in construction. for example; ```java DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { this.clusterInstance = clusterInstance; this.consumerConfigs = clusterInstance.isKRaftTest() ? Arrays.asList(Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()), Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name())) : Collections.emptyList(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org