[
https://issues.apache.org/jira/browse/KAFKA-4544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16706703#comment-16706703
]
ASF GitHub Bot commented on KAFKA-4544:
---------------------------------------
omkreddy closed pull request #5660: KAFKA-4544: Add system tests for delegation
token based authentication
URL: https://github.com/apache/kafka/pull/5660
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/tests/kafkatest/services/console_consumer.py
b/tests/kafkatest/services/console_consumer.py
index 65c9fa589c0..dfbec9f83da 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -60,8 +60,9 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
def __init__(self, context, num_nodes, kafka, topic,
group_id="test-consumer-group", new_consumer=True,
message_validator=None, from_beginning=True,
consumer_timeout_ms=None, version=DEV_BRANCH,
client_id="console-consumer", print_key=False,
jmx_object_names=None, jmx_attributes=None,
- enable_systest_events=False, stop_timeout_sec=30,
print_timestamp=False,
- isolation_level="read_uncommitted",
jaas_override_variables=None):
+ enable_systest_events=False, stop_timeout_sec=35,
print_timestamp=False,
+ isolation_level="read_uncommitted",
jaas_override_variables=None,
+ kafka_opts_override="", client_prop_file_override=""):
"""
Args:
context: standard context
@@ -83,7 +84,8 @@ def __init__(self, context, num_nodes, kafka, topic,
group_id="test-consumer-gro
print_timestamp if True, print each message's
timestamp as well
isolation_level How to handle transactional messages.
jaas_override_variables A dict of variables to be used in the
jaas.conf template file
-
+ kafka_opts_override Override parameters of the KAFKA_OPTS
environment variable
+ client_prop_file_override Override client.properties file used
by the consumer
"""
JmxMixin.__init__(self, num_nodes=num_nodes,
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT)
@@ -116,6 +118,9 @@ def __init__(self, context, num_nodes, kafka, topic,
group_id="test-consumer-gro
self.print_timestamp = print_timestamp
self.jaas_override_variables = jaas_override_variables or {}
+ self.kafka_opts_override = kafka_opts_override
+ self.client_prop_file_override = client_prop_file_override
+
def prop_file(self, node):
"""Return a string which can be used to create a configuration file
appropriate for the given node."""
@@ -134,6 +139,7 @@ def prop_file(self, node):
prop_file += str(self.security_config)
return prop_file
+
def start_cmd(self, node):
"""Return the start command appropriate for the given node."""
args = self.args.copy()
@@ -147,14 +153,19 @@ def start_cmd(self, node):
args['jmx_port'] = self.jmx_port
args['console_consumer'] =
self.path.script("kafka-console-consumer.sh", node)
args['broker_list'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
- args['kafka_opts'] = self.security_config.kafka_opts
+
+ if self.kafka_opts_override:
+ args['kafka_opts'] = "\"%s\"" % self.kafka_opts_override
+ else:
+ args['kafka_opts'] = self.security_config.kafka_opts
cmd = "export JMX_PORT=%(jmx_port)s; " \
"export LOG_DIR=%(log_dir)s; " \
"export
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
"export KAFKA_OPTS=%(kafka_opts)s; " \
"%(console_consumer)s " \
- "--topic %(topic)s --consumer.config %(config_file)s" % args
+ "--topic %(topic)s " \
+ "--consumer.config %(config_file)s " % args
if self.new_consumer:
assert node.version >= V_0_9_0_0, \
@@ -209,7 +220,15 @@ def _worker(self, idx, node):
# Create and upload config file
self.logger.info("console_consumer.properties:")
- prop_file = self.prop_file(node)
+ self.security_config =
self.kafka.security_config.client_config(node=node,
+
jaas_override_variables=self.jaas_override_variables)
+ self.security_config.setup_node(node)
+
+ if self.client_prop_file_override:
+ prop_file = self.client_prop_file_override
+ else:
+ prop_file = self.prop_file(node)
+
self.logger.info(prop_file)
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
diff --git a/tests/kafkatest/services/delegation_tokens.py
b/tests/kafkatest/services/delegation_tokens.py
new file mode 100644
index 00000000000..34da16bcab2
--- /dev/null
+++ b/tests/kafkatest/services/delegation_tokens.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os.path
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+
+"""
+Delegation tokens is a tool to manage the lifecycle of delegation tokens.
+All commands are executed on a secured Kafka node reusing its generated
jaas.conf and krb5.conf.
+"""
+
+class DelegationTokens(KafkaPathResolverMixin):
+ def __init__(self, kafka, context):
+ self.client_properties_content = """
+security.protocol=SASL_PLAINTEXT
+sasl.kerberos.service.name=kafka
+"""
+ self.context = context
+ self.command_path = self.path.script("kafka-delegation-tokens.sh")
+ self.kafka_opts =
"KAFKA_OPTS=\"-Djava.security.auth.login.config=/mnt/security/jaas.conf " \
+ "-Djava.security.krb5.conf=/mnt/security/krb5.conf\"
"
+ self.kafka = kafka
+ self.bootstrap_server = " --bootstrap-server " +
self.kafka.bootstrap_servers('SASL_PLAINTEXT')
+ self.base_cmd = self.kafka_opts + self.command_path +
self.bootstrap_server
+ self.client_prop_path = os.path.join(self.kafka.PERSISTENT_ROOT,
"client.properties")
+ self.jaas_deleg_conf_path = os.path.join(self.kafka.PERSISTENT_ROOT,
"jaas_deleg.conf")
+ self.token_hmac_path = os.path.join(self.kafka.PERSISTENT_ROOT,
"deleg_token_hmac.out")
+ self.delegation_token_out = os.path.join(self.kafka.PERSISTENT_ROOT,
"delegation_token.out")
+ self.expire_delegation_token_out =
os.path.join(self.kafka.PERSISTENT_ROOT, "expire_delegation_token.out")
+ self.renew_delegation_token_out =
os.path.join(self.kafka.PERSISTENT_ROOT, "renew_delegation_token.out")
+
+ self.node = self.kafka.nodes[0]
+
+ def generate_delegation_token(self, maxlifetimeperiod=-1):
+ self.node.account.create_file(self.client_prop_path,
self.client_properties_content)
+
+ cmd = self.base_cmd + " --create" \
+ " --max-life-time-period %s" \
+ " --command-config %s > %s" %
(maxlifetimeperiod, self.client_prop_path, self.delegation_token_out)
+ self.node.account.ssh(cmd, allow_fail=False)
+
+ def expire_delegation_token(self, hmac):
+ cmd = self.base_cmd + " --expire" \
+ " --expiry-time-period -1" \
+ " --hmac %s" \
+ " --command-config %s > %s" % (hmac,
self.client_prop_path, self.expire_delegation_token_out)
+ self.node.account.ssh(cmd, allow_fail=False)
+
+ def renew_delegation_token(self, hmac, renew_time_period=-1):
+ cmd = self.base_cmd + " --renew" \
+ " --renew-time-period %s" \
+ " --hmac %s" \
+ " --command-config %s > %s" \
+ % (renew_time_period, hmac, self.client_prop_path,
self.renew_delegation_token_out)
+ return self.node.account.ssh_capture(cmd, allow_fail=False)
+
+ def create_jaas_conf_with_delegation_token(self):
+ dt = self.parse_delegation_token_out()
+ jaas_deleg_content = """
+KafkaClient {
+ org.apache.kafka.common.security.scram.ScramLoginModule required
+ username="%s"
+ password="%s"
+ tokenauth=true;
+};
+""" % (dt["tokenid"], dt["hmac"])
+ self.node.account.create_file(self.jaas_deleg_conf_path,
jaas_deleg_content)
+
+ return jaas_deleg_content
+
+ def token_hmac(self):
+ dt = self.parse_delegation_token_out()
+ return dt["hmac"]
+
+ def parse_delegation_token_out(self):
+ cmd = "tail -1 %s" % self.delegation_token_out
+
+ output_iter = self.node.account.ssh_capture(cmd, allow_fail=False)
+ output = ""
+ for line in output_iter:
+ output += line
+
+ tokenid, hmac, owner, renewers, issuedate, expirydate, maxdate =
output.split()
+ return {"tokenid" : tokenid,
+ "hmac" : hmac,
+ "owner" : owner,
+ "renewers" : renewers,
+ "issuedate" : issuedate,
+ "expirydate" :expirydate,
+ "maxdate" : maxdate}
\ No newline at end of file
diff --git a/tests/kafkatest/services/kafka/config_property.py
b/tests/kafkatest/services/kafka/config_property.py
index 621b8e5b55b..853bd83896f 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -50,6 +50,12 @@
LOG_ROLL_TIME_MS = "log.roll.ms"
OFFSETS_TOPIC_NUM_PARTITIONS = "offsets.topic.num.partitions"
+DELEGATION_TOKEN_MAX_LIFETIME_MS="delegation.token.max.lifetime.ms"
+DELEGATION_TOKEN_EXPIRY_TIME_MS="delegation.token.expiry.time.ms"
+DELEGATION_TOKEN_MASTER_KEY="delegation.token.master.key"
+SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
+
+
"""
From KafkaConfig.scala
diff --git a/tests/kafkatest/services/security/templates/jaas.conf
b/tests/kafkatest/services/security/templates/jaas.conf
index e2511451e32..3d6c93e09be 100644
--- a/tests/kafkatest/services/security/templates/jaas.conf
+++ b/tests/kafkatest/services/security/templates/jaas.conf
@@ -15,7 +15,7 @@
{% if static_jaas_conf %}
KafkaClient {
{% endif %}
-{% if client_sasl_mechanism == "GSSAPI" %}
+{% if "GSSAPI" in client_sasl_mechanism %}
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
@@ -33,7 +33,7 @@ KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="client"
password="client-secret";
-{% elif client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism ==
"SCRAM-SHA-512" %}
+{% elif "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in
client_sasl_mechanism %}
org.apache.kafka.common.security.scram.ScramLoginModule required
username="{{ SecurityConfig.SCRAM_CLIENT_USER }}"
password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}";
@@ -65,7 +65,7 @@ KafkaServer {
user_client="client-secret"
user_kafka="kafka-secret";
{% endif %}
-{% if client_sasl_mechanism == "SCRAM-SHA-256" or client_sasl_mechanism ==
"SCRAM-SHA-512" %}
+{% if "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in
client_sasl_mechanism %}
org.apache.kafka.common.security.scram.ScramLoginModule required
username="{{ SecurityConfig.SCRAM_BROKER_USER }}"
password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}";
diff --git a/tests/kafkatest/services/verifiable_producer.py
b/tests/kafkatest/services/verifiable_producer.py
index 7fa2654db49..f339a62313f 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -58,18 +58,21 @@ def __init__(self, context, num_nodes, kafka, topic,
max_messages=-1, throughput
message_validator=is_int, compression_types=None,
version=DEV_BRANCH, acks=None,
stop_timeout_sec=150, request_timeout_sec=30,
log_level="INFO",
enable_idempotence=False, offline_nodes=[], create_time=-1,
repeating_keys=None,
- jaas_override_variables=None):
+ jaas_override_variables=None, kafka_opts_override="",
client_prop_file_override=""):
"""
- :param max_messages is a number of messages to be produced per producer
- :param message_validator checks for an expected format of messages
produced. There are
- currently two:
- * is_int is an integer format; this is default and expected to
be used if
- num_nodes = 1
- * is_int_with_prefix recommended if num_nodes > 1, because
otherwise each producer
- will produce exactly same messages, and validation may miss
missing messages.
- :param compression_types: If None, all producers will not use
compression; or a list of
- compression types, one per producer (could be "none").
- :param jaas_override_variables: A dict of variables to be used in the
jaas.conf template file
+ Args:
+ :param max_messages number of messages to be
produced per producer
+ :param message_validator checks for an expected format
of messages produced. There are
+ currently two:
+ * is_int is an integer format;
this is default and expected to be used if
+ num_nodes = 1
+ * is_int_with_prefix
recommended if num_nodes > 1, because otherwise each producer
+ will produce exactly same
messages, and validation may miss missing messages.
+ :param compression_types If None, all producers will not
use compression; or a list of compression types,
+ one per producer (could be
"none").
+ :param jaas_override_variables A dict of variables to be used
in the jaas.conf template file
+ :param kafka_opts_override Override parameters of the
KAFKA_OPTS environment variable
+ :param client_prop_file_override Override client.properties file
used by the consumer
"""
super(VerifiableProducer, self).__init__(context, num_nodes)
self.log_level = log_level
@@ -97,6 +100,9 @@ def __init__(self, context, num_nodes, kafka, topic,
max_messages=-1, throughput
self.create_time = create_time
self.repeating_keys = repeating_keys
self.jaas_override_variables = jaas_override_variables or {}
+ self.kafka_opts_override = kafka_opts_override
+ self.client_prop_file_override = client_prop_file_override
+
def java_class_name(self):
return "VerifiableProducer"
@@ -125,7 +131,11 @@ def _worker(self, idx, node):
self.security_config.setup_node(node)
# Create and upload config file
- producer_prop_file = self.prop_file(node)
+ if self.client_prop_file_override:
+ producer_prop_file = self.client_prop_file_override
+ else:
+ producer_prop_file = self.prop_file(node)
+
if self.acks is not None:
self.logger.info("VerifiableProducer (index = %d) will use acks =
%s", idx, self.acks)
producer_prop_file += "\nacks=%s\n" % self.acks
@@ -189,7 +199,11 @@ def _has_output(self, node):
def start_cmd(self, node, idx):
cmd = "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
- cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+ if self.kafka_opts_override:
+ cmd += " export KAFKA_OPTS=\"%s\";" % self.kafka_opts_override
+ else:
+ cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
+
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; "
% VerifiableProducer.LOG4J_CONFIG
cmd += self.impl.exec_cmd(node)
cmd += " --topic %s --broker-list %s" % (self.topic,
self.kafka.bootstrap_servers(self.security_config.security_protocol, True,
self.offline_nodes))
@@ -207,6 +221,7 @@ def start_cmd(self, node, idx):
cmd += " --repeating-keys %s " % str(self.repeating_keys)
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
+
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE,
VerifiableProducer.STDOUT_CAPTURE)
return cmd
diff --git a/tests/kafkatest/tests/core/delegation_token_test.py
b/tests/kafkatest/tests/core/delegation_token_test.py
new file mode 100644
index 00000000000..0b2b6eb63fc
--- /dev/null
+++ b/tests/kafkatest/tests/core/delegation_token_test.py
@@ -0,0 +1,130 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+from kafkatest.services.kafka import config_property, KafkaService
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.delegation_tokens import DelegationTokens
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from datetime import datetime
+import time
+
+"""
+Basic tests to validate delegation token support
+"""
+class DelegationTokenTest(Test):
+ def __init__(self, test_context):
+ super(DelegationTokenTest, self).__init__(test_context)
+
+ self.test_context = test_context
+ self.topic = "topic"
+ self.zk = ZookeeperService(test_context, num_nodes=1)
+ self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
zk_chroot="/kafka",
+ topics={self.topic: {"partitions": 1,
"replication-factor": 1}},
+ server_prop_overides=[
+
[config_property.DELEGATION_TOKEN_MAX_LIFETIME_MS, "604800000"],
+
[config_property.DELEGATION_TOKEN_EXPIRY_TIME_MS, "86400000"],
+
[config_property.DELEGATION_TOKEN_MASTER_KEY, "test12345"],
+
[config_property.SASL_ENABLED_MECHANISMS, "GSSAPI,SCRAM-SHA-256"]
+ ])
+ self.jaas_deleg_conf_path = "/tmp/jaas_deleg.conf"
+ self.jaas_deleg_conf = ""
+ self.client_properties_content = """
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=SCRAM-SHA-256
+sasl.kerberos.service.name=kafka
+client.id=console-consumer
+"""
+ self.client_kafka_opts=' -Djava.security.auth.login.config=' +
self.jaas_deleg_conf_path
+
+ self.producer = VerifiableProducer(self.test_context, num_nodes=1,
kafka=self.kafka, topic=self.topic, max_messages=1,
+ throughput=1,
kafka_opts_override=self.client_kafka_opts,
+
client_prop_file_override=self.client_properties_content)
+
+ self.consumer = ConsoleConsumer(self.test_context, num_nodes=1,
kafka=self.kafka, topic=self.topic,
+
kafka_opts_override=self.client_kafka_opts,
+
client_prop_file_override=self.client_properties_content)
+
+ self.kafka.security_protocol = 'SASL_PLAINTEXT'
+ self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
+ self.kafka.interbroker_sasl_mechanism = 'GSSAPI'
+
+
+ def setUp(self):
+ self.zk.start()
+
+ def tearDown(self):
+ self.producer.nodes[0].account.remove(self.jaas_deleg_conf_path)
+ self.consumer.nodes[0].account.remove(self.jaas_deleg_conf_path)
+
+ def generate_delegation_token(self):
+ self.logger.debug("Request delegation token")
+ self.delegation_tokens.generate_delegation_token()
+ self.jaas_deleg_conf =
self.delegation_tokens.create_jaas_conf_with_delegation_token()
+
+ def expire_delegation_token(self):
+ self.kafka.client_sasl_mechanism = 'GSSAPI,SCRAM-SHA-256'
+ token_hmac = self.delegation_tokens.token_hmac()
+ self.delegation_tokens.expire_delegation_token(token_hmac)
+
+
+ def produce_with_delegation_token(self):
+ self.producer.acked_values = []
+ self.producer.nodes[0].account.create_file(self.jaas_deleg_conf_path,
self.jaas_deleg_conf)
+ self.logger.debug(self.jaas_deleg_conf)
+ self.producer.start()
+
+ def consume_with_delegation_token(self):
+ self.logger.debug("Consume messages with delegation token")
+
+ self.consumer.nodes[0].account.create_file(self.jaas_deleg_conf_path,
self.jaas_deleg_conf)
+ self.logger.debug(self.jaas_deleg_conf)
+ self.consumer.consumer_timeout_ms = 5000
+
+ self.consumer.start()
+ self.consumer.wait()
+
+ def get_datetime_ms(self, input_date):
+ return
int(time.mktime(datetime.strptime(input_date,"%Y-%m-%dT%H:%M").timetuple()) *
1000)
+
+ def renew_delegation_token(self):
+ dt = self.delegation_tokens.parse_delegation_token_out()
+ orig_expiry_date_ms = self.get_datetime_ms(dt["expirydate"])
+ new_expirydate_ms = orig_expiry_date_ms + 1000
+
+ self.delegation_tokens.renew_delegation_token(dt["hmac"],
new_expirydate_ms)
+
+ def test_delegation_token_lifecycle(self):
+ self.kafka.start()
+ self.delegation_tokens = DelegationTokens(self.kafka,
self.test_context)
+
+ self.generate_delegation_token()
+ self.renew_delegation_token()
+ self.produce_with_delegation_token()
+ wait_until(lambda: self.producer.num_acked > 0, timeout_sec=30,
+ err_msg="Expected producer to still be producing.")
+ assert 1 == self.producer.num_acked, "number of acked messages: %d" %
self.producer.num_acked
+
+ self.consume_with_delegation_token()
+ num_consumed = len(self.consumer.messages_consumed[1])
+ assert 1 == num_consumed, "number of consumed messages: %d" %
num_consumed
+
+ self.expire_delegation_token()
+
+ self.produce_with_delegation_token()
+ assert 0 == self.producer.num_acked, "number of acked messages: %d" %
self.producer.num_acked
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add system tests for delegation token based authentication
> ----------------------------------------------------------
>
> Key: KAFKA-4544
> URL: https://issues.apache.org/jira/browse/KAFKA-4544
> Project: Kafka
> Issue Type: Sub-task
> Components: security
> Reporter: Ashish Singh
> Assignee: Attila Sasvari
> Priority: Major
> Fix For: 2.2.0
>
>
> Add system tests for delegation token based authentication.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)