This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new ac6ae23 KAFKA-13582:
TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)
ac6ae23 is described below
commit ac6ae231a4750be40cb2eb72d8d78d5081470391
Author: Ron Dagostino <[email protected]>
AuthorDate: Mon Jan 10 14:54:26 2022 -0500
KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols
fails (#11664)
KRaft brokers always use the first controller listener, so if there is not
also a colocated KRaft controller on the node be sure to only publish one
controller listener in `controller.listener.names` even when the
inter-controller listener name differs. System tests were failing due to
unnecessarily publishing a second entry in `controller.listener.names` for a
broker-only config and not also publishing a mapping for it in
`listener.security.protocol.map`. Removing the unnecessary e [...]
Reviewers: David Jacot <[email protected]>
---
tests/kafkatest/services/kafka/kafka.py | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index f275853..55b5b7b 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -677,7 +677,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
advertised_listeners = []
protocol_map = []
- controller_listener_names = self.controller_listener_name_list()
+ controller_listener_names = self.controller_listener_name_list(node)
for port in self.port_mappings.values():
if port.open:
@@ -758,12 +758,17 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd
- def controller_listener_name_list(self):
+ def controller_listener_name_list(self, node):
if self.quorum_info.using_zk:
return []
broker_to_controller_listener_name =
self.controller_listener_name(self.controller_quorum.controller_security_protocol)
- return [broker_to_controller_listener_name] if
(self.controller_quorum.intercontroller_security_protocol ==
self.controller_quorum.controller_security_protocol) \
- else [broker_to_controller_listener_name,
self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
+ # Brokers always use the first controller listener, so include a
second, inter-controller listener if and only if:
+ # 1) the node is a controller node
+ # 2) the inter-controller listener name differs from the
broker-to-controller listener name
+ return [broker_to_controller_listener_name,
self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
\
+ if (quorum.NodeQuorumInfo(self.quorum_info,
node).has_controller_role and
+ self.controller_quorum.intercontroller_security_protocol !=
self.controller_quorum.controller_security_protocol) \
+ else [broker_to_controller_listener_name]
def start_node(self, node, timeout_sec=60):
if node not in self.nodes_to_start:
@@ -772,7 +777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
if self.quorum_info.has_controllers:
- for controller_listener in self.controller_listener_name_list():
+ for controller_listener in
self.controller_listener_name_list(node):
if self.node_quorum_info.has_controller_role:
self.open_port(controller_listener)
else: # co-located case where node doesn't have a controller
@@ -793,7 +798,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in
self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
- self.controller_listener_names =
','.join(self.controller_listener_name_list())
+ self.controller_listener_names =
','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match remote quorum
if one exists
if self.remote_controller_quorum:
self.controller_sasl_mechanism =
self.remote_controller_quorum.controller_sasl_mechanism