This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 5c95a5da31f87becebab223d0a80d0469c3cecf5
Author: Alyssa Huang <ahu...@confluent.io>
AuthorDate: Mon Sep 30 05:51:49 2024 -0700

    MINOR: Fix kafkatest advertised listeners  (#17294)
    
    Followup for #17146
    
    Reviewers: Bill Bejeck <bbej...@apache.org>
---
 tests/kafkatest/services/kafka/kafka.py | 25 +++++++++++++++----------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 6987bbd05de..51eab79db96 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -280,7 +280,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.controller_quorum = None # will define below if necessary
         self.isolated_controller_quorum = None # will define below if necessary
         self.configured_for_zk_migration = False
-        
+        self.dynamicRaftQuorum = False
+
         # Set use_new_coordinator based on context and arguments.
         default_use_new_coordinator = False
        
@@ -761,7 +762,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         for port in self.port_mappings.values():
             if port.open:
                 listeners.append(port.listener())
-                advertised_listeners.append(port.advertised_listener(node))
+                if (self.dynamicRaftQuorum and 
quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role) or \
+                        port.name not in controller_listener_names:
+                    advertised_listeners.append(port.advertised_listener(node))
                 protocol_map.append(port.listener_security_protocol())
         controller_sec_protocol = 
self.isolated_controller_quorum.controller_security_protocol if 
self.isolated_controller_quorum \
             else self.controller_security_protocol if 
self.quorum_info.has_brokers_and_controllers and not 
quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
@@ -881,16 +884,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             # define controller.quorum.bootstrap.servers or 
controller.quorum.voters text
             security_protocol_to_use = 
self.controller_quorum.controller_security_protocol
             first_node_id = 1 if self.quorum_info.has_brokers_and_controllers 
else config_property.FIRST_CONTROLLER_ID
-            controller_quorum_bootstrap_servers = 
','.join(["{}:{}".format(node.account.hostname,
-                                                                           
config_property.FIRST_CONTROLLER_PORT +
-                                                                           
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
-                                                            for node in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
             if self.dynamicRaftQuorum:
-                self.controller_quorum_bootstrap_servers = 
controller_quorum_bootstrap_servers
+                self.controller_quorum_bootstrap_servers = 
','.join(["{}:{}".format(node.account.hostname,
+                                                                               
     config_property.FIRST_CONTROLLER_PORT +
+                                                                               
     KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+                                                                     for node 
in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
             else:
-                self.controller_quorum_voters = ','.join(["%s@%s" % 
(self.controller_quorum.idx(node) + first_node_id - 1,
-                                                                     
bootstrap_server)
-                                                          for bootstrap_server 
in controller_quorum_bootstrap_servers.split(',')])
+                self.controller_quorum_voters = 
','.join(["{}@{}:{}".format(self.controller_quorum.idx(node) +
+                                                                            
first_node_id - 1,
+                                                                            
node.account.hostname,
+                                                                            
config_property.FIRST_CONTROLLER_PORT +
+                                                                            
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
+                                                          for node in 
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
             # define controller.listener.names
             self.controller_listener_names = 
','.join(self.controller_listener_name_list(node))
             # define sasl.mechanism.controller.protocol to match the isolated 
quorum if one exists

Reply via email to