Lee-W commented on code in PR #67537:
URL: https://github.com/apache/airflow/pull/67537#discussion_r3388698397
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -533,11 +546,27 @@ def _get_info_list(
reverse: bool,
dag: SerializedDAG,
) -> list[DagRunInfo]:
- infos = dag.iter_dagrun_infos_between(from_date, to_date)
- now = timezone.utcnow()
- dagrun_info_list = [
- x for x in infos if x.partition_key or (x.data_interval and
x.data_interval.end < now)
- ]
+ if dag.timetable.partitioned:
+ # Selection axis is partition_date. Enumerate one run per partition
tick in
+ # the requested calendar-date window; run_after is set to the
partition_date
+ # on each yielded DagRunInfo.
+ # iter_partition_dagrun_infos walks the half-open interval [earliest,
latest), so the
+ # upper bound is exclusive. to_date is inclusive (the user expects its
partitions
+ # backfilled), so advance to the start of the next day: ticks on
to_date itself then
+ # fall strictly below the bound and are included. Date granularity is
intentional --
+ # the time component of to_date does not narrow the final day's
partitions.
+ earliest_partition_date =
dag.timetable.resolve_day_bound(from_date.date())
+ latest_partition_date = dag.timetable.resolve_day_bound(to_date.date()
+ timedelta(days=1))
+ dagrun_info_list = list(
+ dag.timetable.iter_partition_dagrun_infos(
+ earliest_partition_date=earliest_partition_date,
+ latest_partition_date=latest_partition_date,
+ )
+ )
Review Comment:
I moved the partitioned dispatch into `iter_dagrun_infos_between`: for a
partitioned timetable it now resolves the day-bound window and delegates to
`iter_partition_dagrun_infos`, so `_get_info_list` calls one method uniformly
instead of branching at the call site.
##########
airflow-core/src/airflow/models/backfill.py:
##########
@@ -533,11 +546,27 @@ def _get_info_list(
reverse: bool,
dag: SerializedDAG,
) -> list[DagRunInfo]:
- infos = dag.iter_dagrun_infos_between(from_date, to_date)
- now = timezone.utcnow()
- dagrun_info_list = [
- x for x in infos if x.partition_key or (x.data_interval and
x.data_interval.end < now)
- ]
+ if dag.timetable.partitioned:
+ # Selection axis is partition_date. Enumerate one run per partition
tick in
+ # the requested calendar-date window; run_after is set to the
partition_date
+ # on each yielded DagRunInfo.
+ # iter_partition_dagrun_infos walks the half-open interval [earliest,
latest), so the
+ # upper bound is exclusive. to_date is inclusive (the user expects its
partitions
+ # backfilled), so advance to the start of the next day: ticks on
to_date itself then
+ # fall strictly below the bound and are included. Date granularity is
intentional --
+ # the time component of to_date does not narrow the final day's
partitions.
+ earliest_partition_date =
dag.timetable.resolve_day_bound(from_date.date())
+ latest_partition_date = dag.timetable.resolve_day_bound(to_date.date()
+ timedelta(days=1))
+ dagrun_info_list = list(
+ dag.timetable.iter_partition_dagrun_infos(
+ earliest_partition_date=earliest_partition_date,
+ latest_partition_date=latest_partition_date,
+ )
+ )
Review Comment:
Sounds good! I moved the partitioned dispatch into
`iter_dagrun_infos_between`: for a partitioned timetable it now resolves the
day-bound window and delegates to `iter_partition_dagrun_infos`, so
`_get_info_list` calls one method uniformly instead of branching at the call
site.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]