This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af0ec247ccf KAFKA-16231: Update consumer_test.py to support KIP-848’s 
group protocol config (#15330)
af0ec247ccf is described below

commit af0ec247ccf7b1d97ca684109854bf1881fd11bb
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Fri Mar 15 06:17:22 2024 -0700

    KAFKA-16231: Update consumer_test.py to support KIP-848’s group protocol 
config (#15330)
    
    Added a new optional group_protocol parameter to the test methods, then 
passed that down to the setup_consumer method.
    
    Unfortunately, because the new consumer can only be used with the new 
coordinator, this required a new @matrix block instead of adding the 
group_protocol=["classic", "consumer"] to the existing blocks 😢
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 tests/kafkatest/services/kafka/consumer_group.py  | 42 +++++++++++++
 tests/kafkatest/services/verifiable_consumer.py   |  2 +
 tests/kafkatest/tests/client/consumer_test.py     | 73 +++++++++++++++--------
 tests/kafkatest/tests/verifiable_consumer_test.py |  3 +-
 4 files changed, 93 insertions(+), 27 deletions(-)

diff --git a/tests/kafkatest/services/kafka/consumer_group.py 
b/tests/kafkatest/services/kafka/consumer_group.py
new file mode 100644
index 00000000000..e94bd6382d2
--- /dev/null
+++ b/tests/kafkatest/services/kafka/consumer_group.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# These are the group protocols we support. Most tests that use the new group 
coordinator will
+# (eventually) be upgraded to test both of these consumer groups.
+classic_group_protocol = 'classic'
+consumer_group_protocol = 'consumer'
+all_group_protocols = [classic_group_protocol, consumer_group_protocol]
+
+# These are the remote assignors used by the new group coordinator.
+range_remote_assignor = 'range'
+uniform_remote_assignor = 'uniform'
+all_remote_assignors = [range_remote_assignor, uniform_remote_assignor]
+
+
+def is_consumer_group_protocol_enabled(group_protocol):
+    """Check if the KIP-848 consumer group protocol is enabled."""
+    return group_protocol is not None and group_protocol.lower() == 
consumer_group_protocol
+
+
+def maybe_set_group_protocol(group_protocol, config=None):
+    """Maybe include the KIP-848 group.protocol configuration if it's not 
None."""
+    if config is None:
+        config = {}
+
+    if group_protocol is not None:
+        config["group.protocol"] = group_protocol
+
+    return config
diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index e1155c16aae..7ef5f75e22b 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -177,6 +177,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         self.log_level = log_level
         self.kafka = kafka
         self.topic = topic
+        self.group_protocol = group_protocol
+        self.group_remote_assignor = group_remote_assignor
         self.group_id = group_id
         self.reset_policy = reset_policy
         self.static_membership = static_membership
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index 6dc1fb897b0..d4d6af9f2aa 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -18,7 +18,7 @@ from ducktape.utils.util import wait_until
 from ducktape.mark.resource import cluster
 
 from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
-from kafkatest.services.kafka import TopicPartition, quorum
+from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
 
 import signal
 
@@ -76,14 +76,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
 
     @cluster(num_nodes=7)
     @matrix(
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
+    def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
         """
         Verify correct consumer behavior when the brokers are consecutively 
restarted.
 
@@ -108,7 +109,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         # broker, and that coordinator will fail the consumer and trigger a 
group rebalance if its session times out.
         # This test is asserting that no rebalances occur, so we increase the 
session timeout for this to be the case.
         self.session_timeout_sec = 30
-        consumer = self.setup_consumer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC, 
group_protocol=group_protocol)
 
         producer.start()
         self.await_produced_messages(producer)
@@ -136,16 +137,17 @@ class OffsetValidationTest(VerifiableConsumerTest):
     @matrix(
         clean_shutdown=[True],
         bounce_mode=["all", "rolling"],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         clean_shutdown=[True],
         bounce_mode=["all", "rolling"],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]   
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
         """
         Verify correct consumer behavior when the consumers in the group are 
consecutively restarted.
 
@@ -160,7 +162,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         partition = TopicPartition(self.TOPIC, 0)
 
         producer = self.setup_producer(self.TOPIC)
-        consumer = self.setup_consumer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC, 
group_protocol=group_protocol)
 
         producer.start()
         self.await_produced_messages(producer)
@@ -371,19 +373,20 @@ class OffsetValidationTest(VerifiableConsumerTest):
     @matrix(
         clean_shutdown=[True],
         enable_autocommit=[True, False],
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         clean_shutdown=[True],
         enable_autocommit=[True, False],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_consumer_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
         partition = TopicPartition(self.TOPIC, 0)
 
-        consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit)
+        consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit, group_protocol=group_protocol)
         producer = self.setup_producer(self.TOPIC)
 
         consumer.start()
@@ -436,12 +439,19 @@ class OffsetValidationTest(VerifiableConsumerTest):
         clean_shutdown=[True, False],
         enable_autocommit=[True, False],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[False]
     )
