This is an automated email from the ASF dual-hosted git repository. xddeng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 497c2c243d Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436) 497c2c243d is described below commit 497c2c243dd168639d34ff35e02e62d5177de338 Author: Xiaodong DENG <xdd...@apache.org> AuthorDate: Mon Dec 19 04:29:09 2022 +0100 Log FileTaskHandler to work with KubernetesExecutor's multi_namespace_mode (#28436) --- airflow/utils/log/file_task_handler.py | 8 +++-- tests/utils/test_log_handlers.py | 61 +++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 6aee75ee33..b8feb2997b 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -191,16 +191,20 @@ class FileTaskHandler(logging.Handler): log += f"*** {str(e)}\n" return log, {"end_of_log": True} elif self._should_check_k8s(ti.queue): + pod_override = ti.executor_config.get("pod_override") + if pod_override and pod_override.metadata and pod_override.metadata.namespace: + namespace = pod_override.metadata.namespace + else: + namespace = conf.get("kubernetes_executor", "namespace") try: from airflow.kubernetes.kube_client import get_kube_client kube_client = get_kube_client() log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n" - res = kube_client.read_namespaced_pod_log( name=ti.hostname, - namespace=conf.get("kubernetes_executor", "namespace"), + namespace=namespace, container="base", follow=False, tail_lines=100, diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index ee2ff2d9ce..8b7f0145de 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,9 +21,10 @@ import logging import logging.config import os import re -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest +from kubernetes.client import models as k8s from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance @@ -219,6 +220,64 @@ class TestFileTaskLogHandler: # Remove the generated tmp log file. os.remove(log_filename) + @pytest.mark.parametrize( + "pod_override, namespace_to_call", + [ + pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-A")), "namespace-A"), + pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(namespace="namespace-B")), "namespace-B"), + pytest.param(k8s.V1Pod(), "default"), + pytest.param(None, "default"), + pytest.param(k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="pod-name-xxx")), "default"), + ], + ) + @patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="KubernetesExecutor") + @patch("airflow.kubernetes.kube_client.get_kube_client") + def test_read_from_k8s_under_multi_namespace_mode( + self, mock_kube_client, pod_override, namespace_to_call + ): + mock_read_namespaced_pod_log = MagicMock() + mock_kube_client.return_value.read_namespaced_pod_log = mock_read_namespaced_pod_log + + def task_callable(ti): + ti.log.info("test") + + dag = DAG("dag_for_testing_file_task_handler", start_date=DEFAULT_DATE) + dagrun = dag.create_dagrun( + run_type=DagRunType.MANUAL, + state=State.RUNNING, + execution_date=DEFAULT_DATE, + ) + executor_config_pod = pod_override + task = PythonOperator( + task_id="task_for_testing_file_log_handler", + dag=dag, + python_callable=task_callable, + executor_config={"pod_override": executor_config_pod}, + ) + ti = TaskInstance(task=task, run_id=dagrun.run_id) + ti.try_number = 3 + + logger = ti.log + ti.log.disabled = False + + file_handler = next( + (handler for handler in logger.handlers if handler.name == FILE_TASK_HANDLER), None + ) + set_context(logger, ti) + ti.run(ignore_ti_state=True) + + file_handler.read(ti, 3) + + # Check if kube_client.read_namespaced_pod_log() is called with the namespace we expect + mock_read_namespaced_pod_log.assert_called_once_with( + name=ti.hostname, + namespace=namespace_to_call, + container="base", + follow=False, + tail_lines=100, + _preload_content=False, + ) + class TestFilenameRendering: def test_python_formatting(self, create_log_template, create_task_instance):