This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3f744ad8c5b Update iter_dagrun_infos_between to use
next_dagrun_info_v2 (#61465)
3f744ad8c5b is described below
commit 3f744ad8c5badacd8ee1ce045cc92d0dbf855228
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Feb 5 08:19:05 2026 -0800
Update iter_dagrun_infos_between to use next_dagrun_info_v2 (#61465)
This is so that it will work for new timetables which implement
next_dagrun_info_v2 (which receives DagRunInfo objects instead of data
intervals.
I also remove logic that is no longer needed and simplify the function.
Some of the logic is leftover from when the align parameter was there (removed
in #61420). Without it, the logic can be simplified and condensed, e.g. by not
getting an initial info before starting the loop.
---
.../src/airflow/serialization/definitions/dag.py | 48 +++++++---------------
airflow-core/tests/unit/models/test_dag.py | 2 +-
2 files changed, 15 insertions(+), 35 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index bfdf02bd3ac..b1a2dc8da59 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -46,7 +46,6 @@ from airflow.serialization.definitions.deadline import
DeadlineAlertFields, Seri
from airflow.serialization.definitions.param import SerializedParamsDict
from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
-from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
@@ -441,10 +440,9 @@ class SerializedDAG:
DagRunInfo instances yielded if their ``logical_date`` is not earlier
than ``earliest``, nor later than ``latest``. The instances are ordered
by their ``logical_date`` from earliest to latest.
+
+ # TODO: AIP-76 see issue https://github.com/apache/airflow/issues/60455
"""
- if isinstance(self.timetable, CronPartitionTimetable):
- # todo: AIP-76 need to update this so that it handles partitions
- raise ValueError("Partition-driven timetables not supported yet")
if earliest is None:
earliest = self._time_restriction.earliest
if earliest is None:
@@ -454,41 +452,23 @@ class SerializedDAG:
restriction = TimeRestriction(earliest, latest, catchup=True)
+ info = None
try:
- info = self.timetable.next_dagrun_info(
- last_automated_data_interval=None,
- restriction=restriction,
- )
+ while True:
+ info = self.timetable.next_dagrun_info_v2(
+ last_dagrun_info=info,
+ restriction=restriction,
+ )
+ if info:
+ yield info
+ else:
+ break
except Exception:
log.exception(
- "Failed to fetch run info after data interval %s for DAG %r",
- None,
+ "Failed to fetch run info for Dag '%s'",
self.dag_id,
+ last_dagrun_info=info,
)
- info = None
-
- if info is None:
- return
-
- if TYPE_CHECKING:
- # todo: AIP-76 after updating this function for partitions, this
may not be true
- assert info.data_interval is not None
-
- # Generate naturally according to schedule.
- while info is not None:
- yield info
- try:
- info = self.timetable.next_dagrun_info(
- last_automated_data_interval=info.data_interval,
- restriction=restriction,
- )
- except Exception:
- log.exception(
- "Failed to fetch run info after data interval %s for DAG
%r",
- info.data_interval if info else "<NONE>",
- self.dag_id,
- )
- break
@provide_session
def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 59a9e153f0a..5471fdc0f17 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -2794,7 +2794,7 @@ def test_iter_dagrun_infos_between_error(caplog):
(
"airflow.serialization.definitions.dag",
logging.ERROR,
- f"Failed to fetch run info after data interval
{DataInterval(start, end)} for DAG {dag.dag_id!r}",
+ f"Failed to fetch run info for Dag {dag.dag_id!r}",
),
]
assert caplog.entries[0].get("exception"), "should contain exception
context"