Lee-W commented on code in PR #67537:
URL: https://github.com/apache/airflow/pull/67537#discussion_r3388110098


##########
airflow-core/src/airflow/timetables/trigger.py:
##########
@@ -464,8 +466,54 @@ def _get_partition_info(self, run_date: DateTime) -> 
tuple[DateTime, str]:
         partition_key = self._format_key(partition_date)
         return partition_date, partition_key
 
+    def iter_partition_dagrun_infos(
+        self,
+        *,
+        earliest_partition_date: DateTime,
+        latest_partition_date: DateTime,
+    ) -> Iterable[DagRunInfo]:
+        """
+        Yield one DagRunInfo per cron tick in the half-open interval 
``[earliest_partition_date, latest_partition_date)``.
+
+        Iteration walks directly along the partition_date axis — one cron tick 
per
+        partition — without any reverse mapping from run_after.  Each tick 
yields:
+
+        - ``partition_date = current`` (the cron tick itself, as a UTC instant)
+        - ``partition_key`` formatted by :meth:`_format_key` (local-tz label)
+        - ``run_after = partition_date`` (identical to the tick)
+        - ``data_interval = None``
+
+        **Design note — ``run_after := partition_date``.**
+        For ``run_offset != 0`` this differs from the cron run-time a scheduled
+        run would carry; this is intentional.  ``run_after`` is not 
load-bearing
+        for backfill execution: deduplication is keyed on ``partition_key``, 
scheduling gates
+        on ``run_after <= now()`` (always satisfied for past partitions), and
+        ordering by ``BackfillDagRun.sort_ordinal`` (``run_after`` is only the
+        final tiebreaker).  Setting ``run_after = partition_date`` is the 
simplest
+        correct choice and avoids the need for a reverse mapping.
+
+        :param earliest_partition_date: inclusive lower bound (UTC instant); 
first tick is
+            ``_align_to_next(earliest_partition_date)``, which returns 
``earliest_partition_date``
+            itself when it falls exactly on a cron tick.
+        :param latest_partition_date: exclusive upper bound (UTC instant); 
ticks strictly less
+            than ``latest_partition_date`` are yielded.
+        """
+        current = self._align_to_next(earliest_partition_date)
+        while current < latest_partition_date:
+            partition_key = self._format_key(current)
+            yield DagRunInfo(
+                run_after=current,
+                data_interval=None,
+                partition_date=current,
+                partition_key=partition_key,
+            )
+            current = self._get_next(current)
+
     def _format_key(self, partition_date: DateTime) -> str:
-        return partition_date.strftime(self._key_format)
+        # partition_date is a UTC instant; format the key in the timetable 
timezone so the
+        # key reflects the local partition date the user reasons about (e.g. 
an Asia/Taipei
+        # midnight partition keys as "...T00:00:00", not the prior UTC day's 
"...T16:00:00").
+        return 
partition_date.in_timezone(self._timezone).strftime(self._key_format)

Review Comment:
   It's intended — the key reads in the timetable timezone so a non-UTC 
partition isn't mislabelled
   
   But one fix airflow-core/src/airflow/models/backfill.py is added.
   Now deduping keys on `partition_date`, not the `partition_key` string.
   An old UTC-labelled run and a backfill at the same tick share 
`partition_date`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to