This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-3-test by this push:
     new b74cbe28135 [v3-3-test] Propagate `partition_date` to consumers of 
partitioned assets (#67285) (#68892)
b74cbe28135 is described below

commit b74cbe2813526e4d97be9af11914d0171b557e53
Author: Wei Lee <[email protected]>
AuthorDate: Tue Jun 23 22:21:48 2026 +0800

    [v3-3-test] Propagate `partition_date` to consumers of partitioned assets 
(#67285) (#68892)
    
    Co-authored-by: Nathan Hadfield <[email protected]>
---
 .../docs/authoring-and-scheduling/assets.rst       |  14 +-
 airflow-core/docs/migrations-ref.rst               |   4 +-
 airflow-core/docs/templates-ref.rst                |   3 +
 airflow-core/newsfragments/67285.feature.rst       |   1 +
 .../api_fastapi/core_api/datamodels/dag_run.py     |   1 +
 .../core_api/openapi/v2-rest-api-generated.yaml    |   7 +
 .../execution_api/datamodels/taskinstance.py       |   1 +
 .../api_fastapi/execution_api/versions/__init__.py |   2 +
 .../execution_api/versions/v2026_06_30.py          |  14 +
 airflow-core/src/airflow/assets/manager.py         |  71 ++++-
 .../example_dags/example_asset_partition.py        |   2 +-
 .../src/airflow/jobs/scheduler_job_runner.py       |  41 ++-
 airflow-core/src/airflow/listeners/types.py        |   3 +
 ...dd_partition_date_to_asset_partition_dag_run.py |  54 ++++
 airflow-core/src/airflow/models/asset.py           |   1 +
 airflow-core/src/airflow/models/dagrun.py          |   4 +
 airflow-core/src/airflow/models/taskinstance.py    |  19 ++
 airflow-core/src/airflow/partition_mappers/base.py |  15 +
 .../src/airflow/partition_mappers/identity.py      |  11 +
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  14 +-
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |   1 +
 airflow-core/src/airflow/utils/db.py               |   2 +-
 .../core_api/routes/public/test_assets.py          |   1 +
 .../core_api/routes/public/test_dag_run.py         |   9 +
 .../execution_api/versions/head/test_dag_runs.py   |   1 +
 .../versions/head/test_task_instances.py           |   1 +
 .../versions/v2026_06_30/test_task_instances.py    |  96 ++++++
 airflow-core/tests/unit/assets/test_manager.py     | 161 ++++++++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 328 ++++++++++++++++++++-
 .../tests/unit/partition_mappers/test_base.py      |  12 +
 .../tests/unit/partition_mappers/test_identity.py  |   9 +
 .../src/airflowctl/api/datamodels/generated.py     |   1 +
 .../airflow/providers/standard/operators/python.py |   2 +
 .../src/airflow/sdk/api/datamodels/_generated.py   |   1 +
 task-sdk/src/airflow/sdk/definitions/context.py    |   1 +
 .../airflow/sdk/execution_time/schema/schema.json  |  26 ++
 .../src/airflow/sdk/execution_time/task_runner.py  |   1 +
 task-sdk/src/airflow/sdk/types.py                  |   1 +
 .../task_sdk/execution_time/test_supervisor.py     |   2 +
 .../task_sdk/execution_time/test_task_runner.py    |  32 ++
 40 files changed, 940 insertions(+), 30 deletions(-)

diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst 
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index 994cae815d3..d039fbfd1ff 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -631,7 +631,19 @@ partition match can be produced, so the downstream Dag is 
not triggered for
 that key.
 
 Inside partitioned Dag runs, access the resolved partition through
-``dag_run.partition_key``.
+``dag_run.partition_key``. When the consumer's partition mapper can
+resolve the key to a ``datetime``, that value is also available as
+``dag_run.partition_date``, so templates can use
+``{{ partition_date | ds }}``. This covers the ``StartOf*Mapper`` family
+(which decode the key directly), ``IdentityMapper`` (which carries the
+producer's ``partition_date`` through), and composite mappers —
+``RollupMapper``, ``ChainMapper`` and ``FanOutMapper`` — whose effective
+child mapper is temporal (they delegate the anchor to that child).
+Mappers whose key carries no temporal meaning (``ProductMapper``,
+``AllowedKeyMapper`` and custom mappers that do not implement
+``to_partition_date``) leave ``partition_date`` ``None`` even when the
+resulting key is date-shaped, so those consumers should keep parsing
+``partition_key``.
 
 You can also trigger a DagRun manually with a partition key (for example,
 through the Trigger Dag window in the UI, or through the REST API by
diff --git a/airflow-core/docs/migrations-ref.rst 
b/airflow-core/docs/migrations-ref.rst
index a69a8aa2058..d2cfde62352 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``9ff64e1c35d3`` (head) | ``dd5f3a8e2b91`` | ``3.3.0``         | Add indexes 
on dag_run.created_dag_version_id and            |
+| ``d2f4e1b3c5a7`` (head) | ``9ff64e1c35d3`` | ``3.3.0``         | Add 
partition_date to asset_partition_dag_run.               |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``9ff64e1c35d3``        | ``dd5f3a8e2b91`` | ``3.3.0``         | Add indexes 
on dag_run.created_dag_version_id and            |
 |                         |                  |                   | 
task_instance.dag_version_id.                                |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``dd5f3a8e2b91``        | ``c20871fbf23a`` | ``3.3.0``         | Add 
rollup_fingerprint to AssetPartitionDagRun and index     |
diff --git a/airflow-core/docs/templates-ref.rst 
b/airflow-core/docs/templates-ref.rst
index b1609ff0c94..aeb907dbe47 100644
--- a/airflow-core/docs/templates-ref.rst
+++ b/airflow-core/docs/templates-ref.rst
@@ -87,6 +87,9 @@ Variable                                    Type              
    Description
                                                                   | is enabled 
in ``airflow.cfg``.
 ``{{ partition_key }}``                     str | None            | The 
partition key from the current :class:`~airflow.models.dagrun.DagRun`.
                                                                   | Returns 
``None`` if no partition key was set. Added in version 3.3.0.
+``{{ partition_date }}``                    datetime | None       | The 
partition datetime from the current :class:`~airflow.models.dagrun.DagRun`.
+                                                                  | Use ``{{ 
partition_date | ds }}`` and related filters for formatting.
+                                                                  | Returns 
``None`` if no partition date was set. Added in version 3.3.0.
 ``{{ var.value }}``                                               Airflow 
variables. See `Airflow Variables in Templates`_ below.
 ``{{ var.json }}``                                                Airflow 
variables. See `Airflow Variables in Templates`_ below.
 ``{{ conn }}``                                                    Airflow 
connections. See `Airflow Connections in Templates`_ below.
diff --git a/airflow-core/newsfragments/67285.feature.rst 
b/airflow-core/newsfragments/67285.feature.rst
new file mode 100644
index 00000000000..2c4370d5ff6
--- /dev/null
+++ b/airflow-core/newsfragments/67285.feature.rst
@@ -0,0 +1 @@
+Propagate ``partition_date`` from producer DagRuns to consumers of partitioned 
assets, so date-shaped partitions are available in consumer task templates.
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 033437e2ad9..0a530578644 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -119,6 +119,7 @@ class DAGRunResponse(BaseModel):
     bundle_version: str | None
     dag_display_name: str = Field(validation_alias=AliasPath("dag_model", 
"dag_display_name"))
     partition_key: str | None
+    partition_date: datetime | None
 
 
 class DAGRunCollectionResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index f82e665c9be..639f6fb9904 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -13704,6 +13704,12 @@ components:
           - type: string
           - type: 'null'
           title: Partition Key
+        partition_date:
+          anyOf:
+          - type: string
+            format: date-time
+          - type: 'null'
+          title: Partition Date
       type: object
       required:
       - dag_run_id
@@ -13727,6 +13733,7 @@ components:
       - bundle_version
       - dag_display_name
       - partition_key
+      - partition_date
       title: DAGRunResponse
       description: Dag Run serializer for responses.
     DAGRunsBatchBody:
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index fe7f5a5ce05..bf20bcbc30c 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -341,6 +341,7 @@ class DagRun(StrictBaseModel):
     triggering_user_name: str | None = None
     consumed_asset_events: list[AssetEventDagRunReference]
     partition_key: str | None
+    partition_date: UtcDateTime | None = None
     note: str | None = None
     team_name: str | None = None
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
index 332ddb28704..dc7035d31e3 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py
@@ -44,6 +44,7 @@ from airflow.api_fastapi.execution_api.versions.v2026_06_30 
import (
     AddAssetsByAliasEndpoint,
     AddAwaitingInputStatePayload,
     AddConnectionTestEndpoint,
+    AddPartitionDateField,
     AddRetryPolicyFields,
     AddTaskAndAssetStateStoreEndpoints,
     AddTaskInstanceQueueField,
@@ -63,6 +64,7 @@ bundle = VersionBundle(
         AddTeamNameField,
         AddTaskAndAssetStateStoreEndpoints,
         AddAssetsByAliasEndpoint,
+        AddPartitionDateField,
     ),
     Version(
         "2026-04-06",
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
index cbd801c0a9b..e89e2ed04cc 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_06_30.py
@@ -127,3 +127,17 @@ class AddTaskAndAssetStateStoreEndpoints(VersionChange):
         endpoint("/store/asset/by-uri/value", ["DELETE"]).didnt_exist,
         endpoint("/store/asset/by-uri/clear", ["DELETE"]).didnt_exist,
     )
+
+
+class AddPartitionDateField(VersionChange):
+    """Expose the consumer DagRun's partition datetime on the execution API so 
consumer tasks can template it."""
+
+    description = __doc__
+
+    instructions_to_migrate_to_previous_version = 
(schema(DagRun).field("partition_date").didnt_exist,)
+
+    @convert_response_to_previous_version_for(TIRunContext)  # type: 
ignore[arg-type]
+    def remove_partition_date_from_dag_run(response: ResponseInfo) -> None:  # 
type: ignore[misc]
+        """Strip ``partition_date`` from the nested ``dag_run`` payload for 
older clients."""
+        if "dag_run" in response.body and isinstance(response.body["dag_run"], 
dict):
+            response.body["dag_run"].pop("partition_date", None)
diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index c8d2edef4f1..d3399cc6e00 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -49,6 +49,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.sqlalchemy import get_dialect_name, with_row_locks
 
 if TYPE_CHECKING:
+    from datetime import datetime
+
     from sqlalchemy.orm.session import Session
 
     from airflow.models.dag import DagModel
@@ -274,6 +276,7 @@ class AssetManager(LoggingMixin):
         source_alias_names: Collection[str] = (),
         session: Session,
         partition_key: str | None = None,
+        partition_date: datetime | None = None,
         source_is_api: bool = False,
         api_user_teams: set[str] | None = None,
         api_allow_consumer_teams: list[str] | None = None,
@@ -394,6 +397,7 @@ class AssetManager(LoggingMixin):
                 source_map_index=asset_event.source_map_index,
                 source_aliases=[aam.to_serialized() for aam in 
asset_alias_models],
                 partition_key=partition_key,
+                partition_date=partition_date,
             )
         )
 
@@ -440,6 +444,7 @@ class AssetManager(LoggingMixin):
             asset_id=asset_model.id,
             dags_to_queue=dags_to_queue,
             partition_key=partition_key,
+            partition_date=partition_date,
             event=asset_event,
             task_instance=task_instance,
             session=session,
@@ -485,6 +490,7 @@ class AssetManager(LoggingMixin):
         asset_id: int,
         dags_to_queue: set[DagModel],
         partition_key: str | None,
+        partition_date: datetime | None,
         event: AssetEvent,
         task_instance: TaskInstance | None,
         session: Session,
@@ -499,6 +505,7 @@ class AssetManager(LoggingMixin):
             partition_dags=partition_dags,
             event=event,
             partition_key=partition_key,
+            partition_date=partition_date,
             task_instance=task_instance,
             session=session,
         )
@@ -527,6 +534,7 @@ class AssetManager(LoggingMixin):
         partition_dags: Iterable[DagModel],
         event: AssetEvent,
         partition_key: str | None,
+        partition_date: datetime | None,
         task_instance: TaskInstance | None,
         session: Session,
     ) -> None:
@@ -574,9 +582,9 @@ class AssetManager(LoggingMixin):
             if (asset_model := 
session.scalar(select(AssetModel).where(AssetModel.id == asset_id))) is None:
                 raise RuntimeError(f"Could not find asset for 
asset_id={asset_id}")
 
+            mapper = timetable.get_partition_mapper(name=asset_model.name, 
uri=asset_model.uri)
             try:
                 # We'll need to catch every possible exception happen when 
mapping partition_key.
-                mapper = timetable.get_partition_mapper(name=asset_model.name, 
uri=asset_model.uri)
                 target_key = mapper.to_downstream(partition_key)
             except Exception as err:
                 log.exception(
@@ -643,9 +651,31 @@ class AssetManager(LoggingMixin):
                 )
                 continue
 
+            # The producer's partition_date (threaded in from its DagRun via
+            # register_asset_change) is carried onto the APDR only by mappers 
that
+            # opt in. IdentityMapper does, since its key carries no temporal 
meaning
+            # for the scheduler to re-derive at run creation; temporal and 
composite
+            # mappers return None here and are resolved from the key by the 
scheduler
+            # via PartitionMapper.to_partition_date.
+            target_partition_date: datetime | None
+            try:
+                target_partition_date = 
mapper.carry_partition_date(partition_date)
+            except Exception:
+                # A custom mapper override may raise. Mirror the to_downstream 
handling
+                # above and degrade rather than abort the whole write: the 
consumer is
+                # still queued via partition_key, just without a carried 
partition_date.
+                log.exception(
+                    "Partition mapper carry_partition_date failed; consumer 
partition_date will be None.",
+                    partition_key=partition_key,
+                    asset=asset_model,
+                    target_dag=target_dag,
+                )
+                target_partition_date = None
+
             for target_key in target_keys:
                 apdr = cls._get_or_create_apdr(
                     target_key=target_key,
+                    target_partition_date=target_partition_date,
                     target_dag=target_dag,
                     rollup_fingerprint=fingerprint,
                     asset_id=asset_id,
@@ -666,6 +696,7 @@ class AssetManager(LoggingMixin):
         cls,
         *,
         target_key: str,
+        target_partition_date: datetime | None,
         target_dag: DagModel,
         rollup_fingerprint: dict,
         asset_id: int,
@@ -683,6 +714,20 @@ class AssetManager(LoggingMixin):
         ``rollup_fingerprint`` is the serialized mapper / window definition 
for all partitioned
         assets in the timetable at creation time; the scheduler discards APDRs 
whose stamp no
         longer matches the current timetable's fingerprint (mapper / window 
may have changed).
+
+        Reconciling the carried ``partition_date`` on an existing pending APDR 
is best-effort:
+        a partitioned consumer's feeding assets are expected to agree on the 
partition's
+        datetime. The carry only matters for ``IdentityMapper`` (whose key the 
scheduler
+        cannot decode); temporal/composite feeds re-derive the date from the 
key at run
+        creation regardless of what is stored here. Within that contract:
+
+        - If the APDR carries no date yet (``None`` — created by an event that 
carried none),
+          adopt the incoming date when this event carries one. There is 
nothing to conflict
+          with, so a later identity event's date is not dropped.
+        - If the APDR already carries a date and this event carries a 
**different** non-null
+          one, the producing assets disagree; picking one would be 
order-dependent, so the
+          carried date is suppressed to ``None`` (and re-adoptable by a later 
event).
+        - Otherwise (the dates agree, or this event carries none) the existing 
value is kept.
         """
         with _lock_asset_model(session=session, asset_id=asset_id):
             latest_apdr: AssetPartitionDagRun | None = session.scalar(
@@ -695,6 +740,29 @@ class AssetManager(LoggingMixin):
                 .limit(1)
             )
             if latest_apdr and latest_apdr.created_dag_run_id is None:
+                existing_partition_date = latest_apdr.partition_date
+                if existing_partition_date is None:
+                    # No carried date yet; adopt the incoming one if present 
(no conflict
+                    # to resolve). Keeps a later identity event's date from 
being dropped.
+                    if target_partition_date is not None:
+                        latest_apdr.partition_date = target_partition_date
+                        session.flush()
+                elif target_partition_date is not None and 
existing_partition_date != target_partition_date:
+                    # Two contributing events carry conflicting 
partition_dates for the same
+                    # (target_key, target_dag). Choosing one would be 
order-dependent, so
+                    # suppress: the consumer DagRun gets partition_date=None 
rather than a
+                    # wrong, unstable value.
+                    log.warning(
+                        "Conflicting partition_date carried for the same 
target key; "
+                        "suppressing it so the consumer DagRun's 
partition_date is None. "
+                        "The producing assets likely disagree on the 
partition's datetime.",
+                        target_dag_id=target_dag.dag_id,
+                        target_key=target_key,
+                        existing_partition_date=existing_partition_date,
+                        incoming_partition_date=target_partition_date,
+                    )
+                    latest_apdr.partition_date = None
+                    session.flush()
                 cls.logger().debug(
                     "Existing APDR found for key %s dag_id %s",
                     target_key,
@@ -707,6 +775,7 @@ class AssetManager(LoggingMixin):
                 target_dag_id=target_dag.dag_id,
                 created_dag_run_id=None,
                 partition_key=target_key,
+                partition_date=target_partition_date,
                 rollup_fingerprint=rollup_fingerprint,
             )
             session.add(apdr)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 549db8cef96..a7929ed9258 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -106,7 +106,7 @@ with DAG(
         """Merge the aligned hourly partitions into a combined dataset."""
         if TYPE_CHECKING:
             assert dag_run
-        print(dag_run.partition_key)
+        print(dag_run.partition_key, dag_run.partition_date)
 
     combine_player_stats()
 
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index d7f8006317a..2f94d480eb6 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2044,13 +2044,15 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         asset_infos: Iterable[tuple[str, str]],
         partition_key: str,
         dag_id: str,
+        carried_partition_date: datetime | None,
     ) -> datetime | None:
         """
-        Return the temporal anchor (period-start datetime) for *partition_key*.
+        Return the ``partition_date`` the consumer Dag run should be created 
with.
 
-        Resolves the temporal anchor (period-start datetime) for 
*partition_key*
-        across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets
-        that contributed to it. Each upstream mapper resolves the key via
+        The temporal anchor (period-start datetime) is resolved for
+        *partition_key* across *asset_infos* — the ``(name, uri)`` pairs of the
+        upstream assets that contributed to it. Each upstream mapper resolves 
the
+        key via
         
:meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`:
         temporal mappers decode the key, composite mappers delegate to their
         child, and non-temporal mappers (e.g.
@@ -2059,16 +2061,19 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         A partitioned consumer has a single partition identity, so every 
temporal
         mapper feeding it must resolve the same key to the same instant. 
Anchors
         are compared by instant (timezone-aware), so equivalent moments 
collapse
-        to one. When the temporal mappers agree, that anchor is returned; when
-        they disagree — a misconfiguration, e.g. assets mapping the same key 
under
-        different timezones — ``partition_date`` is left unset and a warning is
-        logged rather than silently picking one by scan order. Returns 
``None`` if
-        no mapper is temporal.
-
-        A failure in any mapper aborts the whole resolution and returns 
``None``
-        (logged) — anchors accumulated from earlier mappers are discarded 
rather
-        than used as a partial result, since a partial set could hide a 
conflict.
-        A broken mapper must not crash the scheduler tick.
+        to one. When the temporal mappers agree, that anchor is returned.
+
+        When no temporal mapper contributes at all — an identity key carries no
+        temporal meaning and cannot be decoded back into a date — the 
producer's
+        source date carried on the APDR at queue time 
(*carried_partition_date*,
+        set only for ``IdentityMapper``) is returned instead.
+
+        When temporal mappers were present but produced no usable anchor — they
+        disagreed (a misconfiguration, e.g. assets mapping the same key under
+        different timezones) or one raised — the conflict/error is logged and
+        ``None`` is returned. The carried date is deliberately *not* 
substituted
+        here: stamping it would mask the logged suppression. A broken mapper 
must
+        not crash the scheduler tick.
         """
         anchors: set[datetime] = set()
         try:
@@ -2086,7 +2091,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             return None
 
         if not anchors:
-            return None
+            # No temporal mapper contributed an anchor (e.g. an 
all-IdentityMapper feed),
+            # so fall back to the date carried on the APDR. A partitioned 
consumer's feeding
+            # assets are expected to agree on the partition's datetime; when a 
temporal mapper
+            # *does* resolve an anchor it takes precedence over the carried 
identity date,
+            # since the key is the authoritative source the scheduler can 
re-derive.
+            return carried_partition_date
         if len(anchors) > 1:
             self.log.warning(
                 "Upstream partition mappers resolved conflicting 
partition_date values for the same "
@@ -2288,6 +2298,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     asset_infos=asset_info_per_apdr[apdr.id].values(),
                     partition_key=apdr.partition_key,
                     dag_id=apdr.target_dag_id,
+                    carried_partition_date=apdr.partition_date,
                 )
             dag_run = dag.create_dagrun(
                 run_id=DagRun.generate_run_id(
diff --git a/airflow-core/src/airflow/listeners/types.py 
b/airflow-core/src/airflow/listeners/types.py
index 120b8ef503a..fa9ebdafaca 100644
--- a/airflow-core/src/airflow/listeners/types.py
+++ b/airflow-core/src/airflow/listeners/types.py
@@ -23,6 +23,8 @@ from typing import TYPE_CHECKING
 import attrs
 
 if TYPE_CHECKING:
+    from datetime import datetime
+
     from pydantic import JsonValue
 
     from airflow.serialization.definitions.assets import SerializedAsset, 
SerializedAssetAlias
@@ -40,3 +42,4 @@ class AssetEvent:
     source_map_index: int | None
     source_aliases: list[SerializedAssetAlias]
     partition_key: str | None
+    partition_date: datetime | None = None
diff --git 
a/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py
 
b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py
new file mode 100644
index 00000000000..ba483eb88e0
--- /dev/null
+++ 
b/airflow-core/src/airflow/migrations/versions/0123_3_3_0_add_partition_date_to_asset_partition_dag_run.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add partition_date to asset_partition_dag_run.
+
+The target datetime is frozen at APDR creation time so the consumer DagRun's
+``partition_date`` is consistent with the partition mapper that produced its
+``partition_key``.
+
+Revision ID: d2f4e1b3c5a7
+Revises: 9ff64e1c35d3
+Create Date: 2026-05-21 09:00:00.000000
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+revision = "d2f4e1b3c5a7"
+down_revision = "9ff64e1c35d3"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+    """Add partition_date column to asset_partition_dag_run."""
+    with op.batch_alter_table("asset_partition_dag_run", schema=None) as 
batch_op:
+        batch_op.add_column(sa.Column("partition_date", UtcDateTime, 
nullable=True))
+
+
+def downgrade():
+    """Remove partition_date column from asset_partition_dag_run."""
+    with op.batch_alter_table("asset_partition_dag_run", schema=None) as 
batch_op:
+        batch_op.drop_column("partition_date")
diff --git a/airflow-core/src/airflow/models/asset.py 
b/airflow-core/src/airflow/models/asset.py
index 34e4ffaec3e..60256f9cd88 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -921,6 +921,7 @@ class AssetPartitionDagRun(Base):
     target_dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
     created_dag_run_id: Mapped[int | None] = mapped_column(Integer(), 
nullable=True)
     partition_key: Mapped[str] = mapped_column(StringID(), nullable=False)
+    partition_date: Mapped[datetime | None] = mapped_column(UtcDateTime, 
nullable=True)
     # Serialized snapshot of the rollup definition (mapper + window for every
     # partitioned asset in the timetable) at the time this APDR was created.
     # The scheduler discards APDRs whose stored fingerprint no longer matches
diff --git a/airflow-core/src/airflow/models/dagrun.py 
b/airflow-core/src/airflow/models/dagrun.py
index ab024142918..3f1a493476e 100644
--- a/airflow-core/src/airflow/models/dagrun.py
+++ b/airflow-core/src/airflow/models/dagrun.py
@@ -433,6 +433,7 @@ class DagRun(Base, LoggingMixin):
             conf=self.conf,
             consumed_asset_events=[],
             partition_key=self.partition_key,
+            partition_date=self.partition_date,
         )
 
     @property
@@ -1089,6 +1090,9 @@ class DagRun(Base, LoggingMixin):
                 attributes["airflow.dag_run.logical_date"] = 
str(self.logical_date)
             if self.partition_key:
                 attributes["airflow.dag_run.partition_key"] = 
str(self.partition_key)
+            if self.partition_date:
+                attributes["airflow.dag_run.partition_date"] = 
self.partition_date.isoformat()
+
             # TODO: make the empty parent context optional. Default should be 
to
             # nest the dag run span under the currently active parent span (by
             # omitting `context` here); only use the empty `context.Context()` 
to
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 740596f9d69..011fc9933f7 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1546,6 +1546,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
                 OutletEventPayload(extra=outlet_event["extra"], 
partition_key=partition_key)
             )
         dag_run_partition_key = ti.dag_run.partition_key
+        dag_run_partition_date = ti.dag_run.partition_date
 
         asset_keys = {
             SerializedAssetUniqueKey(o.name, o.uri)
@@ -1581,6 +1582,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
                     asset=am,
                     extra=None,
                     partition_key=dag_run_partition_key,
+                    partition_date=dag_run_partition_date,
                     session=session,
                 )
                 return
@@ -1588,11 +1590,26 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
                 effective_pk = (
                     payload.partition_key if payload.partition_key is not None 
else dag_run_partition_key
                 )
+                # Carry partition_date only when the effective key matches the
+                # DagRun's — the run-level date refers to the run-level key and
+                # would mis-label an event emitted for a different partition.
+                if effective_pk == dag_run_partition_key:
+                    payload_partition_date = dag_run_partition_date
+                else:
+                    payload_partition_date = None
+                    if dag_run_partition_date is not None:
+                        ti.log.debug(
+                            "Task-emitted partition_key %r differs from DagRun 
partition_key %r; "
+                            "consumer partition_date will be None.",
+                            payload.partition_key,
+                            dag_run_partition_key,
+                        )
                 asset_manager.register_asset_change(
                     task_instance=ti,
                     asset=am,
                     extra=payload.extra,
                     partition_key=effective_pk,
+                    partition_date=payload_partition_date,
                     session=session,
                 )
 
@@ -1676,6 +1693,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
                     source_alias_names=event_aliase_names,
                     extra=asset_event_extra,
                     partition_key=dag_run_partition_key,
+                    partition_date=dag_run_partition_date,
                     session=session,
                 )
                 if event is None:
@@ -1688,6 +1706,7 @@ class TaskInstance(Base, LoggingMixin, BaseWorkload):
                         source_alias_names=event_aliase_names,
                         extra=asset_event_extra,
                         partition_key=dag_run_partition_key,
+                        partition_date=dag_run_partition_date,
                         session=session,
                     )
 
diff --git a/airflow-core/src/airflow/partition_mappers/base.py 
b/airflow-core/src/airflow/partition_mappers/base.py
index 0be82d8f6c5..f9d05b6d8d3 100644
--- a/airflow-core/src/airflow/partition_mappers/base.py
+++ b/airflow-core/src/airflow/partition_mappers/base.py
@@ -117,6 +117,21 @@ class PartitionMapper(ABC):
         """
         return None
 
+    def carry_partition_date(self, source_partition_date: datetime | None) -> 
datetime | None:
+        """
+        Return the producer's ``partition_date`` to carry onto the consumer 
APDR.
+
+        Captured at queue time as an asset event arrives, 
*source_partition_date*
+        is the producing run's ``partition_date``. The base implementation 
returns
+        ``None``: for most mappers the consumer's date is derived from its own
+        downstream key by :meth:`to_partition_date` at run creation, not 
carried
+        from the producer.
+        :class:`~airflow.partition_mappers.identity.IdentityMapper` overrides 
to
+        pass it through, since the consumer's key equals the producer's and the
+        key carries no temporal meaning to decode.
+        """
+        return None
+
     def serialize(self) -> dict[str, Any]:
         if self.max_downstream_keys is None:
             return {}
diff --git a/airflow-core/src/airflow/partition_mappers/identity.py 
b/airflow-core/src/airflow/partition_mappers/identity.py
index e0b2e25b1ae..c538747e183 100644
--- a/airflow-core/src/airflow/partition_mappers/identity.py
+++ b/airflow-core/src/airflow/partition_mappers/identity.py
@@ -16,11 +16,22 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from airflow.partition_mappers.base import PartitionMapper
 
+if TYPE_CHECKING:
+    from datetime import datetime
+
 
 class IdentityMapper(PartitionMapper):
     """Partition mapper that does not change the key."""
 
     def to_downstream(self, key: str) -> str:
         return key
+
+    def carry_partition_date(self, source_partition_date: datetime | None) -> 
datetime | None:
+        # Identity passthrough: the consumer's key equals the producer's, so 
the
+        # producer's partition_date is the consumer's. to_partition_date cannot
+        # recover it (the key carries no temporal meaning), so it is carried 
here.
+        return source_partition_date
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 413dbef3d7e..1d5cfc5d86b 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -3473,10 +3473,22 @@ export const $DAGRunResponse = {
                 }
             ],
             title: 'Partition Key'
+        },
+        partition_date: {
+            anyOf: [
+                {
+                    type: 'string',
+                    format: 'date-time'
+                },
+                {
+                    type: 'null'
+                }
+            ],
+            title: 'Partition Date'
         }
     },
     type: 'object',
-    required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at', 
'start_date', 'end_date', 'duration', 'data_interval_start', 
'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type', 
'state', 'triggered_by', 'triggering_user_name', 'conf', 'note', 
'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key'],
+    required: ['dag_run_id', 'dag_id', 'logical_date', 'queued_at', 
'start_date', 'end_date', 'duration', 'data_interval_start', 
'data_interval_end', 'run_after', 'last_scheduling_decision', 'run_type', 
'state', 'triggered_by', 'triggering_user_name', 'conf', 'note', 
'dag_versions', 'bundle_version', 'dag_display_name', 'partition_key', 
'partition_date'],
     title: 'DAGRunResponse',
     description: 'Dag Run serializer for responses.'
 } as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 31f716d85f8..f8cbb1fbc18 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -917,6 +917,7 @@ export type DAGRunResponse = {
     bundle_version: string | null;
     dag_display_name: string;
     partition_key: string | null;
+    partition_date: string | null;
 };
 
 /**
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index 2155ca50d33..f24f3636132 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "3.1.0": "cc92b33c6709",
     "3.1.8": "509b94a1042d",
     "3.2.0": "1d6611b6ab7c",
-    "3.3.0": "9ff64e1c35d3",
+    "3.3.0": "d2f4e1b3c5a7",
 }
 
 # Prefix used to identify tables holding data moved during migration.
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
index 89957e179c2..b65036b72d9 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py
@@ -1543,6 +1543,7 @@ class TestPostAssetMaterialize(TestAssets):
             "dag_versions": mock.ANY,
             "logical_date": None,
             "partition_key": None,
+            "partition_date": None,
             "queued_at": mock.ANY,
             "run_after": mock.ANY,
             "start_date": None,
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index ad11f764d48..b798e67f71e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -167,6 +167,9 @@ def setup(request, dag_maker, *, session=None):
     # The value uses the ProductMapper default delimiter (|) to form a 
composite key so we can
     # verify that the filter treats | as a literal character, not an OR 
separator.
     dag_run1.partition_key = "2026-01-01|us"
+    # Set a real partition_date so the GET/list responses exercise the 
serialized
+    # (non-None) partition_date path, not just the None case.
+    dag_run1.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc)
     dag_run1.note = (DAG1_RUN1_NOTE, "not_test")
     # Set end_date for testing duration filter
     dag_run1.end_date = dag_run1.start_date + timedelta(seconds=101)
@@ -317,6 +320,9 @@ def get_dag_run_dict(run: DagRun):
         "note": run.note,
         "dag_versions": get_dag_versions_dict(run.dag_versions),
         "partition_key": run.partition_key,
+        "partition_date": from_datetime_to_zulu_without_ms(run.partition_date)
+        if run.partition_date
+        else None,
     }
 
 
@@ -2371,6 +2377,7 @@ class TestTriggerDagRun:
             "triggered_by": "rest_api",
             "triggering_user_name": "test",
             "partition_key": None,
+            "partition_date": None,
         }
 
         assert response.json() == expected_response_json
@@ -2601,6 +2608,7 @@ class TestTriggerDagRun:
             "conf": {},
             "note": note,
             "partition_key": None,
+            "partition_date": None,
         }
 
         assert response_2.status_code == 409
@@ -2690,6 +2698,7 @@ class TestTriggerDagRun:
             "conf": {},
             "note": None,
             "partition_key": None,
+            "partition_date": None,
         }
 
     @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 5d3c37c9b52..1f448193046 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -421,6 +421,7 @@ class TestDagRunDetail:
             "end_date": "2025-12-13T00:00:00Z",
             "logical_date": None,
             "partition_key": None,
+            "partition_date": None,
             "run_after": "2025-12-13T00:00:00Z",
             "run_id": "previous",
             "run_type": "manual",
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 395837c0e61..6463efd068e 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -236,6 +236,7 @@ class TestTIRunState:
                 "triggering_user_name": None,
                 "consumed_asset_events": [],
                 "partition_key": None,
+                "partition_date": None,
                 "note": None,
                 "team_name": None,
             },
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py
new file mode 100644
index 00000000000..b2cc016be84
--- /dev/null
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_06_30/test_task_instances.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import pytest
+
+from airflow._shared.timezones import timezone
+from airflow.utils.state import DagRunState, State
+
+from tests_common.test_utils.db import clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+TIMESTAMP_STR = "2024-09-30T12:00:00Z"
+TIMESTAMP = timezone.parse(TIMESTAMP_STR)
+PARTITION_DATE = timezone.parse("2026-05-20T01:00:00")
+
+RUN_PATCH_BODY = {
+    "state": "running",
+    "hostname": "h",
+    "unixname": "u",
+    "pid": 1,
+    "start_date": TIMESTAMP_STR,
+}
+
+
[email protected]
+def old_ver_client(client):
+    """Execution API version immediately before ``partition_date`` was 
added."""
+    client.headers["Airflow-API-Version"] = "2026-06-16"
+    return client
+
+
+class TestPartitionDateFieldBackwardCompat:
+    @pytest.fixture(autouse=True)
+    def _freeze_time(self, time_machine):
+        time_machine.move_to(TIMESTAMP_STR, tick=False)
+
+    def setup_method(self):
+        clear_db_runs()
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    def test_old_version_strips_partition_date_from_dag_run(
+        self, old_ver_client, session, create_task_instance
+    ):
+        ti = create_task_instance(
+            task_id="test_partition_date_downgrade",
+            state=State.QUEUED,
+            dagrun_state=DagRunState.RUNNING,
+            session=session,
+            start_date=TIMESTAMP,
+        )
+        ti.dag_run.partition_key = "2026-05-20"
+        ti.dag_run.partition_date = PARTITION_DATE
+        session.commit()
+
+        response = 
old_ver_client.patch(f"/execution/task-instances/{ti.id}/run", 
json=RUN_PATCH_BODY)
+        assert response.status_code == 200
+        dag_run = response.json()["dag_run"]
+        assert dag_run["partition_key"] == "2026-05-20"
+        assert "partition_date" not in dag_run
+
+    def test_head_version_includes_partition_date_field(self, client, session, 
create_task_instance):
+        ti = create_task_instance(
+            task_id="test_partition_date_head",
+            state=State.QUEUED,
+            dagrun_state=DagRunState.RUNNING,
+            session=session,
+            start_date=TIMESTAMP,
+        )
+        ti.dag_run.partition_key = "2026-05-20"
+        ti.dag_run.partition_date = PARTITION_DATE
+        session.commit()
+
+        response = client.patch(f"/execution/task-instances/{ti.id}/run", 
json=RUN_PATCH_BODY)
+        assert response.status_code == 200
+        dag_run = response.json()["dag_run"]
+        assert dag_run["partition_key"] == "2026-05-20"
+        assert dag_run["partition_date"] == 
PARTITION_DATE.isoformat().replace("+00:00", "Z")
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index 40d9401b9ff..b8d47fb4d3c 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -30,6 +30,7 @@ from sqlalchemy.orm import Session
 
 from airflow import settings
 from airflow._shared.observability.metrics.base_stats_logger import StatsLogger
+from airflow._shared.timezones import timezone
 from airflow.assets.manager import AssetManager
 from airflow.models.asset import (
     AssetAliasModel,
@@ -44,6 +45,7 @@ from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbundle import DagBundleModel
 from airflow.models.log import Log
 from airflow.models.team import Team
+from airflow.partition_mappers.identity import IdentityMapper
 from airflow.partition_mappers.temporal import FanOutMapper, StartOfWeekMapper
 from airflow.partition_mappers.window import WeekWindow
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -260,6 +262,7 @@ class TestAssetManager:
             try:
                 return AssetManager._get_or_create_apdr(
                     target_key="test_partition_key",
+                    target_partition_date=None,
                     target_dag=testing_dag,
                     rollup_fingerprint=rollup_fingerprint,
                     asset_id=asm.id,
@@ -282,6 +285,164 @@ class TestAssetManager:
         assert len(set(ids)) == 1
         assert 
session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1
 
+    @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+    def test_get_or_create_apdr_suppresses_conflicting_partition_date(self, 
session):
+        """Two events resolving the same target key to different dates → 
suppress to None.
+
+        Rather than an order-dependent first-event-wins, conflicting carried 
dates produce a
+        deterministic ``None`` so the consumer DagRun is not stamped with a 
wrong, unstable date.
+        """
+        asm = AssetModel(uri="test://asset1/", name="partition_asset", 
group="asset")
+        testing_dag = DagModel(dag_id="testing_dag_pd_conflict", 
is_stale=False, bundle_name="testing")
+        session.add_all([asm, testing_dag])
+        session.commit()
+        fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": 
{}}}
+
+        first = AssetManager._get_or_create_apdr(
+            target_key="2026-05-20",
+            target_partition_date=timezone.parse("2026-05-20T00:00:00"),
+            target_dag=testing_dag,
+            rollup_fingerprint=fp,
+            asset_id=asm.id,
+            session=session,
+        )
+        assert first.partition_date == timezone.parse("2026-05-20T00:00:00")
+
+        # A second contributing event resolves the same key to a DIFFERENT 
date.
+        second = AssetManager._get_or_create_apdr(
+            target_key="2026-05-20",
+            target_partition_date=timezone.parse("2026-05-21T00:00:00"),
+            target_dag=testing_dag,
+            rollup_fingerprint=fp,
+            asset_id=asm.id,
+            session=session,
+        )
+        assert second.id == first.id  # same pending APDR
+        assert second.partition_date is None  # conflict suppressed, 
deterministic
+
+    @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+    def test_get_or_create_apdr_keeps_agreeing_partition_date(self, session):
+        """A later event carrying the same (or no) date does not trip the 
conflict suppression."""
+        asm = AssetModel(uri="test://asset1/", name="partition_asset", 
group="asset")
+        testing_dag = DagModel(dag_id="testing_dag_pd_agree", is_stale=False, 
bundle_name="testing")
+        session.add_all([asm, testing_dag])
+        session.commit()
+        fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": 
{}}}
+        source_date = timezone.parse("2026-05-20T00:00:00")
+
+        kwargs = dict(
+            target_key="2026-05-20",
+            target_dag=testing_dag,
+            rollup_fingerprint=fp,
+            asset_id=asm.id,
+            session=session,
+        )
+        first = 
AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs)
+        # Same date agrees → kept.
+        same = 
AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs)
+        assert same.id == first.id
+        assert same.partition_date == source_date
+        # A None-carrying event (e.g. a temporal mapper, resolved by the 
scheduler) is not a
+        # conflict → the existing date is kept.
+        with_none = 
AssetManager._get_or_create_apdr(target_partition_date=None, **kwargs)
+        assert with_none.id == first.id
+        assert with_none.partition_date == source_date
+
+    @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+    def test_get_or_create_apdr_adopts_date_when_existing_is_none(self, 
session):
+        """An APDR created with no date adopts a later event's carried date 
(not dropped)."""
+        asm = AssetModel(uri="test://asset1/", name="partition_asset", 
group="asset")
+        testing_dag = DagModel(dag_id="testing_dag_pd_adopt", is_stale=False, 
bundle_name="testing")
+        session.add_all([asm, testing_dag])
+        session.commit()
+        fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": 
{}}}
+        source_date = timezone.parse("2026-05-20T00:00:00")
+
+        kwargs = dict(
+            target_key="2026-05-20",
+            target_dag=testing_dag,
+            rollup_fingerprint=fp,
+            asset_id=asm.id,
+            session=session,
+        )
+        # First event carries no date (e.g. producer had no partition_date).
+        first = AssetManager._get_or_create_apdr(target_partition_date=None, 
**kwargs)
+        assert first.partition_date is None
+        # A later identity event carries a real date → adopted, not silently 
dropped.
+        adopted = 
AssetManager._get_or_create_apdr(target_partition_date=source_date, **kwargs)
+        assert adopted.id == first.id
+        assert adopted.partition_date == source_date
+
+    @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+    def test_get_or_create_apdr_recovers_after_conflict(self, session):
+        """Once a conflict has suppressed the date to None, a later event 
re-adopts a date."""
+        asm = AssetModel(uri="test://asset1/", name="partition_asset", 
group="asset")
+        testing_dag = DagModel(dag_id="testing_dag_pd_recover", 
is_stale=False, bundle_name="testing")
+        session.add_all([asm, testing_dag])
+        session.commit()
+        fp = {"asset-1|test://asset1/": {"__type": "IdentityMapper", "__var": 
{}}}
+        date_1 = timezone.parse("2026-05-20T00:00:00")
+        date_2 = timezone.parse("2026-05-21T00:00:00")
+
+        kwargs = dict(
+            target_key="2026-05-20",
+            target_dag=testing_dag,
+            rollup_fingerprint=fp,
+            asset_id=asm.id,
+            session=session,
+        )
+        first = AssetManager._get_or_create_apdr(target_partition_date=date_1, 
**kwargs)
+        assert first.partition_date == date_1
+        # Conflicting date suppresses to None.
+        conflicted = 
AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs)
+        assert conflicted.partition_date is None
+        # A subsequent event re-adopts (suppression is not permanently sticky).
+        recovered = 
AssetManager._get_or_create_apdr(target_partition_date=date_2, **kwargs)
+        assert recovered.id == first.id
+        assert recovered.partition_date == date_2
+
+    @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+    def test_carry_partition_date_failure_degrades_to_none(self, session, 
dag_maker, mock_task_instance):
+        """A mapper whose carry_partition_date raises must not abort the write.
+
+        The consumer is still queued via partition_key; only the carried 
partition_date is lost
+        (set to None), mirroring how a to_downstream failure is caught and 
handled in the loop.
+        """
+        _clear_partition_db()
+
+        asset_def = Asset(uri="s3://bucket/carry_raise", name="carry_raise")
+        with dag_maker(
+            dag_id="carry_raise_consumer",
+            schedule=PartitionedAssetTimetable(
+                assets=asset_def,
+                partition_mapper_config={asset_def: IdentityMapper()},
+            ),
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t")
+        dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        with (
+            mock.patch.object(IdentityMapper, "carry_partition_date", 
side_effect=RuntimeError("boom")),
+            mock.patch("airflow.assets.manager.log") as mock_log,
+        ):
+            AssetManager.register_asset_change(
+                task_instance=mock_task_instance,
+                asset=asset_def,
+                session=session,
+                partition_key="2026-05-20",
+                partition_date=timezone.parse("2026-05-20T00:00:00"),
+            )
+            session.flush()
+
+        # Write not aborted: the consumer is still queued...
+        apdr = session.scalar(select(AssetPartitionDagRun))
+        assert apdr is not None
+        # ...but the failed carry degraded to None instead of propagating.
+        assert apdr.partition_date is None
+        mock_log.exception.assert_called_once()
+
     @pytest.mark.need_serialized_dag
     @pytest.mark.usefixtures("testing_dag_bundle")
     def test_queue_partitioned_dags_stamps_rollup_fingerprint(self, session, 
dag_maker):
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index f905fccb734..3b950f71d5b 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -10250,6 +10250,7 @@ def _produce_and_register_asset_event(
     session: Session,
     dag_maker: DagMaker,
     expected_partition_key: str | None = None,
+    partition_date: datetime.datetime | None = None,
 ) -> AssetPartitionDagRun:
     if expected_partition_key is None:
         expected_partition_key = partition_key
@@ -10257,7 +10258,11 @@ def _produce_and_register_asset_event(
     with dag_maker(dag_id=dag_id, schedule=PartitionAtRuntime(), 
session=session) as dag:
         EmptyOperator(task_id="hi", outlets=[asset])
 
-    dr = dag_maker.create_dagrun(partition_key=partition_key, session=session)
+    dr = dag_maker.create_dagrun(
+        partition_key=partition_key,
+        partition_date=partition_date,
+        session=session,
+    )
     [ti] = dr.get_task_instances(session=session)
     session.commit()
 
@@ -10479,6 +10484,289 @@ def test_partitioned_dag_run_with_customized_mapper(
     assert asset_event.source_run_id == "test"
 
 
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_identity_passthrough(dag_maker: 
DagMaker, session: Session):
+    """IdentityMapper can't reconstruct a date from its key, so the scheduler 
resolver falls
+    back to the producer's source date carried on the APDR and stamps it on 
the consumer DagRun.
+
+    Temporal and composite mappers are resolved by the scheduler via 
to_partition_date (covered
+    by the partition_mapper and resolver tests); this exercises the 
IdentityMapper carry, which
+    is the one case the scheduler cannot resolve from the key alone.
+    """
+    asset_1 = Asset(name="asset-1")
+    source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC")
+
+    with dag_maker(
+        dag_id="asset-event-consumer",
+        schedule=PartitionedAssetTimetable(
+            assets=asset_1,
+            default_partition_mapper=IdentityMapper(),
+        ),
+        session=session,
+    ):
+        EmptyOperator(task_id="hi")
+    session.commit()
+
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+
+    apdr = _produce_and_register_asset_event(
+        dag_id="asset-event-producer",
+        asset=asset_1,
+        partition_key="2026-05-20T01:00:00",
+        partition_date=source_partition_date,
+        session=session,
+        dag_maker=dag_maker,
+        expected_partition_key="2026-05-20T01:00:00",
+    )
+    partition_dags = 
runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+    session.refresh(apdr)
+    assert apdr.created_dag_run_id is not None
+    assert partition_dags == {"asset-event-consumer"}
+
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    assert dag_run.partition_key == "2026-05-20T01:00:00"
+    assert dag_run.partition_date == source_partition_date
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
[email protected](SchedulerJobRunner, "_resolve_partition_date", 
autospec=True, return_value=None)
+def test_consumer_dag_run_partition_date_not_masked_when_resolver_suppresses(
+    mock_resolve, dag_maker: DagMaker, session: Session
+):
+    """A carried IdentityMapper date must not mask a resolver suppression.
+
+    When temporal mappers feeding the same APDR conflict (or one raises), the 
resolver
+    deliberately returns None and logs it; the carried date is only ever 
applied inside the
+    resolver, never at the call site. Here the APDR carries a date 
(IdentityMapper) but the
+    resolver returns None, and the consumer DagRun's partition_date must stay 
None — a
+    regression re-adding a call-site fallback to ``apdr.partition_date`` would 
fail this.
+    """
+    asset_1 = Asset(name="asset-1")
+    source_partition_date = pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC")
+
+    with dag_maker(
+        dag_id="asset-event-consumer",
+        schedule=PartitionedAssetTimetable(
+            assets=asset_1,
+            default_partition_mapper=IdentityMapper(),
+        ),
+        session=session,
+    ):
+        EmptyOperator(task_id="hi")
+    session.commit()
+
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+
+    apdr = _produce_and_register_asset_event(
+        dag_id="asset-event-producer",
+        asset=asset_1,
+        partition_key="2026-05-20T01:00:00",
+        partition_date=source_partition_date,
+        session=session,
+        dag_maker=dag_maker,
+        expected_partition_key="2026-05-20T01:00:00",
+    )
+    session.refresh(apdr)
+    # The IdentityMapper carry is stored on the APDR...
+    assert apdr.partition_date == source_partition_date
+
+    runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+    session.refresh(apdr)
+    assert apdr.created_dag_run_id is not None
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    assert dag_run.partition_key == "2026-05-20T01:00:00"
+    # ...but the resolver suppressed a date, so the call site must NOT 
substitute the carry.
+    assert dag_run.partition_date is None
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_none_for_non_temporal_mapper(
+    dag_maker: DagMaker,
+    session: Session,
+    custom_partition_mapper_patch: Callable[[], ExitStack],
+):
+    """For mappers that aren't temporal/identity, the consumer DagRun's 
partition_date stays None."""
+    asset_1 = Asset(name="asset-1")
+
+    with custom_partition_mapper_patch():
+        with dag_maker(
+            dag_id="asset-event-consumer",
+            schedule=PartitionedAssetTimetable(
+                assets=asset_1,
+                default_partition_mapper=Key1Mapper(),  # type: 
ignore[arg-type]
+            ),
+            session=session,
+        ):
+            EmptyOperator(task_id="hi")
+        session.commit()
+
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+    with custom_partition_mapper_patch():
+        apdr = _produce_and_register_asset_event(
+            dag_id="asset-event-producer",
+            asset=asset_1,
+            partition_key="this-is-not-key-1-before-mapped",
+            partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"),
+            session=session,
+            dag_maker=dag_maker,
+            expected_partition_key="key-1",
+        )
+        runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+    session.refresh(apdr)
+    assert apdr.created_dag_run_id is not None
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    assert dag_run.partition_key == "key-1"
+    assert dag_run.partition_date is None
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_is_none_when_source_has_no_date(
+    dag_maker: DagMaker, session: Session
+):
+    """When the producer DagRun has no partition_date, IdentityMapper passes 
None through."""
+    asset_1 = Asset(name="asset-1")
+
+    with dag_maker(
+        dag_id="asset-event-consumer",
+        schedule=PartitionedAssetTimetable(
+            assets=asset_1,
+            default_partition_mapper=IdentityMapper(),
+        ),
+        session=session,
+    ):
+        EmptyOperator(task_id="hi")
+    session.commit()
+
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+
+    apdr = _produce_and_register_asset_event(
+        dag_id="asset-event-producer",
+        asset=asset_1,
+        partition_key="2026-05-20T01:00:00",
+        partition_date=None,
+        session=session,
+        dag_maker=dag_maker,
+        expected_partition_key="2026-05-20T01:00:00",
+    )
+    runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+    session.refresh(apdr)
+    assert apdr.created_dag_run_id is not None
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    assert dag_run.partition_key == "2026-05-20T01:00:00"
+    assert dag_run.partition_date is None
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
+def test_consumer_dag_run_partition_date_is_none_when_task_key_diverges(
+    dag_maker: DagMaker, session: Session
+):
+    """A task-emitted partition_key differing from the DagRun's drops the 
source date.
+
+    The producer DagRun carries a partition_date, but the task emits an outlet 
event with a
+    different partition_key. The run-level date refers to the run-level key, 
so it must not be
+    carried onto the divergent partition: the APDR (and the consumer DagRun 
created from it) keep
+    partition_date None even though the producer run had one.
+    """
+    asset_1 = Asset(name="asset-1")
+
+    with dag_maker(
+        dag_id="asset-event-consumer",
+        schedule=PartitionedAssetTimetable(
+            assets=asset_1,
+            default_partition_mapper=IdentityMapper(),
+        ),
+        session=session,
+    ):
+        EmptyOperator(task_id="hi")
+    session.commit()
+
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+
+    with dag_maker(
+        dag_id="asset-event-producer",
+        schedule=PartitionAtRuntime(),
+        session=session,
+    ) as dag:
+        EmptyOperator(task_id="hi", outlets=[asset_1])
+
+    dr = dag_maker.create_dagrun(
+        partition_key="scheduler-key",
+        partition_date=pendulum.datetime(2026, 5, 20, 1, 0, 0, tz="UTC"),
+        session=session,
+    )
+    [ti] = dr.get_task_instances(session=session)
+    session.commit()
+
+    serialized_outlets = dag.get_task("hi").outlets
+    TaskInstance.register_asset_changes_in_db(
+        ti=ti,
+        task_outlets=[o.asprofile() for o in serialized_outlets],
+        outlet_events=[
+            {
+                "dest_asset_key": {"name": "asset-1", "uri": "asset-1"},
+                "extra": {},
+                "partition_key": "task-key",
+            },
+        ],
+        session=session,
+    )
+    session.commit()
+
+    event = session.scalar(
+        select(AssetEvent).where(
+            AssetEvent.source_dag_id == dag.dag_id,
+            AssetEvent.source_run_id == dr.run_id,
+        )
+    )
+    assert event is not None
+    assert event.partition_key == "task-key"
+
+    apdr = session.scalar(
+        select(AssetPartitionDagRun)
+        .join(
+            PartitionedAssetKeyLog,
+            PartitionedAssetKeyLog.asset_partition_dag_run_id == 
AssetPartitionDagRun.id,
+        )
+        .where(PartitionedAssetKeyLog.asset_event_id == event.id)
+    )
+    assert apdr is not None
+    # Divergent key → the threaded source date is dropped to None at APDR 
creation.
+    assert apdr.partition_key == "task-key"
+    assert apdr.partition_date is None
+
+    runner._create_dagruns_for_partitioned_asset_dags(session=session)
+
+    session.refresh(apdr)
+    assert apdr.created_dag_run_id is not None
+    dag_run = session.scalar(select(DagRun).where(DagRun.id == 
apdr.created_dag_run_id))
+    assert dag_run is not None
+    assert dag_run.partition_key == "task-key"
+    assert dag_run.partition_date is None
+
+
 @pytest.mark.need_serialized_dag
 @pytest.mark.usefixtures("clear_asset_partition_rows")
 def test_consumer_dag_listen_to_two_partitioned_asset(
@@ -12265,25 +12553,39 @@ def _make_runner() -> SchedulerJobRunner:
     )
 
 
+_CARRIED_DATE = datetime.datetime(2026, 5, 20, 1, 0, 0, 
tzinfo=datetime.timezone.utc)
+
+
 @pytest.mark.parametrize(
-    ("mappers", "partition_key", "expected"),
+    ("mappers", "partition_key", "carried_partition_date", "expected"),
     [
-        # Non-temporal mapper → no anchor.
-        pytest.param([CoreIdentityMapper()], "some-key", None, 
id="non-temporal-none"),
+        # Non-temporal mapper, nothing carried → None.
+        pytest.param([CoreIdentityMapper()], "some-key", None, None, 
id="non-temporal-none"),
+        # Non-temporal mapper with a carried producer date (IdentityMapper) → 
the carry.
+        pytest.param(
+            [CoreIdentityMapper()],
+            "some-key",
+            _CARRIED_DATE,
+            _CARRIED_DATE,
+            id="non-temporal-returns-carried-date",
+        ),
         # StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT, 
DST since 2024-03-10),
         # localised with the mapper's own timezone rather than the global 
default.
         pytest.param(
             [CoreStartOfDayMapper(timezone="America/New_York")],
             "2024-03-15",
+            None,
             datetime.datetime(2024, 3, 15, 4, 0, 0, 
tzinfo=datetime.timezone.utc),
             id="non-utc-uses-mapper-timezone",
         ),
-        # Key cannot be decoded by the mapper's format → caught → None (no 
raise).
-        pytest.param([CoreStartOfDayMapper()], "not-a-date", None, 
id="decode-failure-none"),
+        # Key cannot be decoded by the mapper's format → caught → None, and 
the carried
+        # date is NOT substituted (the error is logged; masking it would hide 
that).
+        pytest.param([CoreStartOfDayMapper()], "not-a-date", _CARRIED_DATE, 
None, id="decode-failure-none"),
         # FanOutMapper unwraps to its downstream_mapper (daily), which owns 
the per-day key.
         pytest.param(
             [CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(), 
window=CoreWeekWindow())],
             "2024-01-16",
+            None,
             datetime.datetime(2024, 1, 16, 0, 0, 0, 
tzinfo=datetime.timezone.utc),
             id="fanout-uses-downstream-mapper",
         ),
@@ -12291,13 +12593,16 @@ def _make_runner() -> SchedulerJobRunner:
         pytest.param(
             [CoreStartOfDayMapper(), CoreStartOfDayMapper()],
             "2024-03-15",
+            None,
             datetime.datetime(2024, 3, 15, 0, 0, 0, 
tzinfo=datetime.timezone.utc),
             id="agreeing-mappers-anchor",
         ),
-        # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct 
instants → None.
+        # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct 
instants → None,
+        # and the carried date is NOT substituted (it would mask the logged 
conflict).
         pytest.param(
             [CoreStartOfDayMapper(timezone="UTC"), 
CoreStartOfDayMapper(timezone="America/New_York")],
             "2024-03-15",
+            _CARRIED_DATE,
             None,
             id="conflicting-mappers-none",
         ),
@@ -12306,15 +12611,19 @@ def _make_runner() -> SchedulerJobRunner:
         pytest.param(
             [CoreStartOfDayMapper(), CoreStartOfHourMapper()],
             "2024-03-15",
+            _CARRIED_DATE,
             None,
             id="one-failing-mapper-aborts",
         ),
     ],
 )
-def test_resolve_partition_date(mappers, partition_key, expected):
+def test_resolve_partition_date(mappers, partition_key, 
carried_partition_date, expected):
     """_resolve_partition_date over mapper compositions: temporal / fan-out / 
agree / conflict / failure.
 
-    The mappers are consumed one per upstream asset, so ``asset_infos`` is 
sized to ``mappers``.
+    The carried date (the producer's source date stamped on the APDR, set only 
for IdentityMapper)
+    is returned only when no temporal mapper contributes an anchor. On a 
conflict or a mapper
+    error the result is None — the carry must not mask the logged suppression. 
The mappers are
+    consumed one per upstream asset, so ``asset_infos`` is sized to 
``mappers``.
     """
     runner = _make_runner()
     timetable = mock.MagicMock()
@@ -12326,5 +12635,6 @@ def test_resolve_partition_date(mappers, partition_key, 
expected):
         asset_infos=asset_infos,
         partition_key=partition_key,
         dag_id="test-dag",
+        carried_partition_date=carried_partition_date,
     )
     assert result == expected
diff --git a/airflow-core/tests/unit/partition_mappers/test_base.py 
b/airflow-core/tests/unit/partition_mappers/test_base.py
index e7b0ba02f07..ba48057cc12 100644
--- a/airflow-core/tests/unit/partition_mappers/test_base.py
+++ b/airflow-core/tests/unit/partition_mappers/test_base.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import re
+from datetime import datetime, timezone
 
 import pytest
 
@@ -29,6 +30,17 @@ from airflow.serialization.encoders import 
encode_partition_mapper
 from airflow.serialization.enums import Encoding
 
 
+class TestCarryPartitionDate:
+    def test_base_returns_none_by_default(self):
+        """Non-identity mappers don't carry the producer's date; it's derived 
from the key instead."""
+        dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc)
+        assert StartOfDayMapper().carry_partition_date(dt) is None
+        assert (
+            RollupMapper(upstream_mapper=StartOfDayMapper(), 
window=DayWindow()).carry_partition_date(dt)
+            is None
+        )
+
+
 class TestPartitionMapperInitSubclass:
     """Verify that __init_subclass__ enforces the decode/encode pair 
contract."""
 
diff --git a/airflow-core/tests/unit/partition_mappers/test_identity.py 
b/airflow-core/tests/unit/partition_mappers/test_identity.py
index 9a97e583db5..b992e76430e 100644
--- a/airflow-core/tests/unit/partition_mappers/test_identity.py
+++ b/airflow-core/tests/unit/partition_mappers/test_identity.py
@@ -16,6 +16,8 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime, timezone
+
 from airflow.partition_mappers.identity import IdentityMapper
 from airflow.serialization.decoders import decode_partition_mapper
 from airflow.serialization.encoders import encode_partition_mapper
@@ -27,6 +29,13 @@ class TestIdentityMapper:
         pm = IdentityMapper()
         assert pm.to_downstream("key") == "key"
 
+    def test_carry_partition_date_passes_source_through(self):
+        """IdentityMapper carries the producer's date through (its key can't 
reconstruct one)."""
+        pm = IdentityMapper()
+        dt = datetime(2026, 5, 20, 1, 0, 0, tzinfo=timezone.utc)
+        assert pm.carry_partition_date(dt) == dt
+        assert pm.carry_partition_date(None) is None
+
     def test_serialize(self):
         pm = IdentityMapper()
         assert pm.serialize() == {}
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 779bb423191..cb9a1ca15a3 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -1774,6 +1774,7 @@ class DAGRunResponse(BaseModel):
     bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None
     dag_display_name: Annotated[str, Field(title="Dag Display Name")]
     partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+    partition_date: Annotated[datetime | None, Field(title="Partition Date")] 
= None
 
 
 class DAGRunsBatchBody(BaseModel):
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/python.py 
b/providers/standard/src/airflow/providers/standard/operators/python.py
index fb058b924b6..8698ddc403b 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -470,6 +470,8 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
         "prev_execution_date",
         "prev_execution_date_success",
     }
+    if AIRFLOW_V_3_3_PLUS:
+        PENDULUM_SERIALIZABLE_CONTEXT_KEYS.add("partition_date")
 
     AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {
         "macros",
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py 
b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
index 812ca4d0418..bc03569386d 100644
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
@@ -775,6 +775,7 @@ class DagRun(BaseModel):
     triggering_user_name: Annotated[str | None, Field(title="Triggering User 
Name")] = None
     consumed_asset_events: Annotated[list[AssetEventDagRunReference], 
Field(title="Consumed Asset Events")]
     partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+    partition_date: Annotated[AwareDatetime | None, Field(title="Partition 
Date")] = None
     note: Annotated[str | None, Field(title="Note")] = None
     team_name: Annotated[str | None, Field(title="Team Name")] = None
 
diff --git a/task-sdk/src/airflow/sdk/definitions/context.py 
b/task-sdk/src/airflow/sdk/definitions/context.py
index 462af80aeca..9d36203c2eb 100644
--- a/task-sdk/src/airflow/sdk/definitions/context.py
+++ b/task-sdk/src/airflow/sdk/definitions/context.py
@@ -64,6 +64,7 @@ class Context(TypedDict, total=False):
     outlets: list
     params: dict[str, Any]
     partition_key: NotRequired[str | None]
+    partition_date: NotRequired[DateTime | None]
     prev_data_interval_start_success: NotRequired[DateTime | None]
     prev_data_interval_end_success: NotRequired[DateTime | None]
     prev_start_date_success: NotRequired[DateTime | None]
diff --git a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json 
b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
index 4807d9f53b3..b75957870d3 100644
--- a/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
+++ b/task-sdk/src/airflow/sdk/execution_time/schema/schema.json
@@ -1277,6 +1277,19 @@
           "default": null,
           "title": "Partition Key"
         },
+        "partition_date": {
+          "anyOf": [
+            {
+              "format": "date-time",
+              "type": "string"
+            },
+            {
+              "type": "null"
+            }
+          ],
+          "default": null,
+          "title": "Partition Date"
+        },
         "note": {
           "anyOf": [
             {
@@ -4640,6 +4653,19 @@
           ],
           "title": "Partition Key"
         },
+        "partition_date": {
+          "anyOf": [
+            {
+              "format": "date-time",
+              "type": "string"
+            },
+            {
+              "type": "null"
+            }
+          ],
+          "default": null,
+          "title": "Partition Date"
+        },
         "note": {
           "anyOf": [
             {
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 7eb52038ce9..a39e2f91b53 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -334,6 +334,7 @@ class RuntimeTaskInstance(TaskInstance):
                 # TODO: Assess if we need to pass these through 
timezone.coerce_datetime
                 "dag_run": dag_run,  # type: ignore[typeddict-item]  # 
Removable after #46522
                 "partition_key": dag_run.partition_key,
+                "partition_date": coerce_datetime(dag_run.partition_date),
                 "triggering_asset_events": TriggeringAssetEventsAccessor.build(
                     
AssetEventDagRunReferenceResult.from_asset_event_dag_run_reference(event)
                     for event in dag_run.consumed_asset_events
diff --git a/task-sdk/src/airflow/sdk/types.py 
b/task-sdk/src/airflow/sdk/types.py
index 711dbe71ca4..bdaa030cdbe 100644
--- a/task-sdk/src/airflow/sdk/types.py
+++ b/task-sdk/src/airflow/sdk/types.py
@@ -123,6 +123,7 @@ class DagRunProtocol(Protocol):
     triggering_user_name: str | None
     consumed_asset_events: list[AssetEventDagRunReference]
     partition_key: str | None
+    partition_date: AwareDatetime | None
     note: str | None
 
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 48932581954..bd50711ecea 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -2397,6 +2397,7 @@ REQUEST_TEST_CASES = [
             "run_id": "prev_run",
             "logical_date": timezone.parse("2024-01-14T12:00:00Z"),
             "partition_key": None,
+            "partition_date": None,
             "run_type": "scheduled",
             "start_date": timezone.parse("2024-01-15T12:00:00Z"),
             "run_after": timezone.parse("2024-01-15T12:00:00Z"),
@@ -2450,6 +2451,7 @@ REQUEST_TEST_CASES = [
                 "run_id": "prev_run",
                 "logical_date": timezone.parse("2024-01-14T12:00:00Z"),
                 "partition_key": None,
+                "partition_date": None,
                 "run_type": "scheduled",
                 "start_date": timezone.parse("2024-01-15T12:00:00Z"),
                 "run_after": timezone.parse("2024-01-15T12:00:00Z"),
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 8c1312833e8..bcbf83e1249 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -2001,6 +2001,7 @@ class TestRuntimeTaskInstance:
             "ts_nodash": "20241201T010000",
             "ts_nodash_with_tz": "20241201T010000+0000",
             "partition_key": dr.partition_key,
+            "partition_date": dr.partition_date,
         }
 
     def test_partition_key_in_context(self, create_runtime_ti, 
mock_supervisor_comms):
@@ -2027,6 +2028,37 @@ class TestRuntimeTaskInstance:
         context = runtime_ti.get_template_context()
         assert context["partition_key"] == "some-partition"
 
+    def test_partition_date_in_context(self, create_runtime_ti, 
mock_supervisor_comms):
+        """Test that partition_date from dag_run is exposed in the template 
context."""
+        task = BaseOperator(task_id="hello")
+        runtime_ti = create_runtime_ti(task=task, dag_id="basic_task")
+
+        dr = runtime_ti._ti_context_from_server.dag_run
+
+        mock_supervisor_comms.send.return_value = PrevSuccessfulDagRunResult(
+            data_interval_end=dr.logical_date - timedelta(hours=1),
+            data_interval_start=dr.logical_date - timedelta(hours=2),
+            start_date=dr.start_date - timedelta(hours=1),
+            end_date=dr.start_date,
+        )
+
+        context = runtime_ti.get_template_context()
+
+        # Default: partition_date is None
+        assert context["partition_date"] is None
+
+        # Set partition_date on dag_run and verify it surfaces in context
+        partition_date = timezone.datetime(2026, 5, 20, 1, 0, 0)
+        dr.partition_date = partition_date
+        context = runtime_ti.get_template_context()
+        assert context["partition_date"] == partition_date
+
+        # Naive datetime is coerced to tz-aware so Jinja `| ds` / `| ts` 
filters
+        # operate on a real awareness boundary.
+        dr.partition_date = datetime(2026, 5, 20, 1, 0, 0)
+        context = runtime_ti.get_template_context()
+        assert context["partition_date"].tzinfo is not None
+
     def test_lazy_loading_not_triggered_until_accessed(self, 
create_runtime_ti, mock_supervisor_comms):
         """Ensure lazy-loaded attributes are not resolved until accessed."""
         task = BaseOperator(task_id="hello")

Reply via email to