This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ed5a08fd1f AIP-76: Add example and docs for runtime asset
partitioning (#67307)
9ed5a08fd1f is described below
commit 9ed5a08fd1f85900baedf0a1dee3663b3fe70370
Author: Anish Giri <[email protected]>
AuthorDate: Fri May 22 04:48:34 2026 -0500
AIP-76: Add example and docs for runtime asset partitioning (#67307)
---
.../docs/authoring-and-scheduling/assets.rst | 42 +++++++++++++++++
.../example_dags/example_asset_partition.py | 53 ++++++++++++++++++++++
2 files changed, 95 insertions(+)
diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index a983544900c..9fbb232cbb3 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -596,5 +596,47 @@ including ``partition_key`` in the request body):
"partition_key": "us|2026-03-10T09:00:00"
}'
+Setting partition keys at runtime
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+When the partition key is not known ahead of time (for example, a watermark
+discovered from the source data, a late-arriving file, or a backfill request),
+let the producing task decide it while it runs. Schedule the producer with
+``PartitionAtRuntime()`` and record the key(s) on the emitted event with
+``outlet_events[self].add_partitions(...)``:
+
+.. code-block:: python
+
+ from airflow.sdk import PartitionAtRuntime, asset
+
+
+ @asset(
+ uri="file://incoming/player-stats/live-region.csv",
+ schedule=PartitionAtRuntime(),
+ )
+ def live_region_player_stats(self, outlet_events):
+ # The key is only known once the task runs.
+ outlet_events[self].add_partitions("us")
+
+Inside an ``@asset`` function, ``self`` (the emitted ``Asset``) and
+``outlet_events`` (the outlet event accessor) are reserved parameter names that
+Airflow populates at runtime. Pass a single key, or a list to fan out to
several
+partitions in one run. Each key produces its own asset event, and duplicate
+keys collapse to a single event:
+
+.. code-block:: python
+
+ @asset(
+ uri="file://incoming/player-stats/multi-region.csv",
+ schedule=PartitionAtRuntime(),
+ )
+ def multi_region_player_stats(self, outlet_events):
+ outlet_events[self].add_partitions(["us", "eu", "apac"])
+
+When a runtime run emits exactly one partition key, the producing
+``dag_run.partition_key`` is back-filled to that key. Downstream Dags consume
+these events the same way as timetable-produced partitions, through
+``PartitionedAssetTimetable``.
+
For complete runnable examples, see
``airflow-core/src/airflow/example_dags/example_asset_partition.py``.
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 75d582f6cad..f9c8258f951 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -25,6 +25,7 @@ from airflow.sdk import (
Asset,
CronPartitionTimetable,
IdentityMapper,
+ PartitionAtRuntime,
PartitionedAssetTimetable,
ProductMapper,
StartOfDayMapper,
@@ -225,3 +226,55 @@ def regional_stats_breakdown():
keys belong to a fixed set of allowed values (``us``, ``eu``, ``apac``)
rather than time-based partitions.
"""
pass
+
+
+@asset(
+ uri="file://incoming/player-stats/live-region.csv",
+ schedule=PartitionAtRuntime(),
+ tags=["player-stats", "runtime"],
+)
+def live_region_player_stats(self, outlet_events):
+ """
+ Produce a single region partition whose key is decided at runtime.
+
+ This asset demonstrates PartitionAtRuntime, which records the partition
key on the
+ emitted event with ``add_partitions`` while the task runs rather than from
a timetable.
+ """
+ outlet_events[self].add_partitions("us")
+
+
+with DAG(
+ dag_id="summarize_live_region_stats",
+
schedule=PartitionedAssetTimetable(assets=Asset.ref(name="live_region_player_stats")),
+ catchup=False,
+ tags=["player-stats", "runtime"],
+):
+ """
+ Summarize the live region statistics for each runtime-emitted partition.
+
+ Triggered once per partition key recorded upstream at runtime.
+ """
+
+ @task
+ def summarize_live_region(dag_run=None):
+ """Summarize stats for the matched runtime partition."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(dag_run.partition_key)
+
+ summarize_live_region()
+
+
+@asset(
+ uri="file://incoming/player-stats/multi-region.csv",
+ schedule=PartitionAtRuntime(),
+ tags=["player-stats", "runtime"],
+)
+def multi_region_player_stats(self, outlet_events):
+ """
+ Produce several region partitions from a single run.
+
+ This asset demonstrates runtime fan-out, where each key emits its own
asset event
+ and duplicate keys collapse to a single event.
+ """
+ outlet_events[self].add_partitions(["us", "eu", "apac"])