This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-3-test by this push:
new 31bb38d8747 [v3-3-test] Add segment fan-out example to asset partition
example Dag (#68722) (#68737)
31bb38d8747 is described below
commit 31bb38d87477e2cc7043d997d1153a38c277af0b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 19 18:07:08 2026 +0800
[v3-3-test] Add segment fan-out example to asset partition example Dag
(#68722) (#68737)
Co-authored-by: Wei Lee <[email protected]>
---
.../example_dags/example_asset_partition.py | 44 ++++++++++++++++++++--
1 file changed, 40 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 1b3e2aa31b6..baba6ced1fc 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -294,7 +294,7 @@ def multi_region_player_stats(self, outlet_events):
daily_sales = Asset(uri="s3://sales/daily", name="daily_sales")
daily_costs = Asset(uri="s3://costs/daily", name="daily_costs")
-# --- Chained rollup: hourly → daily → monthly --------------------------------
+# --- Chained rollup: hourly -> daily -> monthly
--------------------------------
# The hourly source asset already exists above (``team_a_player_stats``).
# Each rollup Dag publishes its own asset so the next level can consume it.
@@ -319,7 +319,7 @@ with DAG(
tags=["example", "player-stats", "rollup"],
):
"""
- First rollup level: 24 hourly partitions of ``team_a_player_stats`` → one
daily summary.
+ First rollup level: 24 hourly partitions of ``team_a_player_stats`` -> one
daily summary.
``StartOfDayMapper`` normalizes each upstream hourly timestamp
(``%Y-%m-%dT%H:%M:%S``)
to its day-start (``%Y-%m-%d``); ``DayWindow`` declares the downstream run
needs
@@ -352,7 +352,7 @@ with DAG(
tags=["example", "player-stats", "rollup"],
):
"""
- Chained rollup: every day of ``daily_team_a`` (itself a rollup) → one
monthly summary.
+ Chained rollup: every day of ``daily_team_a`` (itself a rollup) -> one
monthly summary.
Demonstrates how a rollup output can feed another rollup.
``StartOfMonthMapper``
is configured with ``input_format="%Y-%m-%d"`` so it can parse the day keys
@@ -371,7 +371,7 @@ with DAG(
summarise_team_a_month()
-# --- Fan-out: one weekly upstream → seven daily downstream Dag runs ----------
+# --- Fan-out: one weekly upstream -> seven daily downstream Dag runs
----------
weekly_model_artifact = Asset(uri="file://artifacts/models/weekly.bin",
name="weekly_model_artifact")
@@ -533,3 +533,39 @@ with DAG(
print(f"Minimum region partitions received. Partition:
{dag_run.partition_key}")
aggregate_available_regions()
+
+
+# --- Segment fan-out: one upstream event scatters across the segment set -----
+# The 1 -> N mirror of ``segment_region_stats_rollup``: a single upstream event
+# fans OUT to one downstream run per declared region. ``SegmentWindow`` has no
+# default-table entry, so an explicit ``downstream_mapper`` is required.
+
+with DAG(
+ dag_id="scatter_live_region_to_segments",
+ schedule=PartitionedAssetTimetable(
+ assets=Asset.ref(name="live_region_player_stats"),
+ default_partition_mapper=FanOutMapper(
+ upstream_mapper=IdentityMapper(),
+ window=SegmentWindow(["us", "eu", "apac"]),
+ downstream_mapper=IdentityMapper(), # required: SegmentWindow has
no default-table entry
+ ),
+ ),
+ catchup=False,
+ tags=["example", "player-stats", "fan-out", "segment"],
+):
+ """
+ Categorical fan-out: scatter one upstream event across a fixed segment set.
+
+ One ``live_region_player_stats`` event fans out to one downstream run per
+ declared region (``us``, ``eu``, ``apac``) — the 1->N counterpart to the
+ segment rollup above.
+ """
+
+ @task
+ def process_region_segment(dag_run=None):
+ """Process one region segment produced by the fan-out."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(dag_run.partition_key)
+
+ process_region_segment()