This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff4d9510277 KAFKA-17715 Remove argument force_use_zk_connection from
kafka_acls_cmd_with_optional_security_settings (#19209)
ff4d9510277 is described below
commit ff4d9510277c474a4be17f95fade9c0c479ec5ab
Author: Ming-Yen Chung <[email protected]>
AuthorDate: Wed Jul 9 17:07:56 2025 +0800
KAFKA-17715 Remove argument force_use_zk_connection from
kafka_acls_cmd_with_optional_security_settings (#19209)
The e2e tests currently cover version 2.1.0 and above. Thus, we can
remove `force_use_zk_connection` in
`kafka_acls_cmd_with_optional_security_settings`
In contrast, the `force_use_zk_connection` in
`kafka_topics_cmd_with_optional_security_settings` and
`kafka_configs_cmd_with_optional_security_settings` still needs to be
kept as `kafka-topics.sh` does not support `--bootstrap-server` in 2.1
and 2.2
e2e test result:
```
===========================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.12.0
session_id: 2025-07-02--001
run time: 200 minutes 28.399 seconds
tests run: 90
passed: 90
flaky: 0
failed: 0
ignored: 0
===========================================
```
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
tests/kafkatest/services/kafka/kafka.py | 37 +++++++++----------------
tests/kafkatest/services/security/kafka_acls.py | 8 ++----
tests/kafkatest/tests/client/quota_test.py | 2 +-
tests/kafkatest/tests/core/authorizer_test.py | 5 ++--
tests/kafkatest/version.py | 3 --
5 files changed, 20 insertions(+), 35 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index ff106425005..c80197c3a5a 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1188,12 +1188,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
return False
return True
- def all_nodes_acl_command_supports_bootstrap_server(self):
- for node in self.nodes:
- if not node.version.acl_command_supports_bootstrap_server():
- return False
- return True
-
def all_nodes_reassign_partitions_command_supports_bootstrap_server(self):
for node in self.nodes:
if not
node.version.reassign_partitions_command_supports_bootstrap_server():
@@ -1350,30 +1344,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)
- def kafka_acls_cmd_with_optional_security_settings(self, node,
force_use_zk_connection, kafka_security_protocol = None,
override_command_config = None):
+ def kafka_acls_cmd_with_optional_security_settings(self, node,
kafka_security_protocol = None, override_command_config = None):
if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
raise Exception("Must invoke kafka-acls against a broker, not a
KRaft controller")
- force_use_zk_connection = force_use_zk_connection or not
self.all_nodes_acl_command_supports_bootstrap_server
- if force_use_zk_connection:
- bootstrap_server_or_authorizer_zk_props = "--authorizer-properties
zookeeper.connect=%s" % (self.zk_connect_setting())
- skip_optional_security_settings = True
- else:
- if kafka_security_protocol is None:
- # it wasn't specified, so use the inter-broker security
protocol if it is PLAINTEXT,
- # otherwise use the client security protocol
- if self.interbroker_security_protocol ==
SecurityConfig.PLAINTEXT:
- security_protocol_to_use = SecurityConfig.PLAINTEXT
- else:
- security_protocol_to_use = self.security_protocol
+ if kafka_security_protocol is None:
+ # it wasn't specified, so use the inter-broker security protocol
if it is PLAINTEXT,
+ # otherwise use the client security protocol
+ if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+ security_protocol_to_use = SecurityConfig.PLAINTEXT
else:
- security_protocol_to_use = kafka_security_protocol
- bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s"
% (self.bootstrap_servers(security_protocol_to_use))
- skip_optional_security_settings = security_protocol_to_use ==
SecurityConfig.PLAINTEXT
+ security_protocol_to_use = self.security_protocol
+ else:
+ security_protocol_to_use = kafka_security_protocol
+ bootstrap_server = "--bootstrap-server %s" %
(self.bootstrap_servers(security_protocol_to_use))
+ skip_optional_security_settings = security_protocol_to_use ==
SecurityConfig.PLAINTEXT
if skip_optional_security_settings:
optional_jass_krb_system_props_prefix = ""
optional_command_config_suffix = ""
else:
- # we need security configs because aren't going to ZooKeeper and
we aren't using PLAINTEXT
+ # we need security configs because we aren't using PLAINTEXT
if (security_protocol_to_use ==
self.interbroker_security_protocol):
# configure JAAS to provide the broker's credentials
# since this is an authenticating cluster and we are going to
use the inter-broker security protocol
@@ -1393,7 +1382,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
kafka_acls_script = self.path.script("kafka-acls.sh", node)
return "%s%s %s%s" % \
(optional_jass_krb_system_props_prefix, kafka_acls_script,
- bootstrap_server_or_authorizer_zk_props,
optional_command_config_suffix)
+ bootstrap_server, optional_command_config_suffix)
def run_cli_tool(self, node, cmd):
output = ""
diff --git a/tests/kafkatest/services/security/kafka_acls.py
b/tests/kafkatest/services/security/kafka_acls.py
index 5bd1a46f597..8c6f946b109 100644
--- a/tests/kafkatest/services/security/kafka_acls.py
+++ b/tests/kafkatest/services/security/kafka_acls.py
@@ -19,13 +19,11 @@ class ACLs:
def __init__(self, context):
self.context = context
- def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False,
additional_cluster_operations_to_grant = [], security_protocol=None):
+ def add_cluster_acl(self, kafka, principal,
additional_cluster_operations_to_grant = [], security_protocol=None):
"""
:param kafka: Kafka cluster upon which ClusterAction ACL is created
:param principal: principal for which ClusterAction ACL is created
:param node: Node to use when determining connection settings
- :param force_use_zk_connection: forces the use of ZooKeeper when true,
otherwise AdminClient is used when available.
- This is necessary for the case where we are bootstrapping ACLs
before Kafka is started or before authorizer is enabled
:param additional_cluster_operations_to_grant may be set to ['Alter',
'Create'] if the cluster is secured since these are required
to create SCRAM credentials and topics, respectively
:param security_protocol set it to explicitly determine whether we use
client or broker credentials, otherwise
@@ -37,7 +35,7 @@ class ACLs:
for operation in ['ClusterAction'] +
additional_cluster_operations_to_grant:
cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s
--allow-principal=%(principal)s" % {
- 'cmd_prefix':
kafka.kafka_acls_cmd_with_optional_security_settings(node,
force_use_zk_connection, security_protocol),
+ 'cmd_prefix':
kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
'operation': operation,
'principal': principal
}
@@ -59,7 +57,7 @@ class ACLs:
for operation in ['ClusterAction'] +
additional_cluster_operations_to_remove:
cmd = "%(cmd_prefix)s --remove --force --cluster
--operation=%(operation)s --allow-principal=%(principal)s" % {
- 'cmd_prefix':
kafka.kafka_acls_cmd_with_optional_security_settings(node, False,
security_protocol),
+ 'cmd_prefix':
kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
'operation': operation,
'principal': principal
}
diff --git a/tests/kafkatest/tests/client/quota_test.py
b/tests/kafkatest/tests/client/quota_test.py
index e89fea80eee..36d4eca08fd 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -78,7 +78,7 @@ class QuotaConfig(object):
def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate,
entity_args):
node = kafka.nodes[0]
cmd = "%s --alter --add-config
producer_byte_rate=%d,consumer_byte_rate=%d" % \
- (kafka.kafka_configs_cmd_with_optional_security_settings(node,
False), producer_byte_rate, consumer_byte_rate)
+ (kafka.kafka_configs_cmd_with_optional_security_settings(node,
force_use_zk_connection=False), producer_byte_rate, consumer_byte_rate)
cmd += " --entity-type " + entity_args[0] +
self.entity_name_opt(entity_args[1])
if len(entity_args) > 2:
cmd += " --entity-type " + entity_args[2] +
self.entity_name_opt(entity_args[3])
diff --git a/tests/kafkatest/tests/core/authorizer_test.py
b/tests/kafkatest/tests/core/authorizer_test.py
index 60c0612f356..260b16b1f6f 100644
--- a/tests/kafkatest/tests/core/authorizer_test.py
+++ b/tests/kafkatest/tests/core/authorizer_test.py
@@ -98,8 +98,9 @@ class AuthorizerTest(Test):
# add ACLs
self.logger.info("Adding ACLs with broker credentials so that alter
client quotas command will succeed")
- self.acls.add_cluster_acl(self.kafka, client_principal,
force_use_zk_connection=False,
-
additional_cluster_operations_to_grant=['AlterConfigs'],
security_protocol=broker_security_protocol)
+ self.acls.add_cluster_acl(self.kafka, client_principal,
+
additional_cluster_operations_to_grant=['AlterConfigs'],
+ security_protocol=broker_security_protocol)
# the alter client quotas command should now succeed again
self.logger.info(alter_client_quotas_cmd_log_msg)
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index cc442bc4bbd..16f3169d500 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -62,9 +62,6 @@ class KafkaVersion(LooseVersion):
return LooseVersion._cmp(self, other)
- def acl_command_supports_bootstrap_server(self):
- return self >= V_2_1_0
-
def topic_command_supports_bootstrap_server(self):
return self >= V_2_3_0