This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5c95a5da31f87becebab223d0a80d0469c3cecf5 Author: Alyssa Huang <ahu...@confluent.io> AuthorDate: Mon Sep 30 05:51:49 2024 -0700 MINOR: Fix kafkatest advertised listeners (#17294) Followup for #17146 Reviewers: Bill Bejeck <bbej...@apache.org> --- tests/kafkatest/services/kafka/kafka.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 6987bbd05de..51eab79db96 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -280,7 +280,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.controller_quorum = None # will define below if necessary self.isolated_controller_quorum = None # will define below if necessary self.configured_for_zk_migration = False - + self.dynamicRaftQuorum = False + # Set use_new_coordinator based on context and arguments. default_use_new_coordinator = False @@ -761,7 +762,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): for port in self.port_mappings.values(): if port.open: listeners.append(port.listener()) - advertised_listeners.append(port.advertised_listener(node)) + if (self.dynamicRaftQuorum and quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role) or \ + port.name not in controller_listener_names: + advertised_listeners.append(port.advertised_listener(node)) protocol_map.append(port.listener_security_protocol()) controller_sec_protocol = self.isolated_controller_quorum.controller_security_protocol if self.isolated_controller_quorum \ else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \ @@ -881,16 +884,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): # define controller.quorum.bootstrap.servers or controller.quorum.voters text security_protocol_to_use = self.controller_quorum.controller_security_protocol first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID - controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname, - config_property.FIRST_CONTROLLER_PORT + - KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use)) - for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]]) if self.dynamicRaftQuorum: - self.controller_quorum_bootstrap_servers = controller_quorum_bootstrap_servers + self.controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname, + config_property.FIRST_CONTROLLER_PORT + + KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use)) + for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]]) else: - self.controller_quorum_voters = ','.join(["%s@%s" % (self.controller_quorum.idx(node) + first_node_id - 1, - bootstrap_server) - for bootstrap_server in controller_quorum_bootstrap_servers.split(',')]) + self.controller_quorum_voters = ','.join(["{}@{}:{}".format(self.controller_quorum.idx(node) + + first_node_id - 1, + node.account.hostname, + config_property.FIRST_CONTROLLER_PORT + + KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use)) + for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]]) # define controller.listener.names self.controller_listener_names = ','.join(self.controller_listener_name_list(node)) # define sasl.mechanism.controller.protocol to match the isolated quorum if one exists