This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 377e8cdcc5 Swallow exception when mini-scheduling raises an exception 
(#41260)
377e8cdcc5 is described below

commit 377e8cdcc54c6bcc7ec277f458da37ef8a0fdddf
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Aug 6 19:00:12 2024 +0200

    Swallow exception when mini-scheduling raises an exception (#41260)
    
    When mini-scheduler raises an exception, it has a bit weird side
    effect - the task succeeds but it is seen as failed and scheduler
    gets confused. Also flower celery worker in this case shows an error.
    
    This happens for example when DAG contains non-serializable tasks.
    
    This PR swallows any exceptions raised in mini-scheduler and simply
    logs them as error rather than fail the process. Mini-scheduler
    is generally optional and we are also already sometimes skipping
    it already so occasional skipping is not a big problem.
    
    Fixes: #39717
---
 airflow/models/taskinstance.py    | 16 +++++++++++++---
 tests/models/test_taskinstance.py | 16 ++++++++++++++++
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index ed38782bea..f53fb5e274 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -3892,9 +3892,19 @@ class TaskInstance(Base, LoggingMixin):
 
         :meta: private
         """
-        return TaskInstance._schedule_downstream_tasks(
-            ti=self, session=session, max_tis_per_query=max_tis_per_query
-        )
+        try:
+            return TaskInstance._schedule_downstream_tasks(
+                ti=self, session=session, max_tis_per_query=max_tis_per_query
+            )
+        except Exception:
+            self.log.exception(
+                "Error scheduling downstream tasks. Skipping it as this is 
entirely optional optimisation. "
+                "There might be various reasons for it, please take a look at 
the stack trace to figure "
+                "out if the root cause can be diagnosed and fixed. See the 
issue "
+                "https://github.com/apache/airflow/issues/39717 for details 
and an example problem. If you "
+                "would like to get help in solving root cause, open discussion 
with all details with your "
+                "managed service support or in Airflow repository."
+            )
 
     def get_relevant_upstream_map_indexes(
         self,
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 08a2ae9019..32158dc00d 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 
 import contextlib
 import datetime
+import logging
 import operator
 import os
 import pathlib
@@ -5140,3 +5141,18 @@ def 
test__refresh_from_db_should_not_increment_try_number(dag_maker, session):
     assert ti.try_number == 1  # stays 1
     ti.refresh_from_db()
     assert ti.try_number == 1  # stays 1
+
+
+@mock.patch("airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks")
+def test_swallow_mini_scheduler_exceptions(_schedule_downstream_mock, 
create_task_instance, caplog):
+    _schedule_downstream_mock.side_effect = Exception("To be swallowed")
+    caplog.set_level(logging.ERROR)
+    ti = create_task_instance(
+        dag_id="dag_for_testing_swallowing_exception",
+        task_id="task_for_testing_swallowing_exception",
+        run_type=DagRunType.SCHEDULED,
+        execution_date=DEFAULT_DATE,
+    )
+    ti.schedule_downstream_tasks()
+    assert "Error scheduling downstream tasks." in caplog.text
+    assert "To be swallowed" in caplog.text

Reply via email to