Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 merged PR #15679: URL: https://github.com/apache/kafka/pull/15679 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1577056133 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,109 +58,141 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; +private final ClusterInstance clusterInstance; + +private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; +private final Iterable> consumerConfigs; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +this.consumerConfigs = clusterInstance.isKRaftTest() +? Arrays.asList( +new HashMap() {{ Review Comment: Updated it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576508031 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,109 +58,141 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; +private final ClusterInstance clusterInstance; + +private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; +private final Iterable> consumerConfigs; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +this.consumerConfigs = clusterInstance.isKRaftTest() +? Arrays.asList( +new HashMap() {{ Review Comment: We can use immutable map and make `createConsumer` create a inner mutable map to collect all configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576432135 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +256,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: Updated it and also added empty map for zk, or zk will not be tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576386813 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +256,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: We can change the type from `Properties` to `Map`. With that change we don't need to create a lot of `Properties` in each test case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072421039 > @FrankYang0529 thanks for updated PR. please take a look at two comments. Hi @chia7712, thanks for the review. Updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576262016 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -170,30 +227,23 @@ private void produceRecord() { } } -private void withStableConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +private void withConsumerGroup(Runnable body, boolean isStable, Properties consumerProperties) { +try (Consumer consumer = createConsumer(consumerProperties)) { +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +Assertions.assertNotEquals(0, records.count()); consumer.commitSync(); -body.run(); -} finally { -Utils.closeQuietly(consumer, "consumer"); +if (isStable) { +body.run(); +} } -} - -private void withEmptyConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); -consumer.commitSync(); -} finally { -Utils.closeQuietly(consumer, "consumer"); +if (!isStable) { +body.run(); } -body.run(); } private KafkaProducer createProducer(Properties config) { Review Comment: It seems `config` is always empty, so please remove it. ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,109 +56,152 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +private final ClusterInstance clusterInstance; +private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} + +@AfterEach +public void tearDown() { +if (consumerGroupService != null) { +consumerGroupService.close(); +} } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsNonExistingGroup(String quorum) { +@ClusterTest +public void testDeleteOffsetsNonExistingGroup() { String group = "missing.group"; String topic = "foo:1"; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic)); +setupConsumerGroupService(getArgs(group, topic)); -Entry> res = service.deleteOffsets(group, Collections.singletonList(topic)); +Entry> res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic)); assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) { -testWithStableConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); +@ClusterTest +public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { +createTopic(TOPIC); +Properties consumerProperties = new Properties(); +testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); +if (clusterInstance.isKRaftTest()) { +consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); +testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); +} } -@ParameterizedTest -
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072251579 > > I think we can add new test case in next PR. We can more focus on migrate to ClusterTestExtensions in this PR. > > Please take a look at [#15766 (comment)](https://github.com/apache/kafka/pull/15766#discussion_r1576009920), and we need a bit refactor but it should not a hard work. So please try to address it in this PR. Update it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072132702 > I think we can add new test case in next PR. We can more focus on migrate to ClusterTestExtensions in this PR. Please take a look at https://github.com/apache/kafka/pull/15766#discussion_r1576009920, and we need a bit refactor but it should not a hard work. So please try to address it in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072051627 > @FrankYang0529 sorry that I check the PR again, and more comments are left. PTAL Hi @chia7712, I addressed last comments. I think we can add new test case in next PR. We can more focus on migrate to ClusterTestExtensions in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576082586 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +238,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: Can we do it in another PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576013151 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -171,29 +209,27 @@ private void produceRecord() { } private void withStableConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +try (Consumer consumer = createConsumer(new Properties());) { Review Comment: please remove `;` ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +238,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: Could you add tests for `GroupProtocol.CONSUMER`? see comment: https://github.com/apache/kafka/pull/15766#discussion_r1576009920 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -171,29 +209,27 @@ private void produceRecord() { } private void withStableConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +try (Consumer consumer = createConsumer(new Properties());) { +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +Assertions.assertNotEquals(0, records.count()); consumer.commitSync(); body.run(); -} finally { -Utils.closeQuietly(consumer, "consumer"); } } private void withEmptyConsumerGroup(Runnable body) { Review Comment: Can we merge `withStableConsumerGroup` and `withEmptyConsumerGroup`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2069788585 > @FrankYang0529 thanks for updated PR. two minor comments left. PTAL Hi @chia7712, I addressed last comments. Thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1574810225 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,75 +53,98 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = { Review Comment: Could we reduce the replica number instead of increasing number of brokers? I try to avoid our CI get burned :) ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -213,4 +246,9 @@ private Consumer createConsumer(Properties config) { return new KafkaConsumer<>(config); } + +private void createTopic(String topic) { +Assertions.assertDoesNotThrow(() -> + clusterInstance.createAdminClient().createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))).topicId(topic).get()); Review Comment: Could we use try-with-resource to close `Admin`? We don't guarantee that `createAdminClient` will auto-release the admin currently. I have filed https://issues.apache.org/jira/browse/KAFKA-16589 to discuss it. However, we should always close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1574007862 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -173,7 +208,11 @@ private void produceRecord() { private void withStableConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +if (records.isEmpty()) { Review Comment: Hi @chia7712, thanks for the review. I have addressed all following comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1573834240 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -173,7 +208,11 @@ private void produceRecord() { private void withStableConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +if (records.isEmpty()) { Review Comment: Could we rewrite it by `assertNotEquals(0, records.count())`? ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -184,7 +223,11 @@ private void withStableConsumerGroup(Runnable body) { private void withEmptyConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); Review Comment: please use try-with-resources ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -173,7 +208,11 @@ private void produceRecord() { private void withStableConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); Review Comment: please use try-with-resources -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1573832025 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -16,7 +16,15 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; Review Comment: > I will create another minor PR to refactor it. Thanks. nice!!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1573795818 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -16,7 +16,15 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; Review Comment: It looks like the only place using `subscribeAndWaitForRecords` is `PlaintextAdminIntegrationTest` after this PR is merge. I will create another minor PR to refactor it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1573623165 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -16,7 +16,15 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; Review Comment: Hi @chia7712, I removed TestUtils in `DeleteOffsetsConsumerGroupCommandIntegrationTest`. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1572912365 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -16,7 +16,15 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; Review Comment: Could you remove the usage of `TestUtils`? the method `subscribeAndWaitForRecords` can be rewrite easily as it seems to poll records only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065947030 > Do you think that we should revert unstable.api.versions.enable change and try again? Thanks. Yep -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065616140 > > @FrankYang0529 Could you reduce the partition number of offsets topic? It seems the timeout is caused by that coordinator is waiting for the offset partition, and our CI could be too busy to complete the assignments. > > Hi @chia7712, thanks for the suggestion. I have set `offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works fine. Hi @chia7712, setting `offsets.topic.num.partitions` as `1` works! Do you think that we should revert `unstable.api.versions.enable` change and try again? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2064035886 > @FrankYang0529 Could you reduce the partition number of offsets topic? It seems the timeout is caused by that coordinator is waiting for the offset partition, and our CI could be too busy to complete the assignments. Hi @chia7712, thanks for the suggestion. I have set `offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2061751961 @FrankYang0529 Could you reduce the partition number of offsets topic? It seems the timeout is caused by that coordinator is waiting for the offset partition, and our CI could be too busy to complete the assignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1557557116 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -213,4 +234,8 @@ private Consumer createConsumer(Properties config) { return new KafkaConsumer<>(config); } + +private void createTopic(String topic) { +TestUtils.createTopicWithAdminRaw(clusterInstance.createAdminClient(), topic, 1, 1, scala.collection.immutable.Map$.MODULE$.empty(), new Properties()); Review Comment: Can we avoid using scala code here? ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,19 +48,38 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +private final ClusterInstance clusterInstance; +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { // Constructor injections +this.clusterInstance = clusterInstance; +} + String[] getArgs(String group, String topic) { return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", clusterInstance.bootstrapServers(), "--delete-offsets", "--group", group, "--topic", topic }; } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsNonExistingGroup(String quorum) { +ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { +ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args); + +return new ConsumerGroupCommand.ConsumerGroupService( +opts, +Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) +); +} + +@ClusterTest +public void testDeleteOffsetsNonExistingGroup() { String group = "missing.group"; String topic = "foo:1"; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic)); Review Comment: could you make sure all services get closed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1557490373 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,19 +48,40 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +private final ClusterInstance clusterInstance; +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { // Constructor injections +this.clusterInstance = clusterInstance; +} + String[] getArgs(String group, String topic) { return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", clusterInstance.bootstrapServers(), "--delete-offsets", "--group", group, "--topic", topic }; } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsNonExistingGroup(String quorum) { +ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { +ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args); + +return new ConsumerGroupCommand.ConsumerGroupService( +opts, +Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) +); +} + +@ClusterTests({ +@ClusterTest(clusterType = Type.ZK), Review Comment: Yes, updated it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1556382498 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,19 +48,40 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +private final ClusterInstance clusterInstance; +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { // Constructor injections +this.clusterInstance = clusterInstance; +} + String[] getArgs(String group, String topic) { return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", clusterInstance.bootstrapServers(), "--delete-offsets", "--group", group, "--topic", topic }; } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsNonExistingGroup(String quorum) { +ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { +ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args); + +return new ConsumerGroupCommand.ConsumerGroupService( +opts, +Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) +); +} + +@ClusterTests({ +@ClusterTest(clusterType = Type.ZK), Review Comment: Could we define `ClusterTestDefaults` at class-level? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 opened a new pull request, #15679: URL: https://github.com/apache/kafka/pull/15679 By using ClusterTestExtensions, `DeleteOffsetsConsumerGroupCommandIntegrationTest` get away from `KafkaServerTestHarness` dependency. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org