This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 377e8cdcc5 Swallow exception when mini-scheduling raises an exception (#41260) 377e8cdcc5 is described below commit 377e8cdcc54c6bcc7ec277f458da37ef8a0fdddf Author: Jarek Potiuk <ja...@potiuk.com> AuthorDate: Tue Aug 6 19:00:12 2024 +0200 Swallow exception when mini-scheduling raises an exception (#41260) When mini-scheduler raises an exception, it has a bit weird side effect - the task succeeds but it is seen as failed and scheduler gets confused. Also flower celery worker in this case shows an error. This happens for example when DAG contains non-serializable tasks. This PR swallows any exceptions raised in mini-scheduler and simply logs them as error rather than fail the process. Mini-scheduler is generally optional and we are also already sometimes skipping it already so occasional skipping is not a big problem. Fixes: #39717 --- airflow/models/taskinstance.py | 16 +++++++++++++--- tests/models/test_taskinstance.py | 16 ++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index ed38782bea..f53fb5e274 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -3892,9 +3892,19 @@ class TaskInstance(Base, LoggingMixin): :meta: private """ - return TaskInstance._schedule_downstream_tasks( - ti=self, session=session, max_tis_per_query=max_tis_per_query - ) + try: + return TaskInstance._schedule_downstream_tasks( + ti=self, session=session, max_tis_per_query=max_tis_per_query + ) + except Exception: + self.log.exception( + "Error scheduling downstream tasks. Skipping it as this is entirely optional optimisation. " + "There might be various reasons for it, please take a look at the stack trace to figure " + "out if the root cause can be diagnosed and fixed. See the issue " + "https://github.com/apache/airflow/issues/39717 for details and an example problem. If you " + "would like to get help in solving root cause, open discussion with all details with your " + "managed service support or in Airflow repository." + ) def get_relevant_upstream_map_indexes( self, diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 08a2ae9019..32158dc00d 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -19,6 +19,7 @@ from __future__ import annotations import contextlib import datetime +import logging import operator import os import pathlib @@ -5140,3 +5141,18 @@ def test__refresh_from_db_should_not_increment_try_number(dag_maker, session): assert ti.try_number == 1 # stays 1 ti.refresh_from_db() assert ti.try_number == 1 # stays 1 + + +@mock.patch("airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks") +def test_swallow_mini_scheduler_exceptions(_schedule_downstream_mock, create_task_instance, caplog): + _schedule_downstream_mock.side_effect = Exception("To be swallowed") + caplog.set_level(logging.ERROR) + ti = create_task_instance( + dag_id="dag_for_testing_swallowing_exception", + task_id="task_for_testing_swallowing_exception", + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, + ) + ti.schedule_downstream_tasks() + assert "Error scheduling downstream tasks." in caplog.text + assert "To be swallowed" in caplog.text