-    def test_broker_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
+    @matrix(
+        clean_shutdown=[True, False],
+        enable_autocommit=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
+    )
+    def test_broker_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
         partition = TopicPartition(self.TOPIC, 0)
 
-        consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit)
+        consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit, group_protocol=group_protocol)
         producer = self.setup_producer(self.TOPIC)
 
         producer.start()
@@ -475,14 +485,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
 
     @cluster(num_nodes=7)
     @matrix(
-        metadata_quorum=[quorum.zk],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=consumer_group.all_group_protocols
     )
-    def test_group_consumption(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False):
+    def test_group_consumption(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
         """
         Verifies correct group rebalance behavior as consumers are started and 
stopped.
         In particular, this test verifies that the partition is readable after 
every
@@ -494,7 +505,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         - Start the consumers one by one, verifying consumption after each 
rebalance
         - Shutdown the consumers one by one, verifying consumption after each 
rebalance
         """
-        consumer = self.setup_consumer(self.TOPIC)
+        consumer = self.setup_consumer(self.TOPIC, 
group_protocol=group_protocol)
         producer = self.setup_producer(self.TOPIC)
 
         partition = TopicPartition(self.TOPIC, 0)
@@ -535,18 +546,25 @@ class AssignmentValidationTest(VerifiableConsumerTest):
     @matrix(
         assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
                              
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
-                             
"org.apache.kafka.clients.consumer.StickyAssignor"], 
-        metadata_quorum=[quorum.zk],
+                             
"org.apache.kafka.clients.consumer.StickyAssignor"],
+        metadata_quorum=[quorum.zk, quorum.isolated_kraft],
         use_new_coordinator=[False]
     )
     @matrix(
         assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
                              
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
-                             
"org.apache.kafka.clients.consumer.StickyAssignor"], 
+                             
"org.apache.kafka.clients.consumer.StickyAssignor"],
         metadata_quorum=[quorum.isolated_kraft],
-        use_new_coordinator=[True, False]
+        use_new_coordinator=[True],
+        group_protocol=[consumer_group.classic_group_protocol],
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        use_new_coordinator=[True],
+        group_protocol=[consumer_group.consumer_group_protocol],
+        group_remote_assignor=consumer_group.all_remote_assignors
     )
-    def test_valid_assignment(self, assignment_strategy, 
metadata_quorum=quorum.zk, use_new_coordinator=False):
+    def test_valid_assignment(self, assignment_strategy=None, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, 
group_remote_assignor=None):
         """
         Verify assignment strategy correctness: each partition is assigned to 
exactly
         one consumer instance.
@@ -556,7 +574,10 @@ class AssignmentValidationTest(VerifiableConsumerTest):
         - Start the consumers one by one
         - Validate assignment after every expected rebalance
         """
-        consumer = self.setup_consumer(self.TOPIC, 
assignment_strategy=assignment_strategy)
+        consumer = self.setup_consumer(self.TOPIC,
+                                       assignment_strategy=assignment_strategy,
+                                       group_protocol=group_protocol,
+                                       
group_remote_assignor=group_remote_assignor)
         for num_started, node in enumerate(consumer.nodes, 1):
             consumer.start_node(node)
             self.await_members(consumer, num_started)
diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py 
b/tests/kafkatest/tests/verifiable_consumer_test.py
index e416690bfc4..38bb6cf3bd9 100644
--- a/tests/kafkatest/tests/verifiable_consumer_test.py
+++ b/tests/kafkatest/tests/verifiable_consumer_test.py
@@ -54,10 +54,11 @@ class VerifiableConsumerTest(KafkaTest):
         return super(VerifiableConsumerTest, self).min_cluster_size() + 
self.num_consumers + self.num_producers
 
     def setup_consumer(self, topic, static_membership=False, 
enable_autocommit=False,
-                       
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", 
**kwargs):
+                       
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor", 
group_remote_assignor="range", **kwargs):
         return VerifiableConsumer(self.test_context, self.num_consumers, 
self.kafka,
                                   topic, self.group_id, 
static_membership=static_membership, 
session_timeout_sec=self.session_timeout_sec,
                                   assignment_strategy=assignment_strategy, 
enable_autocommit=enable_autocommit,
+                                  group_remote_assignor=group_remote_assignor,
                                   log_level="TRACE", **kwargs)
 
     def setup_producer(self, topic, max_messages=-1, throughput=500):

Reply via email to