This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d7769a29951 When a dagrun is cleared, recalculate related deadlines if 
applicable. (#61372)
d7769a29951 is described below

commit d7769a299514c4609f9331a7f68a0a65e3ed5b40
Author: D. Ferruzzi <[email protected]>
AuthorDate: Mon Feb 9 10:25:11 2026 -0800

    When a dagrun is cleared, recalculate related deadlines if applicable. 
(#61372)
    
    * When a dagrun is cleared, recalculate related deadlines if applicable.
    
    If the run has a deadline related to the queued_at time, then those should 
be recalculated when that timestamp is changed.
---
 airflow-core/src/airflow/models/taskinstance.py    | 48 +++++++++++
 .../tests/unit/models/test_taskinstance.py         | 92 ++++++++++++++++++++++
 2 files changed, 140 insertions(+)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 838506a1bbe..a0676cfc0ac 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -74,6 +74,8 @@ from airflow.listeners.listener import get_listener_manager
 from airflow.models.asset import AssetModel
 from airflow.models.base import Base, StringID, TaskInstanceDependencies
 from airflow.models.dag_version import DagVersion
+from airflow.models.deadline import Deadline, ReferenceModels
+from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
 
 # Import HITLDetail at runtime so SQLAlchemy can resolve the relationship
 from airflow.models.hitl import HITLDetail  # noqa: F401
@@ -181,6 +183,50 @@ def _stop_remaining_tasks(*, task_instance: TaskInstance, 
task_teardown_map=None
             log.info("Not skipping teardown task '%s'", ti.task_id)
 
 
+def _recalculate_dagrun_queued_at_deadlines(
+    dagrun: DagRun, new_queued_at: datetime, session: Session
+) -> None:
+    """
+    Recalculate deadline times for deadlines that reference dagrun.queued_at.
+
+    :param dagrun: The DagRun whose deadlines should be recalculated
+    :param new_queued_at: The new queued_at timestamp to use for calculation
+    :param session: Database session
+
+    :meta private:
+    """
+    results = session.execute(
+        select(Deadline, DeadlineAlertModel)
+        .join(DeadlineAlertModel, Deadline.deadline_alert_id == 
DeadlineAlertModel.id)
+        .where(
+            Deadline.dagrun_id == dagrun.id,
+            Deadline.missed == false(),
+            
DeadlineAlertModel.reference[ReferenceModels.REFERENCE_TYPE_FIELD].as_string()
+            == ReferenceModels.DagRunQueuedAtDeadline.__name__,
+        )
+    ).all()
+
+    if not results:
+        return
+
+    for deadline, deadline_alert in results:
+        # We can't use evaluate_with() since the new queued_at is not written 
to the DB yet.
+        deadline_interval = timedelta(seconds=deadline_alert.interval)
+        new_deadline_time = new_queued_at + deadline_interval
+
+        log.debug(
+            "Recalculating deadline %s for DagRun %s.%s: old=%s, new=%s",
+            deadline.id,
+            dagrun.dag_id,
+            dagrun.run_id,
+            deadline.deadline_time,
+            new_deadline_time,
+        )
+        deadline.deadline_time = new_deadline_time
+    # Do not flush/commit here in order to keep the scheduler loop atomic.
+    # These changes are committed by the calling function.
+
+
 def clear_task_instances(
     tis: list[TaskInstance],
     session: Session,
@@ -274,6 +320,8 @@ def clear_task_instances(
             dr.clear_number += 1
             dr.queued_at = timezone.utcnow()
 
+            _recalculate_dagrun_queued_at_deadlines(dr, dr.queued_at, session)
+
             if dr.state in State.finished_dr_states:
                 dr.state = dag_run_state
                 dr.start_date = timezone.utcnow()
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 1b6b3a01d1b..20faa789466 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -50,6 +50,8 @@ from airflow.models.asset import (
 )
 from airflow.models.dag_version import DagVersion
 from airflow.models.dagrun import DagRun
+from airflow.models.deadline import Deadline
+from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
 from airflow.models.pool import Pool
 from airflow.models.renderedtifields import RenderedTaskInstanceFields
 from airflow.models.serialized_dag import SerializedDagModel
@@ -57,6 +59,7 @@ from airflow.models.taskinstance import (
     TaskInstance,
     TaskInstance as TI,
     TaskInstanceNote,
+    clear_task_instances,
     find_relevant_relatives,
 )
 from airflow.models.taskinstancehistory import TaskInstanceHistory
@@ -80,6 +83,8 @@ from airflow.sdk import (
     task_group,
 )
 from airflow.sdk.api.datamodels._generated import AssetEventResponse, 
AssetResponse
+from airflow.sdk.definitions.callback import AsyncCallback
+from airflow.sdk.definitions.deadline import DeadlineReference
 from airflow.sdk.definitions.param import process_params
 from airflow.sdk.definitions.taskgroup import TaskGroup
 from airflow.sdk.execution_time.comms import AssetEventsResult
@@ -3090,3 +3095,90 @@ def 
test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula
     assert pakl.asset_partition_dag_run_id == apdr.id
     assert pakl.source_partition_key == "abc123"
     assert pakl.target_dag_id == "asset_event_listener"
+
+
+async def empty_callback_for_deadline():
+    """Used in deadline tests to confirm that Deadlines and DeadlineAlerts 
function correctly."""
+    pass
+
+
+def test_clear_task_instances_recalculates_dagrun_queued_deadlines(dag_maker, 
session):
+    """Test that clearing tasks recalculates all (and only) DAGRUN_QUEUED_AT 
deadlines."""
+    with dag_maker(
+        dag_id="test_recalculate_deadlines",
+        schedule=datetime.timedelta(days=1),
+    ) as dag:
+        EmptyOperator(task_id="task_1")
+
+    dag_run = dag_maker.create_dagrun()
+    # Set the task to SUCCESS state
+    ti = dag_run.get_task_instance("task_1", session=session)
+    ti.set_state(TaskInstanceState.SUCCESS, session=session)
+
+    original_queued_at = timezone.utcnow() - datetime.timedelta(hours=2)
+    dag_run.queued_at = original_queued_at
+    session.flush()
+
+    serialized_dag_id = session.scalar(
+        select(SerializedDagModel.id).where(SerializedDagModel.dag_id == 
dag.dag_id)
+    )
+
+    deadline_configs = [
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=1)),
+        (DeadlineReference.DAGRUN_QUEUED_AT, datetime.timedelta(hours=2)),
+        (DeadlineReference.FIXED_DATETIME, datetime.timedelta(hours=1)),
+    ]
+
+    for deadline_type, interval in deadline_configs:
+        if deadline_type == DeadlineReference.DAGRUN_QUEUED_AT:
+            reference = 
DeadlineReference.DAGRUN_QUEUED_AT.serialize_reference()
+            deadline_time = dag_run.queued_at + interval
+        else:  # FIXED_DATETIME
+            future_date = timezone.utcnow() + datetime.timedelta(days=7)
+            reference = 
DeadlineReference.FIXED_DATETIME(future_date).serialize_reference()
+            deadline_time = future_date + interval
+
+        deadline_alert = DeadlineAlertModel(
+            serialized_dag_id=serialized_dag_id,
+            reference=reference,
+            interval=interval.total_seconds(),
+            callback_def={"path": f"{__name__}.empty_callback_for_deadline", 
"kwargs": {}},
+        )
+        session.add(deadline_alert)
+        session.flush()
+
+        deadline = Deadline(
+            dagrun_id=dag_run.id,
+            deadline_alert_id=deadline_alert.id,
+            deadline_time=deadline_time,
+            callback=AsyncCallback(empty_callback_for_deadline),
+            dag_id=dag_run.dag_id,
+        )
+        session.add(deadline)
+
+    session.flush()
+
+    deadlines_before = 
session.scalars(select(Deadline).where(Deadline.dagrun_id == dag_run.id)).all()
+    deadline_times_by_alert = {
+        deadline.deadline_alert_id: deadline.deadline_time for deadline in 
deadlines_before
+    }
+
+    tis = session.scalars(select(TI).where(TI.dag_id == dag.dag_id, TI.run_id 
== dag_run.run_id)).all()
+    clear_task_instances(tis, session)
+
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == dag_run.id))
+    assert dag_run.queued_at > original_queued_at
+
+    deadlines_after = 
session.scalars(select(Deadline).where(Deadline.dagrun_id == dag_run.id)).all()
+    assert len(deadlines_after) == 3
+
+    # Verify exactly 2 DAGRUN_QUEUED_AT deadlines were recalculated, 
FIXED_DATETIME was not
+    recalculated_count = 0
+    for deadline in deadlines_after:
+        if deadline.deadline_time != 
deadline_times_by_alert[deadline.deadline_alert_id]:
+            recalculated_count += 1
+            deadline_alert = session.get(DeadlineAlertModel, 
deadline.deadline_alert_id)
+            expected_time = dag_run.queued_at + 
datetime.timedelta(seconds=deadline_alert.interval)
+            assert deadline.deadline_time == expected_time
+
+    assert recalculated_count == 2

Reply via email to