This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f66610095c6 KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577) f66610095c6 is described below commit f66610095c6342260ea21f994db0570b1a3c5e51 Author: Kirk True <k...@kirktrue.pro> AuthorDate: Fri Mar 22 06:57:37 2024 -0700 KAFKA-16274: Update replica_scale_test.py to support KIP-848’s group protocol config (#15577) Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved. Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢 Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- tests/kafkatest/tests/core/replica_scale_test.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/kafkatest/tests/core/replica_scale_test.py b/tests/kafkatest/tests/core/replica_scale_test.py index 22555d202d3..8500c0bf302 100644 --- a/tests/kafkatest/tests/core/replica_scale_test.py +++ b/tests/kafkatest/tests/core/replica_scale_test.py @@ -20,7 +20,7 @@ from ducktape.tests.test import Test from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec from kafkatest.services.trogdor.task_spec import TaskSpec -from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.kafka import KafkaService, quorum, consumer_group from kafkatest.services.trogdor.trogdor import TrogdorService from kafkatest.services.zookeeper import ZookeeperService @@ -52,7 +52,7 @@ class ReplicaScaleTest(Test): topic_count=[50], partition_count=[34], replication_factor=[3], - metadata_quorum=[quorum.zk], + metadata_quorum=[quorum.zk, quorum.isolated_kraft], use_new_coordinator=[False] ) @matrix( @@ -60,10 +60,11 @@ class ReplicaScaleTest(Test): partition_count=[34], replication_factor=[3], metadata_quorum=[quorum.isolated_kraft], - use_new_coordinator=[True, False] + use_new_coordinator=[True], + group_protocol=consumer_group.all_group_protocols ) def test_produce_consume(self, topic_count, partition_count, replication_factor, - metadata_quorum=quorum.zk, use_new_coordinator=False): + metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): topics_create_start_time = time.time() for i in range(topic_count): topic = "replicas_produce_consume_%d" % i @@ -101,12 +102,13 @@ class ReplicaScaleTest(Test): produce_workload.wait_for_done(timeout_sec=600) print("Completed produce bench", flush=True) # Force some stdout for Travis + consumer_conf = consumer_group.maybe_set_group_protocol(group_protocol) consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, consumer_workload_service.consumer_node, consumer_workload_service.bootstrap_servers, target_messages_per_sec=150000, max_messages=3400000, - consumer_conf={}, + consumer_conf=consumer_conf, admin_client_conf={}, common_client_conf={}, active_topics=["replicas_produce_consume_[0-2]"])