This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 52734ea7cc6 docs: asset partition (#63262)
52734ea7cc6 is described below

commit 52734ea7cc67615f63a5c9f61502f5d18e9e6094
Author: Wei Lee <[email protected]>
AuthorDate: Tue Mar 24 18:58:48 2026 +0800

    docs: asset partition (#63262)
    
    * docs: asset partition
    
    * fixup! docs: asset partition
---
 .../docs/authoring-and-scheduling/assets.rst       | 148 +++++++++++++++++++++
 1 file changed, 148 insertions(+)

diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst 
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index b5b97b9979c..42bc78c0005 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -401,3 +401,151 @@ As mentioned in :ref:`Fetching information from 
previously emitted asset events<
         def consume_asset_alias_events(*, inlet_events):
             events = inlet_events[AssetAlias("example-alias")]
             last_row_count = events[-1].extra["row_count"]
+
+Asset partitions
+----------------
+
+.. versionadded:: 3.2.0
+
+Asset events can include a ``partition_key`` to make it _partitioned__. This 
lets you model
+the same asset at partition granularity (for example, ``2026-03-10T09:00:00`` 
for an
+hourly partition).
+
+To produce partitioned events on a schedule, use
+``CronPartitionTimetable`` in the producer Dag (or ``@asset``). This timetable
+creates asset events with a partition key on each run.
+
+.. code-block:: python
+
+    from airflow.sdk import CronPartitionTimetable, asset
+
+
+    @asset(
+        uri="file://incoming/player-stats/team_b.csv",
+        schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"),
+    )
+    def team_b_player_stats():
+        pass
+
+Partitioned events are intended for partition-aware downstream scheduling, and
+do not trigger non-partition-aware Dags.
+
+For downstream partition-aware scheduling, use ``PartitionedAssetTimetable``:
+
+.. code-block:: python
+
+    from airflow.sdk import DAG, HourlyMapper, PartitionedAssetTimetable
+
+    with DAG(
+        dag_id="clean_and_combine_player_stats",
+        schedule=PartitionedAssetTimetable(
+            assets=team_a_player_stats & team_b_player_stats & 
team_c_player_stats,
+            default_partition_mapper=HourlyMapper(),
+        ),
+        catchup=False,
+    ):
+        ...
+
+``PartitionedAssetTimetable`` requires partitioned asset events. If an asset
+event does not contain a ``partition_key``, it will not trigger a downstream
+Dag that uses ``PartitionedAssetTimetable``.
+
+``default_partition_mapper`` is used for every upstream asset unless you
+override it via ``partition_mapper_config``. The default mapper is
+``IdentityMapper`` (no key transformation).
+
+Partition mappers define how upstream partition keys are transformed to the
+downstream Dag partition key:
+
+* ``IdentityMapper`` keeps keys unchanged.
+* Temporal mappers such as ``HourlyMapper``, ``DailyMapper``, and
+  ``YearlyMapper`` normalize time keys to a chosen grain. For input key
+  ``2026-03-10T09:37:51``, the default outputs are:
+
+  * ``HourlyMapper`` -> ``2026-03-10T09``
+  * ``DailyMapper`` -> ``2026-03-10``
+  * ``YearlyMapper`` -> ``2026``
+* ``ProductMapper`` maps composite keys segment-by-segment.
+  It applies one mapper per segment and then rejoins the mapped segments.
+  For example, with key ``us|2026-03-10T09:00:00``,
+  ``ProductMapper(IdentityMapper(), DailyMapper())`` produces
+  ``us|2026-03-10``.
+* ``AllowedKeyMapper`` validates that keys are in a fixed allow-list and
+  passes the key through unchanged if valid.
+  For example, ``AllowedKeyMapper(["us", "eu", "apac"])`` accepts only those
+  region keys and rejects all others.
+
+Example of per-asset mapper configuration and composite-key mapping:
+
+.. code-block:: python
+
+    from airflow.sdk import (
+        Asset,
+        DailyMapper,
+        IdentityMapper,
+        PartitionedAssetTimetable,
+        ProductMapper,
+    )
+
+    regional_sales = Asset(uri="file://incoming/sales/regional.csv", 
name="regional_sales")
+
+    with DAG(
+        dag_id="aggregate_regional_sales",
+        schedule=PartitionedAssetTimetable(
+            assets=regional_sales,
+            default_partition_mapper=ProductMapper(IdentityMapper(), 
DailyMapper()),
+        ),
+    ):
+        ...
+
+You can also override mappers for specific upstream assets with
+``partition_mapper_config``:
+
+.. code-block:: python
+
+    from airflow.sdk import Asset, DAG, DailyMapper, IdentityMapper, 
PartitionedAssetTimetable
+
+    hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", 
name="hourly_sales")
+    daily_targets = Asset(uri="file://incoming/sales/targets.csv", 
name="daily_targets")
+
+    with DAG(
+        dag_id="join_sales_and_targets",
+        schedule=PartitionedAssetTimetable(
+            assets=hourly_sales & daily_targets,
+            # Default behavior: map timestamp-like keys to daily keys.
+            default_partition_mapper=DailyMapper(),
+            # Override for assets that already emit daily partition keys.
+            partition_mapper_config={
+                daily_targets: IdentityMapper(),
+            },
+        ),
+    ):
+        ...
+
+If transformed partition keys from all required upstream assets do not align,
+the downstream Dag will not be triggered for that partition.
+
+The same applies when a mapper cannot transform a key. For example, if an
+upstream event has ``partition_key="random-text"`` and the downstream mapping
+uses ``DailyMapper`` (which expects a timestamp-like key), no downstream
+partition match can be produced, so the downstream Dag is not triggered for
+that key.
+
+Inside partitioned Dag runs, access the resolved partition through
+``dag_run.partition_key``.
+
+You can also trigger a DagRun manually with a partition key (for example,
+through the Trigger Dag window in the UI, or through the REST API by
+including ``partition_key`` in the request body):
+
+.. code-block:: bash
+
+    curl -X POST 
"http://<airflow-host>/api/v2/dags/aggregate_regional_sales/dagRuns" \
+      -H "Content-Type: application/json" \
+      -d '{
+        "logical_date": "2026-03-10T00:00:00Z",
+        "partition_key": "us|2026-03-10T09:00:00"
+      }'
+
+For complete runnable examples, see
+``airflow-core/src/airflow/example_dags/example_asset_partition.py``.

Reply via email to