imcdo commented on code in PR #14524: URL: https://github.com/apache/kafka/pull/14524#discussion_r1355442947
########## tests/kafkatest/services/kafka/kafka.py: ########## @@ -277,6 +278,9 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI self.isolated_controller_quorum = None # will define below if necessary self.configured_for_zk_migration = False + if use_new_coordinator is None : + self.use_new_coordinator = self.context.globals.get["use_new_coordinator", False] Review Comment: get is a method, use () ```suggestion self.use_new_coordinator = self.context.globals.get("use_new_coordinator", False) ``` ########## tests/kafkatest/services/kafka/config.py: ########## @@ -28,7 +28,8 @@ class KafkaConfig(dict): config_property.METADATA_LOG_SEGMENT_BYTES: str(9*1024*1024), # 9 MB config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: str(10*1024*1024), # 10 MB config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10 MB - config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000) # one minute + config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000), # one minute + config_property.NEW_GROUP_COORDINATOR_ENABLE: False Review Comment: Nit: is there a better name than new group coordinator? Not very descriptive and could easily become the old group coordinator if we arn't careful. ########## tests/kafkatest/services/kafka/kafka.py: ########## @@ -407,6 +411,7 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI kraft_broker_configs = { config_property.PORT: config_property.FIRST_BROKER_PORT, config_property.NODE_ID: self.idx(node), + config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator } Review Comment: dont we want to conditionally add this when `use_new_coordinator` is true? ```suggestion kraft_broker_configs = { config_property.PORT: config_property.FIRST_BROKER_PORT, config_property.NODE_ID: self.idx(node) } if self.use_new_coordinator: kraft_broker_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = use_new_coordinator } ``` that way we stick with the default value? Or do we care? Either was is fine. ########## tests/kafkatest/tests/core/consume_bench_test.py: ########## @@ -91,11 +99,17 @@ def test_consume_bench(self, topics, metadata_quorum=quorum.zk): self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_single_partition(self, metadata_quorum=quorum.zk): + @matrix( + metadata_quorum=quorum.all_non_upgrade, + use_new_coordinator=[True, False] + ) Review Comment: same applies here: ```suggestion @matrix( metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] ) @matrix( metadata_quorum=[quorum.zk], use_new_coordinator=[False] ) ``` There are some alternitives, one could be your `skip_if_new_coordinator_and_zk` wrapper could go into the method metadata, and remove the testcases with use_new_coordinator=True and metadata_quorum=zk, though id advise against it as its not that simple. ########## tests/kafkatest/services/kafka/kafka.py: ########## @@ -277,6 +278,9 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI self.isolated_controller_quorum = None # will define below if necessary Review Comment: make sure you add a description of the param you are adding to the docstring for the __init__ method ########## tests/kafkatest/tests/core/consume_bench_test.py: ########## @@ -68,12 +67,21 @@ def produce_messages(self, topics, max_messages=10000): self.logger.debug("Produce workload finished") @cluster(num_nodes=10) - @matrix(topics=[["consume_bench_topic[0-5]"]], metadata_quorum=quorum.all_non_upgrade) # topic subscription - @matrix(topics=[["consume_bench_topic[0-5]:[0-4]"]], metadata_quorum=quorum.all_non_upgrade) # manual topic assignment - def test_consume_bench(self, topics, metadata_quorum=quorum.zk): + @matrix( + topics=[ + ["consume_bench_topic[0-5]"], + ["consume_bench_topic[0-5]:[0-4]"] + ], + metadata_quorum=quorum.all_non_upgrade, + use_new_coordinator=[True, False] + ) # topic subscription Review Comment: You probably shouldn't skip zk + new coordinator test, just dont run it in the first place. We dont really want to polute the report with a bunch of skips (which are normally reserved for broken test that we intend to fix later historicly). ```suggestion @matrix( topics=[ ["consume_bench_topic[0-5]"], ["consume_bench_topic[0-5]:[0-4]"] ], metadata_quorum=metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False] ) @matrix( topics=[ ["consume_bench_topic[0-5]"], ["consume_bench_topic[0-5]:[0-4]"] ], metadata_quorum=[quorum.zk], use_new_coordinator=[False] ) # topic subscription ``` as annoying as that is, prob for the best. ########## tests/kafkatest/services/kafka/kafka.py: ########## @@ -764,6 +769,9 @@ def prop_file(self, node): else: override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false' + if self.use_new_coordinator: + override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 'true' Review Comment: should we limit this to only kraft brokers instead of all brokers or does it not matter? ########## tests/kafkatest/tests/core/consume_bench_test.py: ########## @@ -188,12 +220,18 @@ def test_multiple_consumers_random_group_partitions(self, metadata_quorum=quorum self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) @cluster(num_nodes=10) - @matrix(metadata_quorum=quorum.all_non_upgrade) - def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk): + @matrix( + metadata_quorum=quorum.all_non_upgrade, + use_new_coordinator=[True, False] + ) + @skip_if_new_coordinator_and_zk + def test_multiple_consumers_specified_group_partitions_should_raise(self, metadata_quorum=quorum.zk, use_new_coordinator=False): """ Runs multiple consumers in the same group to read messages from specific partitions. It is an invalid configuration to provide a consumer group and specific partitions. """ + self.kafka.use_new_coordinator = use_new_coordinator Review Comment: make sure in your testing this actually works by sshing onto all the brokers to see if its set properly. The way its configured now, at `kafka.py` construction, the kraft brokers configs sets this to False, then at runtime, the override sets it to `use_new_coordinator` so it should work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org