This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new eca91dc5c2c Restore fail_fast handling when reschedule exceeds MySQL 
TIMESTAMP limit (#67353)
eca91dc5c2c is described below

commit eca91dc5c2cab36b3a8e6d53772cf13716d98a8d
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri May 22 20:20:02 2026 +0100

    Restore fail_fast handling when reschedule exceeds MySQL TIMESTAMP limit 
(#67353)
    
    PR #59686 dropped the _handle_fail_fast_for_dag call in the 
MySQL-TIMESTAMP-limit
    branch of the reschedule path based on an incorrect SQLA2 deadlock concern. 
As a
    result, DAGs with fail_fast=True silently fail to stop sibling tasks when a
    reschedule date exceeds 2038-01-19 on MySQL.
    
    The actual deadlock that motivated #59686 came from a different path (FOR 
UPDATE
    expanding to the lazy-joined dag_run row), fixed in #67246 by scoping the 
lock
    with with_for_update={"of": TI}. With that scope in place, the fail-fast 
call is
    safe and matches the file's two existing fail-fast sites.
    
    Also drops a second misleading comment in the same function claiming 
session.get
    was avoided to "avoid SQLA2 lock contention issues" -- the code itself is 
fine;
    the rationale was wrong.
---
 .../execution_api/routes/task_instances.py         |  8 ++--
 .../versions/head/test_task_instances.py           | 55 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 5 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 18b5842f77b..c2d67926a21 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -644,13 +644,11 @@ def _create_ti_state_update_query_and_update_state(
                 if session.bind is not None:
                     query = TI.duration_expression_update(timezone.utcnow(), 
query, session.bind)
                 query = query.values(state=TaskInstanceState.FAILED)
-                # We skip fail_fast handling in this error case to avoid 
fetching the TI object while the row
-                # is still locked from the earlier with_for_update() query, 
which might cause deadlock issues
-                # in SQLA2. The task is marked as FAILED regardless.
+                ti = session.get(TI, task_instance_id, with_for_update={"of": 
TI})
+                if ti is not None:
+                    _handle_fail_fast_for_dag(ti=ti, dag_id=dag_id, 
session=session, dag_bag=dag_bag)
                 return query, TaskInstanceState.FAILED
 
-        # We can directly use task_instance_id instead of fetching the 
TaskInstance object to avoid SQLA2
-        #  lock contention issues when the TaskInstance row is already locked 
from before.
         actual_start_date = timezone.utcnow()
         session.add(
             TaskReschedule(
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 6f19cff9389..2d15e548a13 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1894,6 +1894,61 @@ class TestTIUpdateState:
         ti1 = session.get(TaskInstance, ti1.id)
         assert ti1.state == State.FAILED
 
+    def test_ti_update_state_reschedule_mysql_limit_triggers_fail_fast(
+        self, client, session, dag_maker, time_machine
+    ):
+        """
+        When a reschedule date exceeds MySQL's TIMESTAMP limit and the DAG has 
fail_fast=True,
+        sibling tasks must still be stopped. The MySQL-limit branch routes 
through a different
+        FAILED transition than the regular fail path -- both must honor 
fail_fast.
+        """
+        instant = timezone.datetime(2024, 10, 30)
+        time_machine.move_to(instant, tick=False)
+
+        with dag_maker(dag_id="test_dag_with_fail_fast_mysql_reschedule", 
fail_fast=True, serialized=True):
+            EmptyOperator(task_id="task1")
+            EmptyOperator(task_id="task2")
+
+        dr = dag_maker.create_dagrun()
+        ti1 = dr.get_task_instance(task_id="task1", session=session)
+        ti1.state = State.RUNNING
+        ti1.start_date = instant
+
+        ti2 = dr.get_task_instance(task_id="task2", session=session)
+        ti2.state = State.QUEUED
+        session.commit()
+        session.refresh(ti1)
+        session.refresh(ti2)
+
+        # Date beyond MySQL's TIMESTAMP limit (2038-01-19 03:14:07).
+        future_date = timezone.datetime(2038, 1, 19, 3, 14, 8)
+
+        with (
+            mock.patch(
+                
"airflow.api_fastapi.execution_api.routes.task_instances.get_dialect_name",
+                return_value="mysql",
+            ),
+            mock.patch(
+                
"airflow.api_fastapi.execution_api.routes.task_instances._stop_remaining_tasks",
+                autospec=True,
+            ) as mock_stop,
+        ):
+            response = client.patch(
+                f"/execution/task-instances/{ti1.id}/state",
+                json={
+                    "state": TaskInstanceState.UP_FOR_RESCHEDULE,
+                    "reschedule_date": future_date.isoformat(),
+                    "end_date": DEFAULT_END_DATE.isoformat(),
+                },
+            )
+
+            assert response.status_code == 204
+            mock_stop.assert_called_once()
+
+        session.expire_all()
+        ti1 = session.get(TaskInstance, ti1.id)
+        assert ti1.state == State.FAILED
+
     @pytest.mark.db_test
     @conf_vars({("state_store", "clear_on_success"): "True"})
     def test_ti_update_state_to_success_clears_task_state(self, client, 
session, create_task_instance):

Reply via email to