This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e0bb6be71a Rewrite kerberos security integration and unit tests 
(#28092)
e0bb6be71a is described below

commit e0bb6be71a3fe2ba75126b601ea11cd2be9465b5
Author: Andrey Anshin <[email protected]>
AuthorDate: Mon Dec 5 05:58:32 2022 +0300

    Rewrite kerberos security integration and unit tests (#28092)
---
 tests/security/test_kerberos.py | 228 ++++++++++++++++++----------------------
 1 file changed, 101 insertions(+), 127 deletions(-)

diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py
index fa8be0fa21..81f139c782 100644
--- a/tests/security/test_kerberos.py
+++ b/tests/security/test_kerberos.py
@@ -17,98 +17,68 @@
 # under the License.
 from __future__ import annotations
 
+import logging
 import os
 import shlex
-import unittest
-from argparse import Namespace
+from contextlib import nullcontext
 from unittest import mock
 
 import pytest
-from parameterized import parameterized
 
 from airflow.security import kerberos
 from airflow.security.kerberos import renew_from_kt
 from tests.test_utils.config import conf_vars
 
-KRB5_KTNAME = os.environ.get("KRB5_KTNAME")
 
[email protected]("kerberos")
+class TestKerberosIntegration:
+    @classmethod
+    def setup_class(cls):
+        assert "KRB5_KTNAME" in os.environ, "Missing KRB5_KTNAME environment 
variable"
+        cls.keytab = os.environ["KRB5_KTNAME"]
 
[email protected](KRB5_KTNAME is None, "Skipping Kerberos API tests due to 
missing KRB5_KTNAME")
-class TestKerberos(unittest.TestCase):
-    def setUp(self):
-        self.args = Namespace(
-            keytab=KRB5_KTNAME, principal=None, pid=None, daemon=None, 
stdout=None, stderr=None, log_file=None
-        )
-
-    @conf_vars({("kerberos", "keytab"): KRB5_KTNAME})
-    def test_renew_from_kt(self):
-        """
-        We expect no result, but a successful run. No more TypeError
-        """
-        assert renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab) is None
-
-    @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", 
"include_ip"): ""})
-    def test_renew_from_kt_include_ip_empty(self):
-        """
-        We expect no result, but a successful run.
-        """
-        assert renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab) is None
-
-    @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", 
"include_ip"): "False"})
-    def test_renew_from_kt_include_ip_false(self):
-        """
-        We expect no result, but a successful run.
-        """
-        assert renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab) is None
-
-    @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", 
"include_ip"): "True"})
-    def test_renew_from_kt_include_ip_true(self):
-        """
-        We expect no result, but a successful run.
-        """
-        assert renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab) is None
-
-    # Validate forwardable kerberos option
-    @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", 
"forwardable"): ""})
-    def test_renew_from_kt_forwardable_empty(self):
-        """
-        We expect no result, but a successful run.
-        """
-        assert renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab) is None
-
-    @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", 
"forwardable"): "False"})
-    def test_renew_from_kt_forwardable_false(self):
-        """
-        We expect no result, but a successful run.
-        """
-        assert renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab) is None
-
-    @conf_vars({("kerberos", "keytab"): KRB5_KTNAME, ("kerberos", 
"forwardable"): "True"})
-    def test_renew_from_kt_forwardable_true(self):
-        """
-        We expect no result, but a successful run.
-        """
-        assert renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab) is None
-
-    @conf_vars({("kerberos", "keytab"): ""})
-    def test_args_from_cli(self):
-        """
-        We expect no result, but a run with sys.exit(1) because keytab not 
exist.
-        """
-        with pytest.raises(SystemExit) as ctx:
-            renew_from_kt(principal=self.args.principal, 
keytab=self.args.keytab)
-
-            with self.assertLogs(kerberos.log) as log:
-                assert (
-                    f"kinit: krb5_init_creds_set_keytab: Failed to find 
[email protected] in "
-                    f"keytab FILE:{self.args.keytab} (unknown enctype)" in 
log.output
-                )
-
-            assert ctx.value.code == 1
-
+    @pytest.mark.parametrize(
+        "kerberos_config",
+        [
+            pytest.param({}, id="default-config"),
+            pytest.param({("kerberos", "include_ip"): "True"}, 
id="explicit-include-ip"),
+            pytest.param({("kerberos", "include_ip"): "False"}, 
id="explicit-not-include-ip"),
+            pytest.param({("kerberos", "forwardable"): "True"}, 
id="explicit-forwardable"),
+            pytest.param({("kerberos", "forwardable"): "False"}, 
id="explicit-not-forwardable"),
+        ],
+    )
+    def test_renew_from_kt(self, kerberos_config):
+        """We expect return 0 (exit code) and successful run."""
+        with conf_vars(kerberos_config):
+            assert renew_from_kt(principal=None, keytab=self.keytab) == 0
 
