This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-3-test by this push:
new a647d068fd6 [v3-3-test] Remove obsolete AIP-76 partition-date storage
TODOs (#68703) (#68866)
a647d068fd6 is described below
commit a647d068fd66670986132b3f5170923a3b013a84
Author: Wei Lee <[email protected]>
AuthorDate: Tue Jun 23 10:06:01 2026 +0800
[v3-3-test] Remove obsolete AIP-76 partition-date storage TODOs (#68703)
(#68866)
---
airflow-core/src/airflow/api/common/mark_tasks.py | 2 +-
.../src/airflow/jobs/scheduler_job_runner.py | 5 ---
airflow-core/src/airflow/models/dag.py | 3 --
airflow-core/src/airflow/timetables/trigger.py | 10 ++----
.../unit/timetables/test_trigger_timetable.py | 41 ++++++++++++++++++++++
.../airflow/sdk/definitions/timetables/trigger.py | 7 ----
6 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/airflow-core/src/airflow/api/common/mark_tasks.py
b/airflow-core/src/airflow/api/common/mark_tasks.py
index 120a31a67b7..7004156ef24 100644
--- a/airflow-core/src/airflow/api/common/mark_tasks.py
+++ b/airflow-core/src/airflow/api/common/mark_tasks.py
@@ -193,7 +193,7 @@ def get_run_ids(
dates.update(
info.logical_date
for info in dag.iter_dagrun_infos_between(start_date, end_date)
- if info.logical_date # todo: AIP-76 this will not find anything
where logical date is null
+ if info.logical_date # runs with a null logical_date are not
matched here
)
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id,
logical_date=dates, session=session)]
return run_ids
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 3c5d71e9a3d..d7f8006317a 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2395,11 +2395,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
existing_dagruns = {(x.dag_id, x.logical_date): x for x in
existing_dagrun_objects}
- # todo: AIP-76 we may want to update check existing to also check
partitioned dag runs,
- # but the thing is, there is not actually a restriction that
- # we don't create new runs with the same partition key
- # so it's unclear whether we should / need to.
-
# backfill runs are not created by scheduler and their concurrency is
separate
# so we exclude them here
active_runs_of_dags = Counter(
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 1b20a92f353..f8277856104 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -825,9 +825,6 @@ class DagModel(Base):
scheduler processing order and concurrent run creation in a
distributed
system mean a newer run may already exist.
"""
- # TODO: AIP-76 perhaps we need to add validation for manual runs
ensure consistency between
- # partition_key / partition_date and run_after
-
if isinstance(reference_run, datetime):
raise ValueError(
"Passing a datetime to `DagModel.calculate_dagrun_date_fields`
is not supported. "
diff --git a/airflow-core/src/airflow/timetables/trigger.py
b/airflow-core/src/airflow/timetables/trigger.py
index a1de05ac76e..fdc9cbde91a 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -390,9 +390,6 @@ class CronPartitionTimetable(CronTriggerTimetable):
running every hour, this would run the previous time if less than 6
minutes had past since the previous run time, otherwise it would wait
until the next hour.
-
- # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
- # todo: AIP-76 we could allow a tuple of integer + time-based
"""
partitioned = True
@@ -404,12 +401,10 @@ class CronPartitionTimetable(CronTriggerTimetable):
timezone: str | Timezone | FixedTimezone,
run_offset: int | datetime.timedelta | relativedelta | None = None,
run_immediately: bool | datetime.timedelta = False,
- # todo: AIP-76 we can't infer partition date from this, so we need to
store it separately.
key_format: str = r"%Y-%m-%dT%H:%M:%S",
) -> None:
super().__init__(cron, timezone=timezone,
run_immediately=run_immediately)
if not isinstance(run_offset, (int, NoneType)):
- # todo: AIP-76 implement timedelta / relative delta?
raise ValueError("Run offset other than integer not supported
yet.")
self._run_offset = run_offset or 0
self._key_format = key_format
@@ -462,8 +457,8 @@ class CronPartitionTimetable(CronTriggerTimetable):
return partition_date
def _get_partition_info(self, run_date: DateTime) -> tuple[DateTime, str]:
- # todo: AIP-76 it does not make sense that we would infer partition
info from run date
- # in general, because they might not be 1-1
+ # Partition info is inferred from the run date here; this is only
correct when run date and
+ # partition are 1-1, which is not guaranteed for every offset.
partition_date = self._get_partition_date(run_date=run_date)
partition_key = self._format_key(partition_date)
return partition_date, partition_key
@@ -545,7 +540,6 @@ class CronPartitionTimetable(CronTriggerTimetable):
last_dagrun_info: DagRunInfo | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
- # todo: AIP-76 add test for this logic
# Scheduler scheduling path: uses next_dagrun_info_v2 to advance
run_after one tick
# at a time. Backfill iterates partitions directly via
timetable.iter_partition_dagrun_infos.
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index 8df3b40a6d5..1806cc875e8 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -795,6 +795,47 @@ def test_next_dagrun_info_v2(schedule, expected,
dag_maker, session):
assert info == expected
+NEXT_DAGRUN_NOW = pendulum.DateTime(2021, 9, 7, 12, tzinfo=utc)
+SEP_5 = pendulum.DateTime(2021, 9, 5, tzinfo=utc)
+SEP_6 = pendulum.DateTime(2021, 9, 6, tzinfo=utc)
+SEP_8 = pendulum.DateTime(2021, 9, 8, tzinfo=utc)
+SEP_9 = pendulum.DateTime(2021, 9, 9, tzinfo=utc)
+SEP_10 = pendulum.DateTime(2021, 9, 10, tzinfo=utc)
+
+
[email protected](
+ ("last_run_after", "earliest", "latest", "catchup", "expected_run_after"),
+ [
+ pytest.param(SEP_5, None, None, True, SEP_6,
id="catchup-advances-from-last-run"),
+ pytest.param(None, None, None, True, SEP_8,
id="catchup-no-last-uses-first-run"),
+ pytest.param(SEP_8, None, None, False, SEP_9,
id="no-catchup-advances-from-last-run"),
+ pytest.param(None, None, None, False, SEP_8,
id="no-catchup-no-last-uses-first-run"),
+ pytest.param(None, SEP_10, None, False, SEP_10,
id="no-catchup-honors-earliest"),
+ pytest.param(None, SEP_10, SEP_9, True, None,
id="returns-none-past-latest"),
+ ],
+)
+@time_machine.travel(NEXT_DAGRUN_NOW)
+def test_cron_partition_next_dagrun_info_v2_branches(
+ last_run_after, earliest, latest, catchup, expected_run_after
+):
+ timetable = CoreCronPartitionTimetable("0 0 * * *", timezone=utc)
+ # only run_after is read off the previous run; the branch logic decides
run_after
+ last_dagrun_info = (
+ DagRunInfo(run_after=last_run_after, data_interval=None,
partition_date=None, partition_key=None)
+ if last_run_after is not None
+ else None
+ )
+ info = timetable.next_dagrun_info_v2(
+ last_dagrun_info=last_dagrun_info,
+ restriction=TimeRestriction(earliest=earliest, latest=latest,
catchup=catchup),
+ )
+ if expected_run_after is None:
+ assert info is None
+ else:
+ assert info is not None
+ assert info.run_after == expected_run_after
+
+
@pytest.mark.db_test
@pytest.mark.need_serialized_dag
@pytest.mark.parametrize(
diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
index 608f4c0a57f..c2d86ffa854 100644
--- a/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
+++ b/task-sdk/src/airflow/sdk/definitions/timetables/trigger.py
@@ -150,14 +150,9 @@ class CronPartitionTimetable(CronTriggerTimetable):
running every hour, this would run the previous time if less than 6
minutes had past since the previous run time, otherwise it would wait
until the next hour.
-
- # todo: AIP-76 talk about how we can have auto-reprocessing of partitions
- # todo: AIP-76 we could allow a tuple of integer + time-based
-
"""
run_offset: int | datetime.timedelta | relativedelta | None = None
- # todo: AIP-76 we can't infer partition date from this, so we need to
store it separately
key_format: str = r"%Y-%m-%dT%H:%M:%S"
def __init__(
@@ -167,11 +162,9 @@ class CronPartitionTimetable(CronTriggerTimetable):
timezone: str | Timezone | FixedTimezone,
run_offset: int | datetime.timedelta | relativedelta | None = None,
run_immediately: bool | datetime.timedelta = False,
- # todo: AIP-76 we can't infer partition date from this, so we need to
store it separately
key_format: str = r"%Y-%m-%dT%H:%M:%S",
) -> None:
if not isinstance(run_offset, (int, NoneType)):
- # todo: AIP-76 implement timedelta / relative delta?
raise ValueError("Run offset other than integer not supported
yet.")
self.__attrs_init__( # type: ignore[attr-defined]
cron,