Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 merged PR #15779: URL: https://github.com/apache/kafka/pull/15779 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2127033316 It seems there are something is failed. Let's wait for https://issues.apache.org/jira/browse/KAFKA-16828 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1610809626 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +86,756 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { Review Comment: Ok, I rebase it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1609776281 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +86,756 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { Review Comment: Please rebase code to fix this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2120776989 @chia7712, Please review, Thanks for your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1606053454 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +86,764 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { +ConsumerGroupCommandTestUtils.generator(clusterGenerator); +} + +private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } -private String[] buildArgsForGroups(List groups, String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -private String[] buildArgsForGroup(String group, String...args) { -return buildArgsForGroups(Collections.singletonList(group), args); +private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { +return buildArgsForGroups(cluster, singletonList(group), args); } -private String[] buildArgsForAllGroups(String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -@Test -public void testResetOffsetsNotExistingGroup() throws Exception { +@ClusterTemplate("generator") +public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { +String topic = generateRandomTopic(); String group = "missing.group"; -String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); -// Make sure we got a coordinator -TestUtils.waitForCondition( -() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), -"Can't find a coordinator"); -Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); -assertTrue(resetOffsets.isEmpty()); -assertTrue(committedOffsets(TOPIC, group).isEmpty()); -} - -@Test -public void testResetOffsetsExistingTopic() { -String group = "new.group"; -String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); -produceMessages(TOPIC, 100); -resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); +String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + +try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { +// Make sure we got a coordinator +TestUtils.waitForCondition( +() -> "localhost".equals(service.collectGroupState(group).coordinator.host()), +"Can't find a coordinator"); +Map resetOffsets = service.resetOffsets().get(group); +assertTrue(resetOffsets.isEmpty()); +assertTrue(committedOffsets(cluster, topic, group).isEmpty()); +} } -@Test -public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { -produceMessages(TOPIC, 100); -List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); -for (String group : groups) { -ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); -awaitConsumerProgress(TOPIC, group, 100L);
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2114598318 @chia7712, Thanks for your conversations, I already rebase this code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2113049965 @m1a2st could you please rebase code to trigger QA again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2112434755 @chia7712 , Thanks for your review, these test passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2110457406 ``` Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 13s Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 14s Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 14s Build / JDK 17 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 8s Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 25s Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 30s Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 27s Build / JDK 8 and Scala 2.12 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 31s Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 24s Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 22s Build / JDK 21 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 40s Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [4] Type=Raft-Combined, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 29s Build / JDK 11 and Scala 2.13 / testResetOffsetsAllTopicsAllGroups [5] Type=Raft-Isolated, Name=consumerGroupCoordinator, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.ResetConsumerGroupOffsetTest1m 35s ``` @m1a2st Those tests verify all groups so the loop of `CLASSIC` + `CONSUMER` in the same cluster makes they failed. Maybe we ought to remove all groups after test. For instance, ```java try (Admin admin = cluster.createAdminClient()) { admin.deleteConsumerGroups(groups).all().get(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2108026784 @chia7712, Thanks for your review, I have been changed according to your conversations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1597629743 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +91,787 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { +ConsumerGroupCommandTestUtils.generator(clusterGenerator); +} + +private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } -private String[] buildArgsForGroups(List groups, String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -private String[] buildArgsForGroup(String group, String...args) { -return buildArgsForGroups(Collections.singletonList(group), args); +private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { +return buildArgsForGroups(cluster, singletonList(group), args); } -private String[] buildArgsForAllGroups(String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -@Test -public void testResetOffsetsNotExistingGroup() throws Exception { -String group = "missing.group"; -String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); -// Make sure we got a coordinator -TestUtils.waitForCondition( -() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), -"Can't find a coordinator"); -Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); -assertTrue(resetOffsets.isEmpty()); -assertTrue(committedOffsets(TOPIC, group).isEmpty()); -} - -@Test -public void testResetOffsetsExistingTopic() { -String group = "new.group"; -String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); -produceMessages(TOPIC, 100); -resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); -} - -@Test -public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { -produceMessages(TOPIC, 100); -List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); -for (String group : groups) { -ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); -awaitConsumerProgress(TOPIC, group, 100L); -executor.shutdown(); +@ClusterTemplate("generator") +public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { +for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { +String topic = generateRandomTopic(); +String group = "missing.group"; +String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + +try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { +// Make sure we got a coordinator +TestUtils.waitForCondition( +() -> "localhost".equals(service.collectGroupState(group).coordinator.host()), +"Can't find a coordinator"); +
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
FrankYang0529 commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1597626926 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +91,787 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { +ConsumerGroupCommandTestUtils.generator(clusterGenerator); +} + +private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } -private String[] buildArgsForGroups(List groups, String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -private String[] buildArgsForGroup(String group, String...args) { -return buildArgsForGroups(Collections.singletonList(group), args); +private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { +return buildArgsForGroups(cluster, singletonList(group), args); } -private String[] buildArgsForAllGroups(String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -@Test -public void testResetOffsetsNotExistingGroup() throws Exception { -String group = "missing.group"; -String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); -// Make sure we got a coordinator -TestUtils.waitForCondition( -() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), -"Can't find a coordinator"); -Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); -assertTrue(resetOffsets.isEmpty()); -assertTrue(committedOffsets(TOPIC, group).isEmpty()); -} - -@Test -public void testResetOffsetsExistingTopic() { -String group = "new.group"; -String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); -produceMessages(TOPIC, 100); -resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); -} - -@Test -public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { -produceMessages(TOPIC, 100); -List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); -for (String group : groups) { -ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); -awaitConsumerProgress(TOPIC, group, 100L); -executor.shutdown(); +@ClusterTemplate("generator") +public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { +for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { +String topic = generateRandomTopic(); +String group = "missing.group"; +String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + +try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { +// Make sure we got a coordinator +TestUtils.waitForCondition( +() -> "localhost".equals(service.collectGroupState(group).coordinator.host()), +"Can't find a coordinator"); +
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1597596564 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +91,787 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { +ConsumerGroupCommandTestUtils.generator(clusterGenerator); +} + +private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } -private String[] buildArgsForGroups(List groups, String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -private String[] buildArgsForGroup(String group, String...args) { -return buildArgsForGroups(Collections.singletonList(group), args); +private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { +return buildArgsForGroups(cluster, singletonList(group), args); } -private String[] buildArgsForAllGroups(String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -@Test -public void testResetOffsetsNotExistingGroup() throws Exception { -String group = "missing.group"; -String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); -// Make sure we got a coordinator -TestUtils.waitForCondition( -() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), -"Can't find a coordinator"); -Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); -assertTrue(resetOffsets.isEmpty()); -assertTrue(committedOffsets(TOPIC, group).isEmpty()); -} - -@Test -public void testResetOffsetsExistingTopic() { -String group = "new.group"; -String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); -produceMessages(TOPIC, 100); -resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); -} - -@Test -public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { -produceMessages(TOPIC, 100); -List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); -for (String group : groups) { -ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); -awaitConsumerProgress(TOPIC, group, 100L); -executor.shutdown(); +@ClusterTemplate("generator") +public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { +for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { +String topic = generateRandomTopic(); +String group = "missing.group"; +String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + +try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { +// Make sure we got a coordinator +TestUtils.waitForCondition( +() -> "localhost".equals(service.collectGroupState(group).coordinator.host()), +"Can't find a coordinator"); +
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1597596564 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +91,787 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { +ConsumerGroupCommandTestUtils.generator(clusterGenerator); +} + +private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } -private String[] buildArgsForGroups(List groups, String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -private String[] buildArgsForGroup(String group, String...args) { -return buildArgsForGroups(Collections.singletonList(group), args); +private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { +return buildArgsForGroups(cluster, singletonList(group), args); } -private String[] buildArgsForAllGroups(String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -@Test -public void testResetOffsetsNotExistingGroup() throws Exception { -String group = "missing.group"; -String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); -// Make sure we got a coordinator -TestUtils.waitForCondition( -() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), -"Can't find a coordinator"); -Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); -assertTrue(resetOffsets.isEmpty()); -assertTrue(committedOffsets(TOPIC, group).isEmpty()); -} - -@Test -public void testResetOffsetsExistingTopic() { -String group = "new.group"; -String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); -produceMessages(TOPIC, 100); -resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); -} - -@Test -public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { -produceMessages(TOPIC, 100); -List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); -for (String group : groups) { -ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); -awaitConsumerProgress(TOPIC, group, 100L); -executor.shutdown(); +@ClusterTemplate("generator") +public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { +for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { +String topic = generateRandomTopic(); +String group = "missing.group"; +String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + +try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { +// Make sure we got a coordinator +TestUtils.waitForCondition( +() -> "localhost".equals(service.collectGroupState(group).coordinator.host()), +"Can't find a coordinator"); +
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1597596564 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -62,506 +91,787 @@ * - scope=topics+partitions, scenario=to-earliest * - export/import */ -public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { -private String[] basicArgs() { +@ExtendWith(value = ClusterTestExtensions.class) +public class ResetConsumerGroupOffsetTest { + +private static final String TOPIC_PREFIX = "foo-"; +private static final String GROUP_PREFIX = "test.group-"; + +private static void generator(ClusterGenerator clusterGenerator) { +ConsumerGroupCommandTestUtils.generator(clusterGenerator); +} + +private String[] basicArgs(ClusterInstance cluster) { return new String[]{"--reset-offsets", -"--bootstrap-server", bootstrapServers(listenerName()), +"--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)}; } -private String[] buildArgsForGroups(List groups, String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForGroups(ClusterInstance cluster, List groups, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); for (String group : groups) { res.add("--group"); res.add(group); } -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -private String[] buildArgsForGroup(String group, String...args) { -return buildArgsForGroups(Collections.singletonList(group), args); +private String[] buildArgsForGroup(ClusterInstance cluster, String group, String... args) { +return buildArgsForGroups(cluster, singletonList(group), args); } -private String[] buildArgsForAllGroups(String...args) { -List res = new ArrayList<>(Arrays.asList(basicArgs())); +private String[] buildArgsForAllGroups(ClusterInstance cluster, String... args) { +List res = new ArrayList<>(asList(basicArgs(cluster))); res.add("--all-groups"); -res.addAll(Arrays.asList(args)); +res.addAll(asList(args)); return res.toArray(new String[0]); } -@Test -public void testResetOffsetsNotExistingGroup() throws Exception { -String group = "missing.group"; -String[] args = buildArgsForGroup(group, "--all-topics", "--to-current", "--execute"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(args); -// Make sure we got a coordinator -TestUtils.waitForCondition( -() -> Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(), "localhost"), -"Can't find a coordinator"); -Map resetOffsets = consumerGroupCommand.resetOffsets().get(group); -assertTrue(resetOffsets.isEmpty()); -assertTrue(committedOffsets(TOPIC, group).isEmpty()); -} - -@Test -public void testResetOffsetsExistingTopic() { -String group = "new.group"; -String[] args = buildArgsForGroup(group, "--topic", TOPIC, "--to-offset", "50"); -produceMessages(TOPIC, 100); -resetAndAssertOffsets(args, 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, Collections.singletonList(TOPIC)); -resetAndAssertOffsets(addTo(args, "--execute"), 50, false, Collections.singletonList(TOPIC)); -} - -@Test -public void testResetOffsetsExistingTopicSelectedGroups() throws Exception { -produceMessages(TOPIC, 100); -List groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP + id).collect(Collectors.toList()); -for (String group : groups) { -ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC, group, GroupProtocol.CLASSIC.name); -awaitConsumerProgress(TOPIC, group, 100L); -executor.shutdown(); +@ClusterTemplate("generator") +public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception { +for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) { +String topic = generateRandomTopic(); +String group = "missing.group"; +String[] args = buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute"); + +try (ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(args)) { +// Make sure we got a coordinator +TestUtils.waitForCondition( +() -> "localhost".equals(service.collectGroupState(group).coordinator.host()), +"Can't find a coordinator"); +
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2106119689 @chia7712 , Thanks for your review, I have been modify all test for testing different groupProtocol type. Please review again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2105943653 @m1a2st Could you please make sure "all" available consumer groups are included in this test? For example: `produceConsumeAndShutdown` creates consumer only for `classic`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on code in PR #15779: URL: https://github.com/apache/kafka/pull/15779#discussion_r1596279690 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java: ## @@ -415,153 +474,246 @@ public void testResetOffsetsExportImportPlan() throws Exception { TopicPartition t1p1 = new TopicPartition(topic1, 1); TopicPartition t2p0 = new TopicPartition(topic2, 0); TopicPartition t2p1 = new TopicPartition(topic2, 1); -createTopic(topic1, 2, 1, new Properties(), listenerName(), new Properties()); -createTopic(topic2, 2, 1, new Properties(), listenerName(), new Properties()); +String[] cgcArgs = buildArgsForGroups(cluster, asList(group1, group2), +"--all-topics", "--to-offset", "2", "--export"); +File file = TestUtils.tempFile("reset", ".csv"); +// Multiple --group's offset import +String[] cgcArgsExec = buildArgsForGroups(cluster, asList(group1, group2), +"--all-topics", +"--from-file", file.getCanonicalPath(), "--dry-run"); +// Single --group offset import using "group,topic,partition,offset" csv format +String[] cgcArgsExec2 = buildArgsForGroup(cluster, group1, "--all-topics", +"--from-file", file.getCanonicalPath(), "--dry-run"); -String[] cgcArgs = buildArgsForGroups(Arrays.asList(group1, group2), "--all-topics", "--to-offset", "2", "--export"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand = getConsumerGroupService(cgcArgs); +try (Admin admin = cluster.createAdminClient(); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); + BufferedWriter bw = new BufferedWriter(new FileWriter(file)); + ConsumerGroupCommand.ConsumerGroupService serviceExec = getConsumerGroupService(cgcArgsExec); + ConsumerGroupCommand.ConsumerGroupService serviceExec2 = getConsumerGroupService(cgcArgsExec2)) { -produceConsumeAndShutdown(topic1, group1, 100, 1); -produceConsumeAndShutdown(topic2, group2, 100, 1); +admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1), +new NewTopic(topic2, 2, (short) 1))).all().get(); -awaitConsumerGroupInactive(consumerGroupCommand, group1); -awaitConsumerGroupInactive(consumerGroupCommand, group2); +produceConsumeAndShutdown(cluster, topic1, group1, 1); +produceConsumeAndShutdown(cluster, topic2, group2, 1); -File file = TestUtils.tempFile("reset", ".csv"); +awaitConsumerGroupInactive(service, group1); +awaitConsumerGroupInactive(service, group2); -Map> exportedOffsets = consumerGroupCommand.resetOffsets(); -BufferedWriter bw = new BufferedWriter(new FileWriter(file)); -bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets)); -bw.close(); -Map exp1 = new HashMap<>(); -exp1.put(t1p0, 2L); -exp1.put(t1p1, 2L); -Map exp2 = new HashMap<>(); -exp2.put(t2p0, 2L); -exp2.put(t2p1, 2L); +Map> exportedOffsets = service.resetOffsets(); +bw.write(service.exportOffsetsToCsv(exportedOffsets)); +Map exp1 = new HashMap<>(); +exp1.put(t1p0, 2L); +exp1.put(t1p1, 2L); +Map exp2 = new HashMap<>(); +exp2.put(t2p0, 2L); +exp2.put(t2p1, 2L); -assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1))); -assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2))); +assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1))); +assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2))); -// Multiple --group's offset import -String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec); -Map> importedOffsets = consumerGroupCommandExec.resetOffsets(); -assertEquals(exp1, toOffsetMap(importedOffsets.get(group1))); -assertEquals(exp2, toOffsetMap(importedOffsets.get(group2))); +Map> importedOffsets = serviceExec.resetOffsets(); +assertEquals(exp1, toOffsetMap(importedOffsets.get(group1))); +assertEquals(exp2, toOffsetMap(importedOffsets.get(group2))); -// Single --group offset import using "group,topic,partition,offset" csv format -String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run"); -ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 = getConsumerGroupService(cgcArgsExec2); -Map> importedOffsets2 = consumerGroupCommandExec2.resetOffsets(); -
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2099697982 @m1a2st Could you please use `ConsumerGroupCommandTestUtils#generator` to rewrite this test? thanks! https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L50 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2098864822 @chia7712 , please take a look for this PR, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2083099739 Rely on #15766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2073680590 @lianetm @chia7712 Thanks for your comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070801312 @m1a2st Please add following server property: ```java @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") }) ``` The default replica of offset topic is 3, and the new infra create single broker. Hence, creating offset topic get failed, and hence you can't find a coordinator as the offset partition can't get be ready. Also, please notice @lianetm comments that bootstrap servers need to be updated manually if you kill any broker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
lianetm commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070678792 Hey @m1a2st , the way to get the servers' addresses is `cluster.bootstrapServers()`, you got it right, but I see you're not inheriting from `ConsumerGroupCommandTest` anymore, so that removes the creation of the brokers to run the test against (that happens in the base setup [here](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala#L121)). Could that be what you're missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2069841257 @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st opened a new pull request, #15779: URL: https://github.com/apache/kafka/pull/15779 change the ResetConsumerGroupOffsetTest first to new test framework -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org