This is an automated email from the ASF dual-hosted git repository. wcarlson pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e95e91a0623 KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config (#15626) e95e91a0623 is described below commit e95e91a0623b3f84438e4ebc8e77d1e37979ef62 Author: Kirk True <k...@kirktrue.pro> AuthorDate: Wed Apr 3 10:12:51 2024 -0700 KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config (#15626) Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢 Reviewers: Walker Carlson <wcarl...@apache.org> --- tests/kafkatest/tests/core/kraft_upgrade_test.py | 29 ++++++++++++++++-------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/tests/kafkatest/tests/core/kraft_upgrade_test.py b/tests/kafkatest/tests/core/kraft_upgrade_test.py index 3f3c4a81b10..2d271b9e061 100644 --- a/tests/kafkatest/tests/core/kraft_upgrade_test.py +++ b/tests/kafkatest/tests/core/kraft_upgrade_test.py @@ -17,12 +17,12 @@ from ducktape.mark import parametrize, matrix from ducktape.mark.resource import cluster from ducktape.utils.util import wait_until from kafkatest.services.console_consumer import ConsoleConsumer -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, consumer_group from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \ +from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_7, \ DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION # @@ -74,7 +74,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest): self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION) self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION) - def run_upgrade(self, from_kafka_version): + def run_upgrade(self, from_kafka_version, group_protocol): """Test upgrade of Kafka broker cluster from various versions to the current version from_kafka_version is a Kafka version to upgrade from. @@ -101,7 +101,8 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest): version=KafkaVersion(from_kafka_version)) self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=True, consumer_timeout_ms=30000, - message_validator=is_int, version=KafkaVersion(from_kafka_version)) + message_validator=is_int, version=KafkaVersion(from_kafka_version), + consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol)) self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version)) cluster_id = self.kafka.cluster_id() assert cluster_id is not None @@ -112,13 +113,21 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest): @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], use_new_coordinator=[True, False], metadata_quorum=[combined_kraft]) - def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False): - self.run_upgrade(from_kafka_version) + @matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)], + use_new_coordinator=[True], + metadata_quorum=[combined_kraft], + group_protocol=consumer_group.all_group_protocols) + def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False, group_protocol=None): + self.run_upgrade(from_kafka_version, group_protocol) @cluster(num_nodes=8) - @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], - use_new_coordinator=[True, False], + @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], + use_new_coordinator=[True, False], metadata_quorum=[isolated_kraft]) - def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False): - self.run_upgrade(from_kafka_version) + @matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)], + use_new_coordinator=[True], + metadata_quorum=[isolated_kraft], + group_protocol=consumer_group.all_group_protocols) + def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False, group_protocol=None): + self.run_upgrade(from_kafka_version, group_protocol)