This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ed5a08fd1f AIP-76: Add example and docs for runtime asset 
partitioning (#67307)
9ed5a08fd1f is described below

commit 9ed5a08fd1f85900baedf0a1dee3663b3fe70370
Author: Anish Giri <[email protected]>
AuthorDate: Fri May 22 04:48:34 2026 -0500

    AIP-76: Add example and docs for runtime asset partitioning (#67307)
---
 .../docs/authoring-and-scheduling/assets.rst       | 42 +++++++++++++++++
 .../example_dags/example_asset_partition.py        | 53 ++++++++++++++++++++++
 2 files changed, 95 insertions(+)

diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst 
b/airflow-core/docs/authoring-and-scheduling/assets.rst
index a983544900c..9fbb232cbb3 100644
--- a/airflow-core/docs/authoring-and-scheduling/assets.rst
+++ b/airflow-core/docs/authoring-and-scheduling/assets.rst
@@ -596,5 +596,47 @@ including ``partition_key`` in the request body):
         "partition_key": "us|2026-03-10T09:00:00"
       }'
 
+Setting partition keys at runtime
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+When the partition key is not known ahead of time (for example, a watermark
+discovered from the source data, a late-arriving file, or a backfill request),
+let the producing task decide it while it runs. Schedule the producer with
+``PartitionAtRuntime()`` and record the key(s) on the emitted event with
+``outlet_events[self].add_partitions(...)``:
+
+.. code-block:: python
+
+    from airflow.sdk import PartitionAtRuntime, asset
+
+
+    @asset(
+        uri="file://incoming/player-stats/live-region.csv",
+        schedule=PartitionAtRuntime(),
+    )
+    def live_region_player_stats(self, outlet_events):
+        # The key is only known once the task runs.
+        outlet_events[self].add_partitions("us")
+
+Inside an ``@asset`` function, ``self`` (the emitted ``Asset``) and
+``outlet_events`` (the outlet event accessor) are reserved parameter names that
+Airflow populates at runtime. Pass a single key, or a list to fan out to 
several
+partitions in one run. Each key produces its own asset event, and duplicate
+keys collapse to a single event:
+
+.. code-block:: python
+
+    @asset(
+        uri="file://incoming/player-stats/multi-region.csv",
+        schedule=PartitionAtRuntime(),
+    )
+    def multi_region_player_stats(self, outlet_events):
+        outlet_events[self].add_partitions(["us", "eu", "apac"])
+
+When a runtime run emits exactly one partition key, the producing
+``dag_run.partition_key`` is back-filled to that key. Downstream Dags consume
+these events the same way as timetable-produced partitions, through
+``PartitionedAssetTimetable``.
+
 For complete runnable examples, see
 ``airflow-core/src/airflow/example_dags/example_asset_partition.py``.
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 75d582f6cad..f9c8258f951 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -25,6 +25,7 @@ from airflow.sdk import (
     Asset,
     CronPartitionTimetable,
     IdentityMapper,
+    PartitionAtRuntime,
     PartitionedAssetTimetable,
     ProductMapper,
     StartOfDayMapper,
@@ -225,3 +226,55 @@ def regional_stats_breakdown():
     keys belong to a fixed set of allowed values (``us``, ``eu``, ``apac``) 
rather than time-based partitions.
     """
     pass
+
+
+@asset(
+    uri="file://incoming/player-stats/live-region.csv",
+    schedule=PartitionAtRuntime(),
+    tags=["player-stats", "runtime"],
+)
+def live_region_player_stats(self, outlet_events):
+    """
+    Produce a single region partition whose key is decided at runtime.
+
+    This asset demonstrates PartitionAtRuntime, which records the partition 
key on the
+    emitted event with ``add_partitions`` while the task runs rather than from 
a timetable.
+    """
+    outlet_events[self].add_partitions("us")
+
+
+with DAG(
+    dag_id="summarize_live_region_stats",
+    
schedule=PartitionedAssetTimetable(assets=Asset.ref(name="live_region_player_stats")),
+    catchup=False,
+    tags=["player-stats", "runtime"],
+):
+    """
+    Summarize the live region statistics for each runtime-emitted partition.
+
+    Triggered once per partition key recorded upstream at runtime.
+    """
+
+    @task
+    def summarize_live_region(dag_run=None):
+        """Summarize stats for the matched runtime partition."""
+        if TYPE_CHECKING:
+            assert dag_run
+        print(dag_run.partition_key)
+
+    summarize_live_region()
+
+
+@asset(
+    uri="file://incoming/player-stats/multi-region.csv",
+    schedule=PartitionAtRuntime(),
+    tags=["player-stats", "runtime"],
+)
+def multi_region_player_stats(self, outlet_events):
+    """
+    Produce several region partitions from a single run.
+
+    This asset demonstrates runtime fan-out, where each key emits its own 
asset event
+    and duplicate keys collapse to a single event.
+    """
+    outlet_events[self].add_partitions(["us", "eu", "apac"])

Reply via email to