This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3d0fb9f8993 docs: add wait policy examples to asset partition example
(#68658)
3d0fb9f8993 is described below
commit 3d0fb9f899309dd6291a541242cc3c21807a1d63
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 17 20:07:23 2026 +0800
docs: add wait policy examples to asset partition example (#68658)
---
.../example_dags/example_asset_partition.py | 84 ++++++++++++++++++++++
1 file changed, 84 insertions(+)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 65a078d3c3f..1b3e2aa31b6 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -28,6 +28,7 @@ from airflow.sdk import (
FanOutMapper,
FixedKeyMapper,
IdentityMapper,
+ MinimumCount,
MonthWindow,
PartitionAtRuntime,
PartitionedAssetTimetable,
@@ -39,7 +40,9 @@ from airflow.sdk import (
StartOfMonthMapper,
StartOfWeekMapper,
StartOfYearMapper,
+ WaitForAll,
WeekWindow,
+ Window,
asset,
task,
)
@@ -306,6 +309,10 @@ with DAG(
default_partition_mapper=RollupMapper(
upstream_mapper=StartOfDayMapper(),
window=DayWindow(),
+ # Explicit default wait policy: hold the run until all 24 hourly
+ # partitions arrive. Identical to omitting wait_policy entirely;
shown
+ # here as the counterpart to the early-firing MinimumCount example
below.
+ wait_policy=WaitForAll(),
),
),
catchup=False,
@@ -396,6 +403,13 @@ with DAG(
default_partition_mapper=FanOutMapper(
upstream_mapper=StartOfWeekMapper(),
window=WeekWindow(),
+ # Safety cap on how many downstream keys one upstream event may
create.
+ # WeekWindow has exactly seven members, so max_downstream_keys=7
sits at
+ # the boundary and never blocks. A smaller value would skip
queuing the
+ # runs for that event and record a "partition fan-out exceeded"
audit log
+ # entry instead. Omitting it falls back to the global
+ # ``[scheduler] partition_mapper_max_downstream_keys`` (default
1000).
+ max_downstream_keys=7,
),
),
catchup=False,
@@ -413,6 +427,38 @@ with DAG(
run_inference()
+# --- Fan-out over a trailing window (Window.Direction.BACKWARD) --------------
+# ``daily_inference`` above fans the weekly artifact FORWARD: the seven days
+# *starting* at the upstream key. The same artifact can drive a trailing
window —
+# the seven days *ending* at the key — e.g. to score the week leading up to a
+# model release. Direction is the only difference between the two Dags.
+
+with DAG(
+ dag_id="trailing_week_inference",
+ schedule=PartitionedAssetTimetable(
+ assets=weekly_model_artifact,
+ default_partition_mapper=FanOutMapper(
+ upstream_mapper=StartOfWeekMapper(),
+ # BACKWARD yields the trailing period ending at the upstream key —
the
+ # mirror of the default FORWARD that daily_inference uses.
+ window=WeekWindow(direction=Window.Direction.BACKWARD),
+ ),
+ ),
+ catchup=False,
+ tags=["example", "model", "inference"],
+):
+ """Run inference over the trailing week: the seven days ending at the
weekly key."""
+
+ @task
+ def run_trailing_inference(dag_run=None):
+ """Run inference for one daily partition in the trailing week."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(dag_run.partition_key)
+
+ run_trailing_inference()
+
+
# --- Segment (categorical) rollup -------------------------------------------
# ``multi_region_player_stats`` (defined above) emits one partition per region
# (``us``, ``eu``, ``apac``) from a single run. The Dag below holds a
downstream
@@ -449,3 +495,41 @@ with DAG(
print(f"All region partitions received. Partition:
{dag_run.partition_key}")
aggregate_all_regions()
+
+
+# --- Segment rollup with an early-fire wait policy (MinimumCount) ------------
+# ``segment_region_stats_rollup`` above waits for all three regions
(WaitForAll).
+# This sibling fires as soon as any two of the three have arrived, tolerating
one
+# slow or missing region rather than holding the downstream run indefinitely.
+
+with DAG(
+ dag_id="segment_region_stats_early_rollup",
+ schedule=PartitionedAssetTimetable(
+ assets=Asset.ref(name="multi_region_player_stats"),
+ default_partition_mapper=RollupMapper(
+ upstream_mapper=FixedKeyMapper("all_regions"),
+ window=SegmentWindow(["us", "eu", "apac"]),
+ # Fire once at least two of the three declared regions have
arrived.
+ # MinimumCount(-1) ("at most one missing") is equivalent for this
window.
+ wait_policy=MinimumCount(2),
+ ),
+ ),
+ catchup=False,
+ tags=["example", "player-stats", "rollup", "segment"],
+):
+ """
+ Categorical rollup that fires early.
+
+ Produces the cross-region summary once two of the three regions have
arrived
+ instead of waiting for all of them — the early-firing counterpart to
+ ``segment_region_stats_rollup``.
+ """
+
+ @task
+ def aggregate_available_regions(dag_run=None):
+ """Produce the cross-region summary once the minimum region count is
met."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(f"Minimum region partitions received. Partition:
{dag_run.partition_key}")
+
+ aggregate_available_regions()