phanikumv commented on code in PR #67537:
URL: https://github.com/apache/airflow/pull/67537#discussion_r3385776313
##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -464,8 +466,54 @@ def _get_partition_info(self, run_date: DateTime) ->
tuple[DateTime, str]:
partition_key = self._format_key(partition_date)
return partition_date, partition_key
+ def iter_partition_dagrun_infos(
+ self,
+ *,
+ earliest_partition_date: DateTime,
+ latest_partition_date: DateTime,
+ ) -> Iterable[DagRunInfo]:
+ """
+ Yield one DagRunInfo per cron tick in the half-open interval
``[earliest_partition_date, latest_partition_date)``.
+
+ Iteration walks directly along the partition_date axis — one cron tick
per
+ partition — without any reverse mapping from run_after. Each tick
yields:
+
+ - ``partition_date = current`` (the cron tick itself, as a UTC instant)
+ - ``partition_key`` formatted by :meth:`_format_key` (local-tz label)
+ - ``run_after = partition_date`` (identical to the tick)
+ - ``data_interval = None``
+
+ **Design note — ``run_after := partition_date``.**
+ For ``run_offset != 0`` this differs from the cron run-time a scheduled
+ run would carry; this is intentional. ``run_after`` is not
load-bearing
+ for backfill execution: deduplication is keyed on ``partition_key``,
scheduling gates
+ on ``run_after <= now()`` (always satisfied for past partitions), and
+ ordering by ``BackfillDagRun.sort_ordinal`` (``run_after`` is only the
+ final tiebreaker). Setting ``run_after = partition_date`` is the
simplest
+ correct choice and avoids the need for a reverse mapping.
+
+ :param earliest_partition_date: inclusive lower bound (UTC instant);
first tick is
+ ``_align_to_next(earliest_partition_date)``, which returns
``earliest_partition_date``
+ itself when it falls exactly on a cron tick.
+ :param latest_partition_date: exclusive upper bound (UTC instant);
ticks strictly less
+ than ``latest_partition_date`` are yielded.
+ """
+ current = self._align_to_next(earliest_partition_date)
+ while current < latest_partition_date:
+ partition_key = self._format_key(current)
+ yield DagRunInfo(
+ run_after=current,
+ data_interval=None,
+ partition_date=current,
+ partition_key=partition_key,
+ )
+ current = self._get_next(current)
+
def _format_key(self, partition_date: DateTime) -> str:
- return partition_date.strftime(self._key_format)
+ # partition_date is a UTC instant; format the key in the timetable
timezone so the
+ # key reflects the local partition date the user reasons about (e.g.
an Asia/Taipei
+ # midnight partition keys as "...T00:00:00", not the prior UTC day's
"...T16:00:00").
+ return
partition_date.in_timezone(self._timezone).strftime(self._key_format)
Review Comment:
This changes `partition_key` for scheduled runs too, not just backfill.
_format_key feeds the value that gets stamped onto
DagModel.next_dagrun_partition_key , which the scheduler then reads back and
persists on the run (
https://github.com/apache/airflow/blob/9c7ce841fc8ad058a4182bd4a2cfd5f822c0fb82/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L2327-L2344)
Since `CronPartitionTimetable` is already shipped in 3.2.0/3.2.1 with the
UTC label, a non-UTC Dag upgrading here will have old runs keyed in UTC and new
ones keyed in local tz. partition_key is the dedup key, so a backfill over a
historical window could create duplicate partition runs
Was the scheduler-side impact intended ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]