This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 811d1e91c5a Allow attachment of multiple deadlines to a DAG (#55086)
811d1e91c5a is described below
commit 811d1e91c5a387a208eb3570b43363ecc137f4ba
Author: Sean Ghaeli <[email protected]>
AuthorDate: Tue Sep 9 10:36:02 2025 -0700
Allow attachment of multiple deadlines to a DAG (#55086)
---
airflow-core/src/airflow/models/dag.py | 17 ++++++-
airflow-core/src/airflow/models/dagrun.py | 10 ++--
airflow-core/src/airflow/serialization/schema.json | 4 ++
.../airflow/serialization/serialized_objects.py | 43 +++++++++++------
airflow-core/tests/unit/models/test_dag.py | 55 ++++++++++++++++++++++
task-sdk/src/airflow/sdk/definitions/dag.py | 21 +++++++--
6 files changed, 127 insertions(+), 23 deletions(-)
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 2cbba4f1c2a..844c28b724c 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -452,12 +452,25 @@ class DagModel(Base):
@property
def deadline(self):
"""Get the deserialized deadline alert."""
- return DeadlineAlert.deserialize_deadline_alert(self._deadline) if
self._deadline else None
+ if self._deadline is None:
+ return None
+ if isinstance(self._deadline, list):
+ return [DeadlineAlert.deserialize_deadline_alert(item) for item in
self._deadline]
+ return DeadlineAlert.deserialize_deadline_alert(self._deadline)
@deadline.setter
def deadline(self, value):
"""Set and serialize the deadline alert."""
- self._deadline = value if isinstance(value, dict) else
value.serialize_deadline_alert()
+ if value is None:
+ self._deadline = None
+ elif isinstance(value, list):
+ self._deadline = [
+ item if isinstance(item, dict) else
item.serialize_deadline_alert() for item in value
+ ]
+ elif isinstance(value, dict):
+ self._deadline = value
+ else:
+ self._deadline = value.serialize_deadline_alert()
@property
def timezone(self):
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index 26d1d42e524..516ef64b30c 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -1222,9 +1222,13 @@ class DagRun(Base, LoggingMixin):
msg="success",
)
- if (deadline := dag.deadline) and isinstance(deadline.reference,
DeadlineReference.TYPES.DAGRUN):
- # The dagrun has succeeded. If there wre any Deadlines for it
which were not breached, they are no longer needed.
- Deadline.prune_deadlines(session=session,
conditions={DagRun.run_id: self.run_id})
+ if dag.deadline:
+ # The dagrun has succeeded. If there were any Deadlines for
it which were not breached, they are no longer needed.
+ if any(
+ isinstance(d.reference, DeadlineReference.TYPES.DAGRUN)
+ for d in cast("list", dag.deadline)
+ ):
+ Deadline.prune_deadlines(session=session,
conditions={DagRun.run_id: self.run_id})
# if *all tasks* are deadlocked, the run failed
elif unfinished.should_schedule and not are_runnable_tasks:
diff --git a/airflow-core/src/airflow/serialization/schema.json
b/airflow-core/src/airflow/serialization/schema.json
index 0ce253e2262..c4740f346f3 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -190,6 +190,10 @@
"deadline": {
"anyOf": [
{ "$ref": "#/definitions/dict" },
+ {
+ "type": "array",
+ "items": { "$ref": "#/definitions/dict" }
+ },
{ "type": "null" }
]
},
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 3e75993b2e2..280b5907a89 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2385,7 +2385,11 @@ class SerializedDAG(DAG, BaseSerialization):
serialized_dag["dag_dependencies"] = [x.__dict__ for x in
sorted(dag_deps)]
serialized_dag["task_group"] =
TaskGroupSerialization.serialize_task_group(dag.task_group)
- serialized_dag["deadline"] =
dag.deadline.serialize_deadline_alert() if dag.deadline else None
+ serialized_dag["deadline"] = (
+ [deadline.serialize_deadline_alert() for deadline in
dag.deadline]
+ if isinstance(dag.deadline, list)
+ else None
+ )
# Edge info in the JSON exactly matches our internal structure
serialized_dag["edge_info"] = dag.edge_info
@@ -2501,7 +2505,14 @@ class SerializedDAG(DAG, BaseSerialization):
dag.has_on_failure_callback = True
if "deadline" in encoded_dag and encoded_dag["deadline"] is not None:
- dag.deadline =
DeadlineAlert.deserialize_deadline_alert(encoded_dag["deadline"])
+ dag.deadline = (
+ [
+ DeadlineAlert.deserialize_deadline_alert(deadline_data)
+ for deadline_data in encoded_dag["deadline"]
+ ]
+ if encoded_dag["deadline"]
+ else None
+ )
keys_to_set_none = dag.get_serialized_fields() - encoded_dag.keys() -
cls._CONSTRUCTOR_PARAMS.keys()
for k in keys_to_set_none:
@@ -3096,19 +3107,21 @@ class SerializedDAG(DAG, BaseSerialization):
session=session,
)
- if self.deadline and isinstance(self.deadline.reference,
DeadlineReference.TYPES.DAGRUN):
- session.add(
- Deadline(
- deadline_time=self.deadline.reference.evaluate_with(
- session=session,
- interval=self.deadline.interval,
- dag_id=self.dag_id,
- run_id=run_id,
- ),
- callback=self.deadline.callback,
- dagrun_id=orm_dagrun.id,
- )
- )
+ if self.deadline:
+ for deadline in cast("list", self.deadline):
+ if isinstance(deadline.reference,
DeadlineReference.TYPES.DAGRUN):
+ session.add(
+ Deadline(
+ deadline_time=deadline.reference.evaluate_with(
+ session=session,
+ interval=deadline.interval,
+ dag_id=self.dag_id,
+ run_id=run_id,
+ ),
+ callback=deadline.callback,
+ dagrun_id=orm_dagrun.id,
+ )
+ )
return orm_dagrun
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 12017478317..ba22ed2aadd 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -1811,6 +1811,61 @@ my_postgres_conn:
assert len(dr.deadlines) == 1
assert dr.deadlines[0].deadline_time == getattr(dr, reference_column,
DEFAULT_DATE) + interval
+ def test_dag_with_multiple_deadlines(self, dag_maker, session):
+ """Test that a DAG with multiple deadlines stores all deadlines in the
database."""
+ deadlines = [
+ DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=datetime.timedelta(minutes=5),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ ),
+ DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=datetime.timedelta(minutes=10),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ ),
+ DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_LOGICAL_DATE,
+ interval=datetime.timedelta(hours=1),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ ),
+ ]
+
+ with dag_maker(
+ dag_id="test_multiple_deadlines",
+ schedule=datetime.timedelta(days=1),
+ deadline=deadlines,
+ ) as dag:
+ ...
+
+ scheduler_dag = sync_dag_to_db(dag)
+ dr = scheduler_dag.create_dagrun(
+ run_id="test_multiple_deadlines",
+ run_type=DagRunType.SCHEDULED,
+ state=State.QUEUED,
+ logical_date=TEST_DATE,
+ run_after=TEST_DATE,
+ triggered_by=DagRunTriggeredByType.TEST,
+ )
+ session.flush()
+ dr = session.merge(dr)
+
+ # Check that all 3 deadlines were created
+ assert len(dr.deadlines) == 3
+
+ # Verify each deadline has correct properties
+ deadline_times = [d.deadline_time for d in dr.deadlines]
+ expected_times = [
+ dr.queued_at + datetime.timedelta(minutes=5),
+ dr.queued_at + datetime.timedelta(minutes=10),
+ dr.logical_date + datetime.timedelta(hours=1),
+ ]
+
+ # Sort both lists to compare regardless of order
+ deadline_times.sort()
+ expected_times.sort()
+ assert deadline_times == expected_times
+
class TestDagModel:
def _clean(self):
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index bab5a6db01b..ed14b7dd587 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -184,6 +184,15 @@ def _convert_access_control(access_control):
return updated_access_control
+def _convert_deadline(deadline: list[DeadlineAlert] | DeadlineAlert | None) ->
list[DeadlineAlert] | None:
+ """Convert deadline parameter to a list of DeadlineAlert objects."""
+ if deadline is None:
+ return None
+ if isinstance(deadline, DeadlineAlert):
+ return [deadline]
+ return list(deadline)
+
+
def _convert_doc_md(doc_md: str | None) -> str | None:
if doc_md is None:
return doc_md
@@ -437,9 +446,15 @@ class DAG:
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)),
)
- deadline: DeadlineAlert | None = attrs.field(
+ deadline: list[DeadlineAlert] | DeadlineAlert | None = attrs.field(
default=None,
-
validator=attrs.validators.optional(attrs.validators.instance_of(DeadlineAlert)),
+ converter=_convert_deadline,
+ validator=attrs.validators.optional(
+ attrs.validators.deep_iterable(
+ member_validator=attrs.validators.instance_of(DeadlineAlert),
+ iterable_validator=attrs.validators.instance_of(list),
+ )
+ ),
)
catchup: bool = attrs.field(
@@ -1415,7 +1430,7 @@ if TYPE_CHECKING:
catchup: bool = ...,
on_success_callback: None | DagStateChangeCallback |
list[DagStateChangeCallback] = None,
on_failure_callback: None | DagStateChangeCallback |
list[DagStateChangeCallback] = None,
- deadline: DeadlineAlert | None = None,
+ deadline: list[DeadlineAlert] | DeadlineAlert | None = None,
doc_md: str | None = None,
params: ParamsDict | dict[str, Any] | None = None,
access_control: dict[str, dict[str, Collection[str]]] | dict[str,
Collection[str]] | None = None,