This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 953aa794504 Add FanOutMapper for one-to-many partition fan-out (#66030)
953aa794504 is described below

commit 953aa7945040c64856b755f1ee89538d05d856de
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 8 19:11:17 2026 +0800

    Add FanOutMapper for one-to-many partition fan-out (#66030)
---
 .pre-commit-config.yaml                            |  10 +
 airflow-core/newsfragments/66030.feature.rst       |   1 +
 airflow-core/src/airflow/assets/manager.py         |  33 ++-
 .../src/airflow/config_templates/config.yml        |  13 ++
 .../example_dags/example_asset_partition.py        |  52 +++++
 .../src/airflow/partition_mappers/temporal.py      | 134 +++++++++++-
 airflow-core/src/airflow/serialization/encoders.py |  10 +
 airflow-core/tests/unit/assets/test_manager.py     |  88 +++++++-
 .../tests/unit/models/test_taskinstance.py         |   2 +-
 .../tests/unit/partition_mappers/test_fan_out.py   | 235 +++++++++++++++++++++
 .../unit/serialization/test_serialized_objects.py  |  68 ++++++
 .../check_partition_mapper_defaults_in_sync.py     | 143 +++++++++++++
 task-sdk/docs/api.rst                              |   2 +
 task-sdk/src/airflow/sdk/__init__.py               |   3 +
 task-sdk/src/airflow/sdk/__init__.pyi              |   2 +
 .../sdk/definitions/partition_mappers/temporal.py  |  77 +++++++
 16 files changed, 865 insertions(+), 8 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 4fab6689d16..e3c1b6faf47 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -248,6 +248,16 @@ repos:
         files: ^\.github/workflows/ci-(arm|amd)\.yml$
         pass_filenames: false
         require_serial: true
+      - id: check-partition-mapper-defaults-in-sync
+        name: Check FanOutMapper default mapper table stays in sync (core/SDK)
+        entry: ./scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
+        language: python
+        files: >
+          (?x)
+          ^airflow-core/src/airflow/partition_mappers/temporal\.py$|
+          ^task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal\.py$
+        pass_filenames: false
+        require_serial: true
       - id: sync-uv-min-version-markers
         name: Sync `# sync-uv-min-version` markers with [tool.uv] 
required-version
         entry: ./scripts/ci/prek/sync_uv_min_version_markers.py
diff --git a/airflow-core/newsfragments/66030.feature.rst 
b/airflow-core/newsfragments/66030.feature.rst
new file mode 100644
index 00000000000..c6382dc0979
--- /dev/null
+++ b/airflow-core/newsfragments/66030.feature.rst
@@ -0,0 +1 @@
+Add ``FanOutMapper`` for one-to-many partition mapping (e.g. one weekly 
upstream key to seven daily downstream Dag runs). It composes ``upstream_mapper 
+ window + downstream_mapper``, mirroring the shape of ``RollupMapper`` and 
reusing the existing ``Window`` classes (``DayWindow``, ``WeekWindow``, 
``MonthWindow``, ``QuarterWindow``, ``YearWindow``). A new ``[scheduler] 
partition_mapper_max_downstream_keys`` config caps the number of downstream 
keys produced per upstream event by any ``P [...]
diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index 1d2fe479ea8..b11698c49c0 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -552,6 +552,8 @@ class AssetManager(LoggingMixin):
             )
             return
 
+        max_downstream_keys = conf.getint("scheduler", 
"partition_mapper_max_downstream_keys")
+
         for target_dag in partition_dags:
             if TYPE_CHECKING:
                 assert partition_key is not None
@@ -600,15 +602,36 @@ class AssetManager(LoggingMixin):
                 continue
 
             if is_container(target_key):
-                # TODO (AIP-76): This never happens now. When we implement
-                # one-to-many partition key mapping, this should also add a
-                # config to cap the iterable size so the scheduler does not
-                # blow up with an incorrectly implemented PartitionMapper.
-                target_keys: Iterable[str] = target_key
+                target_keys: list[str] = list(target_key)
             else:
                 target_keys = [target_key]
             del target_key
 
+            if len(target_keys) > max_downstream_keys:
+                log.error(
+                    "Partition mapper produced more downstream keys than 
allowed; skipping queue.",
+                    asset_id=asset_id,
+                    source_partition_key=partition_key,
+                    target_dag=target_dag.dag_id,
+                    produced_keys=len(target_keys),
+                    max_downstream_keys=max_downstream_keys,
+                )
+                session.add(
+                    Log(
+                        event="partition fan-out exceeded",
+                        extra=(
+                            f"Partition mapper for asset 
(name='{asset_model.name}', "
+                            f"uri='{asset_model.uri}') in target Dag 
'{target_dag.dag_id}' "
+                            f"produced {len(target_keys)} downstream keys from 
"
+                            f"partition_key='{partition_key}', exceeding "
+                            f"[scheduler] 
partition_mapper_max_downstream_keys={max_downstream_keys}. "
+                            f"No Dag runs were queued for this event."
+                        ),
+                        task_instance=task_instance,
+                    )
+                )
+                continue
+
             for target_key in target_keys:
                 apdr = cls._get_or_create_apdr(
                     target_key=target_key,
diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 03fd2794b5a..bba829b3d08 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2688,6 +2688,19 @@ scheduler:
       type: integer
       default: "20"
       see_also: ":ref:`scheduler:ha:tunables`"
+    partition_mapper_max_downstream_keys:
+      description: |
+        Maximum number of downstream partition keys a single 
``PartitionMapper``
+        invocation may produce. When any partition mapper (built-in or custom)
+        expands one upstream key into more keys than this limit, the scheduler
+        skips queuing the runs for that asset event and logs an error against
+        the source task instance. This guards against a misconfigured
+        ``PartitionMapper`` from queuing an unbounded number of Dag runs per
+        upstream event.
+      version_added: 3.3.0
+      type: integer
+      example: ~
+      default: "1000"
     use_job_schedule:
       description: |
         Turn off scheduler use of cron intervals by setting this to ``False``.
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py 
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 3995702df6b..775fea80ec3 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -25,6 +25,7 @@ from airflow.sdk import (
     Asset,
     CronPartitionTimetable,
     DayWindow,
+    FanOutMapper,
     IdentityMapper,
     MonthWindow,
     PartitionAtRuntime,
@@ -34,7 +35,9 @@ from airflow.sdk import (
     StartOfDayMapper,
     StartOfHourMapper,
     StartOfMonthMapper,
+    StartOfWeekMapper,
     StartOfYearMapper,
+    WeekWindow,
     asset,
     task,
 )
@@ -357,3 +360,52 @@ with DAG(
         print(f"All daily partitions received. Month: {dag_run.partition_key}")
 
     summarise_team_a_month()
+
+
+# --- Fan-out: one weekly upstream → seven daily downstream Dag runs ----------
+
+weekly_model_artifact = Asset(uri="file://artifacts/models/weekly.bin", 
name="weekly_model_artifact")
+
+
+with DAG(
+    dag_id="train_weekly_model",
+    schedule=CronPartitionTimetable("0 0 * * 1", timezone="UTC"),
+    catchup=False,
+    tags=["example", "model", "training"],
+):
+    """Train a weekly model artifact every Monday at 00:00 UTC."""
+
+    @task(outlets=[weekly_model_artifact])
+    def train_model():
+        """Materialize the model artifact for the current weekly partition."""
+        pass
+
+    train_model()
+
+
+with DAG(
+    dag_id="daily_inference",
+    schedule=PartitionedAssetTimetable(
+        assets=weekly_model_artifact,
+        # FanOutMapper composes upstream_mapper + window + (optional) 
downstream_mapper.
+        # WeekWindow.to_upstream() yields seven daily datetimes inside one 
week,
+        # and the default downstream_mapper for WeekWindow is 
StartOfDayMapper, so
+        # a weekly upstream key fans out to seven ``%Y-%m-%d`` downstream keys.
+        default_partition_mapper=FanOutMapper(
+            upstream_mapper=StartOfWeekMapper(),
+            window=WeekWindow(),
+        ),
+    ),
+    catchup=False,
+    tags=["example", "model", "inference"],
+):
+    """Run daily inference, fanning the weekly model artifact out to one Dag 
run per day."""
+
+    @task
+    def run_inference(dag_run=None):
+        """Run inference for one daily partition derived from the weekly 
model."""
+        if TYPE_CHECKING:
+            assert dag_run
+        print(dag_run.partition_key)
+
+    run_inference()
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py 
b/airflow-core/src/airflow/partition_mappers/temporal.py
index b45ca9da3a4..18656761122 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -18,8 +18,9 @@ from __future__ import annotations
 
 import re
 from abc import abstractmethod
+from collections.abc import Iterable
 from datetime import datetime, timedelta
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, ClassVar
 
 from airflow._shared.timezones.timezone import make_aware, parse_timezone
 from airflow.partition_mappers.base import PartitionMapper
@@ -27,6 +28,8 @@ from airflow.partition_mappers.base import PartitionMapper
 if TYPE_CHECKING:
     from pendulum import FixedTimezone, Timezone
 
+    from airflow.partition_mappers.window import Window
+
 
 _STRPTIME_PATTERNS: dict[str, str] = {
     "%Y": r"\d{4}",
@@ -419,3 +422,132 @@ class StartOfYearMapper(_BaseTemporalMapper):
             second=0,
             microsecond=0,
         )
+
+
+class FanOutMapper(PartitionMapper):
+    """
+    Partition mapper that fans one upstream key out into multiple downstream 
keys.
+
+    Compose an ``upstream_mapper`` (parses the coarse upstream key and
+    normalizes it to its period start) with a ``window`` (enumerates the
+    members of that period). ``downstream_mapper`` formats each member into a
+    downstream key string; if omitted, a default is chosen from the window
+    class.
+
+    ``downstream_mapper`` must be passed explicitly for any window without an
+    entry in the default table — currently ``HourWindow`` and any custom
+    ``Window`` subclass. Constructing a ``FanOutMapper`` for those windows
+    without a ``downstream_mapper`` raises ``ValueError`` at Dag-parse time.
+
+    Symmetric to :class:`~airflow.partition_mappers.base.RollupMapper`: rollup
+    is N→1 (downstream waits until all members arrive), fan-out is 1→N (one
+    upstream event creates one downstream Dag run per member).
+
+    .. code-block:: python
+
+        # Weekly upstream → 7 daily downstream Dag runs
+        FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow())
+    """
+
+    # Keep ``FanOutMapper.default_downstream_mapper_by_window_name`` in sync 
with
+    # the SDK copy in
+    # ``task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py`` —
+    # the SDK and core class hierarchies are independent (the SDK cannot import
+    # core), so both sides carry the same defaults and the lookup is by class
+    # name. When adding a new ``Window`` subclass, extend the table on both
+    # sides; a missing entry raises ``ValueError`` at ``FanOutMapper.__init__``
+    # (see ``FanOutMapper._resolve_default_downstream_mapper``). The
+    # ``check-partition-mapper-defaults-in-sync`` prek hook enforces that the
+    # two tables stay identical.
+    default_downstream_mapper_by_window_name: ClassVar[dict[str, 
type[_BaseTemporalMapper]]] = {
+        "DayWindow": StartOfHourMapper,
+        "WeekWindow": StartOfDayMapper,
+        "MonthWindow": StartOfDayMapper,
+        "QuarterWindow": StartOfMonthMapper,
+        "YearWindow": StartOfMonthMapper,
+    }
+
+    @classmethod
+    def _resolve_default_downstream_mapper(cls, window: Window) -> 
PartitionMapper:
+        """
+        Return the conventional downstream mapper for *window*.
+
+        Looked up by the window's class **name** rather than identity so that
+        the SDK ``Window`` classes (used in Dag-author code) and the core
+        ``Window`` classes (used after deserialization) both resolve to the
+        same default. Subclasses can extend or override the defaults by
+        setting :attr:`default_downstream_mapper_by_window_name` on the
+        subclass.
+        """
+        mapper_cls = 
cls.default_downstream_mapper_by_window_name.get(type(window).__name__)
+        if mapper_cls is None:
+            raise ValueError(
+                f"{cls.__name__} has no default downstream_mapper for window 
type "
+                f"{type(window).__name__}; pass downstream_mapper explicitly."
+            )
+        return mapper_cls()
+
+    def __init__(
+        self,
+        *,
+        upstream_mapper: PartitionMapper,
+        window: Window,
+        downstream_mapper: PartitionMapper | None = None,
+    ) -> None:
+        self.upstream_mapper = upstream_mapper
+        self.window = window
+        self.downstream_mapper = downstream_mapper or 
self._resolve_default_downstream_mapper(window)
+
+    def to_downstream(self, key: str) -> Iterable[str]:
+        # Round-trip the upstream key through its mapper to obtain the
+        # period-start datetime (decoded form). This keeps the upstream_mapper
+        # opaque — we don't need to know whether it's temporal or segment.
+        formatted = self.upstream_mapper.to_downstream(key)
+        if not isinstance(formatted, str):
+            raise TypeError(
+                "FanOutMapper.upstream_mapper must produce a single key from "
+                "to_downstream; chained fan-out (mapper that itself returns 
multiple keys) "
+                "is not supported."
+            )
+        coarse = self.upstream_mapper.decode_downstream(formatted)
+        return [_format_with(self.downstream_mapper, item) for item in 
self.window.to_upstream(coarse)]
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.encoders import encode_partition_mapper, 
encode_window
+
+        return {
+            "upstream_mapper": encode_partition_mapper(self.upstream_mapper),
+            "window": encode_window(self.window),
+            "downstream_mapper": 
encode_partition_mapper(self.downstream_mapper),
+        }
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
+        from airflow.serialization.decoders import decode_partition_mapper, 
decode_window
+
+        return cls(
+            upstream_mapper=decode_partition_mapper(data["upstream_mapper"]),
+            window=decode_window(data["window"]),
+            
downstream_mapper=decode_partition_mapper(data["downstream_mapper"]),
+        )
+
+
+def _format_with(mapper: PartitionMapper, decoded: Any) -> str:
+    """
+    Format *decoded* using *mapper*'s downstream format.
+
+    Three-layer fallback, in order:
+
+    1. *mapper* exposes a callable ``format`` attribute (all temporal mappers 
do)
+       — delegates to ``mapper.format(decoded)``, which uses ``output_format``.
+    2. *decoded* is a ``datetime`` instance — returns ``decoded.isoformat()``,
+       producing a stable ISO-8601 string that round-trips via
+       ``datetime.fromisoformat``.
+    3. Anything else — ``str(decoded)`` as a last-resort fallback.
+    """
+    formatter = getattr(mapper, "format", None)
+    if callable(formatter):
+        return formatter(decoded)
+    if isinstance(decoded, datetime):
+        return decoded.isoformat()
+    return str(decoded)
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index 707a3c145a3..6301891b3c0 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -42,6 +42,7 @@ from airflow.sdk import (
     DeltaDataIntervalTimetable,
     DeltaTriggerTimetable,
     EventsTimetable,
+    FanOutMapper,
     HourWindow,
     IdentityMapper,
     MonthWindow,
@@ -433,6 +434,7 @@ class _Serializer:
     BUILTIN_PARTITION_MAPPERS: dict[type, str] = {
         AllowedKeyMapper: 
"airflow.partition_mappers.allowed_key.AllowedKeyMapper",
         ChainMapper: "airflow.partition_mappers.chain.ChainMapper",
+        FanOutMapper: "airflow.partition_mappers.temporal.FanOutMapper",
         IdentityMapper: "airflow.partition_mappers.identity.IdentityMapper",
         ProductMapper: "airflow.partition_mappers.product.ProductMapper",
         RollupMapper: "airflow.partition_mappers.base.RollupMapper",
@@ -499,6 +501,14 @@ class _Serializer:
             "window": encode_window(partition_mapper.window),
         }
 
+    @serialize_partition_mapper.register
+    def _(self, partition_mapper: FanOutMapper) -> dict[str, Any]:
+        return {
+            "upstream_mapper": 
encode_partition_mapper(partition_mapper.upstream_mapper),
+            "window": encode_window(partition_mapper.window),
+            "downstream_mapper": 
encode_partition_mapper(partition_mapper.downstream_mapper),
+        }
+
     BUILTIN_WINDOWS: dict[type, str] = {
         HourWindow: "airflow.partition_mappers.window.HourWindow",
         DayWindow: "airflow.partition_mappers.window.DayWindow",
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index 99befc9287f..e25a1f0c204 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import concurrent.futures
+import itertools
 import logging
 from collections import Counter
 from typing import TYPE_CHECKING
@@ -38,10 +39,16 @@ from airflow.models.asset import (
     DagScheduleAssetAliasReference,
     DagScheduleAssetReference,
 )
-from airflow.models.dag import DagModel
+from airflow.models.dag import DAG, DagModel
+from airflow.models.log import Log
+from airflow.partition_mappers.temporal import FanOutMapper, StartOfWeekMapper
+from airflow.partition_mappers.window import WeekWindow
+from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
 
 from tests_common.test_utils.config import conf_vars
+from tests_common.test_utils.db import clear_db_apdr, clear_db_logs, 
clear_db_pakl
 from unit.listeners import asset_listener
 
 pytestmark = pytest.mark.db_test
@@ -65,6 +72,19 @@ def mock_task_instance():
     return None
 
 
+def create_mock_dag():
+    for dag_id in itertools.count(1):
+        mock_dag = mock.Mock(spec=DAG)
+        mock_dag.dag_id = dag_id
+        yield mock_dag
+
+
+def _clear_partition_db() -> None:
+    clear_db_apdr()
+    clear_db_pakl()
+    clear_db_logs()
+
+
 class TestAssetManager:
     def test_register_asset_change_asset_doesnt_exist(self, 
mock_task_instance):
         mock_task_instance = mock.Mock()
@@ -388,6 +408,72 @@ class TestAssetManager:
 
         assert 
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 0
 
+    @pytest.mark.parametrize(
+        ("cap", "expect_trip"),
+        [
+            # WeekWindow always fans a weekly upstream out into 7 daily keys.
+            pytest.param(2, True, id="way_over_cap"),
+            # at-cap (cap == 7) and one-over (cap == 6) pin the boundary at
+            # ``>`` not ``>=``; a flipped comparison would still pass way_over.
+            pytest.param(7, False, id="at_cap_allowed"),
+            pytest.param(6, True, id="one_over_cap_trips"),
+        ],
+    )
+    @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+    def test_partition_fan_out_cap(self, session, dag_maker, 
mock_task_instance, cap, expect_trip):
+        """The ``[scheduler] partition_mapper_max_downstream_keys`` cap gates 
fan-out.
+
+        A WeekWindow fan-out of 7 daily keys is either queued in full (cap >= 
7)
+        or skipped entirely — no APDR queued, ``log.error`` fired, and a Log 
row
+        written — when it exceeds the cap.
+        """
+        _clear_partition_db()
+
+        asset_def = Asset(uri=f"s3://bucket/weekly_{cap}", 
name=f"weekly_{cap}")
+        mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(), 
window=WeekWindow())
+        dag_id = f"fan_out_dag_cap_{cap}"
+        with dag_maker(
+            dag_id=dag_id,
+            schedule=PartitionedAssetTimetable(assets=asset_def, 
partition_mapper_config={asset_def: mapper}),
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t")
+        dag_maker.create_dagrun()
+        dag_maker.sync_dagbag_to_db()
+
+        with (
+            conf_vars({("scheduler", "partition_mapper_max_downstream_keys"): 
str(cap)}),
+            mock.patch("airflow.assets.manager.log") as mock_log,
+        ):
+            AssetManager.register_asset_change(
+                task_instance=mock_task_instance,
+                asset=asset_def,
+                session=session,
+                partition_key="2024-06-03T00:00:00",
+            )
+            session.flush()
+
+        apdr_count = 
session.scalar(select(func.count()).select_from(AssetPartitionDagRun))
+        log_extras = session.scalars(select(Log.extra).where(Log.event == 
"partition fan-out exceeded")).all()
+        if not expect_trip:
+            assert apdr_count == 7
+            assert log_extras == []
+            mock_log.error.assert_not_called()
+            return
+
+        assert apdr_count == 0
+        assert len(log_extras) == 1
+        assert dag_id in log_extras[0]
+        assert f"partition_mapper_max_downstream_keys={cap}" in log_extras[0]
+        # The scheduler-log `log.error` line is a separate observable from the
+        # DB Log row; pin its keyword fields so a rename / level flip is 
caught.
+        mock_log.error.assert_called_once()
+        error_call = mock_log.error.call_args
+        assert error_call.kwargs["target_dag"] == dag_id
+        assert error_call.kwargs["source_partition_key"] == 
"2024-06-03T00:00:00"
+        assert error_call.kwargs["produced_keys"] == 7
+        assert error_call.kwargs["max_downstream_keys"] == cap
+
 
 def _make_dag(dag_id: str) -> DagModel:
     dag = mock.Mock(spec=DagModel)
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index d099691bff1..e840b01b859 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3661,7 +3661,7 @@ def 
test_runtime_partition_key_does_not_overwrite_scheduler_partition(dag_maker,
 def test_runtime_partition_keys_fan_out_to_one_event_per_key(dag_maker, 
session):
     """Multiple distinct runtime keys produce one AssetEvent each; 
DagRun.partition_key stays None."""
     asset = Asset(name="hello")
-    with dag_maker(dag_id="rt_pk_fanout", schedule=None) as dag:
+    with dag_maker(dag_id="rt_pk_fan_out", schedule=None) as dag:
         EmptyOperator(task_id="hi", outlets=[asset])
     dr = dag_maker.create_dagrun(session=session)
     assert dr.partition_key is None
diff --git a/airflow-core/tests/unit/partition_mappers/test_fan_out.py 
b/airflow-core/tests/unit/partition_mappers/test_fan_out.py
new file mode 100644
index 00000000000..683931a84f2
--- /dev/null
+++ b/airflow-core/tests/unit/partition_mappers/test_fan_out.py
@@ -0,0 +1,235 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.partition_mappers.base import PartitionMapper
+from airflow.partition_mappers.temporal import (
+    FanOutMapper,
+    StartOfDayMapper,
+    StartOfHourMapper,
+    StartOfMonthMapper,
+    StartOfQuarterMapper,
+    StartOfWeekMapper,
+    StartOfYearMapper,
+)
+from airflow.partition_mappers.window import (
+    DayWindow,
+    HourWindow,
+    MonthWindow,
+    QuarterWindow,
+    WeekWindow,
+    YearWindow,
+)
+
+
+class TestFanOutMapper:
+    def test_week_to_seven_daily_keys(self):
+        """A weekly upstream key produces seven consecutive daily keys."""
+        mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(), 
window=WeekWindow())
+        # 2024-01-15 is a Monday, so it's already the week start under the 
default ISO week.
+        result = list(mapper.to_downstream("2024-01-15T00:00:00"))
+        assert result == [
+            "2024-01-15",
+            "2024-01-16",
+            "2024-01-17",
+            "2024-01-18",
+            "2024-01-19",
+            "2024-01-20",
+            "2024-01-21",
+        ]
+
+    def test_normalises_mid_week_upstream_to_week_start(self):
+        """An upstream timestamp on Wednesday still yields the seven days from 
Monday."""
+        mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(), 
window=WeekWindow())
+        result = list(mapper.to_downstream("2024-01-17T13:42:00"))
+        assert result == [
+            "2024-01-15",
+            "2024-01-16",
+            "2024-01-17",
+            "2024-01-18",
+            "2024-01-19",
+            "2024-01-20",
+            "2024-01-21",
+        ]
+
+    def test_day_to_24_hourly_keys(self):
+        """A daily upstream key produces 24 consecutive hourly keys."""
+        mapper = FanOutMapper(upstream_mapper=StartOfDayMapper(), 
window=DayWindow())
+        result = list(mapper.to_downstream("2024-01-15T03:30:00"))
+        assert result == [f"2024-01-15T{hour:02d}" for hour in range(24)]
+
+    def test_month_to_daily_keys(self):
+        """A monthly upstream key produces one downstream key per day of the 
month."""
+        mapper = FanOutMapper(upstream_mapper=StartOfMonthMapper(), 
window=MonthWindow())
+        result = list(mapper.to_downstream("2024-02-10T00:00:00"))
+        # February 2024 has 29 days (leap year).
+        assert result == [f"2024-02-{day:02d}" for day in range(1, 30)]
+
+    def test_quarter_to_three_monthly_keys(self):
+        """A quarterly upstream key produces three monthly keys."""
+        mapper = FanOutMapper(upstream_mapper=StartOfQuarterMapper(), 
window=QuarterWindow())
+        result = list(mapper.to_downstream("2024-02-10T00:00:00"))
+        assert result == ["2024-01", "2024-02", "2024-03"]
+
+    def test_year_to_twelve_monthly_keys(self):
+        """A yearly upstream key produces twelve monthly keys."""
+        mapper = FanOutMapper(upstream_mapper=StartOfYearMapper(), 
window=YearWindow())
+        result = list(mapper.to_downstream("2024-07-04T00:00:00"))
+        assert result == [f"2024-{month:02d}" for month in range(1, 13)]
+
+    @pytest.mark.parametrize(
+        ("window", "expected_cls"),
+        [
+            (WeekWindow(), StartOfDayMapper),
+            (MonthWindow(), StartOfDayMapper),
+            (DayWindow(), StartOfHourMapper),
+            (QuarterWindow(), StartOfMonthMapper),
+            (YearWindow(), StartOfMonthMapper),
+        ],
+    )
+    def test_default_downstream_mapper_resolution(self, window, expected_cls):
+        """Each window class resolves to a sensible default downstream 
mapper."""
+        mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(), 
window=window)
+        assert isinstance(mapper.downstream_mapper, expected_cls)
+
+    def 
test_no_default_for_hour_window_requires_explicit_downstream_mapper(self):
+        """HourWindow has no default downstream_mapper (no minute-level mapper 
exists)."""
+        with pytest.raises(ValueError, match="HourWindow"):
+            FanOutMapper(upstream_mapper=StartOfDayMapper(), 
window=HourWindow())
+
+    def test_explicit_downstream_mapper_overrides_default(self):
+        """Passing downstream_mapper explicitly overrides the lookup."""
+        custom_downstream = StartOfDayMapper(output_format="%Y/%m/%d")
+        mapper = FanOutMapper(
+            upstream_mapper=StartOfWeekMapper(),
+            window=WeekWindow(),
+            downstream_mapper=custom_downstream,
+        )
+        result = list(mapper.to_downstream("2024-01-15T00:00:00"))
+        assert result == [
+            "2024/01/15",
+            "2024/01/16",
+            "2024/01/17",
+            "2024/01/18",
+            "2024/01/19",
+            "2024/01/20",
+            "2024/01/21",
+        ]
+
+    def test_non_temporal_downstream_mapper_falls_back_to_str(self):
+        """A downstream_mapper without ``format`` falls back to the isoformat 
middle layer.
+
+        Documents the contract in ``_format_with``: temporal mappers use
+        ``format(dt)`` (layer 1); a mapper without ``format`` but with datetime
+        values uses ``isoformat()`` (layer 2). Pinning this guards against a
+        silent regression that skips the isoformat layer and falls to bare
+        ``str(decoded)``.
+        """
+        # ``spec=PartitionMapper`` ensures the mock has no ``format`` 
attribute,
+        # which is the trigger for the ``str(decoded)`` fallback.
+        downstream = MagicMock(spec=PartitionMapper)
+        mapper = FanOutMapper(
+            upstream_mapper=StartOfWeekMapper(),
+            window=WeekWindow(),
+            downstream_mapper=downstream,
+        )
+        result = list(mapper.to_downstream("2024-01-15T00:00:00"))
+        # WeekWindow yields 7 naive datetimes; without ``format`` each is 
rendered via ``isoformat()``.
+        assert result == [f"2024-01-{day:02d}T00:00:00" for day in range(15, 
22)]
+
+    def test_chained_fan_out_upstream_rejected(self):
+        """An upstream_mapper that itself returns multiple keys is not 
supported."""
+        # FanOutMapper's upstream_mapper must produce a single coarse key, so
+        # nesting one FanOutMapper inside another is rejected with a clear
+        # error rather than silently producing wrong output.
+        outer = FanOutMapper(
+            upstream_mapper=FanOutMapper(
+                upstream_mapper=StartOfMonthMapper(),
+                window=MonthWindow(),
+            ),
+            window=DayWindow(),
+            downstream_mapper=StartOfHourMapper(),
+        )
+        with pytest.raises(
+            TypeError,
+            match=(
+                r"FanOutMapper\.upstream_mapper must produce a single key from 
to_downstream; "
+                r"chained fan-out \(mapper that itself returns multiple keys\) 
is not supported\."
+            ),
+        ):
+            list(outer.to_downstream("2024-02-10T00:00:00"))
+
+    def test_serialize_roundtrip(self):
+        """Serialize + deserialize reconstructs an equivalent mapper (explicit 
downstream mapper)."""
+        mapper = FanOutMapper(
+            upstream_mapper=StartOfWeekMapper(timezone="UTC"),
+            window=WeekWindow(),
+            downstream_mapper=StartOfDayMapper(output_format="%Y/%m/%d"),
+        )
+        data = mapper.serialize()
+        restored = FanOutMapper.deserialize(data)
+        assert isinstance(restored, FanOutMapper)
+        assert isinstance(restored.upstream_mapper, StartOfWeekMapper)
+        assert isinstance(restored.window, WeekWindow)
+        assert isinstance(restored.downstream_mapper, StartOfDayMapper)
+        assert restored.downstream_mapper.output_format == "%Y/%m/%d"
+        # Behavior round-trips too.
+        assert list(restored.to_downstream("2024-01-15T00:00:00")) == list(
+            mapper.to_downstream("2024-01-15T00:00:00")
+        )
+
+    @pytest.mark.parametrize(
+        ("upstream_factory", "window_cls", "expected_downstream_cls", 
"upstream_key"),
+        [
+            pytest.param(StartOfDayMapper, DayWindow, StartOfHourMapper, 
"2024-01-15T00:00:00", id="day"),
+            pytest.param(StartOfWeekMapper, WeekWindow, StartOfDayMapper, 
"2024-01-15T00:00:00", id="week"),
+            pytest.param(
+                StartOfMonthMapper, MonthWindow, StartOfDayMapper, 
"2024-02-10T00:00:00", id="month"
+            ),
+            pytest.param(
+                StartOfQuarterMapper,
+                QuarterWindow,
+                StartOfMonthMapper,
+                "2024-02-10T00:00:00",
+                id="quarter",
+            ),
+            pytest.param(StartOfYearMapper, YearWindow, StartOfMonthMapper, 
"2024-07-04T00:00:00", id="year"),
+        ],
+    )
+    def test_serialize_roundtrip_default_downstream_mapper_all_windows(
+        self, upstream_factory, window_cls, expected_downstream_cls, 
upstream_key
+    ):
+        """The auto-resolved default downstream mapper survives encode → 
decode for every supported window.
+
+        The default-table lookup happens in ``__init__`` (so the restored
+        mapper carries a concrete ``downstream_mapper`` instance, not a
+        sentinel). Round-tripping the serialized blob must reconstruct the
+        same class on the downstream side and produce byte-identical
+        ``to_downstream`` output.
+        """
+        mapper = FanOutMapper(upstream_mapper=upstream_factory(), 
window=window_cls())
+        restored = FanOutMapper.deserialize(mapper.serialize())
+
+        assert isinstance(restored, FanOutMapper)
+        assert isinstance(restored.upstream_mapper, upstream_factory)
+        assert isinstance(restored.window, window_cls)
+        assert isinstance(restored.downstream_mapper, expected_downstream_cls)
+        assert list(restored.to_downstream(upstream_key)) == 
list(mapper.to_downstream(upstream_key))
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index b3e34ca76fc..4801761b74b 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -1105,6 +1105,74 @@ def test_decode_product_mapper():
     assert core_pm.to_downstream("2024-06-15T10:30:00|2024-06-15T10:30:00") == 
"2024-06-15T10|2024-06-15"
 
 
+def test_encode_fan_out_mapper():
+    from airflow.sdk import FanOutMapper, StartOfDayMapper, StartOfWeekMapper, 
WeekWindow
+    from airflow.serialization.encoders import encode_partition_mapper
+
+    partition_mapper = FanOutMapper(
+        upstream_mapper=StartOfWeekMapper(),
+        window=WeekWindow(),
+        downstream_mapper=StartOfDayMapper(),
+    )
+    assert encode_partition_mapper(partition_mapper) == {
+        Encoding.TYPE: "airflow.partition_mappers.temporal.FanOutMapper",
+        Encoding.VAR: {
+            "upstream_mapper": {
+                Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfWeekMapper",
+                Encoding.VAR: {
+                    "timezone": "UTC",
+                    "input_format": "%Y-%m-%dT%H:%M:%S",
+                    "output_format": "%Y-%m-%d (W%V)",
+                },
+            },
+            "window": {
+                Encoding.TYPE: "airflow.partition_mappers.window.WeekWindow",
+                Encoding.VAR: {},
+            },
+            "downstream_mapper": {
+                Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfDayMapper",
+                Encoding.VAR: {
+                    "timezone": "UTC",
+                    "input_format": "%Y-%m-%dT%H:%M:%S",
+                    "output_format": "%Y-%m-%d",
+                },
+            },
+        },
+    }
+
+
+def test_decode_fan_out_mapper():
+    from airflow.partition_mappers.temporal import (
+        FanOutMapper as CoreFanOutMapper,
+        StartOfDayMapper as CoreStartOfDayMapper,
+        StartOfWeekMapper as CoreStartOfWeekMapper,
+    )
+    from airflow.partition_mappers.window import WeekWindow as CoreWeekWindow
+    from airflow.sdk import FanOutMapper, StartOfWeekMapper, WeekWindow
+    from airflow.serialization.decoders import decode_partition_mapper
+    from airflow.serialization.encoders import encode_partition_mapper
+
+    partition_mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(), 
window=WeekWindow())
+    encoded_pm = encode_partition_mapper(partition_mapper)
+
+    core_pm = decode_partition_mapper(encoded_pm)
+
+    assert isinstance(core_pm, CoreFanOutMapper)
+    assert isinstance(core_pm.upstream_mapper, CoreStartOfWeekMapper)
+    assert isinstance(core_pm.window, CoreWeekWindow)
+    # downstream_mapper is auto-resolved to StartOfDayMapper for WeekWindow.
+    assert isinstance(core_pm.downstream_mapper, CoreStartOfDayMapper)
+    assert list(core_pm.to_downstream("2024-01-15T00:00:00")) == [
+        "2024-01-15",
+        "2024-01-16",
+        "2024-01-17",
+        "2024-01-18",
+        "2024-01-19",
+        "2024-01-20",
+        "2024-01-21",
+    ]
+
+
 def test_encode_chain_mapper():
     from airflow.sdk import ChainMapper, StartOfDayMapper, StartOfHourMapper
     from airflow.serialization.encoders import encode_partition_mapper
diff --git a/scripts/ci/prek/check_partition_mapper_defaults_in_sync.py 
b/scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
new file mode 100755
index 00000000000..0c1adea9669
--- /dev/null
+++ b/scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# /// script
+# requires-python = ">=3.10"
+# dependencies = [
+#   "rich>=13.6.0",
+# ]
+# ///
+"""
+Verify ``FanOutMapper.default_downstream_mapper_by_window_name`` stays in sync
+between core and the Task SDK.
+
+The default downstream-mapper table is defined twice — once in the core class
+hierarchy and once in the SDK copy — because the two hierarchies are
+independent (the SDK cannot import core) and the lookup is by ``Window`` class
+*name*. Both copies must list the same ``Window`` name -> mapper class mapping,
+otherwise a ``FanOutMapper`` resolves a different default depending on whether
+it runs in Dag-author code (SDK) or after deserialization (core).
+
+This check parses the ``default_downstream_mapper_by_window_name`` class
+attribute from both files via AST and asserts the two mappings are identical.
+
+Run from the repo root:
+
+    uv run --project scripts python 
scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
+
+Exits 0 if the two tables match, 1 (with a diff) otherwise.
+"""
+
+from __future__ import annotations
+
+import ast
+import sys
+from pathlib import Path
+
+from common_prek_utils import (
+    AIRFLOW_CORE_SOURCES_PATH,
+    AIRFLOW_TASK_SDK_SOURCES_PATH,
+)
+from rich.console import Console
+
+console = Console(color_system="standard", width=200)
+
+CLASS_NAME = "FanOutMapper"
+ATTR_NAME = "default_downstream_mapper_by_window_name"
+
+CORE_FILE = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "partition_mappers" / 
"temporal.py"
+SDK_FILE = (
+    AIRFLOW_TASK_SDK_SOURCES_PATH / "airflow" / "sdk" / "definitions" / 
"partition_mappers" / "temporal.py"
+)
+
+
+def _find_attr_value(file_path: Path) -> ast.Dict:
+    """Return the AST node assigned to 
``FanOutMapper.default_downstream_mapper_by_window_name``."""
+    tree = ast.parse(file_path.read_text(encoding="utf-8"), 
filename=str(file_path))
+    for node in ast.walk(tree):
+        if not (isinstance(node, ast.ClassDef) and node.name == CLASS_NAME):
+            continue
+        for stmt in node.body:
+            target_name = None
+            if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, 
ast.Name):
+                target_name = stmt.target.id
+            elif (
+                isinstance(stmt, ast.Assign)
+                and len(stmt.targets) == 1
+                and isinstance(stmt.targets[0], ast.Name)
+            ):
+                target_name = stmt.targets[0].id
+            if target_name != ATTR_NAME:
+                continue
+            if not isinstance(stmt.value, ast.Dict):
+                raise ValueError(
+                    f"{file_path}: {CLASS_NAME}.{ATTR_NAME} is not a dict 
literal; "
+                    f"this check parses a dict literal and must be updated."
+                )
+            return stmt.value
+        raise ValueError(f"{file_path}: {CLASS_NAME} has no {ATTR_NAME} 
attribute.")
+    raise ValueError(f"{file_path}: no class {CLASS_NAME} found.")
+
+
+def _extract_mapping(file_path: Path) -> dict[str, str]:
+    """Extract the ``{window_name: mapper_class_name}`` mapping from a dict 
literal."""
+    dict_node = _find_attr_value(file_path)
+    mapping: dict[str, str] = {}
+    for key_node, value_node in zip(dict_node.keys, dict_node.values):
+        if not (isinstance(key_node, ast.Constant) and 
isinstance(key_node.value, str)):
+            raise ValueError(
+                f"{file_path}: {CLASS_NAME}.{ATTR_NAME} has a 
non-string-literal key; "
+                f"this check expects window class names as string literals."
+            )
+        if not isinstance(value_node, ast.Name):
+            raise ValueError(
+                f"{file_path}: {CLASS_NAME}.{ATTR_NAME}[{key_node.value!r}] is 
not a bare "
+                f"class name; this check expects a mapper class reference."
+            )
+        mapping[key_node.value] = value_node.id
+    return mapping
+
+
+def main() -> int:
+    try:
+        core_mapping = _extract_mapping(CORE_FILE)
+        sdk_mapping = _extract_mapping(SDK_FILE)
+    except ValueError as exc:
+        console.print(f"[red]Could not read the default mapper table:[/red] 
{exc}")
+        return 1
+
+    if core_mapping == sdk_mapping:
+        return 0
+
+    console.print(f"[red]{CLASS_NAME}.{ATTR_NAME} is out of sync between core 
and the Task SDK.[/red]\n")
+    all_windows = sorted(core_mapping.keys() | sdk_mapping.keys())
+    for window in all_windows:
+        core_val = core_mapping.get(window, "<missing>")
+        sdk_val = sdk_mapping.get(window, "<missing>")
+        marker = "  " if core_val == sdk_val else "->"
+        color = "" if core_val == sdk_val else "[red]"
+        end = "" if core_val == sdk_val else "[/red]"
+        console.print(f"{color}{marker} {window}: core={core_val} 
sdk={sdk_val}{end}")
+    console.print(
+        f"\nMake both tables list the same window -> mapper entries:\n  core: 
{CORE_FILE}\n  sdk:  {SDK_FILE}"
+    )
+    return 1
+
+
+if __name__ == "__main__":
+    sys.exit(main())
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 2cc9e07bf1c..18fb8a08bbc 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -245,6 +245,8 @@ Partition Mapper
 
 .. autoapiclass:: airflow.sdk.AllowedKeyMapper
 
+.. autoapiclass:: airflow.sdk.FanOutMapper
+
 Rollup Windows
 ~~~~~~~~~~~~~~
 
diff --git a/task-sdk/src/airflow/sdk/__init__.py 
b/task-sdk/src/airflow/sdk/__init__.py
index b922cad9e1f..ce5b8ab2c2e 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -54,6 +54,7 @@ __all__ = [
     "EdgeModifier",
     "EventsTimetable",
     "ExceptionRetryPolicy",
+    "FanOutMapper",
     "HourWindow",
     "IdentityMapper",
     "Label",
@@ -156,6 +157,7 @@ if TYPE_CHECKING:
     from airflow.sdk.definitions.partition_mappers.identity import 
IdentityMapper
     from airflow.sdk.definitions.partition_mappers.product import ProductMapper
     from airflow.sdk.definitions.partition_mappers.temporal import (
+        FanOutMapper,
         StartOfDayMapper,
         StartOfHourMapper,
         StartOfMonthMapper,
@@ -241,6 +243,7 @@ __lazy_imports: dict[str, str] = {
     "EdgeModifier": ".definitions.edges",
     "EventsTimetable": ".definitions.timetables.events",
     "ExceptionRetryPolicy": ".definitions.retry_policy",
+    "FanOutMapper": ".definitions.partition_mappers.temporal",
     "HourWindow": ".definitions.partition_mappers.window",
     "IdentityMapper": ".definitions.partition_mappers.identity",
     "Label": ".definitions.edges",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi 
b/task-sdk/src/airflow/sdk/__init__.pyi
index 703e3d4f0e6..1bb975e2202 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -70,6 +70,7 @@ from airflow.sdk.definitions.partition_mappers.chain import 
ChainMapper
 from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper
 from airflow.sdk.definitions.partition_mappers.product import ProductMapper
 from airflow.sdk.definitions.partition_mappers.temporal import (
+    FanOutMapper,
     StartOfDayMapper,
     StartOfHourMapper,
     StartOfMonthMapper,
@@ -152,6 +153,7 @@ __all__ = [
     "EdgeModifier",
     "EventsTimetable",
     "ExceptionRetryPolicy",
+    "FanOutMapper",
     "HourWindow",
     "IdentityMapper",
     "Label",
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py 
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
index 06364cacd39..200a547c403 100644
--- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
@@ -25,6 +25,8 @@ from airflow.sdk.definitions.partition_mappers.base import 
PartitionMapper
 if TYPE_CHECKING:
     from pendulum import FixedTimezone, Timezone
 
+    from airflow.sdk.definitions.partition_mappers.window import Window
+
 
 class _BaseTemporalMapper(PartitionMapper):
     """Base class for Temporal Partition Mappers."""
@@ -104,3 +106,78 @@ class StartOfYearMapper(_BaseTemporalMapper):
     """
 
     default_output_format = "%Y"
+
+
+class FanOutMapper(PartitionMapper):
+    """
+    Partition mapper that fans one upstream key out into multiple downstream 
keys.
+
+    Compose an ``upstream_mapper`` (which parses the coarse upstream key and
+    normalizes it to a period start) with a ``window`` that enumerates the
+    members of that period. ``downstream_mapper`` formats each member into a
+    downstream key string; if omitted, a default is chosen from the window
+    class (e.g. ``WeekWindow`` → ``StartOfDayMapper``).
+
+    ``downstream_mapper`` must be passed explicitly for any window without an
+    entry in the default table — currently ``HourWindow`` and any custom
+    ``Window`` subclass. Constructing a ``FanOutMapper`` for those windows
+    without a ``downstream_mapper`` raises ``ValueError`` at Dag-parse time.
+
+    Symmetric to :class:`~airflow.sdk.RollupMapper`: rollup is N→1 (downstream
+    waits for all members), fan-out is 1→N (one upstream event creates many
+    downstream Dag runs).
+
+    .. code-block:: python
+
+        # Weekly upstream → 7 daily downstream Dag runs
+        FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow())
+    """
+
+    # Keep ``FanOutMapper.default_downstream_mapper_by_window_name`` in sync 
with
+    # the core copy in
+    # ``airflow-core/src/airflow/partition_mappers/temporal.py`` —
+    # the SDK and core class hierarchies are independent (the SDK cannot import
+    # core), so both sides carry the same defaults and the lookup is by class
+    # name. When adding a new ``Window`` subclass, extend the table on both
+    # sides; a missing entry raises ``ValueError`` at ``FanOutMapper.__init__``
+    # (see ``FanOutMapper._resolve_default_downstream_mapper``). The
+    # ``check-partition-mapper-defaults-in-sync`` prek hook enforces that the
+    # two tables stay identical.
+    default_downstream_mapper_by_window_name: ClassVar[dict[str, 
type[_BaseTemporalMapper]]] = {
+        "DayWindow": StartOfHourMapper,
+        "WeekWindow": StartOfDayMapper,
+        "MonthWindow": StartOfDayMapper,
+        "QuarterWindow": StartOfMonthMapper,
+        "YearWindow": StartOfMonthMapper,
+    }
+
+    @classmethod
+    def _resolve_default_downstream_mapper(cls, window: Window) -> 
PartitionMapper:
+        """
+        Return the conventional downstream mapper for *window*.
+
+        Looked up by the window's class **name** rather than identity so that
+        the SDK ``Window`` classes (used in Dag-author code) and the core
+        ``Window`` classes (used after deserialization) both resolve to the
+        same default. Subclasses can extend or override the defaults by
+        setting :attr:`default_downstream_mapper_by_window_name` on the
+        subclass.
+        """
+        mapper_cls = 
cls.default_downstream_mapper_by_window_name.get(type(window).__name__)
+        if mapper_cls is None:
+            raise ValueError(
+                f"{cls.__name__} has no default downstream_mapper for window 
type "
+                f"{type(window).__name__}; pass downstream_mapper explicitly."
+            )
+        return mapper_cls()
+
+    def __init__(
+        self,
+        *,
+        upstream_mapper: PartitionMapper,
+        window: Window,
+        downstream_mapper: PartitionMapper | None = None,
+    ) -> None:
+        self.upstream_mapper = upstream_mapper
+        self.window = window
+        self.downstream_mapper = downstream_mapper or 
self._resolve_default_downstream_mapper(window)

Reply via email to