This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1958bcc05d810beff0fa715b55d5d07aa3bfdd8c Author: Daniel Standish <[email protected]> AuthorDate: Sat Sep 13 19:56:06 2025 -0700 Be a little clearer when task likely OOM error (#55602) * Be a little clearer when task likely OOM error When a task process is terminated with exit code -9 (sigkill) then there's a good chance it's due to out of memory error (OOM). We can be a little more direct about indicating that in the logs. And I reorganize the way we present that in the docs a bit. * fix test (cherry picked from commit e3f42b7f656fec43dcd51c76f700724df14920b3) --- airflow-core/docs/troubleshooting.rst | 22 ++++++++++++++++------ .../src/airflow/sdk/execution_time/supervisor.py | 5 +++-- .../task_sdk/execution_time/test_supervisor.py | 16 ++++++++++++---- 3 files changed, 31 insertions(+), 12 deletions(-) diff --git a/airflow-core/docs/troubleshooting.rst b/airflow-core/docs/troubleshooting.rst index 6756ab87e10..b5c904cac96 100644 --- a/airflow-core/docs/troubleshooting.rst +++ b/airflow-core/docs/troubleshooting.rst @@ -36,16 +36,26 @@ Below are some example scenarios that could cause a task's state to change by a - A user marked the task as successful or failed in the Airflow UI. - An external script or process used the :doc:`Airflow REST API <stable-rest-api-ref>` to change the state of a task. -TaskRunner killed ------------------ +Process terminated by signal +---------------------------- Sometimes, Airflow or some adjacent system will kill a task instance's ``TaskRunner``, causing the task instance to fail. -Here are some examples that could cause such an event: +Below we discuss a few common cases. -- A Dag run timeout, specified by ``dagrun_timeout`` in the Dag's definition. -- An Airflow worker running out of memory - - Usually, Airflow workers that run out of memory receive a SIGKILL, and the scheduler will fail the corresponding task instance for not having a heartbeat. However, in some scenarios, Airflow kills the task before that happens. +Dag run timeout +""""""""""""""" + +A dag run timeout can be specified by ``dagrun_timeout`` in the dag's definition. +The task process would likely be killed with SIGTERM (exit code -15). + +Out of memory error (OOM) +""""""""""""""""""""""""" + +When a task process consumes too much memory for a worker, the best case scenario is it is killed +with SIGTERM (exit code -9). Depending on configuration and infrastructure, it is also +possible that the whole worker will be killed due to OOM and then the tasks would be marked as +failed after failing to heartbeat. Lingering task supervisor processes ----------------------------------- diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py b/task-sdk/src/airflow/sdk/execution_time/supervisor.py index e3e4a1cbe83..76a52acd98f 100644 --- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py @@ -803,11 +803,12 @@ class WatchedSubprocess: self.process_log.critical(SIGSEGV_MESSAGE) # psutil turns signal exit codes into an enum for us. Handy. (Otherwise it's a plain integer) if exit_code and (name := getattr(exit_code, "name")): elif name := getattr(self._exit_code, "name", None): - message = "Process terminated by signal" + message = "Process terminated by signal." level = logging.ERROR if self._exit_code == -signal.SIGKILL: - message += ". For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#TaskRunner-killed" + message += " Likely out of memory error (OOM)." level = logging.CRITICAL + message += " For more information, see https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#process-terminated-by-signal." self.process_log.log(level, message, signal=int(self._exit_code), signal_name=name) elif self._exit_code: # Run of the mill exit code (1, 42, etc). diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index e6b0a3dccd9..ce60753bf6a 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -930,21 +930,29 @@ class TestWatchedSubprocess: mock_logger.warning.assert_not_called() @pytest.mark.parametrize( - ["signal_to_raise", "log_pattern"], + ["signal_to_raise", "log_pattern", "level"], ( pytest.param( signal.SIGKILL, - re.compile(r"Process terminated by signal. For more information, see"), + re.compile(r"Process terminated by signal. Likely out of memory error"), + "critical", id="kill", ), + pytest.param( + signal.SIGTERM, + re.compile(r"Process terminated by signal. For more information"), + "error", + id="term", + ), pytest.param( signal.SIGSEGV, re.compile(r".*SIGSEGV \(Segmentation Violation\) signal indicates", re.DOTALL), + "critical", id="segv", ), ), ) - def test_exit_by_signal(self, signal_to_raise, log_pattern, cap_structlog, client_with_ti_start): + def test_exit_by_signal(self, signal_to_raise, log_pattern, level, cap_structlog, client_with_ti_start): def subprocess_main(): import faulthandler import os @@ -976,7 +984,7 @@ class TestWatchedSubprocess: rc = proc.wait() assert { - "log_level": "critical", + "log_level": level, "event": log_pattern, } in cap_structlog assert rc == -signal_to_raise