-class TestKerberosUnit(unittest.TestCase):
-    @parameterized.expand(
+    @pytest.mark.parametrize(
+        "exit_on_fail, expected_context",
+        [
+            pytest.param(True, pytest.raises(SystemExit), id="exit-on-fail"),
+            pytest.param(False, nullcontext(), id="return-code-of-fail"),
+        ],
+    )
+    def test_args_from_cli(self, exit_on_fail, expected_context, caplog):
+        """Test exit code if keytab not exist."""
+        keytab = "/not/exists/keytab"
+        result = None
+
+        with mock.patch.dict(os.environ, KRB5_KTNAME=keytab), 
conf_vars({("kerberos", "keytab"): keytab}):
+            with expected_context as ctx:
+                with caplog.at_level(logging.ERROR, logger=kerberos.log.name):
+                    caplog.clear()
+                    result = renew_from_kt(principal=None, keytab=keytab, 
exit_on_fail=exit_on_fail)
+
+        # If `exit_on_fail` set to True than exit code in exception, otherwise 
in function return
+        exit_code = ctx.value.code if exit_on_fail else result
+        assert exit_code == 1
+        assert caplog.record_tuples
+
+
+class TestKerberos:
+    @pytest.mark.parametrize(
+        "kerberos_config, expected_cmd",
         [
             (
                 {("kerberos", "reinit_frequency"): "42"},
@@ -158,31 +128,27 @@ class TestKerberosUnit(unittest.TestCase):
                     "test-principal",
                 ],
             ),
-        ]
+        ],
     )
-    def test_renew_from_kt(self, kerberos_config, expected_cmd):
-        with self.assertLogs(kerberos.log) as log_ctx, 
conf_vars(kerberos_config), mock.patch(
-            "airflow.security.kerberos.subprocess"
-        ) as mock_subprocess, mock.patch(
-            "airflow.security.kerberos.NEED_KRB181_WORKAROUND", None
-        ), mock.patch(
-            "airflow.security.kerberos.open", 
mock.mock_open(read_data=b"X-CACHECONF:")
-        ), mock.patch(
-            "time.sleep", return_value=None
-        ):
+    @mock.patch("time.sleep", return_value=None)
+    @mock.patch("airflow.security.kerberos.open", 
mock.mock_open(read_data=b"X-CACHECONF:"))
+    @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
+    @mock.patch("airflow.security.kerberos.subprocess")
+    def test_renew_from_kt(self, mock_subprocess, mock_sleep, kerberos_config, 
expected_cmd, caplog):
+        expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd)
+
+        with conf_vars(kerberos_config), caplog.at_level(logging.INFO, 
logger=kerberos.log.name):
+            caplog.clear()
             
mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
             mock_subprocess.call.return_value = 0
             renew_from_kt(principal="test-principal", keytab="keytab")
 
