This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-3-test by this push:
     new a647d068fd6 [v3-3-test] Remove obsolete AIP-76 partition-date storage 
TODOs (#68703) (#68866)
a647d068fd6 is described below

commit a647d068fd66670986132b3f5170923a3b013a84
Author: Wei Lee <[email protected]>
AuthorDate: Tue Jun 23 10:06:01 2026 +0800

    [v3-3-test] Remove obsolete AIP-76 partition-date storage TODOs (#68703) 
(#68866)
---
 airflow-core/src/airflow/api/common/mark_tasks.py  |  2 +-
 .../src/airflow/jobs/scheduler_job_runner.py       |  5 ---
 airflow-core/src/airflow/models/dag.py             |  3 --
 airflow-core/src/airflow/timetables/trigger.py     | 10 ++----
 .../unit/timetables/test_trigger_timetable.py      | 41 ++++++++++++++++++++++
 .../airflow/sdk/definitions/timetables/trigger.py  |  7 ----
 6 files changed, 44 insertions(+), 24 deletions(-)

diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py 
b/airflow-core/src/airflow/api/common/mark_tasks.py
index 120a31a67b7..7004156ef24 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -193,7 +193,7 @@ def get_run_ids(
         dates.update(
             info.logical_date
             for info in dag.iter_dagrun_infos_between(start_date, end_date)
-            if info.logical_date  # todo: AIP-76 this will not find anything 
where logical date is null
+            if info.logical_date  # runs with a null logical_date are not 
matched here
         )
         run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, 
logical_date=dates, session=session)]
     return run_ids
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 3c5d71e9a3d..d7f8006317a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2395,11 +2395,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         )
         existing_dagruns = {(x.dag_id, x.logical_date): x for x in 
existing_dagrun_objects}
 
-        # todo: AIP-76 we may want to update check existing to also check 
partitioned dag runs,
-        #  but the thing is, there is not actually a restriction that
-        #  we don't create new runs with the same partition key
-        #  so it's unclear whether we should / need to.
-
         # backfill runs are not created by scheduler and their concurrency is 
separate
         # so we exclude them here
         active_runs_of_dags = Counter(
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index 1b20a92f353..f8277856104 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -825,9 +825,6 @@ class DagModel(Base):
             scheduler processing order and concurrent run creation in a 
distributed
             system mean a newer run may already exist.
         """
-        # TODO: AIP-76 perhaps we need to add validation for manual runs 
ensure consistency between
-        #   partition_key / partition_date and run_after
-
         if isinstance(reference_run, datetime):
             raise ValueError(
                 "Passing a datetime to `DagModel.calculate_dagrun_date_fields` 
is not supported. "
diff --git a/airflow-core/src/airflow/timetables/trigger.py 
b/airflow-core/src/airflow/timetables/trigger.py
index a1de05ac76e..fdc9cbde91a 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -390,9 +390,6 @@ class CronPartitionTimetable(CronTriggerTimetable):
       running every hour, this would run the previous time if less than 6
       minutes had past since the previous run time, otherwise it would wait
       until the next hour.
-
-    # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
-    # todo: AIP-76 we could allow a tuple of integer + time-based
     """
 
     partitioned = True
@@ -404,12 +401,10 @@ class CronPartitionTimetable(CronTriggerTimetable):
         timezone: str | Timezone | FixedTimezone,
         run_offset: int | datetime.timedelta | relativedelta | None = None,
         run_immediately: bool | datetime.timedelta = False,
-        # todo: AIP-76 we can't infer partition date from this, so we need to 
store it separately.
         key_format: str = r"%Y-%m-%dT%H:%M:%S",
     ) -> None:
         super().__init__(cron, timezone=timezone, 
run_immediately=run_immediately)
         if not isinstance(run_offset, (int, NoneType)):
-            # todo: AIP-76 implement timedelta / relative delta?
             raise ValueError("Run offset other than integer not supported 
yet.")
         self._run_offset = run_offset or 0
         self._key_format = key_format
@@ -462,8 +457,8 @@ class CronPartitionTimetable(CronTriggerTimetable):
         return partition_date
 
     def _get_partition_info(self, run_date: DateTime) -> tuple[DateTime, str]:
-        # todo: AIP-76 it does not make sense that we would infer partition 
info from run date
-        #  in general, because they might not be 1-1
+        # Partition info is inferred from the run date here; this is only 
correct when run date and
+        # partition are 1-1, which is not guaranteed for every offset.
         partition_date = self._get_partition_date(run_date=run_date)
         partition_key = self._format_key(partition_date)
         return partition_date, partition_key
@@ -545,7 +540,6 @@ class CronPartitionTimetable(CronTriggerTimetable):
         last_dagrun_info: DagRunInfo | None,
         restriction: TimeRestriction,
     ) -> DagRunInfo | None:
-        # todo: AIP-76 add test for this logic
         # Scheduler scheduling path: uses next_dagrun_info_v2 to advance 
run_after one tick
         # at a time. Backfill iterates partitions directly via 
timetable.iter_partition_dagrun_infos.
 
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py 
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index 8df3b40a6d5..1806cc875e8 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -795,6 +795,47 @@ def test_next_dagrun_info_v2(schedule, expected, 
dag_maker, session):
     assert info == expected
 
 
+NEXT_DAGRUN_NOW = pendulum.DateTime(2021, 9, 7, 12, tzinfo=utc)
+SEP_5 = pendulum.DateTime(2021, 9, 5, tzinfo=utc)
+SEP_6 = pendulum.DateTime(2021, 9, 6, tzinfo=utc)
+SEP_8 = pendulum.DateTime(2021, 9, 8, tzinfo=utc)
+SEP_9 = pendulum.DateTime(2021, 9, 9, tzinfo=utc)
+SEP_10 = pendulum.DateTime(2021, 9, 10, tzinfo=utc)
+
+
[email protected](
+    ("last_run_after", "earliest", "latest", "catchup", "expected_run_after"),
+    [
+        pytest.param(SEP_5, None, None, True, SEP_6, 
id="catchup-advances-from-last-run"),
+        pytest.param(None, None, None, True, SEP_8, 
id="catchup-no-last-uses-first-run"),
+        pytest.param(SEP_8, None, None, False, SEP_9, 
id="no-catchup-advances-from-last-run"),
+        pytest.param(None, None, None, False, SEP_8, 
id="no-catchup-no-last-uses-first-run"),
+        pytest.param(None, SEP_10, None, False, SEP_10, 
id="no-catchup-honors-earliest"),
+        pytest.param(None, SEP_10, SEP_9, True, None, 
id="returns-none-past-latest"),
+    ],
+)
+@time_machine.travel(NEXT_DAGRUN_NOW)
+def test_cron_partition_next_dagrun_info_v2_branches(
+    last_run_after, earliest, latest, catchup, expected_run_after
+):
+    timetable = CoreCronPartitionTimetable("0 0 * * *", timezone=utc)
+    # only run_after is read off the previous run; the branch logic decides 
run_after
+    last_dagrun_info = (
+        DagRunInfo(run_after=last_run_after, data_interval=None, 
partition_date=None, partition_key=None)
+        if last_run_after is not None
+        else None
+    )
+    info = timetable.next_dagrun_info_v2(
+        last_dagrun_info=last_dagrun_info,
+        restriction=TimeRestriction(earliest=earliest, latest=latest, 
catchup=catchup),
+    )
+    if expected_run_after is None:
+        assert info is None
+    else:
+        assert info is not None
+        assert info.run_after == expected_run_after
+
+
 @pytest.mark.db_test
 @pytest.mark.need_serialized_dag
 @pytest.mark.parametrize(
diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py 
b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
index 608f4c0a57f..c2d86ffa854 100644
--- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
+++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
@@ -150,14 +150,9 @@ class CronPartitionTimetable(CronTriggerTimetable):
       running every hour, this would run the previous time if less than 6
       minutes had past since the previous run time, otherwise it would wait
       until the next hour.
-
-    # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
-    # todo: AIP-76 we could allow a tuple of integer + time-based
-
     """
 
     run_offset: int | datetime.timedelta | relativedelta | None = None
-    # todo: AIP-76 we can't infer partition date from this, so we need to 
store it separately
     key_format: str = r"%Y-%m-%dT%H:%M:%S"
 
     def __init__(
@@ -167,11 +162,9 @@ class CronPartitionTimetable(CronTriggerTimetable):
         timezone: str | Timezone | FixedTimezone,
         run_offset: int | datetime.timedelta | relativedelta | None = None,
         run_immediately: bool | datetime.timedelta = False,
-        # todo: AIP-76 we can't infer partition date from this, so we need to 
store it separately
         key_format: str = r"%Y-%m-%dT%H:%M:%S",
     ) -> None:
         if not isinstance(run_offset, (int, NoneType)):
-            # todo: AIP-76 implement timedelta / relative delta?
             raise ValueError("Run offset other than integer not supported 
yet.")
         self.__attrs_init__(  # type: ignore[attr-defined]
             cron,

Reply via email to