This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new af0ec247ccf KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol config (#15330) af0ec247ccf is described below commit af0ec247ccf7b1d97ca684109854bf1881fd11bb Author: Kirk True <k...@kirktrue.pro> AuthorDate: Fri Mar 15 06:17:22 2024 -0700 KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol config (#15330) Added a new optional group_protocol parameter to the test methods, then passed that down to the setup_consumer method. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢 Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- tests/kafkatest/services/kafka/consumer_group.py | 42 +++++++++++++ tests/kafkatest/services/verifiable_consumer.py | 2 + tests/kafkatest/tests/client/consumer_test.py | 73 +++++++++++++++-------- tests/kafkatest/tests/verifiable_consumer_test.py | 3 +- 4 files changed, 93 insertions(+), 27 deletions(-) diff --git a/tests/kafkatest/services/kafka/consumer_group.py b/tests/kafkatest/services/kafka/consumer_group.py new file mode 100644 index 00000000000..e94bd6382d2 --- /dev/null +++ b/tests/kafkatest/services/kafka/consumer_group.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# These are the group protocols we support. Most tests that use the new group coordinator will +# (eventually) be upgraded to test both of these consumer groups. +classic_group_protocol = 'classic' +consumer_group_protocol = 'consumer' +all_group_protocols = [classic_group_protocol, consumer_group_protocol] + +# These are the remote assignors used by the new group coordinator. +range_remote_assignor = 'range' +uniform_remote_assignor = 'uniform' +all_remote_assignors = [range_remote_assignor, uniform_remote_assignor] + + +def is_consumer_group_protocol_enabled(group_protocol): + """Check if the KIP-848 consumer group protocol is enabled.""" + return group_protocol is not None and group_protocol.lower() == consumer_group_protocol + + +def maybe_set_group_protocol(group_protocol, config=None): + """Maybe include the KIP-848 group.protocol configuration if it's not None.""" + if config is None: + config = {} + + if group_protocol is not None: + config["group.protocol"] = group_protocol + + return config diff --git a/tests/kafkatest/services/verifiable_consumer.py b/tests/kafkatest/services/verifiable_consumer.py index e1155c16aae..7ef5f75e22b 100644 --- a/tests/kafkatest/services/verifiable_consumer.py +++ b/tests/kafkatest/services/verifiable_consumer.py @@ -177,6 +177,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, Backgrou self.log_level = log_level self.kafka = kafka self.topic = topic + self.group_protocol = group_protocol + self.group_remote_assignor = group_remote_assignor self.group_id = group_id self.reset_policy = reset_policy self.static_membership = static_membership diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index 6dc1fb897b0..d4d6af9f2aa 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -18,7 +18,7 @@ from ducktape.utils.util import wait_until from ducktape.mark.resource import cluster from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest -from kafkatest.services.kafka import TopicPartition, quorum +from kafkatest.services.kafka import TopicPartition, quorum, consumer_group import signal @@ -76,14 +76,15 @@ class OffsetValidationTest(VerifiableConsumerTest): @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordinator=False): + def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): """ Verify correct consumer behavior when the brokers are consecutively restarted. @@ -108,7 +109,7 @@ class OffsetValidationTest(VerifiableConsumerTest): # broker, and that coordinator will fail the consumer and trigger a group rebalance if its session times out. # This test is asserting that no rebalances occur, so we increase the session timeout for this to be the case. self.session_timeout_sec = 30 - consumer = self.setup_consumer(self.TOPIC) + consumer = self.setup_consumer(self.TOPIC, group_protocol=group_protocol) producer.start() self.await_produced_messages(producer) @@ -136,16 +137,17 @@ class OffsetValidationTest(VerifiableConsumerTest): @matrix( clean_shutdown=[True], bounce_mode=["all", "rolling"], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( clean_shutdown=[True], bounce_mode=["all", "rolling"], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False): + def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): """ Verify correct consumer behavior when the consumers in the group are consecutively restarted. @@ -160,7 +162,7 @@ class OffsetValidationTest(VerifiableConsumerTest): partition = TopicPartition(self.TOPIC, 0) producer = self.setup_producer(self.TOPIC) - consumer = self.setup_consumer(self.TOPIC) + consumer = self.setup_consumer(self.TOPIC, group_protocol=group_protocol) producer.start() self.await_produced_messages(producer) @@ -371,19 +373,20 @@ class OffsetValidationTest(VerifiableConsumerTest): @matrix( clean_shutdown=[True], enable_autocommit=[True, False], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( clean_shutdown=[True], enable_autocommit=[True, False], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False): + def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): partition = TopicPartition(self.TOPIC, 0) - consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit) + consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) producer = self.setup_producer(self.TOPIC) consumer.start() @@ -436,12 +439,19 @@ class OffsetValidationTest(VerifiableConsumerTest): clean_shutdown=[True, False], enable_autocommit=[True, False], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[False] ) - def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False): + @matrix( + clean_shutdown=[True, False], + enable_autocommit=[True, False], + metadata_quorum=[quorum.isolated_kraft], + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols + ) + def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): partition = TopicPartition(self.TOPIC, 0) - consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit) + consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) producer = self.setup_producer(self.TOPIC) producer.start() @@ -475,14 +485,15 @@ class OffsetValidationTest(VerifiableConsumerTest): @cluster(num_nodes=7) @matrix( - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) - def test_group_consumption(self, metadata_quorum=quorum.zk, use_new_coordinator=False): + def test_group_consumption(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): """ Verifies correct group rebalance behavior as consumers are started and stopped. In particular, this test verifies that the partition is readable after every @@ -494,7 +505,7 @@ class OffsetValidationTest(VerifiableConsumerTest): - Start the consumers one by one, verifying consumption after each rebalance - Shutdown the consumers one by one, verifying consumption after each rebalance """ - consumer = self.setup_consumer(self.TOPIC) + consumer = self.setup_consumer(self.TOPIC, group_protocol=group_protocol) producer = self.setup_producer(self.TOPIC) partition = TopicPartition(self.TOPIC, 0) @@ -535,18 +546,25 @@ class AssignmentValidationTest(VerifiableConsumerTest): @matrix( assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", - "org.apache.kafka.clients.consumer.StickyAssignor"], - metadata_quorum=[quorum.zk], + "org.apache.kafka.clients.consumer.StickyAssignor"], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor", "org.apache.kafka.clients.consumer.RoundRobinAssignor", - "org.apache.kafka.clients.consumer.StickyAssignor"], + "org.apache.kafka.clients.consumer.StickyAssignor"], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=[consumer_group.classic_group_protocol], + ) + @matrix( + metadata_quorum=[quorum.isolated_kraft], + use_new_coordinator=[True], + group_protocol=[consumer_group.consumer_group_protocol], + group_remote_assignor=consumer_group.all_remote_assignors ) - def test_valid_assignment(self, assignment_strategy, metadata_quorum=quorum.zk, use_new_coordinator=False): + def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None): """ Verify assignment strategy correctness: each partition is assigned to exactly one consumer instance. @@ -556,7 +574,10 @@ class AssignmentValidationTest(VerifiableConsumerTest): - Start the consumers one by one - Validate assignment after every expected rebalance """ - consumer = self.setup_consumer(self.TOPIC, assignment_strategy=assignment_strategy) + consumer = self.setup_consumer(self.TOPIC, + assignment_strategy=assignment_strategy, + group_protocol=group_protocol, + group_remote_assignor=group_remote_assignor) for num_started, node in enumerate(consumer.nodes, 1): consumer.start_node(node) self.await_members(consumer, num_started) diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py b/tests/kafkatest/tests/verifiable_consumer_test.py index e416690bfc4..38bb6cf3bd9 100644 --- a/tests/kafkatest/tests/verifiable_consumer_test.py +++ b/tests/kafkatest/tests/verifiable_consumer_test.py @@ -54,10 +54,11 @@ class VerifiableConsumerTest(KafkaTest): return super(VerifiableConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers def setup_consumer(self, topic, static_membership=False, enable_autocommit=False, - assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", **kwargs): + assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", group_remote_assignor="range", **kwargs): return VerifiableConsumer(self.test_context, self.num_consumers, self.kafka, topic, self.group_id, static_membership=static_membership, session_timeout_sec=self.session_timeout_sec, assignment_strategy=assignment_strategy, enable_autocommit=enable_autocommit, + group_remote_assignor=group_remote_assignor, log_level="TRACE", **kwargs) def setup_producer(self, topic, max_messages=-1, throughput=500):