This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-3-test by this push:
new 75d69a2ac77 [v3-3-test] Add wait policy examples to asset partition
example (#68658) (#68691)
75d69a2ac77 is described below
commit 75d69a2ac773267e73c725f2eb74e395f49e50c3
Author: Wei Lee <[email protected]>
AuthorDate: Thu Jun 18 12:37:04 2026 +0800
[v3-3-test] Add wait policy examples to asset partition example (#68658)
(#68691)
---
.../example_dags/example_asset_partition.py | 84 ++++++++++++++++++++++
1 file changed, 84 insertions(+)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 65a078d3c3f..1b3e2aa31b6 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -28,6 +28,7 @@ from airflow.sdk import (
FanOutMapper,
FixedKeyMapper,
IdentityMapper,
+ MinimumCount,
MonthWindow,
PartitionAtRuntime,
PartitionedAssetTimetable,
@@ -39,7 +40,9 @@ from airflow.sdk import (
StartOfMonthMapper,
StartOfWeekMapper,
StartOfYearMapper,
+ WaitForAll,
WeekWindow,
+ Window,
asset,
task,
)
@@ -306,6 +309,10 @@ with DAG(
default_partition_mapper=RollupMapper(
upstream_mapper=StartOfDayMapper(),
window=DayWindow(),
+ # Explicit default wait policy: hold the run until all 24 hourly
+ # partitions arrive. Identical to omitting wait_policy entirely;
shown
+ # here as the counterpart to the early-firing MinimumCount example
below.
+ wait_policy=WaitForAll(),
),
),
catchup=False,
@@ -396,6 +403,13 @@ with DAG(
default_partition_mapper=FanOutMapper(
upstream_mapper=StartOfWeekMapper(),
window=WeekWindow(),
+ # Safety cap on how many downstream keys one upstream event may
create.
+ # WeekWindow has exactly seven members, so max_downstream_keys=7
sits at
+ # the boundary and never blocks. A smaller value would skip
queuing the
+ # runs for that event and record a "partition fan-out exceeded"
audit log
+ # entry instead. Omitting it falls back to the global
+ # ``[scheduler] partition_mapper_max_downstream_keys`` (default
1000).
+ max_downstream_keys=7,
),
),
catchup=False,
@@ -413,6 +427,38 @@ with DAG(
run_inference()
+# --- Fan-out over a trailing window (Window.Direction.BACKWARD) --------------
+# ``daily_inference`` above fans the weekly artifact FORWARD: the seven days
+# *starting* at the upstream key. The same artifact can drive a trailing
window —
+# the seven days *ending* at the key — e.g. to score the week leading up to a
+# model release. Direction is the only difference between the two Dags.
+
+with DAG(
+ dag_id="trailing_week_inference",
+ schedule=PartitionedAssetTimetable(
+ assets=weekly_model_artifact,
+ default_partition_mapper=FanOutMapper(
+ upstream_mapper=StartOfWeekMapper(),
+ # BACKWARD yields the trailing period ending at the upstream key —
the
+ # mirror of the default FORWARD that daily_inference uses.
+ window=WeekWindow(direction=Window.Direction.BACKWARD),
+ ),
+ ),
+ catchup=False,
+ tags=["example", "model", "inference"],
+):
+ """Run inference over the trailing week: the seven days ending at the
weekly key."""
+
+ @task
+ def run_trailing_inference(dag_run=None):
+ """Run inference for one daily partition in the trailing week."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(dag_run.partition_key)
+
+ run_trailing_inference()
+
+
# --- Segment (categorical) rollup -------------------------------------------
# ``multi_region_player_stats`` (defined above) emits one partition per region
# (``us``, ``eu``, ``apac``) from a single run. The Dag below holds a
downstream
@@ -449,3 +495,41 @@ with DAG(
print(f"All region partitions received. Partition:
{dag_run.partition_key}")
aggregate_all_regions()
+
+
+# --- Segment rollup with an early-fire wait policy (MinimumCount) ------------
+# ``segment_region_stats_rollup`` above waits for all three regions
(WaitForAll).
+# This sibling fires as soon as any two of the three have arrived, tolerating
one
+# slow or missing region rather than holding the downstream run indefinitely.
+
+with DAG(
+ dag_id="segment_region_stats_early_rollup",
+ schedule=PartitionedAssetTimetable(
+ assets=Asset.ref(name="multi_region_player_stats"),
+ default_partition_mapper=RollupMapper(
+ upstream_mapper=FixedKeyMapper("all_regions"),
+ window=SegmentWindow(["us", "eu", "apac"]),
+ # Fire once at least two of the three declared regions have
arrived.
+ # MinimumCount(-1) ("at most one missing") is equivalent for this
window.
+ wait_policy=MinimumCount(2),
+ ),
+ ),
+ catchup=False,
+ tags=["example", "player-stats", "rollup", "segment"],
+):
+ """
+ Categorical rollup that fires early.
+
+ Produces the cross-region summary once two of the three regions have
arrived
+ instead of waiting for all of them — the early-firing counterpart to
+ ``segment_region_stats_rollup``.
+ """
+
+ @task
+ def aggregate_available_regions(dag_run=None):
+ """Produce the cross-region summary once the minimum region count is
met."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(f"Minimum region partitions received. Partition:
{dag_run.partition_key}")
+
+ aggregate_available_regions()