This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e0bb6be71a Rewrite kerberos security integration and unit tests
(#28092)
e0bb6be71a is described below
commit e0bb6be71a3fe2ba75126b601ea11cd2be9465b5
Author: Andrey Anshin <[email protected]>
AuthorDate: Mon Dec 5 05:58:32 2022 +0300
Rewrite kerberos security integration and unit tests (#28092)
---
tests/security/test_kerberos.py | 228 ++++++++++++++++++----------------------
1 file changed, 101 insertions(+), 127 deletions(-)
diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py
index fa8be0fa21..81f139c782 100644
--- a/tests/security/test_kerberos.py
+++ b/tests/security/test_kerberos.py
@@ -17,98 +17,68 @@
# under the License.
from __future__ import annotations
+import logging
import os
import shlex
-import unittest
-from argparse import Namespace
+from contextlib import nullcontext
from unittest import mock
import pytest
-from parameterized import parameterized
from airflow.security import kerberos
from airflow.security.kerberos import renew_from_kt
from tests.test_utils.config import conf_vars
-KRB5_KTNAME = os.environ.get("KRB5_KTNAME")
[email protected]("kerberos")
+class TestKerberosIntegration:
+ @classmethod
+ def setup_class(cls):
+ assert "KRB5_KTNAME" in os.environ, "Missing KRB5_KTNAME environment
variable"
+ cls.keytab = os.environ["KRB5_KTNAME"]
[email protected](KRB5_KTNAME is None, "Skipping Kerberos API tests due to
missing KRB5_KTNAME")
-class TestKerberos(unittest.TestCase):
- def setUp(self):
- self.args = Namespace(
- keytab=KRB5_KTNAME, principal=None, pid=None, daemon=None,
stdout=None, stderr=None, log_file=None
- )
-
- @conf_vars({("kerberos", "keytab"): KRB5_KTNAME})
- def test_renew_from_kt(self):
- """
- We expect no result, but a successful run. No more TypeError
- """
- assert renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab) is None
-
- @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos",
"include_ip"): ""})
- def test_renew_from_kt_include_ip_empty(self):
- """
- We expect no result, but a successful run.
- """
- assert renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab) is None
-
- @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos",
"include_ip"): "False"})
- def test_renew_from_kt_include_ip_false(self):
- """
- We expect no result, but a successful run.
- """
- assert renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab) is None
-
- @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos",
"include_ip"): "True"})
- def test_renew_from_kt_include_ip_true(self):
- """
- We expect no result, but a successful run.
- """
- assert renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab) is None
-
- # Validate forwardable kerberos option
- @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos",
"forwardable"): ""})
- def test_renew_from_kt_forwardable_empty(self):
- """
- We expect no result, but a successful run.
- """
- assert renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab) is None
-
- @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos",
"forwardable"): "False"})
- def test_renew_from_kt_forwardable_false(self):
- """
- We expect no result, but a successful run.
- """
- assert renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab) is None
-
- @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos",
"forwardable"): "True"})
- def test_renew_from_kt_forwardable_true(self):
- """
- We expect no result, but a successful run.
- """
- assert renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab) is None
-
- @conf_vars({("kerberos", "keytab"): ""})
- def test_args_from_cli(self):
- """
- We expect no result, but a run with sys.exit(1) because keytab not
exist.
- """
- with pytest.raises(SystemExit) as ctx:
- renew_from_kt(principal=self.args.principal,
keytab=self.args.keytab)
-
- with self.assertLogs(kerberos.log) as log:
- assert (
- f"kinit: krb5_init_creds_set_keytab: Failed to find
[email protected] in "
- f"keytab FILE:{self.args.keytab} (unknown enctype)" in
log.output
- )
-
- assert ctx.value.code == 1
-
+ @pytest.mark.parametrize(
+ "kerberos_config",
+ [
+ pytest.param({}, id="default-config"),
+ pytest.param({("kerberos", "include_ip"): "True"},
id="explicit-include-ip"),
+ pytest.param({("kerberos", "include_ip"): "False"},
id="explicit-not-include-ip"),
+ pytest.param({("kerberos", "forwardable"): "True"},
id="explicit-forwardable"),
+ pytest.param({("kerberos", "forwardable"): "False"},
id="explicit-not-forwardable"),
+ ],
+ )
+ def test_renew_from_kt(self, kerberos_config):
+ """We expect return 0 (exit code) and successful run."""
+ with conf_vars(kerberos_config):
+ assert renew_from_kt(principal=None, keytab=self.keytab) == 0
-class TestKerberosUnit(unittest.TestCase):
- @parameterized.expand(
+ @pytest.mark.parametrize(
+ "exit_on_fail, expected_context",
+ [
+ pytest.param(True, pytest.raises(SystemExit), id="exit-on-fail"),
+ pytest.param(False, nullcontext(), id="return-code-of-fail"),
+ ],
+ )
+ def test_args_from_cli(self, exit_on_fail, expected_context, caplog):
+ """Test exit code if keytab not exist."""
+ keytab = "/not/exists/keytab"
+ result = None
+
+ with mock.patch.dict(os.environ, KRB5_KTNAME=keytab),
conf_vars({("kerberos", "keytab"): keytab}):
+ with expected_context as ctx:
+ with caplog.at_level(logging.ERROR, logger=kerberos.log.name):
+ caplog.clear()
+ result = renew_from_kt(principal=None, keytab=keytab,
exit_on_fail=exit_on_fail)
+
+ # If `exit_on_fail` set to True than exit code in exception, otherwise
in function return
+ exit_code = ctx.value.code if exit_on_fail else result
+ assert exit_code == 1
+ assert caplog.record_tuples
+
+
+class TestKerberos:
+ @pytest.mark.parametrize(
+ "kerberos_config, expected_cmd",
[
(
{("kerberos", "reinit_frequency"): "42"},
@@ -158,31 +128,27 @@ class TestKerberosUnit(unittest.TestCase):
"test-principal",
],
),
- ]
+ ],
)
- def test_renew_from_kt(self, kerberos_config, expected_cmd):
- with self.assertLogs(kerberos.log) as log_ctx,
conf_vars(kerberos_config), mock.patch(
- "airflow.security.kerberos.subprocess"
- ) as mock_subprocess, mock.patch(
- "airflow.security.kerberos.NEED_KRB181_WORKAROUND", None
- ), mock.patch(
- "airflow.security.kerberos.open",
mock.mock_open(read_data=b"X-CACHECONF:")
- ), mock.patch(
- "time.sleep", return_value=None
- ):
+ @mock.patch("time.sleep", return_value=None)
+ @mock.patch("airflow.security.kerberos.open",
mock.mock_open(read_data=b"X-CACHECONF:"))
+ @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
+ @mock.patch("airflow.security.kerberos.subprocess")
+ def test_renew_from_kt(self, mock_subprocess, mock_sleep, kerberos_config,
expected_cmd, caplog):
+ expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd)
+
+ with conf_vars(kerberos_config), caplog.at_level(logging.INFO,
logger=kerberos.log.name):
+ caplog.clear()
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
mock_subprocess.call.return_value = 0
renew_from_kt(principal="test-principal", keytab="keytab")
- assert mock_subprocess.Popen.call_args[0][0] == expected_cmd
-
- expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd)
- assert log_ctx.output == [
- f"INFO:airflow.security.kerberos:Re-initialising kerberos from
keytab: {expected_cmd_text}",
- "INFO:airflow.security.kerberos:Renewing kerberos ticket to work
around kerberos 1.8.1: "
- "kinit -c /tmp/airflow_krb5_ccache -R",
+ assert caplog.messages == [
+ f"Re-initialising kerberos from keytab: {expected_cmd_text}",
+ "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c
/tmp/airflow_krb5_ccache -R",
]
+ assert mock_subprocess.Popen.call_args[0][0] == expected_cmd
assert mock_subprocess.mock_calls == [
mock.call.Popen(
expected_cmd,
@@ -201,17 +167,17 @@ class TestKerberosUnit(unittest.TestCase):
@mock.patch("airflow.security.kerberos.subprocess")
@mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
@mock.patch("airflow.security.kerberos.open",
mock.mock_open(read_data=b""))
- def test_renew_from_kt_without_workaround(self, mock_subprocess):
+ def test_renew_from_kt_without_workaround(self, mock_subprocess, caplog):
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
mock_subprocess.call.return_value = 0
- with self.assertLogs(kerberos.log) as log_ctx:
+ with caplog.at_level(logging.INFO, logger=kerberos.log.name):
+ caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")
-
- assert log_ctx.output == [
- "INFO:airflow.security.kerberos:Re-initialising kerberos from
keytab: "
- "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache
test-principal"
- ]
+ assert caplog.messages == [
+ "Re-initialising kerberos from keytab: "
+ "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache
test-principal"
+ ]
assert mock_subprocess.mock_calls == [
mock.call.Popen(
@@ -241,21 +207,24 @@ class TestKerberosUnit(unittest.TestCase):
@mock.patch("airflow.security.kerberos.subprocess")
@mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
- def test_renew_from_kt_failed(self, mock_subprocess):
+ def test_renew_from_kt_failed(self, mock_subprocess, caplog):
mock_subp = mock_subprocess.Popen.return_value.__enter__.return_value
mock_subp.returncode = 1
mock_subp.stdout = mock.MagicMock(name="stdout",
**{"readlines.return_value": ["STDOUT"]})
mock_subp.stderr = mock.MagicMock(name="stderr",
**{"readlines.return_value": ["STDERR"]})
- with self.assertLogs(kerberos.log) as log_ctx,
self.assertRaises(SystemExit):
+ with pytest.raises(SystemExit) as ctx:
+ caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")
+ assert ctx.value.code == 1
- assert log_ctx.output == [
- "INFO:airflow.security.kerberos:Re-initialising kerberos from
keytab: "
+ log_records = [record for record in caplog.record_tuples if record[0]
== kerberos.log.name]
+ assert len(log_records) == 2, log_records
+ assert [lr[1] for lr in log_records] == [logging.INFO, logging.ERROR]
+ assert [lr[2] for lr in log_records] == [
+ "Re-initialising kerberos from keytab: "
"kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache
test-principal",
- "ERROR:airflow.security.kerberos:Couldn't reinit from keytab!
`kinit' exited with 1.\n"
- "STDOUT\n"
- "STDERR",
+ "Couldn't reinit from keytab! `kinit' exited with
1.\nSTDOUT\nSTDERR",
]
assert mock_subprocess.mock_calls == [
@@ -289,22 +258,25 @@ class TestKerberosUnit(unittest.TestCase):
@mock.patch("airflow.security.kerberos.open",
mock.mock_open(read_data=b"X-CACHECONF:"))
@mock.patch("airflow.security.kerberos.get_hostname", return_value="HOST")
@mock.patch("time.sleep", return_value=None)
- def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn,
mock_subprocess):
+ def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn,
mock_subprocess, caplog):
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
mock_subprocess.call.return_value = 1
- with self.assertLogs(kerberos.log) as log_ctx,
self.assertRaises(SystemExit):
+ with pytest.raises(SystemExit) as ctx:
+ caplog.clear()
renew_from_kt(principal="test-principal", keytab="keytab")
+ assert ctx.value.code == 1
- assert log_ctx.output == [
- "INFO:airflow.security.kerberos:Re-initialising kerberos from
keytab: "
+ log_records = [record for record in caplog.record_tuples if record[0]
== kerberos.log.name]
+ assert len(log_records) == 3, log_records
+ assert [lr[1] for lr in log_records] == [logging.INFO, logging.INFO,
logging.ERROR]
+ assert [lr[2] for lr in log_records] == [
+ "Re-initialising kerberos from keytab: "
"kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache
test-principal",
- "INFO:airflow.security.kerberos:Renewing kerberos ticket to work
around kerberos 1.8.1: "
- "kinit -c /tmp/airflow_krb5_ccache -R",
- "ERROR:airflow.security.kerberos:Couldn't renew kerberos ticket in
order to work around "
+ "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c
/tmp/airflow_krb5_ccache -R",
+ "Couldn't renew kerberos ticket in order to work around "
"Kerberos 1.8.1 issue. Please check that the ticket for
'test-principal/HOST' is still "
- "renewable:\n"
- " $ kinit -f -c /tmp/airflow_krb5_ccache\n"
+ "renewable:\n $ kinit -f -c /tmp/airflow_krb5_ccache\n"
"If the 'renew until' date is the same as the 'valid starting'
date, the ticket cannot be "
"renewed. Please check your KDC configuration, and the ticket
renewal policy (maxrenewlife) for "
"the 'test-principal/HOST' and `krbtgt' principals.",
@@ -337,19 +309,21 @@ class TestKerberosUnit(unittest.TestCase):
mock.call.call(["kinit", "-c", "/tmp/airflow_krb5_ccache", "-R"],
close_fds=True),
]
- def test_run_without_keytab(self):
- with self.assertLogs(kerberos.log) as log_ctx,
self.assertRaises(SystemExit):
- kerberos.run(principal="test-principal", keytab=None)
- assert log_ctx.output == [
- "WARNING:airflow.security.kerberos:Keytab renewer not starting, no
keytab configured"
- ]
+ def test_run_without_keytab(self, caplog):
+ with pytest.raises(SystemExit) as ctx:
+ with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
+ caplog.clear()
+ kerberos.run(principal="test-principal", keytab=None)
+ assert ctx.value.code == 0
+ assert caplog.messages == ["Keytab renewer not starting, no keytab
configured"]
@mock.patch("airflow.security.kerberos.renew_from_kt")
@mock.patch("time.sleep", return_value=None)
def test_run(self, mock_sleep, mock_renew_from_kt):
mock_renew_from_kt.side_effect = [1, 1, SystemExit(42)]
- with self.assertRaises(SystemExit):
+ with pytest.raises(SystemExit) as ctx:
kerberos.run(principal="test-principal", keytab="/tmp/keytab")
+ assert ctx.value.code == 42
assert mock_renew_from_kt.mock_calls == [
mock.call("test-principal", "/tmp/keytab"),
mock.call("test-principal", "/tmp/keytab"),