This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 36c5c111ec Permitting airflow kerberos to run in different modes (#35146) 36c5c111ec is described below commit 36c5c111ec00075db30fab7c67ac1b6900e144dc Author: Amogh Desai <amoghrajesh1...@gmail.com> AuthorDate: Wed Oct 25 19:14:18 2023 +0530 Permitting airflow kerberos to run in different modes (#35146) --- airflow/cli/cli_config.py | 4 ++++ airflow/cli/commands/kerberos_command.py | 7 ++++++- airflow/security/kerberos.py | 25 ++++++++++++++++++++--- docs/apache-airflow/security/kerberos.rst | 23 +++++++++++++++++++++ tests/cli/commands/test_kerberos_command.py | 31 +++++++++++++++++++++++++++-- 5 files changed, 84 insertions(+), 6 deletions(-) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 11d83debb0..f828a63ee1 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -561,6 +561,9 @@ ARG_VAR_ACTION_ON_EXISTING_KEY = Arg( # kerberos ARG_PRINCIPAL = Arg(("principal",), help="kerberos principal", nargs="?") ARG_KEYTAB = Arg(("-k", "--keytab"), help="keytab", nargs="?", default=conf.get("kerberos", "keytab")) +ARG_KERBEROS_ONE_TIME_MODE = Arg( + ("-o", "--one-time"), help="Run airflow kerberos one time instead of forever", action="store_true" +) # run ARG_INTERACTIVE = Arg( ("-N", "--interactive"), @@ -1889,6 +1892,7 @@ core_commands: list[CLICommand] = [ ARG_KEYTAB, ARG_PID, ARG_DAEMON, + ARG_KERBEROS_ONE_TIME_MODE, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE, diff --git a/airflow/cli/commands/kerberos_command.py b/airflow/cli/commands/kerberos_command.py index 8d33e7f8ef..dd7bb1aee3 100644 --- a/airflow/cli/commands/kerberos_command.py +++ b/airflow/cli/commands/kerberos_command.py @@ -20,6 +20,7 @@ from __future__ import annotations from airflow import settings from airflow.cli.commands.daemon_utils import run_command_with_daemon_option from airflow.security import kerberos as krb +from airflow.security.kerberos import KerberosMode from airflow.utils import cli as cli_utils from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -30,8 +31,12 @@ def kerberos(args): """Start a kerberos ticket renewer.""" print(settings.HEADER) + mode = KerberosMode.STANDARD + if args.one_time: + mode = KerberosMode.ONE_TIME + run_command_with_daemon_option( args=args, process_name="kerberos", - callback=lambda: krb.run(principal=args.principal, keytab=args.keytab), + callback=lambda: krb.run(principal=args.principal, keytab=args.keytab, mode=mode), ) diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py index 69afc5d793..2729a07f1a 100644 --- a/airflow/security/kerberos.py +++ b/airflow/security/kerberos.py @@ -17,6 +17,8 @@ # under the License. from __future__ import annotations +from enum import Enum + # Licensed to Cloudera, Inc. under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -47,6 +49,17 @@ NEED_KRB181_WORKAROUND: bool | None = None log = logging.getLogger(__name__) +class KerberosMode(Enum): + """ + Defines modes for running airflow kerberos. + + :return: None. + """ + + STANDARD = "standard" + ONE_TIME = "one-time" + + def get_kerberos_principle(principal: str | None) -> str: """Retrieve Kerberos principal. Fallback to principal from Airflow configuration if not provided.""" return principal or conf.get_mandatory_value("kerberos", "principal").replace("_HOST", get_hostname()) @@ -176,18 +189,24 @@ def detect_conf_var() -> bool: return b"X-CACHECONF:" in file.read() -def run(principal: str | None, keytab: str): +def run(principal: str | None, keytab: str, mode: KerberosMode = KerberosMode.STANDARD): """ Run the kerberos renewer. :param principal: principal name :param keytab: keytab file + :param mode: mode to run the airflow kerberos in :return: None """ if not keytab: log.warning("Keytab renewer not starting, no keytab configured") sys.exit(0) - while True: + log.info("Using airflow kerberos with mode: %s", mode.value) + + if mode == KerberosMode.STANDARD: + while True: + renew_from_kt(principal, keytab) + time.sleep(conf.getint("kerberos", "reinit_frequency")) + elif mode == KerberosMode.ONE_TIME: renew_from_kt(principal, keytab) - time.sleep(conf.getint("kerberos", "reinit_frequency")) diff --git a/docs/apache-airflow/security/kerberos.rst b/docs/apache-airflow/security/kerberos.rst index 345a8c8484..c51997d752 100644 --- a/docs/apache-airflow/security/kerberos.rst +++ b/docs/apache-airflow/security/kerberos.rst @@ -100,6 +100,29 @@ Launch the ticket renewer by # run ticket renewer airflow kerberos +To support more advanced deployment models for using kerberos in standard or one-time fashion, +you can specify the mode while running the ``airflow kerberos`` by using the ``--one-time`` flag. + +a) standard: The airflow kerberos command will run endlessly. The ticket renewer process runs continuously every few seconds +and refreshes the ticket if it has expired. +b) one-time: The airflow kerberos will run once and exit. In case of failure the main task won't spin up. + +The default mode is standard. + +Example usages: + +For standard mode: + +.. code-block:: bash + + airflow kerberos + +For one time mode: + +.. code-block:: bash + + airflow kerberos --one-time + Hadoop ^^^^^^ diff --git a/tests/cli/commands/test_kerberos_command.py b/tests/cli/commands/test_kerberos_command.py index 14eb1676bd..8835d9d287 100644 --- a/tests/cli/commands/test_kerberos_command.py +++ b/tests/cli/commands/test_kerberos_command.py @@ -20,6 +20,7 @@ from unittest import mock from airflow.cli import cli_parser from airflow.cli.commands import kerberos_command +from airflow.security.kerberos import KerberosMode from tests.test_utils.config import conf_vars @@ -34,7 +35,9 @@ class TestKerberosCommand: args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab"]) kerberos_command.kerberos(args) - mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL") + mock_krb.run.assert_called_once_with( + keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD + ) @mock.patch("airflow.cli.commands.daemon_utils.TimeoutPIDLockFile") @mock.patch("airflow.cli.commands.daemon_utils.setup_locations") @@ -69,7 +72,9 @@ class TestKerberosCommand: with mock.patch("airflow.cli.commands.daemon_utils.open", mock_open): kerberos_command.kerberos(args) - mock_krb.run.assert_called_once_with(keytab="/tmp/airflow.keytab", principal="PRINCIPAL") + mock_krb.run.assert_called_once_with( + keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD + ) assert mock_daemon.mock_calls[:3] == [ mock.call.DaemonContext( pidfile=mock_pid_file.return_value, @@ -100,3 +105,25 @@ class TestKerberosCommand: mock.call().__exit__(None, None, None), mock.call().__exit__(None, None, None), ] + + @mock.patch("airflow.cli.commands.kerberos_command.krb") + @conf_vars({("core", "executor"): "CeleryExecutor"}) + def test_run_command_with_mode_standard(self, mock_krb): + args = self.parser.parse_args(["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab"]) + + kerberos_command.kerberos(args) + mock_krb.run.assert_called_once_with( + keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.STANDARD + ) + + @mock.patch("airflow.cli.commands.kerberos_command.krb") + @conf_vars({("core", "executor"): "CeleryExecutor"}) + def test_run_command_with_mode_one_time(self, mock_krb): + args = self.parser.parse_args( + ["kerberos", "PRINCIPAL", "--keytab", "/tmp/airflow.keytab", "--one-time"] + ) + + kerberos_command.kerberos(args) + mock_krb.run.assert_called_once_with( + keytab="/tmp/airflow.keytab", principal="PRINCIPAL", mode=KerberosMode.ONE_TIME + )