This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d0fb9f8993 docs: add wait policy examples to asset partition example 
(#68658)
3d0fb9f8993 is described below

commit 3d0fb9f899309dd6291a541242cc3c21807a1d63
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 17 20:07:23 2026 +0800

    docs: add wait policy examples to asset partition example (#68658)
---
 .../example_dags/example_asset_partition.py        | 84 ++++++++++++++++++++++
 1 file changed, 84 insertions(+)

diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 65a078d3c3f..1b3e2aa31b6 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -28,6 +28,7 @@ from airflow.sdk import (
     FanOutMapper,
     FixedKeyMapper,
     IdentityMapper,
+    MinimumCount,
     MonthWindow,
     PartitionAtRuntime,
     PartitionedAssetTimetable,
@@ -39,7 +40,9 @@ from airflow.sdk import (
     StartOfMonthMapper,
     StartOfWeekMapper,
     StartOfYearMapper,
+    WaitForAll,
     WeekWindow,
+    Window,
     asset,
     task,
 )
@@ -306,6 +309,10 @@ with DAG(
         default_partition_mapper=RollupMapper(
             upstream_mapper=StartOfDayMapper(),
             window=DayWindow(),
+            # Explicit default wait policy: hold the run until all 24 hourly
+            # partitions arrive. Identical to omitting wait_policy entirely; 
shown
+            # here as the counterpart to the early-firing MinimumCount example 
below.
+            wait_policy=WaitForAll(),
         ),
     ),
     catchup=False,
@@ -396,6 +403,13 @@ with DAG(
         default_partition_mapper=FanOutMapper(
             upstream_mapper=StartOfWeekMapper(),
             window=WeekWindow(),
+            # Safety cap on how many downstream keys one upstream event may 
create.
+            # WeekWindow has exactly seven members, so max_downstream_keys=7 
sits at
+            # the boundary and never blocks. A smaller value would skip 
queuing the
+            # runs for that event and record a "partition fan-out exceeded" 
audit log
+            # entry instead. Omitting it falls back to the global
+            # ``[scheduler] partition_mapper_max_downstream_keys`` (default 
1000).
+            max_downstream_keys=7,
         ),
     ),
     catchup=False,
@@ -413,6 +427,38 @@ with DAG(
     run_inference()
 
 
+# --- Fan-out over a trailing window (Window.Direction.BACKWARD) --------------
+# ``daily_inference`` above fans the weekly artifact FORWARD: the seven days
+# *starting* at the upstream key. The same artifact can drive a trailing 
window —
+# the seven days *ending* at the key — e.g. to score the week leading up to a
+# model release. Direction is the only difference between the two Dags.
+
+with DAG(
+    dag_id="trailing_week_inference",
+    schedule=PartitionedAssetTimetable(
+        assets=weekly_model_artifact,
+        default_partition_mapper=FanOutMapper(
+            upstream_mapper=StartOfWeekMapper(),
+            # BACKWARD yields the trailing period ending at the upstream key — 
the
+            # mirror of the default FORWARD that daily_inference uses.
+            window=WeekWindow(direction=Window.Direction.BACKWARD),
+        ),
+    ),
+    catchup=False,
+    tags=["example", "model", "inference"],
+):
+    """Run inference over the trailing week: the seven days ending at the 
weekly key."""
+
+    @task
+    def run_trailing_inference(dag_run=None):
+        """Run inference for one daily partition in the trailing week."""
+        if TYPE_CHECKING:
+            assert dag_run
+        print(dag_run.partition_key)
+
+    run_trailing_inference()
+
+
 # --- Segment (categorical) rollup -------------------------------------------
 # ``multi_region_player_stats`` (defined above) emits one partition per region
 # (``us``, ``eu``, ``apac``) from a single run.  The Dag below holds a 
downstream
@@ -449,3 +495,41 @@ with DAG(
         print(f"All region partitions received. Partition: 
{dag_run.partition_key}")
 
     aggregate_all_regions()
+
+
+# --- Segment rollup with an early-fire wait policy (MinimumCount) ------------
+# ``segment_region_stats_rollup`` above waits for all three regions 
(WaitForAll).
+# This sibling fires as soon as any two of the three have arrived, tolerating 
one
+# slow or missing region rather than holding the downstream run indefinitely.
+
+with DAG(
+    dag_id="segment_region_stats_early_rollup",
+    schedule=PartitionedAssetTimetable(
+        assets=Asset.ref(name="multi_region_player_stats"),
+        default_partition_mapper=RollupMapper(
+            upstream_mapper=FixedKeyMapper("all_regions"),
+            window=SegmentWindow(["us", "eu", "apac"]),
+            # Fire once at least two of the three declared regions have 
arrived.
+            # MinimumCount(-1) ("at most one missing") is equivalent for this 
window.
+            wait_policy=MinimumCount(2),
+        ),
+    ),
+    catchup=False,
+    tags=["example", "player-stats", "rollup", "segment"],
+):
+    """
+    Categorical rollup that fires early.
+
+    Produces the cross-region summary once two of the three regions have 
arrived
+    instead of waiting for all of them — the early-firing counterpart to
+    ``segment_region_stats_rollup``.
+    """
+
+    @task
+    def aggregate_available_regions(dag_run=None):
+        """Produce the cross-region summary once the minimum region count is 
met."""
+        if TYPE_CHECKING:
+            assert dag_run
+        print(f"Minimum region partitions received. Partition: 
{dag_run.partition_key}")
+
+    aggregate_available_regions()

Reply via email to