This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch v3-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-3-test by this push:
     new 31bb38d8747 [v3-3-test] Add segment fan-out example to asset partition 
example Dag (#68722) (#68737)
31bb38d8747 is described below

commit 31bb38d87477e2cc7043d997d1153a38c277af0b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 19 18:07:08 2026 +0800

    [v3-3-test] Add segment fan-out example to asset partition example Dag 
(#68722) (#68737)
    
    Co-authored-by: Wei Lee <[email protected]>
---
 .../example_dags/example_asset_partition.py        | 44 ++++++++++++++++++++--
 1 file changed, 40 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 1b3e2aa31b6..baba6ced1fc 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -294,7 +294,7 @@ def multi_region_player_stats(self, outlet_events):
 
 daily_sales = Asset(uri="s3://sales/daily", name="daily_sales")
 daily_costs = Asset(uri="s3://costs/daily", name="daily_costs")
-# --- Chained rollup: hourly → daily → monthly --------------------------------
+# --- Chained rollup: hourly -> daily -> monthly 
--------------------------------
 # The hourly source asset already exists above (``team_a_player_stats``).
 # Each rollup Dag publishes its own asset so the next level can consume it.
 
@@ -319,7 +319,7 @@ with DAG(
     tags=["example", "player-stats", "rollup"],
 ):
     """
-    First rollup level: 24 hourly partitions of ``team_a_player_stats`` → one 
daily summary.
+    First rollup level: 24 hourly partitions of ``team_a_player_stats`` -> one 
daily summary.
 
     ``StartOfDayMapper`` normalizes each upstream hourly timestamp 
(``%Y-%m-%dT%H:%M:%S``)
     to its day-start (``%Y-%m-%d``); ``DayWindow`` declares the downstream run 
needs
@@ -352,7 +352,7 @@ with DAG(
     tags=["example", "player-stats", "rollup"],
 ):
     """
-    Chained rollup: every day of ``daily_team_a`` (itself a rollup) → one 
monthly summary.
+    Chained rollup: every day of ``daily_team_a`` (itself a rollup) -> one 
monthly summary.
 
     Demonstrates how a rollup output can feed another rollup. 
``StartOfMonthMapper``
     is configured with ``input_format="%Y-%m-%d"`` so it can parse the day keys
@@ -371,7 +371,7 @@ with DAG(
     summarise_team_a_month()
 
 
-# --- Fan-out: one weekly upstream → seven daily downstream Dag runs ----------
+# --- Fan-out: one weekly upstream -> seven daily downstream Dag runs 
----------
 
 weekly_model_artifact = Asset(uri="file://artifacts/models/weekly.bin", 
name="weekly_model_artifact")
 
@@ -533,3 +533,39 @@ with DAG(
         print(f"Minimum region partitions received. Partition: 
{dag_run.partition_key}")
 
     aggregate_available_regions()
+
+
+# --- Segment fan-out: one upstream event scatters across the segment set -----
+# The 1 -> N mirror of ``segment_region_stats_rollup``: a single upstream event
+# fans OUT to one downstream run per declared region. ``SegmentWindow`` has no
+# default-table entry, so an explicit ``downstream_mapper`` is required.
+
+with DAG(
+    dag_id="scatter_live_region_to_segments",
+    schedule=PartitionedAssetTimetable(
+        assets=Asset.ref(name="live_region_player_stats"),
+        default_partition_mapper=FanOutMapper(
+            upstream_mapper=IdentityMapper(),
+            window=SegmentWindow(["us", "eu", "apac"]),
+            downstream_mapper=IdentityMapper(),  # required: SegmentWindow has 
no default-table entry
+        ),
+    ),
+    catchup=False,
+    tags=["example", "player-stats", "fan-out", "segment"],
+):
+    """
+    Categorical fan-out: scatter one upstream event across a fixed segment set.
+
+    One ``live_region_player_stats`` event fans out to one downstream run per
+    declared region (``us``, ``eu``, ``apac``) — the 1->N counterpart to the
+    segment rollup above.
+    """
+
+    @task
+    def process_region_segment(dag_run=None):
+        """Process one region segment produced by the fan-out."""
+        if TYPE_CHECKING:
+            assert dag_run
+        print(dag_run.partition_key)
+
+    process_region_segment()

Reply via email to