Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2146925116 @FrankYang0529 Could you please merge `ListConsumerGroupUnitTest` into `ListConsumerGroupTest`? Multi classes in single class file is not common in code base -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 merged PR #15821: URL: https://github.com/apache/kafka/pull/15821 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1619707209 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -50,14 +51,13 @@ class ConsumerGroupCommandTestUtils { private ConsumerGroupCommandTestUtils() { } -static List generator() { +static List generator(boolean onlyConsumerGroupCoordinator) { Review Comment: Updated it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1619254777 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -50,14 +51,13 @@ class ConsumerGroupCommandTestUtils { private ConsumerGroupCommandTestUtils() { } -static List generator() { +static List generator(boolean onlyConsumerGroupCoordinator) { Review Comment: Could we avoid using single `boolean` argument? That is not readable. We can do a bit refactor: ```java static List generator() { return Stream.concat(forConsumerGroupCoordinator().stream(), forClassicGroupCoordinator().stream()) .collect(Collectors.toList()); } static List forConsumerGroupCoordinator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "true"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) .setServerProperties(serverProperties) .setTags(Collections.singletonList("consumerGroupCoordinator")) .build()); } static List forClassicGroupCoordinator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(NEW_GROUP_COORDINATOR_ENABLE_CONFIG, "false"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setServerProperties(serverProperties) .setTags(Collections.singletonList("classicGroupCoordinator")) .build()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2135260400 The CI is finished and failed test cases are not related to this PR. MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault: https://issues.apache.org/jira/browse/KAFKA-15927 ReplicationQuotasTest.shouldThrottleOldSegments: https://issues.apache.org/jira/browse/KAFKA-16635 ConsumerBounceTest.testConsumptionWithBrokerFailures: https://issues.apache.org/jira/browse/KAFKA-15146 DelegationTokenEndToEndAuthorizationWithOwnerTest: https://issues.apache.org/jira/browse/KAFKA-15411 QuorumControllerMetricsIntegrationTest.testTimeoutMetrics: https://issues.apache.org/jira/browse/KAFKA-16173 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1616179624 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -17,449 +17,502 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; +import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) -@MethodSource("getTestQuorumAndGroupProtocolParametersAll") -public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { -String simpleGroup = "simple-group"; - -createOffsetsTopic(listenerName(), new Properties()); - -addSimpleGroupExecutor(simpleGroup); -addConsumerGroupExecutor(1); -addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class ListConsumerGroupTest { +private final static String TOPIC = "foo"; +private final static String SIMPLE_GROUP = "test.simple.group"; +private final static String DEFAULT_GROUP = "test.default.group"; +private final static String PROTOCOL_GROUP = "test.protocol.group"; +private final ClusterInstance clusterInstance; + +ListConsumerGroupTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +private static List defaultGenerator() { +return ConsumerGroupCommandTestUtils.generator(); +} -Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); -final AtomicReference foundGroups = new AtomicReference<>(); +private static List consumerProtocolOnlyGenerator() { Review Comment: Hi @chia7712, thanks for the suggestion. I found that I didn't follow old cases to test `group.coordinator.new.enable=true` with classic and consumer group protocols. I have updated the PR to follow it. BTW, do we want to follow this comment to reduce test cases like old framework? Thanks. https://github.com/apache/kafka/p
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1616213225 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -17,449 +17,502 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; +import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) -@MethodSource("getTestQuorumAndGroupProtocolParametersAll") -public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { -String simpleGroup = "simple-group"; - -createOffsetsTopic(listenerName(), new Properties()); - -addSimpleGroupExecutor(simpleGroup); -addConsumerGroupExecutor(1); -addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class ListConsumerGroupTest { +private final static String TOPIC = "foo"; +private final static String SIMPLE_GROUP = "test.simple.group"; +private final static String DEFAULT_GROUP = "test.default.group"; +private final static String PROTOCOL_GROUP = "test.protocol.group"; +private final ClusterInstance clusterInstance; + +ListConsumerGroupTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +private static List defaultGenerator() { +return ConsumerGroupCommandTestUtils.generator(); +} -Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); -final AtomicReference foundGroups = new AtomicReference<>(); +private static List consumerProtocolOnlyGenerator() { Review Comment: > BTW, do we want to follow this comment to reduce test cases like old framework? Thanks. That is a good suggestion. Could you please file a ticket for it? We should have a separate PR for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1616179624 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -17,449 +17,502 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; +import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) -@MethodSource("getTestQuorumAndGroupProtocolParametersAll") -public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { -String simpleGroup = "simple-group"; - -createOffsetsTopic(listenerName(), new Properties()); - -addSimpleGroupExecutor(simpleGroup); -addConsumerGroupExecutor(1); -addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class ListConsumerGroupTest { +private final static String TOPIC = "foo"; +private final static String SIMPLE_GROUP = "test.simple.group"; +private final static String DEFAULT_GROUP = "test.default.group"; +private final static String PROTOCOL_GROUP = "test.protocol.group"; +private final ClusterInstance clusterInstance; + +ListConsumerGroupTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +private static List defaultGenerator() { +return ConsumerGroupCommandTestUtils.generator(); +} -Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); -final AtomicReference foundGroups = new AtomicReference<>(); +private static List consumerProtocolOnlyGenerator() { Review Comment: Hi @chia7712, thanks for the suggestion. I found that I didn't follow old cases to test `group. coordinator. new. enable=true` with classic and consumer group protocols. I have updated the PR to follow it. BTW, do we want to follow this comment to reduce test cases like old framework? Thanks. https://github.com/apache/kafk
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1615797305 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -17,449 +17,502 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; +import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) -@MethodSource("getTestQuorumAndGroupProtocolParametersAll") -public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { -String simpleGroup = "simple-group"; - -createOffsetsTopic(listenerName(), new Properties()); - -addSimpleGroupExecutor(simpleGroup); -addConsumerGroupExecutor(1); -addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class ListConsumerGroupTest { +private final static String TOPIC = "foo"; +private final static String SIMPLE_GROUP = "test.simple.group"; +private final static String DEFAULT_GROUP = "test.default.group"; +private final static String PROTOCOL_GROUP = "test.protocol.group"; +private final ClusterInstance clusterInstance; + +ListConsumerGroupTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--list"}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +private static List defaultGenerator() { +return ConsumerGroupCommandTestUtils.generator(); +} -Set expectedGroups = set(Arrays.asList(GROUP, simpleGroup, PROTOCOL_GROUP)); -final AtomicReference foundGroups = new AtomicReference<>(); +private static List consumerProtocolOnlyGenerator() { Review Comment: How about moving this helper to `ConsumerGroupCommandTestUtils` and reuse it to rewrite `ConsumerGroupCommandTestUtils.generator`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kaf
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1612965039 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -68,15 +68,31 @@ static List generator() { ClusterConfig consumerGroupCoordinator = ClusterConfig.defaultBuilder() .setTypes(Stream.of(KRAFT, CO_KRAFT).collect(Collectors.toSet())) .setServerProperties(serverProperties) -.setTags(Collections.singletonList("newGroupCoordinator")) +.setTags(Collections.singletonList("consumerGroupCoordinator")) .build(); return Arrays.asList(classicGroupCoordinator, consumerGroupCoordinator); } +static AutoCloseable buildConsumers(int numberOfConsumers, +boolean syncCommit, Review Comment: In this case the consumers don't use consumer offsets, so `syncCommit` must be "false" to avoid error ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java: ## @@ -17,449 +17,503 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterConfig; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTemplate; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.api.extension.ExtendWith; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Properties; import java.util.List; import java.util.Set; +import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES; +import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -public class ListConsumerGroupTest extends ConsumerGroupCommandTest { -@ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES) -@MethodSource("getTestQuorumAndGroupProtocolParametersAll") -public void testListConsumerGroupsWithoutFilters(String quorum, String groupProtocol) throws Exception { -String simpleGroup = "simple-group"; - -createOffsetsTopic(listenerName(), new Properties()); - -addSimpleGroupExecutor(simpleGroup); -addConsumerGroupExecutor(1); -addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol); +@Tag("integration") +@ExtendWith(ClusterTestExtensions.class) +public class ListConsumerGroupTest { +private final static String TOPIC = "foo"; +private final static String SIMPLE_GROUP = "test.simple.group"; +private final static String DEFAULT_GROUP = "test.default.group"; +private final static String PROTOCOL_GROUP = "test.protocol.group"; +private final ClusterInstance clusterInstance; + +ListConsumerGroupTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInsta
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1601924534 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -93,6 +95,27 @@ static AutoCloseable buildConsumers(int numberOfConsumers, } } +static AutoCloseable buildConsumers(int numberOfConsumers, Review Comment: Thanks for the great suggestion. Updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1600210615 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -93,6 +95,27 @@ static AutoCloseable buildConsumers(int numberOfConsumers, } } +static AutoCloseable buildConsumers(int numberOfConsumers, Review Comment: Please DON'T generate a bunch of duplicate code... ```java static AutoCloseable buildConsumers(int numberOfConsumers, boolean syncCommit, Set partitions, Supplier> consumerSupplier) { return buildConsumers(numberOfConsumers, syncCommit, consumerSupplier, consumer -> consumer.assign(partitions)); } static AutoCloseable buildConsumers(int numberOfConsumers, boolean syncCommit, String topic, Supplier> consumerSupplier) { return buildConsumers(numberOfConsumers, syncCommit, consumerSupplier, consumer -> consumer.subscribe(Collections.singleton(topic))); } private static AutoCloseable buildConsumers(int numberOfConsumers, boolean syncCommit, Supplier> consumerSupplier, Consumer> setPartitions) { List> consumers = new ArrayList<>(numberOfConsumers); ExecutorService executor = Executors.newFixedThreadPool(numberOfConsumers); AtomicBoolean closed = new AtomicBoolean(false); final AutoCloseable closeable = () -> releaseConsumers(closed, consumers, executor); try { for (int i = 0; i < numberOfConsumers; i++) { KafkaConsumer consumer = consumerSupplier.get(); consumers.add(consumer); executor.execute(() -> initConsumer(syncCommit, () -> { setPartitions.accept(consumer); return consumer; }, closed)); } return closeable; } catch (Throwable e) { Utils.closeQuietly(closeable, "Release Consumer"); throw e; } } private static void releaseConsumers(AtomicBoolean closed, List> consumers, ExecutorService executor) throws InterruptedException { closed.set(true); consumers.forEach(KafkaConsumer::wakeup); executor.shutdown(); executor.awaitTermination(1, TimeUnit.MINUTES); } private static void initConsumer(boolean syncCommit, Supplier> consumerSupplier, AtomicBoolean closed) { try (KafkaConsumer kafkaConsumer = consumerSupplier.get()) { while (!closed.get()) { kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE)); if (syncCommit) kafkaConsumer.commitSync(); } } catch (WakeupException e) { // OK } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2107364174 @FrankYang0529 please rebase code to include https://github.com/apache/kafka/commit/334d5d58bb73ca04d04be90dec8c4e49000577ec -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1597470470 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) { static AutoCloseable buildConsumers(int numberOfConsumers, Review Comment: Thank you. I update to define two `buildConsumers`. May you help me take a look? If we're good with current design. I will update to https://github.com/apache/kafka/pull/15908. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15821: URL: https://github.com/apache/kafka/pull/15821#discussion_r1596196298 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java: ## @@ -92,6 +94,7 @@ static void generator(ClusterGenerator clusterGenerator) { static AutoCloseable buildConsumers(int numberOfConsumers, Review Comment: Could we have two `buildConsumers` to deal with "assign"/"subscribe" individually? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2102879454 > > This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. WDYT? Thank you. > > That is addressed already. see https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92 Thanks. The `SimpleConsumerGroupExecutor ` subscribe topic partitions. I have updated `ConsumerGroupCommandTestUtils` to support it. https://github.com/apache/kafka/blob/f4fdaa702a2e718bdb44b9c5fec254f32a33f0d8/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L274-L287 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2099700974 > This is used by ListConsumerGroupTest and DescribeConsumerGroupTest. I think we can create a new class SimpleConsumerGroupExecutorTestUtils for it. WDYT? Thank you. That is addressed already. see https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java#L92 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2098714976 Hi @chia7712, I rebase latest trunk branch, so we have `ConsumerGroupCommandTestUtils` now. The only remaining part is `SimpleConsumerGroupExecutor`. This is used by `ListConsumerGroupTest` and `DescribeConsumerGroupTest`. I think we can create a new class `SimpleConsumerGroupExecutorTestUtils` for it. WDYT? Thank you. https://github.com/apache/kafka/blob/21bf715622e9d05984fa8a2a1f9f12d54b76ce41/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java#L327-L335 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15821: URL: https://github.com/apache/kafka/pull/15821#issuecomment-2080374723 Rely on https://github.com/apache/kafka/pull/15766. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: migrate ListConsumerGroupTest to use ClusterTestExtensions [kafka]
FrankYang0529 opened a new pull request, #15821: URL: https://github.com/apache/kafka/pull/15821 By using `ClusterTestExtensions`, `ListConsumerGroupTest` get get away from `KafkaServerTestHarness` dependency. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org