This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch py-client-sync in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8ea2481f97d2dd777ecb9c9daecd9aa187401083 Author: Wei Lee <[email protected]> AuthorDate: Tue Mar 24 18:58:48 2026 +0800 docs: asset partition (#63262) * docs: asset partition * fixup! docs: asset partition --- .../docs/authoring-and-scheduling/assets.rst | 148 +++++++++++++++++++++ 1 file changed, 148 insertions(+) diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index b5b97b9979c..42bc78c0005 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -401,3 +401,151 @@ As mentioned in :ref:`Fetching information from previously emitted asset events< def consume_asset_alias_events(*, inlet_events): events = inlet_events[AssetAlias("example-alias")] last_row_count = events[-1].extra["row_count"] + +Asset partitions +---------------- + +.. versionadded:: 3.2.0 + +Asset events can include a ``partition_key`` to make it _partitioned__. This lets you model +the same asset at partition granularity (for example, ``2026-03-10T09:00:00`` for an +hourly partition). + +To produce partitioned events on a schedule, use +``CronPartitionTimetable`` in the producer Dag (or ``@asset``). This timetable +creates asset events with a partition key on each run. + +.. code-block:: python + + from airflow.sdk import CronPartitionTimetable, asset + + + @asset( + uri="file://incoming/player-stats/team_b.csv", + schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"), + ) + def team_b_player_stats(): + pass + +Partitioned events are intended for partition-aware downstream scheduling, and +do not trigger non-partition-aware Dags. + +For downstream partition-aware scheduling, use ``PartitionedAssetTimetable``: + +.. code-block:: python + + from airflow.sdk import DAG, HourlyMapper, PartitionedAssetTimetable + + with DAG( + dag_id="clean_and_combine_player_stats", + schedule=PartitionedAssetTimetable( + assets=team_a_player_stats & team_b_player_stats & team_c_player_stats, + default_partition_mapper=HourlyMapper(), + ), + catchup=False, + ): + ... + +``PartitionedAssetTimetable`` requires partitioned asset events. If an asset +event does not contain a ``partition_key``, it will not trigger a downstream +Dag that uses ``PartitionedAssetTimetable``. + +``default_partition_mapper`` is used for every upstream asset unless you +override it via ``partition_mapper_config``. The default mapper is +``IdentityMapper`` (no key transformation). + +Partition mappers define how upstream partition keys are transformed to the +downstream Dag partition key: + +* ``IdentityMapper`` keeps keys unchanged. +* Temporal mappers such as ``HourlyMapper``, ``DailyMapper``, and + ``YearlyMapper`` normalize time keys to a chosen grain. For input key + ``2026-03-10T09:37:51``, the default outputs are: + + * ``HourlyMapper`` -> ``2026-03-10T09`` + * ``DailyMapper`` -> ``2026-03-10`` + * ``YearlyMapper`` -> ``2026`` +* ``ProductMapper`` maps composite keys segment-by-segment. + It applies one mapper per segment and then rejoins the mapped segments. + For example, with key ``us|2026-03-10T09:00:00``, + ``ProductMapper(IdentityMapper(), DailyMapper())`` produces + ``us|2026-03-10``. +* ``AllowedKeyMapper`` validates that keys are in a fixed allow-list and + passes the key through unchanged if valid. + For example, ``AllowedKeyMapper(["us", "eu", "apac"])`` accepts only those + region keys and rejects all others. + +Example of per-asset mapper configuration and composite-key mapping: + +.. code-block:: python + + from airflow.sdk import ( + Asset, + DailyMapper, + IdentityMapper, + PartitionedAssetTimetable, + ProductMapper, + ) + + regional_sales = Asset(uri="file://incoming/sales/regional.csv", name="regional_sales") + + with DAG( + dag_id="aggregate_regional_sales", + schedule=PartitionedAssetTimetable( + assets=regional_sales, + default_partition_mapper=ProductMapper(IdentityMapper(), DailyMapper()), + ), + ): + ... + +You can also override mappers for specific upstream assets with +``partition_mapper_config``: + +.. code-block:: python + + from airflow.sdk import Asset, DAG, DailyMapper, IdentityMapper, PartitionedAssetTimetable + + hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", name="hourly_sales") + daily_targets = Asset(uri="file://incoming/sales/targets.csv", name="daily_targets") + + with DAG( + dag_id="join_sales_and_targets", + schedule=PartitionedAssetTimetable( + assets=hourly_sales & daily_targets, + # Default behavior: map timestamp-like keys to daily keys. + default_partition_mapper=DailyMapper(), + # Override for assets that already emit daily partition keys. + partition_mapper_config={ + daily_targets: IdentityMapper(), + }, + ), + ): + ... + +If transformed partition keys from all required upstream assets do not align, +the downstream Dag will not be triggered for that partition. + +The same applies when a mapper cannot transform a key. For example, if an +upstream event has ``partition_key="random-text"`` and the downstream mapping +uses ``DailyMapper`` (which expects a timestamp-like key), no downstream +partition match can be produced, so the downstream Dag is not triggered for +that key. + +Inside partitioned Dag runs, access the resolved partition through +``dag_run.partition_key``. + +You can also trigger a DagRun manually with a partition key (for example, +through the Trigger Dag window in the UI, or through the REST API by +including ``partition_key`` in the request body): + +.. code-block:: bash + + curl -X POST "http://<airflow-host>/api/v2/dags/aggregate_regional_sales/dagRuns" \ + -H "Content-Type: application/json" \ + -d '{ + "logical_date": "2026-03-10T00:00:00Z", + "partition_key": "us|2026-03-10T09:00:00" + }' + +For complete runnable examples, see +``airflow-core/src/airflow/example_dags/example_asset_partition.py``.