-        assert mock_subprocess.Popen.call_args[0][0] == expected_cmd
-
-        expected_cmd_text = " ".join(shlex.quote(f) for f in expected_cmd)
-        assert log_ctx.output == [
-            f"INFO:airflow.security.kerberos:Re-initialising kerberos from 
keytab: {expected_cmd_text}",
-            "INFO:airflow.security.kerberos:Renewing kerberos ticket to work 
around kerberos 1.8.1: "
-            "kinit -c /tmp/airflow_krb5_ccache -R",
+        assert caplog.messages == [
+            f"Re-initialising kerberos from keytab: {expected_cmd_text}",
+            "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c 
/tmp/airflow_krb5_ccache -R",
         ]
 
+        assert mock_subprocess.Popen.call_args[0][0] == expected_cmd
         assert mock_subprocess.mock_calls == [
             mock.call.Popen(
                 expected_cmd,
@@ -201,17 +167,17 @@ class TestKerberosUnit(unittest.TestCase):
     @mock.patch("airflow.security.kerberos.subprocess")
     @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
     @mock.patch("airflow.security.kerberos.open", 
mock.mock_open(read_data=b""))
-    def test_renew_from_kt_without_workaround(self, mock_subprocess):
+    def test_renew_from_kt_without_workaround(self, mock_subprocess, caplog):
         mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
         mock_subprocess.call.return_value = 0
 
-        with self.assertLogs(kerberos.log) as log_ctx:
+        with caplog.at_level(logging.INFO, logger=kerberos.log.name):
+            caplog.clear()
             renew_from_kt(principal="test-principal", keytab="keytab")
-
-        assert log_ctx.output == [
-            "INFO:airflow.security.kerberos:Re-initialising kerberos from 
keytab: "
-            "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache 
test-principal"
-        ]
+            assert caplog.messages == [
+                "Re-initialising kerberos from keytab: "
+                "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache 
test-principal"
+            ]
 
         assert mock_subprocess.mock_calls == [
             mock.call.Popen(
@@ -241,21 +207,24 @@ class TestKerberosUnit(unittest.TestCase):
 
     @mock.patch("airflow.security.kerberos.subprocess")
     @mock.patch("airflow.security.kerberos.NEED_KRB181_WORKAROUND", None)
-    def test_renew_from_kt_failed(self, mock_subprocess):
+    def test_renew_from_kt_failed(self, mock_subprocess, caplog):
         mock_subp = mock_subprocess.Popen.return_value.__enter__.return_value
         mock_subp.returncode = 1
         mock_subp.stdout = mock.MagicMock(name="stdout", 
**{"readlines.return_value": ["STDOUT"]})
         mock_subp.stderr = mock.MagicMock(name="stderr", 
**{"readlines.return_value": ["STDERR"]})
 
-        with self.assertLogs(kerberos.log) as log_ctx, 
self.assertRaises(SystemExit):
+        with pytest.raises(SystemExit) as ctx:
+            caplog.clear()
             renew_from_kt(principal="test-principal", keytab="keytab")
+        assert ctx.value.code == 1
 
-        assert log_ctx.output == [
-            "INFO:airflow.security.kerberos:Re-initialising kerberos from 
keytab: "
+        log_records = [record for record in caplog.record_tuples if record[0] 
== kerberos.log.name]
+        assert len(log_records) == 2, log_records
+        assert [lr[1] for lr in log_records] == [logging.INFO, logging.ERROR]
+        assert [lr[2] for lr in log_records] == [
+            "Re-initialising kerberos from keytab: "
             "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache 
test-principal",
-            "ERROR:airflow.security.kerberos:Couldn't reinit from keytab! 
`kinit' exited with 1.\n"
-            "STDOUT\n"
-            "STDERR",
+            "Couldn't reinit from keytab! `kinit' exited with 
1.\nSTDOUT\nSTDERR",
         ]
 
         assert mock_subprocess.mock_calls == [
@@ -289,22 +258,25 @@ class TestKerberosUnit(unittest.TestCase):
     @mock.patch("airflow.security.kerberos.open", 
mock.mock_open(read_data=b"X-CACHECONF:"))
     @mock.patch("airflow.security.kerberos.get_hostname", return_value="HOST")
     @mock.patch("time.sleep", return_value=None)
-    def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn, 
mock_subprocess):
+    def test_renew_from_kt_failed_workaround(self, mock_sleep, mock_getfqdn, 
mock_subprocess, caplog):
         mock_subprocess.Popen.return_value.__enter__.return_value.returncode = 0
         mock_subprocess.call.return_value = 1
 
-        with self.assertLogs(kerberos.log) as log_ctx, 
self.assertRaises(SystemExit):
+        with pytest.raises(SystemExit) as ctx:
+            caplog.clear()
             renew_from_kt(principal="test-principal", keytab="keytab")
+        assert ctx.value.code == 1
 
-        assert log_ctx.output == [
-            "INFO:airflow.security.kerberos:Re-initialising kerberos from 
keytab: "
+        log_records = [record for record in caplog.record_tuples if record[0] 
== kerberos.log.name]
+        assert len(log_records) == 3, log_records
+        assert [lr[1] for lr in log_records] == [logging.INFO, logging.INFO, 
logging.ERROR]
+        assert [lr[2] for lr in log_records] == [
+            "Re-initialising kerberos from keytab: "
             "kinit -f -a -r 3600m -k -t keytab -c /tmp/airflow_krb5_ccache 
test-principal",
-            "INFO:airflow.security.kerberos:Renewing kerberos ticket to work 
around kerberos 1.8.1: "
-            "kinit -c /tmp/airflow_krb5_ccache -R",
-            "ERROR:airflow.security.kerberos:Couldn't renew kerberos ticket in 
order to work around "
+            "Renewing kerberos ticket to work around kerberos 1.8.1: kinit -c 
/tmp/airflow_krb5_ccache -R",
+            "Couldn't renew kerberos ticket in order to work around "
             "Kerberos 1.8.1 issue. Please check that the ticket for 
'test-principal/HOST' is still "
-            "renewable:\n"
-            "  $ kinit -f -c /tmp/airflow_krb5_ccache\n"
+            "renewable:\n  $ kinit -f -c /tmp/airflow_krb5_ccache\n"
             "If the 'renew until' date is the same as the 'valid starting' 
date, the ticket cannot be "
             "renewed. Please check your KDC configuration, and the ticket 
renewal policy (maxrenewlife) for "
             "the 'test-principal/HOST' and `krbtgt' principals.",
@@ -337,19 +309,21 @@ class TestKerberosUnit(unittest.TestCase):
             mock.call.call(["kinit", "-c", "/tmp/airflow_krb5_ccache", "-R"], 
close_fds=True),
         ]
 
-    def test_run_without_keytab(self):
-        with self.assertLogs(kerberos.log) as log_ctx, 
self.assertRaises(SystemExit):
-            kerberos.run(principal="test-principal", keytab=None)
-        assert log_ctx.output == [
-            "WARNING:airflow.security.kerberos:Keytab renewer not starting, no 
keytab configured"
-        ]
+    def test_run_without_keytab(self, caplog):
+        with pytest.raises(SystemExit) as ctx:
+            with caplog.at_level(logging.WARNING, logger=kerberos.log.name):
+                caplog.clear()
+                kerberos.run(principal="test-principal", keytab=None)
+        assert ctx.value.code == 0
+        assert caplog.messages == ["Keytab renewer not starting, no keytab 
configured"]
 
     @mock.patch("airflow.security.kerberos.renew_from_kt")
     @mock.patch("time.sleep", return_value=None)
     def test_run(self, mock_sleep, mock_renew_from_kt):
         mock_renew_from_kt.side_effect = [1, 1, SystemExit(42)]
-        with self.assertRaises(SystemExit):
+        with pytest.raises(SystemExit) as ctx:
             kerberos.run(principal="test-principal", keytab="/tmp/keytab")
+        assert ctx.value.code == 42
         assert mock_renew_from_kt.mock_calls == [
             mock.call("test-principal", "/tmp/keytab"),
             mock.call("test-principal", "/tmp/keytab"),

Reply via email to