Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-30 Thread via GitHub


chia7712 merged PR #15779:
URL: https://github.com/apache/kafka/pull/15779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2127033316

   It seems there are something is failed. Let's wait for 
https://issues.apache.org/jira/browse/KAFKA-16828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-22 Thread via GitHub


m1a2st commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1610809626


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +86,756 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {

Review Comment:
   Ok, I rebase it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-22 Thread via GitHub


chia7712 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1609776281


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +86,756 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {

Review Comment:
   Please rebase code to fix this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-20 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2120776989

   @chia7712, Please review, Thanks for your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-19 Thread via GitHub


chia7712 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1606053454


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +86,764 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {
+ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+}
+
+private String[] basicArgs(ClusterInstance cluster) {
 return new String[]{"--reset-offsets",
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", cluster.bootstrapServers(),
 "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
 }
 
-private String[] buildArgsForGroups(List groups, String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForGroups(ClusterInstance cluster, List 
groups, String... args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 for (String group : groups) {
 res.add("--group");
 res.add(group);
 }
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-private String[] buildArgsForGroup(String group, String...args) {
-return buildArgsForGroups(Collections.singletonList(group), args);
+private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+return buildArgsForGroups(cluster, singletonList(group), args);
 }
 
-private String[] buildArgsForAllGroups(String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 res.add("--all-groups");
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-@Test
-public void testResetOffsetsNotExistingGroup() throws Exception {
+@ClusterTemplate("generator")
+public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+String topic = generateRandomTopic();
 String group = "missing.group";
-String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-// Make sure we got a coordinator
-TestUtils.waitForCondition(
-() -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-"Can't find a coordinator");
-Map resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-assertTrue(resetOffsets.isEmpty());
-assertTrue(committedOffsets(TOPIC, group).isEmpty());
-}
-
-@Test
-public void testResetOffsetsExistingTopic() {
-String group = "new.group";
-String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-produceMessages(TOPIC, 100);
-resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
+String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+// Make sure we got a coordinator
+TestUtils.waitForCondition(
+() -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+"Can't find a coordinator");
+Map resetOffsets = 
service.resetOffsets().get(group);
+assertTrue(resetOffsets.isEmpty());
+assertTrue(committedOffsets(cluster, topic, group).isEmpty());
+}
 }
 
-@Test
-public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-produceMessages(TOPIC, 100);
-List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-for (String group : groups) {
-ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-awaitConsumerProgress(TOPIC, group, 

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-16 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2114598318

   @chia7712, Thanks for your conversations, I already rebase this code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-15 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2113049965

   @m1a2st could you please rebase code to trigger QA again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-15 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2112434755

   @chia7712 , Thanks for your review, these test passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-14 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2110457406

   ```
   Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 13s
   Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 14s
   Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 14s
   Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 8s
   Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 25s
   Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 30s
   Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 27s
   Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 31s
   Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 24s
   Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s
   Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 22s
   Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s
   Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s
   Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 40s
   Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] 
Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s
   Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] 
Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, 
Security=PLAINTEXT – 
org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 35s
   ```
   
   @m1a2st Those tests verify all groups so the loop of `CLASSIC` + `CONSUMER` 
in the same cluster makes they failed. Maybe we ought to remove all groups 
after test. For instance,
   ```java
   
   try (Admin admin = cluster.createAdminClient()) {
   admin.deleteConsumerGroups(groups).all().get();
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-13 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2108026784

   @chia7712, Thanks for your review, I have been changed according to your 
conversations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-12 Thread via GitHub


m1a2st commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1597629743


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +91,787 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {
+ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+}
+
+private String[] basicArgs(ClusterInstance cluster) {
 return new String[]{"--reset-offsets",
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", cluster.bootstrapServers(),
 "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
 }
 
-private String[] buildArgsForGroups(List groups, String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForGroups(ClusterInstance cluster, List 
groups, String... args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 for (String group : groups) {
 res.add("--group");
 res.add(group);
 }
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-private String[] buildArgsForGroup(String group, String...args) {
-return buildArgsForGroups(Collections.singletonList(group), args);
+private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+return buildArgsForGroups(cluster, singletonList(group), args);
 }
 
-private String[] buildArgsForAllGroups(String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 res.add("--all-groups");
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-@Test
-public void testResetOffsetsNotExistingGroup() throws Exception {
-String group = "missing.group";
-String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-// Make sure we got a coordinator
-TestUtils.waitForCondition(
-() -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-"Can't find a coordinator");
-Map resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-assertTrue(resetOffsets.isEmpty());
-assertTrue(committedOffsets(TOPIC, group).isEmpty());
-}
-
-@Test
-public void testResetOffsetsExistingTopic() {
-String group = "new.group";
-String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-produceMessages(TOPIC, 100);
-resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
-}
-
-@Test
-public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-produceMessages(TOPIC, 100);
-List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-for (String group : groups) {
-ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-awaitConsumerProgress(TOPIC, group, 100L);
-executor.shutdown();
+@ClusterTemplate("generator")
+public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+String topic = generateRandomTopic();
+String group = "missing.group";
+String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+// Make sure we got a coordinator
+TestUtils.waitForCondition(
+() -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+"Can't find a coordinator");
+

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-12 Thread via GitHub


FrankYang0529 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1597626926


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +91,787 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {
+ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+}
+
+private String[] basicArgs(ClusterInstance cluster) {
 return new String[]{"--reset-offsets",
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", cluster.bootstrapServers(),
 "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
 }
 
-private String[] buildArgsForGroups(List groups, String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForGroups(ClusterInstance cluster, List 
groups, String... args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 for (String group : groups) {
 res.add("--group");
 res.add(group);
 }
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-private String[] buildArgsForGroup(String group, String...args) {
-return buildArgsForGroups(Collections.singletonList(group), args);
+private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+return buildArgsForGroups(cluster, singletonList(group), args);
 }
 
-private String[] buildArgsForAllGroups(String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 res.add("--all-groups");
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-@Test
-public void testResetOffsetsNotExistingGroup() throws Exception {
-String group = "missing.group";
-String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-// Make sure we got a coordinator
-TestUtils.waitForCondition(
-() -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-"Can't find a coordinator");
-Map resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-assertTrue(resetOffsets.isEmpty());
-assertTrue(committedOffsets(TOPIC, group).isEmpty());
-}
-
-@Test
-public void testResetOffsetsExistingTopic() {
-String group = "new.group";
-String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-produceMessages(TOPIC, 100);
-resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
-}
-
-@Test
-public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-produceMessages(TOPIC, 100);
-List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-for (String group : groups) {
-ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-awaitConsumerProgress(TOPIC, group, 100L);
-executor.shutdown();
+@ClusterTemplate("generator")
+public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+String topic = generateRandomTopic();
+String group = "missing.group";
+String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+// Make sure we got a coordinator
+TestUtils.waitForCondition(
+() -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+"Can't find a coordinator");
+ 

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-12 Thread via GitHub


m1a2st commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1597596564


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +91,787 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {
+ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+}
+
+private String[] basicArgs(ClusterInstance cluster) {
 return new String[]{"--reset-offsets",
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", cluster.bootstrapServers(),
 "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
 }
 
-private String[] buildArgsForGroups(List groups, String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForGroups(ClusterInstance cluster, List 
groups, String... args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 for (String group : groups) {
 res.add("--group");
 res.add(group);
 }
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-private String[] buildArgsForGroup(String group, String...args) {
-return buildArgsForGroups(Collections.singletonList(group), args);
+private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+return buildArgsForGroups(cluster, singletonList(group), args);
 }
 
-private String[] buildArgsForAllGroups(String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 res.add("--all-groups");
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-@Test
-public void testResetOffsetsNotExistingGroup() throws Exception {
-String group = "missing.group";
-String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-// Make sure we got a coordinator
-TestUtils.waitForCondition(
-() -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-"Can't find a coordinator");
-Map resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-assertTrue(resetOffsets.isEmpty());
-assertTrue(committedOffsets(TOPIC, group).isEmpty());
-}
-
-@Test
-public void testResetOffsetsExistingTopic() {
-String group = "new.group";
-String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-produceMessages(TOPIC, 100);
-resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
-}
-
-@Test
-public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-produceMessages(TOPIC, 100);
-List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-for (String group : groups) {
-ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-awaitConsumerProgress(TOPIC, group, 100L);
-executor.shutdown();
+@ClusterTemplate("generator")
+public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+String topic = generateRandomTopic();
+String group = "missing.group";
+String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+// Make sure we got a coordinator
+TestUtils.waitForCondition(
+() -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+"Can't find a coordinator");
+

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-12 Thread via GitHub


m1a2st commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1597596564


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +91,787 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {
+ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+}
+
+private String[] basicArgs(ClusterInstance cluster) {
 return new String[]{"--reset-offsets",
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", cluster.bootstrapServers(),
 "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
 }
 
-private String[] buildArgsForGroups(List groups, String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForGroups(ClusterInstance cluster, List 
groups, String... args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 for (String group : groups) {
 res.add("--group");
 res.add(group);
 }
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-private String[] buildArgsForGroup(String group, String...args) {
-return buildArgsForGroups(Collections.singletonList(group), args);
+private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+return buildArgsForGroups(cluster, singletonList(group), args);
 }
 
-private String[] buildArgsForAllGroups(String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 res.add("--all-groups");
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-@Test
-public void testResetOffsetsNotExistingGroup() throws Exception {
-String group = "missing.group";
-String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-// Make sure we got a coordinator
-TestUtils.waitForCondition(
-() -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-"Can't find a coordinator");
-Map resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-assertTrue(resetOffsets.isEmpty());
-assertTrue(committedOffsets(TOPIC, group).isEmpty());
-}
-
-@Test
-public void testResetOffsetsExistingTopic() {
-String group = "new.group";
-String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-produceMessages(TOPIC, 100);
-resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
-}
-
-@Test
-public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-produceMessages(TOPIC, 100);
-List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-for (String group : groups) {
-ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-awaitConsumerProgress(TOPIC, group, 100L);
-executor.shutdown();
+@ClusterTemplate("generator")
+public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+String topic = generateRandomTopic();
+String group = "missing.group";
+String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+// Make sure we got a coordinator
+TestUtils.waitForCondition(
+() -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+"Can't find a coordinator");
+

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-12 Thread via GitHub


m1a2st commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1597596564


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -62,506 +91,787 @@
  * - scope=topics+partitions, scenario=to-earliest
  * - export/import
  */
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
-private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+private static final String TOPIC_PREFIX = "foo-";
+private static final String GROUP_PREFIX = "test.group-";
+
+private static void generator(ClusterGenerator clusterGenerator) {
+ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+}
+
+private String[] basicArgs(ClusterInstance cluster) {
 return new String[]{"--reset-offsets",
-"--bootstrap-server", bootstrapServers(listenerName()),
+"--bootstrap-server", cluster.bootstrapServers(),
 "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
 }
 
-private String[] buildArgsForGroups(List groups, String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForGroups(ClusterInstance cluster, List 
groups, String... args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 for (String group : groups) {
 res.add("--group");
 res.add(group);
 }
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-private String[] buildArgsForGroup(String group, String...args) {
-return buildArgsForGroups(Collections.singletonList(group), args);
+private String[] buildArgsForGroup(ClusterInstance cluster, String group, 
String... args) {
+return buildArgsForGroups(cluster, singletonList(group), args);
 }
 
-private String[] buildArgsForAllGroups(String...args) {
-List res = new ArrayList<>(Arrays.asList(basicArgs()));
+private String[] buildArgsForAllGroups(ClusterInstance cluster, String... 
args) {
+List res = new ArrayList<>(asList(basicArgs(cluster)));
 res.add("--all-groups");
-res.addAll(Arrays.asList(args));
+res.addAll(asList(args));
 return res.toArray(new String[0]);
 }
 
-@Test
-public void testResetOffsetsNotExistingGroup() throws Exception {
-String group = "missing.group";
-String[] args = buildArgsForGroup(group, "--all-topics", 
"--to-current", "--execute");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(args);
-// Make sure we got a coordinator
-TestUtils.waitForCondition(
-() -> 
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
 "localhost"),
-"Can't find a coordinator");
-Map resetOffsets = 
consumerGroupCommand.resetOffsets().get(group);
-assertTrue(resetOffsets.isEmpty());
-assertTrue(committedOffsets(TOPIC, group).isEmpty());
-}
-
-@Test
-public void testResetOffsetsExistingTopic() {
-String group = "new.group";
-String[] args = buildArgsForGroup(group, "--topic", TOPIC, 
"--to-offset", "50");
-produceMessages(TOPIC, 100);
-resetAndAssertOffsets(args, 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, 
Collections.singletonList(TOPIC));
-resetAndAssertOffsets(addTo(args, "--execute"), 50, false, 
Collections.singletonList(TOPIC));
-}
-
-@Test
-public void testResetOffsetsExistingTopicSelectedGroups() throws Exception 
{
-produceMessages(TOPIC, 100);
-List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP 
+ id).collect(Collectors.toList());
-for (String group : groups) {
-ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, 
TOPIC, group, GroupProtocol.CLASSIC.name);
-awaitConsumerProgress(TOPIC, group, 100L);
-executor.shutdown();
+@ClusterTemplate("generator")
+public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) 
throws Exception {
+for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+String topic = generateRandomTopic();
+String group = "missing.group";
+String[] args = buildArgsForGroup(cluster, group, "--all-topics", 
"--to-current", "--execute");
+
+try (ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(args)) {
+// Make sure we got a coordinator
+TestUtils.waitForCondition(
+() -> 
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+"Can't find a coordinator");
+

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-11 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2106119689

   @chia7712 , Thanks for your review, I have been modify all test for testing 
different groupProtocol type. Please review again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-11 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2105943653

   @m1a2st Could you please make sure "all" available consumer groups are 
included in this test? For example: `produceConsumeAndShutdown` creates 
consumer only for `classic`. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-09 Thread via GitHub


chia7712 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1596279690


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##
@@ -415,153 +474,246 @@ public void testResetOffsetsExportImportPlan() throws 
Exception {
 TopicPartition t1p1 = new TopicPartition(topic1, 1);
 TopicPartition t2p0 = new TopicPartition(topic2, 0);
 TopicPartition t2p1 = new TopicPartition(topic2, 1);
-createTopic(topic1, 2, 1, new Properties(), listenerName(), new 
Properties());
-createTopic(topic2, 2, 1, new Properties(), listenerName(), new 
Properties());
+String[] cgcArgs = buildArgsForGroups(cluster, asList(group1, group2),
+"--all-topics", "--to-offset", "2", "--export");
+File file = TestUtils.tempFile("reset", ".csv");
+// Multiple --group's offset import
+String[] cgcArgsExec = buildArgsForGroups(cluster, asList(group1, 
group2),
+"--all-topics",
+"--from-file", file.getCanonicalPath(), "--dry-run");
+// Single --group offset import using "group,topic,partition,offset" 
csv format
+String[] cgcArgsExec2 = buildArgsForGroup(cluster, group1, 
"--all-topics",
+"--from-file", file.getCanonicalPath(), "--dry-run");
 
-String[] cgcArgs = buildArgsForGroups(Arrays.asList(group1, group2), 
"--all-topics", "--to-offset", "2", "--export");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = 
getConsumerGroupService(cgcArgs);
+try (Admin admin = cluster.createAdminClient();
+ ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(file));
+ ConsumerGroupCommand.ConsumerGroupService serviceExec = 
getConsumerGroupService(cgcArgsExec);
+ ConsumerGroupCommand.ConsumerGroupService serviceExec2 = 
getConsumerGroupService(cgcArgsExec2)) {
 
-produceConsumeAndShutdown(topic1, group1, 100, 1);
-produceConsumeAndShutdown(topic2, group2, 100, 1);
+admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1),
+new NewTopic(topic2, 2, (short) 1))).all().get();
 
-awaitConsumerGroupInactive(consumerGroupCommand, group1);
-awaitConsumerGroupInactive(consumerGroupCommand, group2);
+produceConsumeAndShutdown(cluster, topic1, group1, 1);
+produceConsumeAndShutdown(cluster, topic2, group2, 1);
 
-File file = TestUtils.tempFile("reset", ".csv");
+awaitConsumerGroupInactive(service, group1);
+awaitConsumerGroupInactive(service, group2);
 
-Map> exportedOffsets = 
consumerGroupCommand.resetOffsets();
-BufferedWriter bw = new BufferedWriter(new FileWriter(file));
-bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets));
-bw.close();
-Map exp1 = new HashMap<>();
-exp1.put(t1p0, 2L);
-exp1.put(t1p1, 2L);
-Map exp2 = new HashMap<>();
-exp2.put(t2p0, 2L);
-exp2.put(t2p1, 2L);
+Map> 
exportedOffsets = service.resetOffsets();
+bw.write(service.exportOffsetsToCsv(exportedOffsets));
+Map exp1 = new HashMap<>();
+exp1.put(t1p0, 2L);
+exp1.put(t1p1, 2L);
+Map exp2 = new HashMap<>();
+exp2.put(t2p0, 2L);
+exp2.put(t2p1, 2L);
 
-assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
-assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
+assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
+assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
 
-// Multiple --group's offset import
-String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1, 
group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = 
getConsumerGroupService(cgcArgsExec);
-Map> importedOffsets = 
consumerGroupCommandExec.resetOffsets();
-assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
-assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
+Map> 
importedOffsets = serviceExec.resetOffsets();
+assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
+assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
 
-// Single --group offset import using "group,topic,partition,offset" 
csv format
-String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", 
"--from-file", file.getCanonicalPath(), "--dry-run");
-ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 = 
getConsumerGroupService(cgcArgsExec2);
-Map> importedOffsets2 = 
consumerGroupCommandExec2.resetOffsets();

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-07 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2099697982

   @m1a2st Could you please use `ConsumerGroupCommandTestUtils#generator` to 
rewrite this test? thanks!
   
   
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L50


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-05-07 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2098864822

   @chia7712 , please take a look for this PR, thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-29 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2083099739

   Rely on #15766 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-23 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2073680590

   @lianetm @chia7712  Thanks for your comment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070801312

   @m1a2st Please add following server property:
   ```java
   @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
   @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
   @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   })
   ```
   The default replica of offset topic is 3, and the new infra create single 
broker. Hence, creating offset topic get failed, and hence you can't find a 
coordinator as the offset partition can't get be ready.
   
   Also, please notice @lianetm comments that bootstrap servers need to be 
updated manually if you kill any broker. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-22 Thread via GitHub


lianetm commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070678792

   Hey @m1a2st , the way to get the servers' addresses is 
`cluster.bootstrapServers()`, you got it right, but I see you're not inheriting 
from `ConsumerGroupCommandTest` anymore, so that removes the creation of the 
brokers to run the test against (that happens in the base setup 
[here](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala#L121)).
 Could that be what you're missing? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-22 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2069841257

   @chia7712 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org