This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-3-test by this push:
new b74cbe28135 [v3-3-test] Propagate `partition_date` to consumers of
partitioned assets (#67285) (#68892)
b74cbe28135 is described below
commit b74cbe2813526e4d97be9af11914d0171b557e53
Author: Wei Lee <[email protected]>
AuthorDate: Tue Jun 23 22:21:48 2026 +0800
[v3-3-test] Propagate `partition_date` to consumers of partitioned assets
(#67285) (#68892)
Co-authored-by: Nathan Hadfield <[email protected]>
---
.../docs/authoring-and-scheduling/assets.rst | 14 +-
airflow-core/docs/migrations-ref.rst | 4 +-
airflow-core/docs/templates-ref.rst | 3 +
airflow-core/newsfragments/67285.feature.rst | 1 +
.../api_fastapi/core_api/datamodels/dag_run.py | 1 +
.../core_api/openapi/v2-rest-api-generated.yaml | 7 +
.../execution_api/datamodels/taskinstance.py | 1 +
.../api_fastapi/execution_api/versions/__init__.py | 2 +
.../execution_api/versions/v2026_06_30.py | 14 +
airflow-core/src/airflow/assets/manager.py | 71 ++++-
.../example_dags/example_asset_partition.py | 2 +-
.../src/airflow/jobs/scheduler_job_runner.py | 41 ++-
airflow-core/src/airflow/listeners/types.py | 3 +
...dd_partition_date_to_asset_partition_dag_run.py | 54 ++++
airflow-core/src/airflow/models/asset.py | 1 +
airflow-core/src/airflow/models/dagrun.py | 4 +
airflow-core/src/airflow/models/taskinstance.py | 19 ++
airflow-core/src/airflow/partition_mappers/base.py | 15 +
.../src/airflow/partition_mappers/identity.py | 11 +
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 14 +-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 1 +
airflow-core/src/airflow/utils/db.py | 2 +-
.../core_api/routes/public/test_assets.py | 1 +
.../core_api/routes/public/test_dag_run.py | 9 +
.../execution_api/versions/head/test_dag_runs.py | 1 +
.../versions/head/test_task_instances.py | 1 +
.../versions/v2026_06_30/test_task_instances.py | 96 ++++++
airflow-core/tests/unit/assets/test_manager.py | 161 ++++++++++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 328 ++++++++++++++++++++-
.../tests/unit/partition_mappers/test_base.py | 12 +
.../tests/unit/partition_mappers/test_identity.py | 9 +
.../src/airflowctl/api/datamodels/generated.py | 1 +
.../airflow/providers/standard/operators/python.py | 2 +
.../src/airflow/sdk/api/datamodels/_generated.py | 1 +
task-sdk/src/airflow/sdk/definitions/context.py | 1 +
.../airflow/sdk/execution_time/schema/schema.json | 26 ++
.../src/airflow/sdk/execution_time/task_runner.py | 1 +
task-sdk/src/airflow/sdk/types.py | 1 +
.../task_sdk/execution_time/test_supervisor.py | 2 +
.../task_sdk/execution_time/test_task_runner.py | 32 ++
40 files changed, 940 insertions(+), 30 deletions(-)
diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index 994cae815d3..d039fbfd1ff 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -631,7 +631,19 @@ partition match can be produced, so the downstream Dag is
not triggered for
that key.
Inside partitioned Dag runs, access the resolved partition through
-``dag_run.partition_key``.
+``dag_run.partition_key``. When the consumer's partition mapper can
+resolve the key to a ``datetime``, that value is also available as
+``dag_run.partition_date``, so templates can use
+``{{ partition_date | ds }}``. This covers the ``StartOf*Mapper`` family
+(which decode the key directly), ``IdentityMapper`` (which carries the
+producer's ``partition_date`` through), and composite mappers —
+``RollupMapper``, ``ChainMapper`` and ``FanOutMapper`` — whose effective
+child mapper is temporal (they delegate the anchor to that child).
+Mappers whose key carries no temporal meaning (``ProductMapper``,
+``AllowedKeyMapper`` and custom mappers that do not implement
+``to_partition_date``) leave ``partition_date`` ``None`` even when the
+resulting key is date-shaped, so those consumers should keep parsing
+``partition_key``.
You can also trigger a DagRun manually with a partition key (for example,
through the Trigger Dag window in the UI, or through the REST API by
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index a69a8aa2058..d2cfde62352 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``9ff64e1c35d3`` (head) | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes
on dag_run.created_dag_version_id and |
+| ``d2f4e1b3c5a7`` (head) | ``9ff64e1c35d3`` | ``3.3.0`` | Add
partition_date to asset_partition_dag_run. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``9ff64e1c35d3`` | ``dd5f3a8e2b91`` | ``3.3.0`` | Add indexes
on dag_run.created_dag_version_id and |
| | | |
task_instance.dag_version_id. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``dd5f3a8e2b91`` | ``c20871fbf23a`` | ``3.3.0`` | Add
rollup_fingerprint to AssetPartitionDagRun and index |
diff --git a/airflow-core/docs/templates-ref.rst
b/airflow-core/docs/templates-ref.rst
index b1609ff0c94..aeb907dbe47 100644
--- a/airflow-core/docs/templates-ref.rst
+++ b/airflow-core/docs/templates-ref.rst
@@ -87,6 +87,9 @@ Variable Type
Description
| is enabled
in ``airflow.cfg``.
``{{ partition_key }}`` str | None | The
partition key from the current :class:`~airflow.models.dagrun.DagRun`.
| Returns
``None`` if no partition key was set. Added in version 3.3.0.
+``{{ partition_date }}`` datetime | None | The
partition datetime from the current :class:`~airflow.models.dagrun.DagRun`.
+ | Use ``{{
partition_date | ds }}`` and related filters for formatting.
+ | Returns
``None`` if no partition date was set. Added in version 3.3.0.
``{{ var.value }}`` Airflow
variables. See `Airflow Variables in Templates`_ below.
``{{ var.json }}`` Airflow
variables. See `Airflow Variables in Templates`_ below.
``{{ conn }}`` Airflow
connections. See `Airflow Connections in Templates`_ below.
diff --git a/airflow-core/newsfragments/67285.feature.rst
b/airflow-core/newsfragments/67285.feature.rst
new file mode 100644
index 00000000000..2c4370d5ff6
--- /dev/null
+++ b/airflow-core/newsfragments/67285.feature.rst
@@ -0,0 +1 @@
+Propagate ``partition_date`` from producer DagRuns to consumers of partitioned
assets, so date-shaped partitions are available in consumer task templates.
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 033437e2ad9..0a530578644 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -119,6 +119,7 @@ class DAGRunResponse(BaseModel):
bundle_version: str | None
dag_display_name: str = Field(validation_alias=AliasPath("dag_model",
"dag_display_name"))
partition_key: str | None
+ partition_date: datetime | None
class DAGRunCollectionResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index f82e665c9be..639f6fb9904 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -13704,6 +13704,12 @@ components:
- type: string
- type: 'null'
title: Partition Key
+ partition_date:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Partition Date
type: object
required:
- dag_run_id
@@ -13727,6 +13733,7 @@ components:
- bundle_version
- dag_display_name
- partition_key
+ - partition_date
title: DAGRunResponse
description: Dag Run serializer for responses.
DAGRunsBatchBody:
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index fe7f5a5ce05..bf20bcbc30c 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -341,6 +341,7 @@ class DagRun(StrictBaseModel):
triggering_user_name: str | None = None
consumed_asset_events: list[AssetEventDagRunReference]
partition_key: str | None
+ partition_date: UtcDateTime | None = None
note: str | None = None
team_name: str | None = None
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 332ddb28704..dc7035d31e3 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -44,6 +44,7 @@ from airflow.api_fastapi.execution_api.versions.v2026_06_30
import (
AddAssetsByAliasEndpoint,
AddAwaitingInputStatePayload,
AddConnectionTestEndpoint,
+ AddPartitionDateField,
AddRetryPolicyFields,
AddTaskAndAssetStateStoreEndpoints,
AddTaskInstanceQueueField,
@@ -63,6 +64,7 @@ bundle = VersionBundle(
AddTeamNameField,
AddTaskAndAssetStateStoreEndpoints,
AddAssetsByAliasEndpoint,
+ AddPartitionDateField,
),
Version(
"2026-04-06",
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
index cbd801c0a9b..e89e2ed04cc 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
@@ -127,3 +127,17 @@ class AddTaskAndAssetStateStoreEndpoints(VersionChange):
endpoint("/store/asset/by-uri/value", ["DELETE"]).didnt_exist,
endpoint("/store/asset/by-uri/clear", ["DELETE"]).didnt_exist,
)
+
+
+class AddPartitionDateField(VersionChange):
+ """Expose the consumer DagRun's partition datetime on the execution API so
consumer tasks can template it."""
+
+ description = __doc__
+
+ instructions_to_migrate_to_previous_version =
(schema(DagRun).field("partition_date").didnt_exist,)
+
+ @convert_response_to_previous_version_for(TIRunContext) # type:
ignore[arg-type]
+ def remove_partition_date_from_dag_run(response: ResponseInfo) -> None: #
type: ignore[misc]
+ """Strip ``partition_date`` from the nested ``dag_run`` payload for
older clients."""
+ if "dag_run" in response.body and isinstance(response.body["dag_run"],
dict):
+ response.body["dag_run"].pop("partition_date", None)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index c8d2edef4f1..d3399cc6e00 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -49,6 +49,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
if TYPE_CHECKING:
+ from datetime import datetime
+
from sqlalchemy.orm.session import Session
from airflow.models.dag import DagModel
@@ -274,6 +276,7 @@ class AssetManager(LoggingMixin):
source_alias_names: Collection[str] = (),
session: Session,
partition_key: str | None = None,
+ partition_date: datetime | None = None,
source_is_api: bool = False,
api_user_teams: set[str] | None = None,
api_allow_consumer_teams: list[str] | None = None,
@@ -394,6 +397,7 @@ class AssetManager(LoggingMixin):
source_map_index=asset_event.source_map_index,
source_aliases=[aam.to_serialized() for aam in
asset_alias_models],
partition_key=partition_key,
+ partition_date=partition_date,
)
)
@@ -440,6 +444,7 @@ class AssetManager(LoggingMixin):
asset_id=asset_model.id,
dags_to_queue=dags_to_queue,
partition_key=partition_key,
+ partition_date=partition_date,
event=asset_event,
task_instance=task_instance,
session=session,
@@ -485,6 +490,7 @@ class AssetManager(LoggingMixin):
asset_id: int,
dags_to_queue: set[DagModel],
partition_key: str | None,
+ partition_date: datetime | None,
event: AssetEvent,
task_instance: TaskInstance | None,
session: Session,
@@ -499,6 +505,7 @@ class AssetManager(LoggingMixin):
partition_dags=partition_dags,
event=event,
partition_key=partition_key,
+ partition_date=partition_date,
task_instance=task_instance,
session=session,
)
@@ -527,6 +534,7 @@ class AssetManager(LoggingMixin):
partition_dags: Iterable[DagModel],
event: AssetEvent,
partition_key: str | None,
+ partition_date: datetime | None,
task_instance: TaskInstance | None,
session: Session,
) -> None:
@@ -574,9 +582,9 @@ class AssetManager(LoggingMixin):
if (asset_model :=
session.scalar(select(AssetModel).where(AssetModel.id == asset_id))) is None:
raise RuntimeError(f"Could not find asset for
asset_id={asset_id}")
+ mapper = timetable.get_partition_mapper(name=asset_model.name,
uri=asset_model.uri)
try:
# We'll need to catch every possible exception happen when
mapping partition_key.
- mapper = timetable.get_partition_mapper(name=asset_model.name,
uri=asset_model.uri)
target_key = mapper.to_downstream(partition_key)
except Exception as err:
log.exception(
@@ -643,9 +651,31 @@ class AssetManager(LoggingMixin):
)
continue
+ # The producer's partition_date (threaded in from its DagRun via
+ # register_asset_change) is carried onto the APDR only by mappers
that
+ # opt in. IdentityMapper does, since its key carries no temporal
meaning
+ # for the scheduler to re-derive at run creation; temporal and
composite
+ # mappers return None here and are resolved from the key by the
scheduler
+ # via PartitionMapper.to_partition_date.
+ target_partition_date: datetime | None
+ try:
+ target_partition_date =
mapper.carry_partition_date(partition_date)
+ except Exception:
+ # A custom mapper override may raise. Mirror the to_downstream
handling
+ # above and degrade rather than abort the whole write: the
consumer is
+ # still queued via partition_key, just without a carried
partition_date.
+ log.exception(
+ "Partition mapper carry_partition_date failed; consumer
partition_date will be None.",
+ partition_key=partition_key,
+ asset=asset_model,
+ target_dag=target_dag,
+ )
+ target_partition_date = None
+
for target_key in target_keys:
apdr = cls._get_or_create_apdr(
target_key=target_key,
+ target_partition_date=target_partition_date,
target_dag=target_dag,
rollup_fingerprint=fingerprint,
asset_id=asset_id,
@@ -666,6 +696,7 @@ class AssetManager(LoggingMixin):
cls,
*,
target_key: str,
+ target_partition_date: datetime | None,
target_dag: DagModel,
rollup_fingerprint: dict,
asset_id: int,
@@ -683,6 +714,20 @@ class AssetManager(LoggingMixin):
``rollup_fingerprint`` is the serialized mapper / window definition
for all partitioned
assets in the timetable at creation time; the scheduler discards APDRs
whose stamp no
longer matches the current timetable's fingerprint (mapper / window
may have changed).
+
+ Reconciling the carried ``partition_date`` on an existing pending APDR
is best-effort:
+ a partitioned consumer's feeding assets are expected to agree on the
partition's
+ datetime. The carry only matters for ``IdentityMapper`` (whose key the
scheduler
+ cannot decode); temporal/composite feeds re-derive the date from the
key at run
+ creation regardless of what is stored here. Within that contract:
+
+ - If the APDR carries no date yet (``None`` — created by an event that
carried none),
+ adopt the incoming date when this event carries one. There is
nothing to conflict
+ with, so a later identity event's date is not dropped.
+ - If the APDR already carries a date and this event carries a
**different** non-null
+ one, the producing assets disagree; picking one would be
order-dependent, so the
+ carried date is suppressed to ``None`` (and re-adoptable by a later
event).
+ - Otherwise (the dates agree, or this event carries none) the existing
value is kept.
"""
with _lock_asset_model(session=session, asset_id=asset_id):
latest_apdr: AssetPartitionDagRun | None = session.scalar(
@@ -695,6 +740,29 @@ class AssetManager(LoggingMixin):
.limit(1)
)
if latest_apdr and latest_apdr.created_dag_run_id is None:
+ existing_partition_date = latest_apdr.partition_date
+ if existing_partition_date is None:
+ # No carried date yet; adopt the incoming one if present
(no conflict
+ # to resolve). Keeps a later identity event's date from
being dropped.
+ if target_partition_date is not None:
+ latest_apdr.partition_date = target_partition_date
+ session.flush()
+ elif target_partition_date is not None and
existing_partition_date != target_partition_date:
+ # Two contributing events carry conflicting
partition_dates for the same
+ # (target_key, target_dag). Choosing one would be
order-dependent, so
+ # suppress: the consumer DagRun gets partition_date=None
rather than a
+ # wrong, unstable value.
+ log.warning(
+ "Conflicting partition_date carried for the same
target key; "
+ "suppressing it so the consumer DagRun's
partition_date is None. "
+ "The producing assets likely disagree on the
partition's datetime.",
+ target_dag_id=target_dag.dag_id,
+ target_key=target_key,
+ existing_partition_date=existing_partition_date,
+ incoming_partition_date=target_partition_date,
+ )
+ latest_apdr.partition_date = None
+ session.flush()
cls.logger().debug(
"Existing APDR found for key %s dag_id %s",
target_key,
@@ -707,6 +775,7 @@ class AssetManager(LoggingMixin):
target_dag_id=target_dag.dag_id,
created_dag_run_id=None,
partition_key=target_key,
+ partition_date=target_partition_date,
rollup_fingerprint=rollup_fingerprint,
)
session.add(apdr)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 549db8cef96..a7929ed9258 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -106,7 +106,7 @@ with DAG(
"""Merge the aligned hourly partitions into a combined dataset."""
if TYPE_CHECKING:
assert dag_run
- print(dag_run.partition_key)
+ print(dag_run.partition_key, dag_run.partition_date)
combine_player_stats()
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d7f8006317a..2f94d480eb6 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2044,13 +2044,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
asset_infos: Iterable[tuple[str, str]],
partition_key: str,
dag_id: str,
+ carried_partition_date: datetime | None,
) -> datetime | None:
"""
- Return the temporal anchor (period-start datetime) for *partition_key*.
+ Return the ``partition_date`` the consumer Dag run should be created
with.
- Resolves the temporal anchor (period-start datetime) for
*partition_key*
- across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets
- that contributed to it. Each upstream mapper resolves the key via
+ The temporal anchor (period-start datetime) is resolved for
+ *partition_key* across *asset_infos* — the ``(name, uri)`` pairs of the
+ upstream assets that contributed to it. Each upstream mapper resolves
the
+ key via
:meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`:
temporal mappers decode the key, composite mappers delegate to their
child, and non-temporal mappers (e.g.
@@ -2059,16 +2061,19 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
A partitioned consumer has a single partition identity, so every
temporal
mapper feeding it must resolve the same key to the same instant.
Anchors
are compared by instant (timezone-aware), so equivalent moments
collapse
- to one. When the temporal mappers agree, that anchor is returned; when
- they disagree — a misconfiguration, e.g. assets mapping the same key
under
- different timezones — ``partition_date`` is left unset and a warning is
- logged rather than silently picking one by scan order. Returns
``None`` if
- no mapper is temporal.
-
- A failure in any mapper aborts the whole resolution and returns
``None``
- (logged) — anchors accumulated from earlier mappers are discarded
rather
- than used as a partial result, since a partial set could hide a
conflict.
- A broken mapper must not crash the scheduler tick.
+ to one. When the temporal mappers agree, that anchor is returned.
+
+ When no temporal mapper contributes at all — an identity key carries no
+ temporal meaning and cannot be decoded back into a date — the
producer's
+ source date carried on the APDR at queue time
(*carried_partition_date*,
+ set only for ``IdentityMapper``) is returned instead.
+
+ When temporal mappers were present but produced no usable anchor — they
+ disagreed (a misconfiguration, e.g. assets mapping the same key under
+ different timezones) or one raised — the conflict/error is logged and
+ ``None`` is returned. The carried date is deliberately *not*
substituted
+ here: stamping it would mask the logged suppression. A broken mapper
must
+ not crash the scheduler tick.
"""
anchors: set[datetime] = set()
try:
@@ -2086,7 +2091,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
return None
if not anchors:
- return None
+ # No temporal mapper contributed an anchor (e.g. an
all-IdentityMapper feed),
+ # so fall back to the date carried on the APDR. A partitioned
consumer's feeding
+ # assets are expected to agree on the partition's datetime; when a
temporal mapper
+ # *does* resolve an anchor it takes precedence over the carried
identity date,
+ # since the key is the authoritative source the scheduler can
re-derive.
+ return carried_partition_date
if len(anchors) > 1:
self.log.warning(
"Upstream partition mappers resolved conflicting
partition_date values for the same "
@@ -2288,6 +2298,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
asset_infos=asset_info_per_apdr[apdr.id].values(),
partition_key=apdr.partition_key,
dag_id=apdr.target_dag_id,
+ carried_partition_date=apdr.partition_date,
)
dag_run = dag.create_dagrun(
run_id=DagRun.generate_run_id(
diff --git a/airflow-core/src/airflow/listeners/types.py
b/airflow-core/src/airflow/listeners/types.py
index 120b8ef503a..fa9ebdafaca 100644
--- a/airflow-core/src/airflow/listeners/types.py
+++ b/airflow-core/src/airflow/listeners/types.py
@@ -23,6 +23,8 @@ from typing import TYPE_CHECKING
import attrs
if TYPE_CHECKING:
+ from datetime import datetime
+
from pydantic import JsonValue
from airflow.serialization.definitions.assets import SerializedAsset,
SerializedAssetAlias
@@ -40,3 +42,4 @@ class AssetEvent:
source_map_index: int | None
source_aliases: list[SerializedAssetAlias]
partition_key: str | None
+ partition_date: datetime | None = None
diff --git
a/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py
b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py
new file mode 100644
index 00000000000..ba483eb88e0
--- /dev/null
+++
b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add partition_date to asset_partition_dag_run.
+
+The target datetime is frozen at APDR creation time so the consumer DagRun's
+``partition_date`` is consistent with the partition mapper that produced its
+``partition_key``.
+
+Revision ID: d2f4e1b3c5a7
+Revises: 9ff64e1c35d3
+Create Date: 2026-05-21 09:00:00.000000
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+revision = "d2f4e1b3c5a7"
+down_revision = "9ff64e1c35d3"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Add partition_date column to asset_partition_dag_run."""
+ with op.batch_alter_table("asset_partition_dag_run", schema=None) as
batch_op:
+ batch_op.add_column(sa.Column("partition_date", UtcDateTime,
nullable=True))
+
+
+def downgrade():
+ """Remove partition_date column from asset_partition_dag_run."""
+ with op.batch_alter_table("asset_partition_dag_run", schema=None) as
batch_op:
+ batch_op.drop_column("partition_date")
diff --git a/airflow-core/src/airflow/models/asset.py
b/airflow-core/src/airflow/models/asset.py
index 34e4ffaec3e..60256f9cd88 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -921,6 +921,7 @@ class AssetPartitionDagRun(Base):
target_dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
created_dag_run_id: Mapped[int | None] = mapped_column(Integer(),
nullable=True)
partition_key: Mapped[str] = mapped_column(StringID(), nullable=False)
+ partition_date: Mapped[datetime | None] = mapped_column(UtcDateTime,
nullable=True)
# Serialized snapshot of the rollup definition (mapper + window for every
# partitioned asset in the timetable) at the time this APDR was created.
# The scheduler discards APDRs whose stored fingerprint no longer matches
diff --git a/airflow-core/src/airflow/models/dagrun.py
b/airflow-core/src/airflow/models/dagrun.py
index ab024142918..3f1a493476e 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -433,6 +433,7 @@ class DagRun(Base, LoggingMixin):
conf=self.conf,
consumed_asset_events=[],
partition_key=self.partition_key,
+ partition_date=self.partition_date,
)
@property
@@ -1089,6 +1090,9 @@ class DagRun(Base, LoggingMixin):
attributes["airflow.dag_run.logical_date"] =
str(self.logical_date)
if self.partition_key:
attributes["airflow.dag_run.partition_key"] =
str(self.partition_key)
+ if self.partition_date:
+ attributes["airflow.dag_run.partition_date"] =
self.partition_date.isoformat()
+
# TODO: make the empty parent context optional. Default should be
to
# nest the dag run span under the currently active parent span (by
# omitting `context` here); only use the empty `context.Context()`
to
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 740596f9d69..011fc9933f7 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1546,6 +1546,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
OutletEventPayload(extra=outlet_event["extra"],
partition_key=partition_key)
)
dag_run_partition_key = ti.dag_run.partition_key
+ dag_run_partition_date = ti.dag_run.partition_date
asset_keys = {
SerializedAssetUniqueKey(o.name, o.uri)
@@ -1581,6 +1582,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
asset=am,
extra=None,
partition_key=dag_run_partition_key,
+ partition_date=dag_run_partition_date,
session=session,
)
return
@@ -1588,11 +1590,26 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
effective_pk = (
payload.partition_key if payload.partition_key is not None
else dag_run_partition_key
)
+ # Carry partition_date only when the effective key matches the
+ # DagRun's — the run-level date refers to the run-level key and
+ # would mis-label an event emitted for a different partition.
+ if effective_pk == dag_run_partition_key:
+ payload_partition_date = dag_run_partition_date
+ else:
+ payload_partition_date = None
+ if dag_run_partition_date is not None:
+ ti.log.debug(
+ "Task-emitted partition_key %r differs from DagRun
partition_key %r; "
+ "consumer partition_date will be None.",
+ payload.partition_key,
+ dag_run_partition_key,
+ )
asset_manager.register_asset_change(
task_instance=ti,
asset=am,
extra=payload.extra,
partition_key=effective_pk,
+ partition_date=payload_partition_date,
session=session,
)
@@ -1676,6 +1693,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
source_alias_names=event_aliase_names,
extra=asset_event_extra,
partition_key=dag_run_partition_key,
+ partition_date=dag_run_partition_date,
session=session,
)
if event is None:
@@ -1688,6 +1706,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
source_alias_names=event_aliase_names,
extra=asset_event_extra,
partition_key=dag_run_partition_key,
+ partition_date=dag_run_partition_date,
session=session,
)
diff --git a/airflow-core/src/airflow/partition_mappers/base.py
b/airflow-core/src/airflow/partition_mappers/base.py
index 0be82d8f6c5..f9d05b6d8d3 100644
--- a/airflow-core/src/airflow/partition_mappers/base.py
+++ b/airflow-core/src/airflow/partition_mappers/base.py
@@ -117,6 +117,21 @@ class PartitionMapper(ABC):
"""
return None
+ def carry_partition_date(self, source_partition_date: datetime | None) ->
datetime | None:
+ """
+ Return the producer's ``partition_date`` to carry onto the consumer
APDR.
+
+ Captured at queue time as an asset event arrives,
*source_partition_date*
+ is the producing run's ``partition_date``. The base implementation
returns
+ ``None``: for most mappers the consumer's date is derived from its own
+ downstream key by :meth:`to_partition_date` at run creation, not
carried
+ from the producer.
+ :class:`~airflow.partition_mappers.identity.IdentityMapper` overrides
to
+ pass it through, since the consumer's key equals the producer's and the
+ key carries no temporal meaning to decode.
+ """
+ return None
+
def serialize(self) -> dict[str, Any]:
if self.max_downstream_keys is None:
return {}
diff --git a/airflow-core/src/airflow/partition_mappers/identity.py
b/airflow-core/src/airflow/partition_mappers/identity.py
index e0b2e25b1ae..c538747e183 100644
--- a/airflow-core/src/airflow/partition_mappers/identity.py
+++ b/airflow-core/src/airflow/partition_mappers/identity.py
@@ -16,11 +16,22 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from airflow.partition_mappers.base import PartitionMapper
+if TYPE_CHECKING:
+ from datetime import datetime
+
class IdentityMapper(PartitionMapper):
"""Partition mapper that does not change the key."""
def to_downstream(self, key: str) -> str:
return key
+
+ def carry_partition_date(self, source_partition_date: datetime | None) ->
datetime | None:
+ # Identity passthrough: the consumer's key equals the producer's, so
the
+ # producer's partition_date is the consumer's. to_partition_date cannot
+ # recover it (the key carries no temporal meaning), so it is carried
here.
+ return source_partition_date
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 413dbef3d7e..1d5cfc5d86b 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3473,10 +3473,22 @@ export const $DAGRunResponse = {
}
],
title: 'Partition Key'
+ },
+ partition_date: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Partition Date'
}
},
type: 'object',
- required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at',
'start_date', 'end_date', 'duration', 'data_interval_start',
'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type',
'state', 'triggered_by', 'triggering_user_name', 'conf', 'note',
'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key'],
+ required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at',
'start_date', 'end_date', 'duration', 'data_interval_start',
'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type',
'state', 'triggered_by', 'triggering_user_name', 'conf', 'note',
'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key',
'partition_date'],
title: 'DAGRunResponse',
description: 'Dag Run serializer for responses.'
} as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 31f716d85f8..f8cbb1fbc18 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -917,6 +917,7 @@ export type DAGRunResponse = {
bundle_version: string | null;
dag_display_name: string;
partition_key: string | null;
+ partition_date: string | null;
};
/**
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 2155ca50d33..f24f3636132 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
"3.2.0": "1d6611b6ab7c",
- "3.3.0": "9ff64e1c35d3",
+ "3.3.0": "d2f4e1b3c5a7",
}
# Prefix used to identify tables holding data moved during migration.
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 89957e179c2..b65036b72d9 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1543,6 +1543,7 @@ class TestPostAssetMaterialize(TestAssets):
"dag_versions": mock.ANY,
"logical_date": None,
"partition_key": None,
+ "partition_date": None,
"queued_at": mock.ANY,
"run_after": mock.ANY,
"start_date": None,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index ad11f764d48..b798e67f71e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -167,6 +167,9 @@ def setup(request, dag_maker, *, session=None):
# The value uses the ProductMapper default delimiter (|) to form a
composite key so we can
# verify that the filter treats | as a literal character, not an OR
separator.
dag_run1.partition_key = "2026-01-01|us"
+ # Set a real partition_date so the GET/list responses exercise the
serialized
+ # (non-None) partition_date path, not just the None case.
+ dag_run1.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc)
dag_run1.note = (DAG1_RUN1_NOTE, "not_test")
# Set end_date for testing duration filter
dag_run1.end_date = dag_run1.start_date + timedelta(seconds=101)
@@ -317,6 +320,9 @@ def get_dag_run_dict(run: DagRun):
"note": run.note,
"dag_versions": get_dag_versions_dict(run.dag_versions),
"partition_key": run.partition_key,
+ "partition_date": from_datetime_to_zulu_without_ms(run.partition_date)
+ if run.partition_date
+ else None,
}
@@ -2371,6 +2377,7 @@ class TestTriggerDagRun:
"triggered_by": "rest_api",
"triggering_user_name": "test",
"partition_key": None,
+ "partition_date": None,
}
assert response.json() == expected_response_json
@@ -2601,6 +2608,7 @@ class TestTriggerDagRun:
"conf": {},
"note": note,
"partition_key": None,
+ "partition_date": None,
}
assert response_2.status_code == 409
@@ -2690,6 +2698,7 @@ class TestTriggerDagRun:
"conf": {},
"note": None,
"partition_key": None,
+ "partition_date": None,
}
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 5d3c37c9b52..1f448193046 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -421,6 +421,7 @@ class TestDagRunDetail:
"end_date": "2025-12-13T00:00:00Z",
"logical_date": None,
"partition_key": None,
+ "partition_date": None,
"run_after": "2025-12-13T00:00:00Z",
"run_id": "previous",
"run_type": "manual",
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 395837c0e61..6463efd068e 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -236,6 +236,7 @@ class TestTIRunState:
"triggering_user_name": None,
"consumed_asset_events": [],
"partition_key": None,
+ "partition_date": None,
"note": None,
"team_name": None,
},
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py
new file mode 100644
index 00000000000..b2cc016be84
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow._shared.timezones import timezone
+from airflow.utils.state import DagRunState, State
+
+from tests_common.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+TIMESTAMP_STR = "2024-09-30T12:00:00Z"
+TIMESTAMP = timezone.parse(TIMESTAMP_STR)
+PARTITION_DATE = timezone.parse("2026-05-20T01:00:00")
+
+RUN_PATCH_BODY = {
+ "state": "running",
+ "hostname": "h",
+ "unixname": "u",
+ "pid": 1,
+ "start_date": TIMESTAMP_STR,
+}
+
+
[email protected]
+def old_ver_client(client):
+ """Execution API version immediately before ``partition_date`` was
added."""
+ client.headers["Airflow-API-Version"] = "2026-06-16"
+ return client
+
+
+class TestPartitionDateFieldBackwardCompat:
+ @pytest.fixture(autouse=True)
+ def _freeze_time(self, time_machine):
+ time_machine.move_to(TIMESTAMP_STR, tick=False)
+
+ def setup_method(self):
+ clear_db_runs()
+
+ def teardown_method(self):
+ clear_db_runs()
+
+ def test_old_version_strips_partition_date_from_dag_run(
+ self, old_ver_client, session, create_task_instance
+ ):
+ ti = create_task_instance(
+ task_id="test_partition_date_downgrade",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.dag_run.partition_key = "2026-05-20"
+ ti.dag_run.partition_date = PARTITION_DATE
+ session.commit()
+
+ response =
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+ assert response.status_code == 200
+ dag_run = response.json()["dag_run"]
+ assert dag_run["partition_key"] == "2026-05-20"
+ assert "partition_date" not in dag_run
+
+ def test_head_version_includes_partition_date_field(self, client, session,
create_task_instance):
+ ti = create_task_instance(
+ task_id="test_partition_date_head",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ start_date=TIMESTAMP,
+ )
+ ti.dag_run.partition_key = "2026-05-20"
+ ti.dag_run.partition_date = PARTITION_DATE
+ session.commit()
+
+ response = client.patch(f"/execution/task-instances/{ti.id}/run",
json=RUN_PATCH_BODY)
+ assert response.status_code == 200
+ dag_run = response.json()["dag_run"]
+ assert dag_run["partition_key"] == "2026-05-20"
+ assert dag_run["partition_date"] ==
PARTITION_DATE.isoformat().replace("+00:00", "Z")
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index 40d9401b9ff..b8d47fb4d3c 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -30,6 +30,7 @@ from sqlalchemy.orm import Session
from airflow import settings
from airflow._shared.observability.metrics.base_stats_logger import StatsLogger
+from airflow._shared.timezones import timezone
from airflow.assets.manager import AssetManager
from airflow.models.asset import (
AssetAliasModel,
@@ -44,6 +45,7 @@ from airflow.models.dag import DAG, DagModel
from airflow.models.dagbundle import DagBundleModel
from airflow.models.log import Log
from airflow.models.team import Team
+from airflow.partition_mappers.identity import IdentityMapper
from airflow.partition_mappers.temporal import FanOutMapper, StartOfWeekMapper
from airflow.partition_mappers.window import WeekWindow
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -260,6 +262,7 @@ class TestAssetManager:
try:
return AssetManager._get_or_create_apdr(
target_key="test_partition_key",
+ target_partition_date=None,
target_dag=testing_dag,
rollup_fingerprint=rollup_fingerprint,
asset_id=asm.id,
@@ -282,6 +285,164 @@ class TestAssetManager:
assert len(set(ids)) == 1
assert
session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1
+ @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+ def test_get_or_create_apdr_suppresses_conflicting_partition_date(self,
session):
+ """Two events resolving the same target key to different dates →
suppress to None.
+
+ Rather than an order-dependent first-event-wins, conflicting carried
dates produce a
+ deterministic ``None`` so the consumer DagRun is not stamped with a
wrong, unstable date.
+ """
+ asm = AssetModel(uri="test://asset1/", name="partition_asset",
group="asset")
+ testing_dag = DagModel(dag_id="testing_dag_pd_conflict",
is_stale=False, bundle_name="testing")
+ session.add_all([asm, testing_dag])
+ session.commit()
+ fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var":
{}}}
+
+ first = AssetManager._get_or_create_apdr(
+ target_key="2026-05-20",
+ target_partition_date=timezone.parse("2026-05-20T00:00:00"),
+ target_dag=testing_dag,
+ rollup_fingerprint=fp,
+ asset_id=asm.id,
+ session=session,
+ )
+ assert first.partition_date == timezone.parse("2026-05-20T00:00:00")
+
+ # A second contributing event resolves the same key to a DIFFERENT
date.
+ second = AssetManager._get_or_create_apdr(
+ target_key="2026-05-20",
+ target_partition_date=timezone.parse("2026-05-21T00:00:00"),
+ target_dag=testing_dag,
+ rollup_fingerprint=fp,
+ asset_id=asm.id,
+ session=session,
+ )
+ assert second.id == first.id # same pending APDR
+ assert second.partition_date is None # conflict suppressed,
deterministic
+
+ @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+ def test_get_or_create_apdr_keeps_agreeing_partition_date(self, session):
+ """A later event carrying the same (or no) date does not trip the
conflict suppression."""
+ asm = AssetModel(uri="test://asset1/", name="partition_asset",
group="asset")
+ testing_dag = DagModel(dag_id="testing_dag_pd_agree", is_stale=False,
bundle_name="testing")
+ session.add_all([asm, testing_dag])
+ session.commit()
+ fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var":
{}}}
+ source_date = timezone.parse("2026-05-20T00:00:00")
+
+ kwargs = dict(
+ target_key="2026-05-20",
+ target_dag=testing_dag,
+ rollup_fingerprint=fp,
+ asset_id=asm.id,
+ session=session,
+ )
+ first =
AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs)
+ # Same date agrees → kept.
+ same =
AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs)
+ assert same.id == first.id
+ assert same.partition_date == source_date
+ # A None-carrying event (e.g. a temporal mapper, resolved by the
scheduler) is not a
+ # conflict → the existing date is kept.
+ with_none =
AssetManager._get_or_create_apdr(target_partition_date=None, **kwargs)
+ assert with_none.id == first.id
+ assert with_none.partition_date == source_date
+
+ @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+ def test_get_or_create_apdr_adopts_date_when_existing_is_none(self,
session):
+ """An APDR created with no date adopts a later event's carried date
(not dropped)."""
+ asm = AssetModel(uri="test://asset1/", name="partition_asset",
group="asset")
+ testing_dag = DagModel(dag_id="testing_dag_pd_adopt", is_stale=False,
bundle_name="testing")
+ session.add_all([asm, testing_dag])
+ session.commit()
+ fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var":
{}}}
+ source_date = timezone.parse("2026-05-20T00:00:00")
+
+ kwargs = dict(
+ target_key="2026-05-20",
+ target_dag=testing_dag,
+ rollup_fingerprint=fp,
+ asset_id=asm.id,
+ session=session,
+ )
+ # First event carries no date (e.g. producer had no partition_date).
+ first = AssetManager._get_or_create_apdr(target_partition_date=None,
**kwargs)
+ assert first.partition_date is None
+ # A later identity event carries a real date → adopted, not silently
dropped.
+ adopted =
AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs)
+ assert adopted.id == first.id
+ assert adopted.partition_date == source_date
+
+ @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+ def test_get_or_create_apdr_recovers_after_conflict(self, session):
+ """Once a conflict has suppressed the date to None, a later event
re-adopts a date."""
+ asm = AssetModel(uri="test://asset1/", name="partition_asset",
group="asset")
+ testing_dag = DagModel(dag_id="testing_dag_pd_recover",
is_stale=False, bundle_name="testing")
+ session.add_all([asm, testing_dag])
+ session.commit()
+ fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var":
{}}}
+ date_1 = timezone.parse("2026-05-20T00:00:00")
+ date_2 = timezone.parse("2026-05-21T00:00:00")
+
+ kwargs = dict(
+ target_key="2026-05-20",
+ target_dag=testing_dag,
+ rollup_fingerprint=fp,
+ asset_id=asm.id,
+ session=session,
+ )
+ first = AssetManager._get_or_create_apdr(target_partition_date=date_1,
**kwargs)
+ assert first.partition_date == date_1
+ # Conflicting date suppresses to None.
+ conflicted =
AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs)
+ assert conflicted.partition_date is None
+ # A subsequent event re-adopts (suppression is not permanently sticky).
+ recovered =
AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs)
+ assert recovered.id == first.id
+ assert recovered.partition_date == date_2
+
+ @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+ def test_carry_partition_date_failure_degrades_to_none(self, session,
dag_maker, mock_task_instance):
+ """A mapper whose carry_partition_date raises must not abort the write.
+
+ The consumer is still queued via partition_key; only the carried
partition_date is lost
+ (set to None), mirroring how a to_downstream failure is caught and
handled in the loop.
+ """
+ _clear_partition_db()
+
+ asset_def = Asset(uri="s3://bucket/carry_raise", name="carry_raise")
+ with dag_maker(
+ dag_id="carry_raise_consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_def,
+ partition_mapper_config={asset_def: IdentityMapper()},
+ ),
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+ dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ with (
+ mock.patch.object(IdentityMapper, "carry_partition_date",
side_effect=RuntimeError("boom")),
+ mock.patch("airflow.assets.manager.log") as mock_log,
+ ):
+ AssetManager.register_asset_change(
+ task_instance=mock_task_instance,
+ asset=asset_def,
+ session=session,
+ partition_key="2026-05-20",
+ partition_date=timezone.parse("2026-05-20T00:00:00"),
+ )
+ session.flush()
+
+ # Write not aborted: the consumer is still queued...
+ apdr = session.scalar(select(AssetPartitionDagRun))
+ assert apdr is not None
+ # ...but the failed carry degraded to None instead of propagating.
+ assert apdr.partition_date is None
+ mock_log.exception.assert_called_once()
+
@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("testing_dag_bundle")
def test_queue_partitioned_dags_stamps_rollup_fingerprint(self, session,
dag_maker):
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index f905fccb734..3b950f71d5b 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -10250,6 +10250,7 @@ def _produce_and_register_asset_event(
session: Session,
dag_maker: DagMaker,
expected_partition_key: str | None = None,
+ partition_date: datetime.datetime | None = None,
) -> AssetPartitionDagRun:
if expected_partition_key is None:
expected_partition_key = partition_key
@@ -10257,7 +10258,11 @@ def _produce_and_register_asset_event(
with dag_maker(dag_id=dag_id, schedule=PartitionAtRuntime(),
session=session) as dag:
EmptyOperator(task_id="hi", outlets=[asset])
- dr = dag_maker.create_dagrun(partition_key=partition_key, session=session)
+ dr = dag_maker.create_dagrun(
+ partition_key=partition_key,
+ partition_date=partition_date,
+ session=session,
+ )
[ti] = dr.get_task_instances(session=session)
session.commit()
@@ -10479,6 +10484,289 @@ def test_partitioned_dag_run_with_customized_mapper(
assert asset_event.source_run_id == "test"
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_identity_passthrough(dag_maker:
DagMaker, session: Session):
+ """IdentityMapper can't reconstruct a date from its key, so the scheduler
resolver falls
+ back to the producer's source date carried on the APDR and stamps it on
the consumer DagRun.
+
+ Temporal and composite mappers are resolved by the scheduler via
to_partition_date (covered
+ by the partition_mapper and resolver tests); this exercises the
IdentityMapper carry, which
+ is the one case the scheduler cannot resolve from the key alone.
+ """
+ asset_1 = Asset(name="asset-1")
+ source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC")
+
+ with dag_maker(
+ dag_id="asset-event-consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_1,
+ default_partition_mapper=IdentityMapper(),
+ ),
+ session=session,
+ ):
+ EmptyOperator(task_id="hi")
+ session.commit()
+
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+
+ apdr = _produce_and_register_asset_event(
+ dag_id="asset-event-producer",
+ asset=asset_1,
+ partition_key="2026-05-20T01:00:00",
+ partition_date=source_partition_date,
+ session=session,
+ dag_maker=dag_maker,
+ expected_partition_key="2026-05-20T01:00:00",
+ )
+ partition_dags =
runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+ session.refresh(apdr)
+ assert apdr.created_dag_run_id is not None
+ assert partition_dags == {"asset-event-consumer"}
+
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ assert dag_run.partition_key == "2026-05-20T01:00:00"
+ assert dag_run.partition_date == source_partition_date
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
[email protected](SchedulerJobRunner, "_resolve_partition_date",
autospec=True, return_value=None)
+def test_consumer_dag_run_partition_date_not_masked_when_resolver_suppresses(
+ mock_resolve, dag_maker: DagMaker, session: Session
+):
+ """A carried IdentityMapper date must not mask a resolver suppression.
+
+ When temporal mappers feeding the same APDR conflict (or one raises), the
resolver
+ deliberately returns None and logs it; the carried date is only ever
applied inside the
+ resolver, never at the call site. Here the APDR carries a date
(IdentityMapper) but the
+ resolver returns None, and the consumer DagRun's partition_date must stay
None — a
+ regression re-adding a call-site fallback to ``apdr.partition_date`` would
fail this.
+ """
+ asset_1 = Asset(name="asset-1")
+ source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC")
+
+ with dag_maker(
+ dag_id="asset-event-consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_1,
+ default_partition_mapper=IdentityMapper(),
+ ),
+ session=session,
+ ):
+ EmptyOperator(task_id="hi")
+ session.commit()
+
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+
+ apdr = _produce_and_register_asset_event(
+ dag_id="asset-event-producer",
+ asset=asset_1,
+ partition_key="2026-05-20T01:00:00",
+ partition_date=source_partition_date,
+ session=session,
+ dag_maker=dag_maker,
+ expected_partition_key="2026-05-20T01:00:00",
+ )
+ session.refresh(apdr)
+ # The IdentityMapper carry is stored on the APDR...
+ assert apdr.partition_date == source_partition_date
+
+ runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+ session.refresh(apdr)
+ assert apdr.created_dag_run_id is not None
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ assert dag_run.partition_key == "2026-05-20T01:00:00"
+ # ...but the resolver suppressed a date, so the call site must NOT
substitute the carry.
+ assert dag_run.partition_date is None
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_none_for_non_temporal_mapper(
+ dag_maker: DagMaker,
+ session: Session,
+ custom_partition_mapper_patch: Callable[[], ExitStack],
+):
+ """For mappers that aren't temporal/identity, the consumer DagRun's
partition_date stays None."""
+ asset_1 = Asset(name="asset-1")
+
+ with custom_partition_mapper_patch():
+ with dag_maker(
+ dag_id="asset-event-consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_1,
+ default_partition_mapper=Key1Mapper(), # type:
ignore[arg-type]
+ ),
+ session=session,
+ ):
+ EmptyOperator(task_id="hi")
+ session.commit()
+
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ with custom_partition_mapper_patch():
+ apdr = _produce_and_register_asset_event(
+ dag_id="asset-event-producer",
+ asset=asset_1,
+ partition_key="this-is-not-key-1-before-mapped",
+ partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"),
+ session=session,
+ dag_maker=dag_maker,
+ expected_partition_key="key-1",
+ )
+ runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+ session.refresh(apdr)
+ assert apdr.created_dag_run_id is not None
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ assert dag_run.partition_key == "key-1"
+ assert dag_run.partition_date is None
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_is_none_when_source_has_no_date(
+ dag_maker: DagMaker, session: Session
+):
+ """When the producer DagRun has no partition_date, IdentityMapper passes
None through."""
+ asset_1 = Asset(name="asset-1")
+
+ with dag_maker(
+ dag_id="asset-event-consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_1,
+ default_partition_mapper=IdentityMapper(),
+ ),
+ session=session,
+ ):
+ EmptyOperator(task_id="hi")
+ session.commit()
+
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+
+ apdr = _produce_and_register_asset_event(
+ dag_id="asset-event-producer",
+ asset=asset_1,
+ partition_key="2026-05-20T01:00:00",
+ partition_date=None,
+ session=session,
+ dag_maker=dag_maker,
+ expected_partition_key="2026-05-20T01:00:00",
+ )
+ runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+ session.refresh(apdr)
+ assert apdr.created_dag_run_id is not None
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ assert dag_run.partition_key == "2026-05-20T01:00:00"
+ assert dag_run.partition_date is None
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_is_none_when_task_key_diverges(
+ dag_maker: DagMaker, session: Session
+):
+ """A task-emitted partition_key differing from the DagRun's drops the
source date.
+
+ The producer DagRun carries a partition_date, but the task emits an outlet
event with a
+ different partition_key. The run-level date refers to the run-level key,
so it must not be
+ carried onto the divergent partition: the APDR (and the consumer DagRun
created from it) keep
+ partition_date None even though the producer run had one.
+ """
+ asset_1 = Asset(name="asset-1")
+
+ with dag_maker(
+ dag_id="asset-event-consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_1,
+ default_partition_mapper=IdentityMapper(),
+ ),
+ session=session,
+ ):
+ EmptyOperator(task_id="hi")
+ session.commit()
+
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+
+ with dag_maker(
+ dag_id="asset-event-producer",
+ schedule=PartitionAtRuntime(),
+ session=session,
+ ) as dag:
+ EmptyOperator(task_id="hi", outlets=[asset_1])
+
+ dr = dag_maker.create_dagrun(
+ partition_key="scheduler-key",
+ partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"),
+ session=session,
+ )
+ [ti] = dr.get_task_instances(session=session)
+ session.commit()
+
+ serialized_outlets = dag.get_task("hi").outlets
+ TaskInstance.register_asset_changes_in_db(
+ ti=ti,
+ task_outlets=[o.asprofile() for o in serialized_outlets],
+ outlet_events=[
+ {
+ "dest_asset_key": {"name": "asset-1", "uri": "asset-1"},
+ "extra": {},
+ "partition_key": "task-key",
+ },
+ ],
+ session=session,
+ )
+ session.commit()
+
+ event = session.scalar(
+ select(AssetEvent).where(
+ AssetEvent.source_dag_id == dag.dag_id,
+ AssetEvent.source_run_id == dr.run_id,
+ )
+ )
+ assert event is not None
+ assert event.partition_key == "task-key"
+
+ apdr = session.scalar(
+ select(AssetPartitionDagRun)
+ .join(
+ PartitionedAssetKeyLog,
+ PartitionedAssetKeyLog.asset_partition_dag_run_id ==
AssetPartitionDagRun.id,
+ )
+ .where(PartitionedAssetKeyLog.asset_event_id == event.id)
+ )
+ assert apdr is not None
+ # Divergent key → the threaded source date is dropped to None at APDR
creation.
+ assert apdr.partition_key == "task-key"
+ assert apdr.partition_date is None
+
+ runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+ session.refresh(apdr)
+ assert apdr.created_dag_run_id is not None
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ assert dag_run.partition_key == "task-key"
+ assert dag_run.partition_date is None
+
+
@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("clear_asset_partition_rows")
def test_consumer_dag_listen_to_two_partitioned_asset(
@@ -12265,25 +12553,39 @@ def _make_runner() -> SchedulerJobRunner:
)
+_CARRIED_DATE = datetime.datetime(2026, 5, 20, 1, 0, 0,
tzinfo=datetime.timezone.utc)
+
+
@pytest.mark.parametrize(
- ("mappers", "partition_key", "expected"),
+ ("mappers", "partition_key", "carried_partition_date", "expected"),
[
- # Non-temporal mapper → no anchor.
- pytest.param([CoreIdentityMapper()], "some-key", None,
id="non-temporal-none"),
+ # Non-temporal mapper, nothing carried → None.
+ pytest.param([CoreIdentityMapper()], "some-key", None, None,
id="non-temporal-none"),
+ # Non-temporal mapper with a carried producer date (IdentityMapper) →
the carry.
+ pytest.param(
+ [CoreIdentityMapper()],
+ "some-key",
+ _CARRIED_DATE,
+ _CARRIED_DATE,
+ id="non-temporal-returns-carried-date",
+ ),
# StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT,
DST since 2024-03-10),
# localised with the mapper's own timezone rather than the global
default.
pytest.param(
[CoreStartOfDayMapper(timezone="America/New_York")],
"2024-03-15",
+ None,
datetime.datetime(2024, 3, 15, 4, 0, 0,
tzinfo=datetime.timezone.utc),
id="non-utc-uses-mapper-timezone",
),
- # Key cannot be decoded by the mapper's format → caught → None (no
raise).
- pytest.param([CoreStartOfDayMapper()], "not-a-date", None,
id="decode-failure-none"),
+ # Key cannot be decoded by the mapper's format → caught → None, and
the carried
+ # date is NOT substituted (the error is logged; masking it would hide
that).
+ pytest.param([CoreStartOfDayMapper()], "not-a-date", _CARRIED_DATE,
None, id="decode-failure-none"),
# FanOutMapper unwraps to its downstream_mapper (daily), which owns
the per-day key.
pytest.param(
[CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(),
window=CoreWeekWindow())],
"2024-01-16",
+ None,
datetime.datetime(2024, 1, 16, 0, 0, 0,
tzinfo=datetime.timezone.utc),
id="fanout-uses-downstream-mapper",
),
@@ -12291,13 +12593,16 @@ def _make_runner() -> SchedulerJobRunner:
pytest.param(
[CoreStartOfDayMapper(), CoreStartOfDayMapper()],
"2024-03-15",
+ None,
datetime.datetime(2024, 3, 15, 0, 0, 0,
tzinfo=datetime.timezone.utc),
id="agreeing-mappers-anchor",
),
- # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct
instants → None.
+ # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct
instants → None,
+ # and the carried date is NOT substituted (it would mask the logged
conflict).
pytest.param(
[CoreStartOfDayMapper(timezone="UTC"),
CoreStartOfDayMapper(timezone="America/New_York")],
"2024-03-15",
+ _CARRIED_DATE,
None,
id="conflicting-mappers-none",
),
@@ -12306,15 +12611,19 @@ def _make_runner() -> SchedulerJobRunner:
pytest.param(
[CoreStartOfDayMapper(), CoreStartOfHourMapper()],
"2024-03-15",
+ _CARRIED_DATE,
None,
id="one-failing-mapper-aborts",
),
],
)
-def test_resolve_partition_date(mappers, partition_key, expected):
+def test_resolve_partition_date(mappers, partition_key,
carried_partition_date, expected):
"""_resolve_partition_date over mapper compositions: temporal / fan-out /
agree / conflict / failure.
- The mappers are consumed one per upstream asset, so ``asset_infos`` is
sized to ``mappers``.
+ The carried date (the producer's source date stamped on the APDR, set only
for IdentityMapper)
+ is returned only when no temporal mapper contributes an anchor. On a
conflict or a mapper
+ error the result is None — the carry must not mask the logged suppression.
The mappers are
+ consumed one per upstream asset, so ``asset_infos`` is sized to
``mappers``.
"""
runner = _make_runner()
timetable = mock.MagicMock()
@@ -12326,5 +12635,6 @@ def test_resolve_partition_date(mappers, partition_key,
expected):
asset_infos=asset_infos,
partition_key=partition_key,
dag_id="test-dag",
+ carried_partition_date=carried_partition_date,
)
assert result == expected
diff --git a/airflow-core/tests/unit/partition_mappers/test_base.py
b/airflow-core/tests/unit/partition_mappers/test_base.py
index e7b0ba02f07..ba48057cc12 100644
--- a/airflow-core/tests/unit/partition_mappers/test_base.py
+++ b/airflow-core/tests/unit/partition_mappers/test_base.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import re
+from datetime import datetime, timezone
import pytest
@@ -29,6 +30,17 @@ from airflow.serialization.encoders import
encode_partition_mapper
from airflow.serialization.enums import Encoding
+class TestCarryPartitionDate:
+ def test_base_returns_none_by_default(self):
+ """Non-identity mappers don't carry the producer's date; it's derived
from the key instead."""
+ dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc)
+ assert StartOfDayMapper().carry_partition_date(dt) is None
+ assert (
+ RollupMapper(upstream_mapper=StartOfDayMapper(),
window=DayWindow()).carry_partition_date(dt)
+ is None
+ )
+
+
class TestPartitionMapperInitSubclass:
"""Verify that __init_subclass__ enforces the decode/encode pair
contract."""
diff --git a/airflow-core/tests/unit/partition_mappers/test_identity.py
b/airflow-core/tests/unit/partition_mappers/test_identity.py
index 9a97e583db5..b992e76430e 100644
--- a/airflow-core/tests/unit/partition_mappers/test_identity.py
+++ b/airflow-core/tests/unit/partition_mappers/test_identity.py
@@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations
+from datetime import datetime, timezone
+
from airflow.partition_mappers.identity import IdentityMapper
from airflow.serialization.decoders import decode_partition_mapper
from airflow.serialization.encoders import encode_partition_mapper
@@ -27,6 +29,13 @@ class TestIdentityMapper:
pm = IdentityMapper()
assert pm.to_downstream("key") == "key"
+ def test_carry_partition_date_passes_source_through(self):
+ """IdentityMapper carries the producer's date through (its key can't
reconstruct one)."""
+ pm = IdentityMapper()
+ dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc)
+ assert pm.carry_partition_date(dt) == dt
+ assert pm.carry_partition_date(None) is None
+
def test_serialize(self):
pm = IdentityMapper()
assert pm.serialize() == {}
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 779bb423191..cb9a1ca15a3 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -1774,6 +1774,7 @@ class DAGRunResponse(BaseModel):
bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None
dag_display_name: Annotated[str, Field(title="Dag Display Name")]
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+ partition_date: Annotated[datetime | None, Field(title="Partition Date")]
= None
class DAGRunsBatchBody(BaseModel):
diff --git
a/providers/standard/src/airflow/providers/standard/operators/python.py
b/providers/standard/src/airflow/providers/standard/operators/python.py
index fb058b924b6..8698ddc403b 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -470,6 +470,8 @@ class _BasePythonVirtualenvOperator(PythonOperator,
metaclass=ABCMeta):
"prev_execution_date",
"prev_execution_date_success",
}
+ if AIRFLOW_V_3_3_PLUS:
+ PENDULUM_SERIALIZABLE_CONTEXT_KEYS.add("partition_date")
AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {
"macros",
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 812ca4d0418..bc03569386d 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -775,6 +775,7 @@ class DagRun(BaseModel):
triggering_user_name: Annotated[str | None, Field(title="Triggering User
Name")] = None
consumed_asset_events: Annotated[list[AssetEventDagRunReference],
Field(title="Consumed Asset Events")]
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+ partition_date: Annotated[AwareDatetime | None, Field(title="Partition
Date")] = None
note: Annotated[str | None, Field(title="Note")] = None
team_name: Annotated[str | None, Field(title="Team Name")] = None
diff --git a/task-sdk/src/airflow/sdk/definitions/context.py
b/task-sdk/src/airflow/sdk/definitions/context.py
index 462af80aeca..9d36203c2eb 100644
--- a/task-sdk/src/airflow/sdk/definitions/context.py
+++ b/task-sdk/src/airflow/sdk/definitions/context.py
@@ -64,6 +64,7 @@ class Context(TypedDict, total=False):
outlets: list
params: dict[str, Any]
partition_key: NotRequired[str | None]
+ partition_date: NotRequired[DateTime | None]
prev_data_interval_start_success: NotRequired[DateTime | None]
prev_data_interval_end_success: NotRequired[DateTime | None]
prev_start_date_success: NotRequired[DateTime | None]
diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
index 4807d9f53b3..b75957870d3 100644
--- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
+++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
@@ -1277,6 +1277,19 @@
"default": null,
"title": "Partition Key"
},
+ "partition_date": {
+ "anyOf": [
+ {
+ "format": "date-time",
+ "type": "string"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "default": null,
+ "title": "Partition Date"
+ },
"note": {
"anyOf": [
{
@@ -4640,6 +4653,19 @@
],
"title": "Partition Key"
},
+ "partition_date": {
+ "anyOf": [
+ {
+ "format": "date-time",
+ "type": "string"
+ },
+ {
+ "type": "null"
+ }
+ ],
+ "default": null,
+ "title": "Partition Date"
+ },
"note": {
"anyOf": [
{
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 7eb52038ce9..a39e2f91b53 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -334,6 +334,7 @@ class RuntimeTaskInstance(TaskInstance):
# TODO: Assess if we need to pass these through
timezone.coerce_datetime
"dag_run": dag_run, # type: ignore[typeddict-item] #
Removable after #46522
"partition_key": dag_run.partition_key,
+ "partition_date": coerce_datetime(dag_run.partition_date),
"triggering_asset_events": TriggeringAssetEventsAccessor.build(
AssetEventDagRunReferenceResult.from_asset_event_dag_run_reference(event)
for event in dag_run.consumed_asset_events
diff --git a/task-sdk/src/airflow/sdk/types.py
b/task-sdk/src/airflow/sdk/types.py
index 711dbe71ca4..bdaa030cdbe 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -123,6 +123,7 @@ class DagRunProtocol(Protocol):
triggering_user_name: str | None
consumed_asset_events: list[AssetEventDagRunReference]
partition_key: str | None
+ partition_date: AwareDatetime | None
note: str | None
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 48932581954..bd50711ecea 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2397,6 +2397,7 @@ REQUEST_TEST_CASES = [
"run_id": "prev_run",
"logical_date": timezone.parse("2024-01-14T12:00:00Z"),
"partition_key": None,
+ "partition_date": None,
"run_type": "scheduled",
"start_date": timezone.parse("2024-01-15T12:00:00Z"),
"run_after": timezone.parse("2024-01-15T12:00:00Z"),
@@ -2450,6 +2451,7 @@ REQUEST_TEST_CASES = [
"run_id": "prev_run",
"logical_date": timezone.parse("2024-01-14T12:00:00Z"),
"partition_key": None,
+ "partition_date": None,
"run_type": "scheduled",
"start_date": timezone.parse("2024-01-15T12:00:00Z"),
"run_after": timezone.parse("2024-01-15T12:00:00Z"),
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 8c1312833e8..bcbf83e1249 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -2001,6 +2001,7 @@ class TestRuntimeTaskInstance:
"ts_nodash": "20241201T010000",
"ts_nodash_with_tz": "20241201T010000+0000",
"partition_key": dr.partition_key,
+ "partition_date": dr.partition_date,
}
def test_partition_key_in_context(self, create_runtime_ti,
mock_supervisor_comms):
@@ -2027,6 +2028,37 @@ class TestRuntimeTaskInstance:
context = runtime_ti.get_template_context()
assert context["partition_key"] == "some-partition"
+ def test_partition_date_in_context(self, create_runtime_ti,
mock_supervisor_comms):
+ """Test that partition_date from dag_run is exposed in the template
context."""
+ task = BaseOperator(task_id="hello")
+ runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
+
+ dr = runtime_ti._ti_context_from_server.dag_run
+
+ mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult(
+ data_interval_end=dr.logical_date - timedelta(hours=1),
+ data_interval_start=dr.logical_date - timedelta(hours=2),
+ start_date=dr.start_date - timedelta(hours=1),
+ end_date=dr.start_date,
+ )
+
+ context = runtime_ti.get_template_context()
+
+ # Default: partition_date is None
+ assert context["partition_date"] is None
+
+ # Set partition_date on dag_run and verify it surfaces in context
+ partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0)
+ dr.partition_date = partition_date
+ context = runtime_ti.get_template_context()
+ assert context["partition_date"] == partition_date
+
+ # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts`
filters
+ # operate on a real awareness boundary.
+ dr.partition_date = datetime(2026, 5, 20, 1, 0, 0)
+ context = runtime_ti.get_template_context()
+ assert context["partition_date"].tzinfo is not None
+
def test_lazy_loading_not_triggered_until_accessed(self,
create_runtime_ti, mock_supervisor_comms):
"""Ensure lazy-loaded attributes are not resolved until accessed."""
task = BaseOperator(task_id="hello")