This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7945d322f6b KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config (#15537) 7945d322f6b is described below commit 7945d322f6b1a61e477e992768668d28f594a072 Author: Kirk True <k...@kirktrue.pro> AuthorDate: Fri Mar 15 06:19:01 2024 -0700 KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config (#15537) * KAFKA-16267: Update consumer_group_command_test.py to support KIP-848’s group protocol config Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢 Note: this requires #15330. * Update consumer_group_command_test.py Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../tests/core/consumer_group_command_test.py | 29 ++++++++++++---------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/tests/kafkatest/tests/core/consumer_group_command_test.py b/tests/kafkatest/tests/core/consumer_group_command_test.py index 23406770548..7f1d79574d3 100644 --- a/tests/kafkatest/tests/core/consumer_group_command_test.py +++ b/tests/kafkatest/tests/core/consumer_group_command_test.py @@ -20,7 +20,7 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.kafka import KafkaService, quorum, consumer_group from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.security.security_config import SecurityConfig @@ -59,14 +59,15 @@ class ConsumerGroupCommandTest(Test): controller_num_nodes_override=self.num_zk) self.kafka.start() - def start_consumer(self): + def start_consumer(self, group_protocol=None): + consumer_properties = consumer_group.maybe_set_group_protocol(group_protocol) self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, - consumer_timeout_ms=None) + consumer_timeout_ms=None, consumer_properties=consumer_properties) self.consumer.start() - def setup_and_verify(self, security_protocol, group=None): + def setup_and_verify(self, security_protocol, group=None, group_protocol=None): self.start_kafka(security_protocol, security_protocol) - self.start_consumer() + self.start_consumer(group_protocol=group_protocol) consumer_node = self.consumer.nodes[0] wait_until(lambda: self.consumer.alive(consumer_node), timeout_sec=20, backoff_sec=.2, err_msg="Consumer was too slow to start") @@ -92,35 +93,37 @@ class ConsumerGroupCommandTest(Test): @cluster(num_nodes=3) @matrix( security_protocol=['PLAINTEXT', 'SSL'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False): + def test_list_consumer_groups(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): """ Tests if ConsumerGroupCommand is listing correct consumer groups :return: None """ - self.setup_and_verify(security_protocol) + self.setup_and_verify(security_protocol, group_protocol=group_protocol) @cluster(num_nodes=3) @matrix( security_protocol=['PLAINTEXT', 'SSL'], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False): + def test_describe_consumer_group(self, security_protocol='PLAINTEXT', metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): """ Tests if ConsumerGroupCommand is describing a consumer group correctly :return: None """ - self.setup_and_verify(security_protocol, group="test-consumer-group") + self.setup_and_verify(security_protocol, group="test-consumer-group", group_protocol=group_protocol)