Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-24 Thread via GitHub


chia7712 merged PR #15679:
URL: https://github.com/apache/kafka/pull/15679


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1577056133


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,109 +58,141 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+private final ClusterInstance clusterInstance;
+
+private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+private final Iterable> consumerConfigs;
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
+this.consumerConfigs = clusterInstance.isKRaftTest()
+? Arrays.asList(
+new HashMap() {{

Review Comment:
   Updated it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576508031


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,109 +58,141 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+private final ClusterInstance clusterInstance;
+
+private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+private final Iterable> consumerConfigs;
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
+this.consumerConfigs = clusterInstance.isKRaftTest()
+? Arrays.asList(
+new HashMap() {{

Review Comment:
   We can use immutable map and make `createConsumer` create a inner mutable 
map to collect all configs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576432135


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +256,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   Updated it and also added empty map for zk, or zk will not be tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576386813


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +256,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   We can change the type from `Properties` to `Map`. With that 
change we don't need to create a lot of `Properties` in each test case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072421039

   > @FrankYang0529 thanks for updated PR. please take a look at two comments.
   
   Hi @chia7712, thanks for the review. Updated it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576262016


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -170,30 +227,23 @@ private void produceRecord() {
 }
 }
 
-private void withStableConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+private void withConsumerGroup(Runnable body, boolean isStable, Properties 
consumerProperties) {
+try (Consumer consumer = 
createConsumer(consumerProperties)) {
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+Assertions.assertNotEquals(0, records.count());
 consumer.commitSync();
-body.run();
-} finally {
-Utils.closeQuietly(consumer, "consumer");
+if (isStable) {
+body.run();
+}
 }
-}
-
-private void withEmptyConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
-consumer.commitSync();
-} finally {
-Utils.closeQuietly(consumer, "consumer");
+if (!isStable) {
+body.run();
 }
-body.run();
 }
 
 private KafkaProducer createProducer(Properties config) {

Review Comment:
   It seems `config` is always empty, so please remove it.



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,109 +56,152 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+private final ClusterInstance clusterInstance;
+private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
+
+@AfterEach
+public void tearDown() {
+if (consumerGroupService != null) {
+consumerGroupService.close();
+}
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteOffsetsNonExistingGroup(String quorum) {
+@ClusterTest
+public void testDeleteOffsetsNonExistingGroup() {
 String group = "missing.group";
 String topic = "foo:1";
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(getArgs(group, topic));
+setupConsumerGroupService(getArgs(group, topic));
 
-Entry> res = 
service.deleteOffsets(group, Collections.singletonList(topic));
+Entry> res = 
consumerGroupService.deleteOffsets(group, Collections.singletonList(topic));
 assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void 
testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) {
-testWithStableConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+@ClusterTest
+public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
+createTopic(TOPIC);
+Properties consumerProperties = new Properties();
+testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, 
true, consumerProperties);
+if (clusterInstance.isKRaftTest()) {
+consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+testWithConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties);
+}
 }
 
-@ParameterizedTest
-   

Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072251579

   > > I think we can add new test case in next PR. We can more focus on 
migrate to ClusterTestExtensions in this PR.
   > 
   > Please take a look at [#15766 
(comment)](https://github.com/apache/kafka/pull/15766#discussion_r1576009920), 
and we need a bit refactor but it should not a hard work. So please try to 
address it in this PR.
   
   Update it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072132702

   > I think we can add new test case in next PR. We can more focus on migrate 
to ClusterTestExtensions in this PR.
   
   Please take a look at 
https://github.com/apache/kafka/pull/15766#discussion_r1576009920, and we need 
a bit refactor but it should not a hard work. So please try to address it in 
this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072051627

   > @FrankYang0529 sorry that I check the PR again, and more comments are 
left. PTAL
   
   Hi @chia7712, I addressed last comments. I think we can add new test case in 
next PR. We can more focus on migrate to ClusterTestExtensions in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576082586


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +238,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   Can we do it in another PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576013151


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -171,29 +209,27 @@ private void produceRecord() {
 }
 
 private void withStableConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+try (Consumer consumer = createConsumer(new 
Properties());) {

Review Comment:
   please remove `;` 



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +238,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   Could you add tests for `GroupProtocol.CONSUMER`? see comment: 
https://github.com/apache/kafka/pull/15766#discussion_r1576009920



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -171,29 +209,27 @@ private void produceRecord() {
 }
 
 private void withStableConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+try (Consumer consumer = createConsumer(new 
Properties());) {
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+Assertions.assertNotEquals(0, records.count());
 consumer.commitSync();
 body.run();
-} finally {
-Utils.closeQuietly(consumer, "consumer");
 }
 }
 
 private void withEmptyConsumerGroup(Runnable body) {

Review Comment:
   Can we merge `withStableConsumerGroup` and `withEmptyConsumerGroup`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2069788585

   > @FrankYang0529 thanks for updated PR. two minor comments left. PTAL
   
   Hi @chia7712, I addressed last comments. Thanks for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1574810225


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,75 +53,98 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = {

Review Comment:
   Could we reduce the replica number instead of increasing number of brokers? 
I try to avoid our CI get burned :)



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -213,4 +246,9 @@ private Consumer createConsumer(Properties 
config) {
 
 return new KafkaConsumer<>(config);
 }
+
+private void createTopic(String topic) {
+Assertions.assertDoesNotThrow(() ->
+
clusterInstance.createAdminClient().createTopics(Collections.singletonList(new 
NewTopic(topic, 1, (short) 1))).topicId(topic).get());

Review Comment:
   Could we use try-with-resource to close `Admin`? We don't guarantee that 
`createAdminClient` will auto-release the admin currently. I have filed 
https://issues.apache.org/jira/browse/KAFKA-16589 to discuss it. However, we 
should always close it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1574007862


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -173,7 +208,11 @@ private void produceRecord() {
 private void withStableConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());
 try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+if (records.isEmpty()) {

Review Comment:
   Hi @chia7712, thanks for the review. I have addressed all following comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1573834240


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -173,7 +208,11 @@ private void produceRecord() {
 private void withStableConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());
 try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+if (records.isEmpty()) {

Review Comment:
   Could we rewrite it by `assertNotEquals(0, records.count())`?



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -184,7 +223,11 @@ private void withStableConsumerGroup(Runnable body) {
 private void withEmptyConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());

Review Comment:
   please use  try-with-resources



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -173,7 +208,11 @@ private void produceRecord() {
 private void withStableConsumerGroup(Runnable body) {
 Consumer consumer = createConsumer(new Properties());

Review Comment:
   please use  try-with-resources



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1573832025


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -16,7 +16,15 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;

Review Comment:
   > I will create another minor PR to refactor it. Thanks.
   
   nice!!!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1573795818


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -16,7 +16,15 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;

Review Comment:
   It looks like the only place using `subscribeAndWaitForRecords` is 
`PlaintextAdminIntegrationTest` after this PR is merge. I will create another 
minor PR to refactor it. Thanks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-20 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1573623165


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -16,7 +16,15 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;

Review Comment:
   Hi @chia7712, I removed TestUtils in 
`DeleteOffsetsConsumerGroupCommandIntegrationTest`. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1572912365


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -16,7 +16,15 @@
  */
 package org.apache.kafka.tools.consumer.group;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
 import kafka.utils.TestUtils;

Review Comment:
   Could you remove the usage of `TestUtils`? the method 
`subscribeAndWaitForRecords` can be rewrite easily as it seems to poll records 
only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065947030

   > Do you think that we should revert unstable.api.versions.enable change and 
try again? Thanks.
   
   Yep 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-18 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2065616140

   > > @FrankYang0529 Could you reduce the partition number of offsets topic? 
It seems the timeout is caused by that coordinator is waiting for the offset 
partition, and our CI could be too busy to complete the assignments.
   > 
   > Hi @chia7712, thanks for the suggestion. I have set 
`offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works 
fine.
   
   Hi @chia7712, setting `offsets.topic.num.partitions` as `1` works! Do you 
think that we should revert `unstable.api.versions.enable` change and try 
again? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-18 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2064035886

   > @FrankYang0529 Could you reduce the partition number of offsets topic? It 
seems the timeout is caused by that coordinator is waiting for the offset 
partition, and our CI could be too busy to complete the assignments.
   
   Hi @chia7712, thanks for the suggestion. I have set 
`offsets.topic.num.partitions` as `1` on `ClusterTestDefaults`. Hope it works 
fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2061751961

   @FrankYang0529 Could you reduce the partition number of offsets topic? It 
seems the timeout is caused by that coordinator is waiting for the offset 
partition, and our CI could be too busy to complete the assignments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-09 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1557557116


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -213,4 +234,8 @@ private Consumer createConsumer(Properties 
config) {
 
 return new KafkaConsumer<>(config);
 }
+
+private void createTopic(String topic) {
+TestUtils.createTopicWithAdminRaw(clusterInstance.createAdminClient(), 
topic, 1, 1, scala.collection.immutable.Map$.MODULE$.empty(), new Properties());

Review Comment:
   Can we avoid using scala code here?



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,19 +48,38 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+private final ClusterInstance clusterInstance;
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) { // Constructor injections
+this.clusterInstance = clusterInstance;
+}
+
 String[] getArgs(String group, String topic) {
 return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", clusterInstance.bootstrapServers(),
 "--delete-offsets",
 "--group", group,
 "--topic", topic
 };
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteOffsetsNonExistingGroup(String quorum) {
+ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] 
args) {
+ConsumerGroupCommandOptions opts = 
ConsumerGroupCommandOptions.fromArgs(args);
+
+return new ConsumerGroupCommand.ConsumerGroupService(
+opts,
+Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, 
Integer.toString(Integer.MAX_VALUE))
+);
+}
+
+@ClusterTest
+public void testDeleteOffsetsNonExistingGroup() {
 String group = "missing.group";
 String topic = "foo:1";
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(getArgs(group, topic));

Review Comment:
   could you make sure all services get closed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-09 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1557490373


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,19 +48,40 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+private final ClusterInstance clusterInstance;
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) { // Constructor injections
+this.clusterInstance = clusterInstance;
+}
+
 String[] getArgs(String group, String topic) {
 return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", clusterInstance.bootstrapServers(),
 "--delete-offsets",
 "--group", group,
 "--topic", topic
 };
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteOffsetsNonExistingGroup(String quorum) {
+ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] 
args) {
+ConsumerGroupCommandOptions opts = 
ConsumerGroupCommandOptions.fromArgs(args);
+
+return new ConsumerGroupCommand.ConsumerGroupService(
+opts,
+Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, 
Integer.toString(Integer.MAX_VALUE))
+);
+}
+
+@ClusterTests({
+@ClusterTest(clusterType = Type.ZK),

Review Comment:
   Yes, updated it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-08 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1556382498


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,19 +48,40 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
+@Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+private final ClusterInstance clusterInstance;
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) { // Constructor injections
+this.clusterInstance = clusterInstance;
+}
+
 String[] getArgs(String group, String topic) {
 return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", clusterInstance.bootstrapServers(),
 "--delete-offsets",
 "--group", group,
 "--topic", topic
 };
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteOffsetsNonExistingGroup(String quorum) {
+ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] 
args) {
+ConsumerGroupCommandOptions opts = 
ConsumerGroupCommandOptions.fromArgs(args);
+
+return new ConsumerGroupCommand.ConsumerGroupService(
+opts,
+Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, 
Integer.toString(Integer.MAX_VALUE))
+);
+}
+
+@ClusterTests({
+@ClusterTest(clusterType = Type.ZK),

Review Comment:
   Could we define `ClusterTestDefaults` at class-level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-08 Thread via GitHub


FrankYang0529 opened a new pull request, #15679:
URL: https://github.com/apache/kafka/pull/15679

   By using ClusterTestExtensions, 
`DeleteOffsetsConsumerGroupCommandIntegrationTest` get away from 
`KafkaServerTestHarness` dependency.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org