This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 953aa794504 Add FanOutMapper for one-to-many partition fan-out (#66030)
953aa794504 is described below
commit 953aa7945040c64856b755f1ee89538d05d856de
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 8 19:11:17 2026 +0800
Add FanOutMapper for one-to-many partition fan-out (#66030)
---
.pre-commit-config.yaml | 10 +
airflow-core/newsfragments/66030.feature.rst | 1 +
airflow-core/src/airflow/assets/manager.py | 33 ++-
.../src/airflow/config_templates/config.yml | 13 ++
.../example_dags/example_asset_partition.py | 52 +++++
.../src/airflow/partition_mappers/temporal.py | 134 +++++++++++-
airflow-core/src/airflow/serialization/encoders.py | 10 +
airflow-core/tests/unit/assets/test_manager.py | 88 +++++++-
.../tests/unit/models/test_taskinstance.py | 2 +-
.../tests/unit/partition_mappers/test_fan_out.py | 235 +++++++++++++++++++++
.../unit/serialization/test_serialized_objects.py | 68 ++++++
.../check_partition_mapper_defaults_in_sync.py | 143 +++++++++++++
task-sdk/docs/api.rst | 2 +
task-sdk/src/airflow/sdk/__init__.py | 3 +
task-sdk/src/airflow/sdk/__init__.pyi | 2 +
.../sdk/definitions/partition_mappers/temporal.py | 77 +++++++
16 files changed, 865 insertions(+), 8 deletions(-)
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 4fab6689d16..e3c1b6faf47 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -248,6 +248,16 @@ repos:
files: ^\.github/workflows/ci-(arm|amd)\.yml$
pass_filenames: false
require_serial: true
+ - id: check-partition-mapper-defaults-in-sync
+ name: Check FanOutMapper default mapper table stays in sync (core/SDK)
+ entry: ./scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
+ language: python
+ files: >
+ (?x)
+ ^airflow-core/src/airflow/partition_mappers/temporal\.py$|
+ ^task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal\.py$
+ pass_filenames: false
+ require_serial: true
- id: sync-uv-min-version-markers
name: Sync `# sync-uv-min-version` markers with [tool.uv]
required-version
entry: ./scripts/ci/prek/sync_uv_min_version_markers.py
diff --git a/airflow-core/newsfragments/66030.feature.rst
b/airflow-core/newsfragments/66030.feature.rst
new file mode 100644
index 00000000000..c6382dc0979
--- /dev/null
+++ b/airflow-core/newsfragments/66030.feature.rst
@@ -0,0 +1 @@
+Add ``FanOutMapper`` for one-to-many partition mapping (e.g. one weekly
upstream key to seven daily downstream Dag runs). It composes ``upstream_mapper
+ window + downstream_mapper``, mirroring the shape of ``RollupMapper`` and
reusing the existing ``Window`` classes (``DayWindow``, ``WeekWindow``,
``MonthWindow``, ``QuarterWindow``, ``YearWindow``). A new ``[scheduler]
partition_mapper_max_downstream_keys`` config caps the number of downstream
keys produced per upstream event by any ``P [...]
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index 1d2fe479ea8..b11698c49c0 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -552,6 +552,8 @@ class AssetManager(LoggingMixin):
)
return
+ max_downstream_keys = conf.getint("scheduler",
"partition_mapper_max_downstream_keys")
+
for target_dag in partition_dags:
if TYPE_CHECKING:
assert partition_key is not None
@@ -600,15 +602,36 @@ class AssetManager(LoggingMixin):
continue
if is_container(target_key):
- # TODO (AIP-76): This never happens now. When we implement
- # one-to-many partition key mapping, this should also add a
- # config to cap the iterable size so the scheduler does not
- # blow up with an incorrectly implemented PartitionMapper.
- target_keys: Iterable[str] = target_key
+ target_keys: list[str] = list(target_key)
else:
target_keys = [target_key]
del target_key
+ if len(target_keys) > max_downstream_keys:
+ log.error(
+ "Partition mapper produced more downstream keys than
allowed; skipping queue.",
+ asset_id=asset_id,
+ source_partition_key=partition_key,
+ target_dag=target_dag.dag_id,
+ produced_keys=len(target_keys),
+ max_downstream_keys=max_downstream_keys,
+ )
+ session.add(
+ Log(
+ event="partition fan-out exceeded",
+ extra=(
+ f"Partition mapper for asset
(name='{asset_model.name}', "
+ f"uri='{asset_model.uri}') in target Dag
'{target_dag.dag_id}' "
+ f"produced {len(target_keys)} downstream keys from
"
+ f"partition_key='{partition_key}', exceeding "
+ f"[scheduler]
partition_mapper_max_downstream_keys={max_downstream_keys}. "
+ f"No Dag runs were queued for this event."
+ ),
+ task_instance=task_instance,
+ )
+ )
+ continue
+
for target_key in target_keys:
apdr = cls._get_or_create_apdr(
target_key=target_key,
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 03fd2794b5a..bba829b3d08 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2688,6 +2688,19 @@ scheduler:
type: integer
default: "20"
see_also: ":ref:`scheduler:ha:tunables`"
+ partition_mapper_max_downstream_keys:
+ description: |
+ Maximum number of downstream partition keys a single
``PartitionMapper``
+ invocation may produce. When any partition mapper (built-in or custom)
+ expands one upstream key into more keys than this limit, the scheduler
+ skips queuing the runs for that asset event and logs an error against
+ the source task instance. This guards against a misconfigured
+ ``PartitionMapper`` from queuing an unbounded number of Dag runs per
+ upstream event.
+ version_added: 3.3.0
+ type: integer
+ example: ~
+ default: "1000"
use_job_schedule:
description: |
Turn off scheduler use of cron intervals by setting this to ``False``.
diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py
b/airflow-core/src/airflow/example_dags/example_asset_partition.py
index 3995702df6b..775fea80ec3 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_partition.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py
@@ -25,6 +25,7 @@ from airflow.sdk import (
Asset,
CronPartitionTimetable,
DayWindow,
+ FanOutMapper,
IdentityMapper,
MonthWindow,
PartitionAtRuntime,
@@ -34,7 +35,9 @@ from airflow.sdk import (
StartOfDayMapper,
StartOfHourMapper,
StartOfMonthMapper,
+ StartOfWeekMapper,
StartOfYearMapper,
+ WeekWindow,
asset,
task,
)
@@ -357,3 +360,52 @@ with DAG(
print(f"All daily partitions received. Month: {dag_run.partition_key}")
summarise_team_a_month()
+
+
+# --- Fan-out: one weekly upstream → seven daily downstream Dag runs ----------
+
+weekly_model_artifact = Asset(uri="file://artifacts/models/weekly.bin",
name="weekly_model_artifact")
+
+
+with DAG(
+ dag_id="train_weekly_model",
+ schedule=CronPartitionTimetable("0 0 * * 1", timezone="UTC"),
+ catchup=False,
+ tags=["example", "model", "training"],
+):
+ """Train a weekly model artifact every Monday at 00:00 UTC."""
+
+ @task(outlets=[weekly_model_artifact])
+ def train_model():
+ """Materialize the model artifact for the current weekly partition."""
+ pass
+
+ train_model()
+
+
+with DAG(
+ dag_id="daily_inference",
+ schedule=PartitionedAssetTimetable(
+ assets=weekly_model_artifact,
+ # FanOutMapper composes upstream_mapper + window + (optional)
downstream_mapper.
+ # WeekWindow.to_upstream() yields seven daily datetimes inside one
week,
+ # and the default downstream_mapper for WeekWindow is
StartOfDayMapper, so
+ # a weekly upstream key fans out to seven ``%Y-%m-%d`` downstream keys.
+ default_partition_mapper=FanOutMapper(
+ upstream_mapper=StartOfWeekMapper(),
+ window=WeekWindow(),
+ ),
+ ),
+ catchup=False,
+ tags=["example", "model", "inference"],
+):
+ """Run daily inference, fanning the weekly model artifact out to one Dag
run per day."""
+
+ @task
+ def run_inference(dag_run=None):
+ """Run inference for one daily partition derived from the weekly
model."""
+ if TYPE_CHECKING:
+ assert dag_run
+ print(dag_run.partition_key)
+
+ run_inference()
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py
b/airflow-core/src/airflow/partition_mappers/temporal.py
index b45ca9da3a4..18656761122 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -18,8 +18,9 @@ from __future__ import annotations
import re
from abc import abstractmethod
+from collections.abc import Iterable
from datetime import datetime, timedelta
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, ClassVar
from airflow._shared.timezones.timezone import make_aware, parse_timezone
from airflow.partition_mappers.base import PartitionMapper
@@ -27,6 +28,8 @@ from airflow.partition_mappers.base import PartitionMapper
if TYPE_CHECKING:
from pendulum import FixedTimezone, Timezone
+ from airflow.partition_mappers.window import Window
+
_STRPTIME_PATTERNS: dict[str, str] = {
"%Y": r"\d{4}",
@@ -419,3 +422,132 @@ class StartOfYearMapper(_BaseTemporalMapper):
second=0,
microsecond=0,
)
+
+
+class FanOutMapper(PartitionMapper):
+ """
+ Partition mapper that fans one upstream key out into multiple downstream
keys.
+
+ Compose an ``upstream_mapper`` (parses the coarse upstream key and
+ normalizes it to its period start) with a ``window`` (enumerates the
+ members of that period). ``downstream_mapper`` formats each member into a
+ downstream key string; if omitted, a default is chosen from the window
+ class.
+
+ ``downstream_mapper`` must be passed explicitly for any window without an
+ entry in the default table — currently ``HourWindow`` and any custom
+ ``Window`` subclass. Constructing a ``FanOutMapper`` for those windows
+ without a ``downstream_mapper`` raises ``ValueError`` at Dag-parse time.
+
+ Symmetric to :class:`~airflow.partition_mappers.base.RollupMapper`: rollup
+ is N→1 (downstream waits until all members arrive), fan-out is 1→N (one
+ upstream event creates one downstream Dag run per member).
+
+ .. code-block:: python
+
+ # Weekly upstream → 7 daily downstream Dag runs
+ FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow())
+ """
+
+ # Keep ``FanOutMapper.default_downstream_mapper_by_window_name`` in sync
with
+ # the SDK copy in
+ # ``task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py`` —
+ # the SDK and core class hierarchies are independent (the SDK cannot import
+ # core), so both sides carry the same defaults and the lookup is by class
+ # name. When adding a new ``Window`` subclass, extend the table on both
+ # sides; a missing entry raises ``ValueError`` at ``FanOutMapper.__init__``
+ # (see ``FanOutMapper._resolve_default_downstream_mapper``). The
+ # ``check-partition-mapper-defaults-in-sync`` prek hook enforces that the
+ # two tables stay identical.
+ default_downstream_mapper_by_window_name: ClassVar[dict[str,
type[_BaseTemporalMapper]]] = {
+ "DayWindow": StartOfHourMapper,
+ "WeekWindow": StartOfDayMapper,
+ "MonthWindow": StartOfDayMapper,
+ "QuarterWindow": StartOfMonthMapper,
+ "YearWindow": StartOfMonthMapper,
+ }
+
+ @classmethod
+ def _resolve_default_downstream_mapper(cls, window: Window) ->
PartitionMapper:
+ """
+ Return the conventional downstream mapper for *window*.
+
+ Looked up by the window's class **name** rather than identity so that
+ the SDK ``Window`` classes (used in Dag-author code) and the core
+ ``Window`` classes (used after deserialization) both resolve to the
+ same default. Subclasses can extend or override the defaults by
+ setting :attr:`default_downstream_mapper_by_window_name` on the
+ subclass.
+ """
+ mapper_cls =
cls.default_downstream_mapper_by_window_name.get(type(window).__name__)
+ if mapper_cls is None:
+ raise ValueError(
+ f"{cls.__name__} has no default downstream_mapper for window
type "
+ f"{type(window).__name__}; pass downstream_mapper explicitly."
+ )
+ return mapper_cls()
+
+ def __init__(
+ self,
+ *,
+ upstream_mapper: PartitionMapper,
+ window: Window,
+ downstream_mapper: PartitionMapper | None = None,
+ ) -> None:
+ self.upstream_mapper = upstream_mapper
+ self.window = window
+ self.downstream_mapper = downstream_mapper or
self._resolve_default_downstream_mapper(window)
+
+ def to_downstream(self, key: str) -> Iterable[str]:
+ # Round-trip the upstream key through its mapper to obtain the
+ # period-start datetime (decoded form). This keeps the upstream_mapper
+ # opaque — we don't need to know whether it's temporal or segment.
+ formatted = self.upstream_mapper.to_downstream(key)
+ if not isinstance(formatted, str):
+ raise TypeError(
+ "FanOutMapper.upstream_mapper must produce a single key from "
+ "to_downstream; chained fan-out (mapper that itself returns
multiple keys) "
+ "is not supported."
+ )
+ coarse = self.upstream_mapper.decode_downstream(formatted)
+ return [_format_with(self.downstream_mapper, item) for item in
self.window.to_upstream(coarse)]
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_partition_mapper,
encode_window
+
+ return {
+ "upstream_mapper": encode_partition_mapper(self.upstream_mapper),
+ "window": encode_window(self.window),
+ "downstream_mapper":
encode_partition_mapper(self.downstream_mapper),
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
+ from airflow.serialization.decoders import decode_partition_mapper,
decode_window
+
+ return cls(
+ upstream_mapper=decode_partition_mapper(data["upstream_mapper"]),
+ window=decode_window(data["window"]),
+
downstream_mapper=decode_partition_mapper(data["downstream_mapper"]),
+ )
+
+
+def _format_with(mapper: PartitionMapper, decoded: Any) -> str:
+ """
+ Format *decoded* using *mapper*'s downstream format.
+
+ Three-layer fallback, in order:
+
+ 1. *mapper* exposes a callable ``format`` attribute (all temporal mappers
do)
+ — delegates to ``mapper.format(decoded)``, which uses ``output_format``.
+ 2. *decoded* is a ``datetime`` instance — returns ``decoded.isoformat()``,
+ producing a stable ISO-8601 string that round-trips via
+ ``datetime.fromisoformat``.
+ 3. Anything else — ``str(decoded)`` as a last-resort fallback.
+ """
+ formatter = getattr(mapper, "format", None)
+ if callable(formatter):
+ return formatter(decoded)
+ if isinstance(decoded, datetime):
+ return decoded.isoformat()
+ return str(decoded)
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index 707a3c145a3..6301891b3c0 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -42,6 +42,7 @@ from airflow.sdk import (
DeltaDataIntervalTimetable,
DeltaTriggerTimetable,
EventsTimetable,
+ FanOutMapper,
HourWindow,
IdentityMapper,
MonthWindow,
@@ -433,6 +434,7 @@ class _Serializer:
BUILTIN_PARTITION_MAPPERS: dict[type, str] = {
AllowedKeyMapper:
"airflow.partition_mappers.allowed_key.AllowedKeyMapper",
ChainMapper: "airflow.partition_mappers.chain.ChainMapper",
+ FanOutMapper: "airflow.partition_mappers.temporal.FanOutMapper",
IdentityMapper: "airflow.partition_mappers.identity.IdentityMapper",
ProductMapper: "airflow.partition_mappers.product.ProductMapper",
RollupMapper: "airflow.partition_mappers.base.RollupMapper",
@@ -499,6 +501,14 @@ class _Serializer:
"window": encode_window(partition_mapper.window),
}
+ @serialize_partition_mapper.register
+ def _(self, partition_mapper: FanOutMapper) -> dict[str, Any]:
+ return {
+ "upstream_mapper":
encode_partition_mapper(partition_mapper.upstream_mapper),
+ "window": encode_window(partition_mapper.window),
+ "downstream_mapper":
encode_partition_mapper(partition_mapper.downstream_mapper),
+ }
+
BUILTIN_WINDOWS: dict[type, str] = {
HourWindow: "airflow.partition_mappers.window.HourWindow",
DayWindow: "airflow.partition_mappers.window.DayWindow",
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index 99befc9287f..e25a1f0c204 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import concurrent.futures
+import itertools
import logging
from collections import Counter
from typing import TYPE_CHECKING
@@ -38,10 +39,16 @@ from airflow.models.asset import (
DagScheduleAssetAliasReference,
DagScheduleAssetReference,
)
-from airflow.models.dag import DagModel
+from airflow.models.dag import DAG, DagModel
+from airflow.models.log import Log
+from airflow.partition_mappers.temporal import FanOutMapper, StartOfWeekMapper
+from airflow.partition_mappers.window import WeekWindow
+from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
+from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable
from tests_common.test_utils.config import conf_vars
+from tests_common.test_utils.db import clear_db_apdr, clear_db_logs,
clear_db_pakl
from unit.listeners import asset_listener
pytestmark = pytest.mark.db_test
@@ -65,6 +72,19 @@ def mock_task_instance():
return None
+def create_mock_dag():
+ for dag_id in itertools.count(1):
+ mock_dag = mock.Mock(spec=DAG)
+ mock_dag.dag_id = dag_id
+ yield mock_dag
+
+
+def _clear_partition_db() -> None:
+ clear_db_apdr()
+ clear_db_pakl()
+ clear_db_logs()
+
+
class TestAssetManager:
def test_register_asset_change_asset_doesnt_exist(self,
mock_task_instance):
mock_task_instance = mock.Mock()
@@ -388,6 +408,72 @@ class TestAssetManager:
assert
session.scalar(select(func.count()).select_from(AssetDagRunQueue)) == 0
+ @pytest.mark.parametrize(
+ ("cap", "expect_trip"),
+ [
+ # WeekWindow always fans a weekly upstream out into 7 daily keys.
+ pytest.param(2, True, id="way_over_cap"),
+ # at-cap (cap == 7) and one-over (cap == 6) pin the boundary at
+ # ``>`` not ``>=``; a flipped comparison would still pass way_over.
+ pytest.param(7, False, id="at_cap_allowed"),
+ pytest.param(6, True, id="one_over_cap_trips"),
+ ],
+ )
+ @pytest.mark.usefixtures("clear_assets", "testing_dag_bundle")
+ def test_partition_fan_out_cap(self, session, dag_maker,
mock_task_instance, cap, expect_trip):
+ """The ``[scheduler] partition_mapper_max_downstream_keys`` cap gates
fan-out.
+
+ A WeekWindow fan-out of 7 daily keys is either queued in full (cap >=
7)
+ or skipped entirely — no APDR queued, ``log.error`` fired, and a Log
row
+ written — when it exceeds the cap.
+ """
+ _clear_partition_db()
+
+ asset_def = Asset(uri=f"s3://bucket/weekly_{cap}",
name=f"weekly_{cap}")
+ mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(),
window=WeekWindow())
+ dag_id = f"fan_out_dag_cap_{cap}"
+ with dag_maker(
+ dag_id=dag_id,
+ schedule=PartitionedAssetTimetable(assets=asset_def,
partition_mapper_config={asset_def: mapper}),
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t")
+ dag_maker.create_dagrun()
+ dag_maker.sync_dagbag_to_db()
+
+ with (
+ conf_vars({("scheduler", "partition_mapper_max_downstream_keys"):
str(cap)}),
+ mock.patch("airflow.assets.manager.log") as mock_log,
+ ):
+ AssetManager.register_asset_change(
+ task_instance=mock_task_instance,
+ asset=asset_def,
+ session=session,
+ partition_key="2024-06-03T00:00:00",
+ )
+ session.flush()
+
+ apdr_count =
session.scalar(select(func.count()).select_from(AssetPartitionDagRun))
+ log_extras = session.scalars(select(Log.extra).where(Log.event ==
"partition fan-out exceeded")).all()
+ if not expect_trip:
+ assert apdr_count == 7
+ assert log_extras == []
+ mock_log.error.assert_not_called()
+ return
+
+ assert apdr_count == 0
+ assert len(log_extras) == 1
+ assert dag_id in log_extras[0]
+ assert f"partition_mapper_max_downstream_keys={cap}" in log_extras[0]
+ # The scheduler-log `log.error` line is a separate observable from the
+ # DB Log row; pin its keyword fields so a rename / level flip is
caught.
+ mock_log.error.assert_called_once()
+ error_call = mock_log.error.call_args
+ assert error_call.kwargs["target_dag"] == dag_id
+ assert error_call.kwargs["source_partition_key"] ==
"2024-06-03T00:00:00"
+ assert error_call.kwargs["produced_keys"] == 7
+ assert error_call.kwargs["max_downstream_keys"] == cap
+
def _make_dag(dag_id: str) -> DagModel:
dag = mock.Mock(spec=DagModel)
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index d099691bff1..e840b01b859 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -3661,7 +3661,7 @@ def
test_runtime_partition_key_does_not_overwrite_scheduler_partition(dag_maker,
def test_runtime_partition_keys_fan_out_to_one_event_per_key(dag_maker,
session):
"""Multiple distinct runtime keys produce one AssetEvent each;
DagRun.partition_key stays None."""
asset = Asset(name="hello")
- with dag_maker(dag_id="rt_pk_fanout", schedule=None) as dag:
+ with dag_maker(dag_id="rt_pk_fan_out", schedule=None) as dag:
EmptyOperator(task_id="hi", outlets=[asset])
dr = dag_maker.create_dagrun(session=session)
assert dr.partition_key is None
diff --git a/airflow-core/tests/unit/partition_mappers/test_fan_out.py
b/airflow-core/tests/unit/partition_mappers/test_fan_out.py
new file mode 100644
index 00000000000..683931a84f2
--- /dev/null
+++ b/airflow-core/tests/unit/partition_mappers/test_fan_out.py
@@ -0,0 +1,235 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest.mock import MagicMock
+
+import pytest
+
+from airflow.partition_mappers.base import PartitionMapper
+from airflow.partition_mappers.temporal import (
+ FanOutMapper,
+ StartOfDayMapper,
+ StartOfHourMapper,
+ StartOfMonthMapper,
+ StartOfQuarterMapper,
+ StartOfWeekMapper,
+ StartOfYearMapper,
+)
+from airflow.partition_mappers.window import (
+ DayWindow,
+ HourWindow,
+ MonthWindow,
+ QuarterWindow,
+ WeekWindow,
+ YearWindow,
+)
+
+
+class TestFanOutMapper:
+ def test_week_to_seven_daily_keys(self):
+ """A weekly upstream key produces seven consecutive daily keys."""
+ mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(),
window=WeekWindow())
+ # 2024-01-15 is a Monday, so it's already the week start under the
default ISO week.
+ result = list(mapper.to_downstream("2024-01-15T00:00:00"))
+ assert result == [
+ "2024-01-15",
+ "2024-01-16",
+ "2024-01-17",
+ "2024-01-18",
+ "2024-01-19",
+ "2024-01-20",
+ "2024-01-21",
+ ]
+
+ def test_normalises_mid_week_upstream_to_week_start(self):
+ """An upstream timestamp on Wednesday still yields the seven days from
Monday."""
+ mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(),
window=WeekWindow())
+ result = list(mapper.to_downstream("2024-01-17T13:42:00"))
+ assert result == [
+ "2024-01-15",
+ "2024-01-16",
+ "2024-01-17",
+ "2024-01-18",
+ "2024-01-19",
+ "2024-01-20",
+ "2024-01-21",
+ ]
+
+ def test_day_to_24_hourly_keys(self):
+ """A daily upstream key produces 24 consecutive hourly keys."""
+ mapper = FanOutMapper(upstream_mapper=StartOfDayMapper(),
window=DayWindow())
+ result = list(mapper.to_downstream("2024-01-15T03:30:00"))
+ assert result == [f"2024-01-15T{hour:02d}" for hour in range(24)]
+
+ def test_month_to_daily_keys(self):
+ """A monthly upstream key produces one downstream key per day of the
month."""
+ mapper = FanOutMapper(upstream_mapper=StartOfMonthMapper(),
window=MonthWindow())
+ result = list(mapper.to_downstream("2024-02-10T00:00:00"))
+ # February 2024 has 29 days (leap year).
+ assert result == [f"2024-02-{day:02d}" for day in range(1, 30)]
+
+ def test_quarter_to_three_monthly_keys(self):
+ """A quarterly upstream key produces three monthly keys."""
+ mapper = FanOutMapper(upstream_mapper=StartOfQuarterMapper(),
window=QuarterWindow())
+ result = list(mapper.to_downstream("2024-02-10T00:00:00"))
+ assert result == ["2024-01", "2024-02", "2024-03"]
+
+ def test_year_to_twelve_monthly_keys(self):
+ """A yearly upstream key produces twelve monthly keys."""
+ mapper = FanOutMapper(upstream_mapper=StartOfYearMapper(),
window=YearWindow())
+ result = list(mapper.to_downstream("2024-07-04T00:00:00"))
+ assert result == [f"2024-{month:02d}" for month in range(1, 13)]
+
+ @pytest.mark.parametrize(
+ ("window", "expected_cls"),
+ [
+ (WeekWindow(), StartOfDayMapper),
+ (MonthWindow(), StartOfDayMapper),
+ (DayWindow(), StartOfHourMapper),
+ (QuarterWindow(), StartOfMonthMapper),
+ (YearWindow(), StartOfMonthMapper),
+ ],
+ )
+ def test_default_downstream_mapper_resolution(self, window, expected_cls):
+ """Each window class resolves to a sensible default downstream
mapper."""
+ mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(),
window=window)
+ assert isinstance(mapper.downstream_mapper, expected_cls)
+
+ def
test_no_default_for_hour_window_requires_explicit_downstream_mapper(self):
+ """HourWindow has no default downstream_mapper (no minute-level mapper
exists)."""
+ with pytest.raises(ValueError, match="HourWindow"):
+ FanOutMapper(upstream_mapper=StartOfDayMapper(),
window=HourWindow())
+
+ def test_explicit_downstream_mapper_overrides_default(self):
+ """Passing downstream_mapper explicitly overrides the lookup."""
+ custom_downstream = StartOfDayMapper(output_format="%Y/%m/%d")
+ mapper = FanOutMapper(
+ upstream_mapper=StartOfWeekMapper(),
+ window=WeekWindow(),
+ downstream_mapper=custom_downstream,
+ )
+ result = list(mapper.to_downstream("2024-01-15T00:00:00"))
+ assert result == [
+ "2024/01/15",
+ "2024/01/16",
+ "2024/01/17",
+ "2024/01/18",
+ "2024/01/19",
+ "2024/01/20",
+ "2024/01/21",
+ ]
+
+ def test_non_temporal_downstream_mapper_falls_back_to_str(self):
+ """A downstream_mapper without ``format`` falls back to the isoformat
middle layer.
+
+ Documents the contract in ``_format_with``: temporal mappers use
+ ``format(dt)`` (layer 1); a mapper without ``format`` but with datetime
+ values uses ``isoformat()`` (layer 2). Pinning this guards against a
+ silent regression that skips the isoformat layer and falls to bare
+ ``str(decoded)``.
+ """
+ # ``spec=PartitionMapper`` ensures the mock has no ``format``
attribute,
+ # which is the trigger for the ``str(decoded)`` fallback.
+ downstream = MagicMock(spec=PartitionMapper)
+ mapper = FanOutMapper(
+ upstream_mapper=StartOfWeekMapper(),
+ window=WeekWindow(),
+ downstream_mapper=downstream,
+ )
+ result = list(mapper.to_downstream("2024-01-15T00:00:00"))
+ # WeekWindow yields 7 naive datetimes; without ``format`` each is
rendered via ``isoformat()``.
+ assert result == [f"2024-01-{day:02d}T00:00:00" for day in range(15,
22)]
+
+ def test_chained_fan_out_upstream_rejected(self):
+ """An upstream_mapper that itself returns multiple keys is not
supported."""
+ # FanOutMapper's upstream_mapper must produce a single coarse key, so
+ # nesting one FanOutMapper inside another is rejected with a clear
+ # error rather than silently producing wrong output.
+ outer = FanOutMapper(
+ upstream_mapper=FanOutMapper(
+ upstream_mapper=StartOfMonthMapper(),
+ window=MonthWindow(),
+ ),
+ window=DayWindow(),
+ downstream_mapper=StartOfHourMapper(),
+ )
+ with pytest.raises(
+ TypeError,
+ match=(
+ r"FanOutMapper\.upstream_mapper must produce a single key from
to_downstream; "
+ r"chained fan-out \(mapper that itself returns multiple keys\)
is not supported\."
+ ),
+ ):
+ list(outer.to_downstream("2024-02-10T00:00:00"))
+
+ def test_serialize_roundtrip(self):
+ """Serialize + deserialize reconstructs an equivalent mapper (explicit
downstream mapper)."""
+ mapper = FanOutMapper(
+ upstream_mapper=StartOfWeekMapper(timezone="UTC"),
+ window=WeekWindow(),
+ downstream_mapper=StartOfDayMapper(output_format="%Y/%m/%d"),
+ )
+ data = mapper.serialize()
+ restored = FanOutMapper.deserialize(data)
+ assert isinstance(restored, FanOutMapper)
+ assert isinstance(restored.upstream_mapper, StartOfWeekMapper)
+ assert isinstance(restored.window, WeekWindow)
+ assert isinstance(restored.downstream_mapper, StartOfDayMapper)
+ assert restored.downstream_mapper.output_format == "%Y/%m/%d"
+ # Behavior round-trips too.
+ assert list(restored.to_downstream("2024-01-15T00:00:00")) == list(
+ mapper.to_downstream("2024-01-15T00:00:00")
+ )
+
+ @pytest.mark.parametrize(
+ ("upstream_factory", "window_cls", "expected_downstream_cls",
"upstream_key"),
+ [
+ pytest.param(StartOfDayMapper, DayWindow, StartOfHourMapper,
"2024-01-15T00:00:00", id="day"),
+ pytest.param(StartOfWeekMapper, WeekWindow, StartOfDayMapper,
"2024-01-15T00:00:00", id="week"),
+ pytest.param(
+ StartOfMonthMapper, MonthWindow, StartOfDayMapper,
"2024-02-10T00:00:00", id="month"
+ ),
+ pytest.param(
+ StartOfQuarterMapper,
+ QuarterWindow,
+ StartOfMonthMapper,
+ "2024-02-10T00:00:00",
+ id="quarter",
+ ),
+ pytest.param(StartOfYearMapper, YearWindow, StartOfMonthMapper,
"2024-07-04T00:00:00", id="year"),
+ ],
+ )
+ def test_serialize_roundtrip_default_downstream_mapper_all_windows(
+ self, upstream_factory, window_cls, expected_downstream_cls,
upstream_key
+ ):
+ """The auto-resolved default downstream mapper survives encode →
decode for every supported window.
+
+ The default-table lookup happens in ``__init__`` (so the restored
+ mapper carries a concrete ``downstream_mapper`` instance, not a
+ sentinel). Round-tripping the serialized blob must reconstruct the
+ same class on the downstream side and produce byte-identical
+ ``to_downstream`` output.
+ """
+ mapper = FanOutMapper(upstream_mapper=upstream_factory(),
window=window_cls())
+ restored = FanOutMapper.deserialize(mapper.serialize())
+
+ assert isinstance(restored, FanOutMapper)
+ assert isinstance(restored.upstream_mapper, upstream_factory)
+ assert isinstance(restored.window, window_cls)
+ assert isinstance(restored.downstream_mapper, expected_downstream_cls)
+ assert list(restored.to_downstream(upstream_key)) ==
list(mapper.to_downstream(upstream_key))
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index b3e34ca76fc..4801761b74b 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -1105,6 +1105,74 @@ def test_decode_product_mapper():
assert core_pm.to_downstream("2024-06-15T10:30:00|2024-06-15T10:30:00") ==
"2024-06-15T10|2024-06-15"
+def test_encode_fan_out_mapper():
+ from airflow.sdk import FanOutMapper, StartOfDayMapper, StartOfWeekMapper,
WeekWindow
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ partition_mapper = FanOutMapper(
+ upstream_mapper=StartOfWeekMapper(),
+ window=WeekWindow(),
+ downstream_mapper=StartOfDayMapper(),
+ )
+ assert encode_partition_mapper(partition_mapper) == {
+ Encoding.TYPE: "airflow.partition_mappers.temporal.FanOutMapper",
+ Encoding.VAR: {
+ "upstream_mapper": {
+ Encoding.TYPE:
"airflow.partition_mappers.temporal.StartOfWeekMapper",
+ Encoding.VAR: {
+ "timezone": "UTC",
+ "input_format": "%Y-%m-%dT%H:%M:%S",
+ "output_format": "%Y-%m-%d (W%V)",
+ },
+ },
+ "window": {
+ Encoding.TYPE: "airflow.partition_mappers.window.WeekWindow",
+ Encoding.VAR: {},
+ },
+ "downstream_mapper": {
+ Encoding.TYPE:
"airflow.partition_mappers.temporal.StartOfDayMapper",
+ Encoding.VAR: {
+ "timezone": "UTC",
+ "input_format": "%Y-%m-%dT%H:%M:%S",
+ "output_format": "%Y-%m-%d",
+ },
+ },
+ },
+ }
+
+
+def test_decode_fan_out_mapper():
+ from airflow.partition_mappers.temporal import (
+ FanOutMapper as CoreFanOutMapper,
+ StartOfDayMapper as CoreStartOfDayMapper,
+ StartOfWeekMapper as CoreStartOfWeekMapper,
+ )
+ from airflow.partition_mappers.window import WeekWindow as CoreWeekWindow
+ from airflow.sdk import FanOutMapper, StartOfWeekMapper, WeekWindow
+ from airflow.serialization.decoders import decode_partition_mapper
+ from airflow.serialization.encoders import encode_partition_mapper
+
+ partition_mapper = FanOutMapper(upstream_mapper=StartOfWeekMapper(),
window=WeekWindow())
+ encoded_pm = encode_partition_mapper(partition_mapper)
+
+ core_pm = decode_partition_mapper(encoded_pm)
+
+ assert isinstance(core_pm, CoreFanOutMapper)
+ assert isinstance(core_pm.upstream_mapper, CoreStartOfWeekMapper)
+ assert isinstance(core_pm.window, CoreWeekWindow)
+ # downstream_mapper is auto-resolved to StartOfDayMapper for WeekWindow.
+ assert isinstance(core_pm.downstream_mapper, CoreStartOfDayMapper)
+ assert list(core_pm.to_downstream("2024-01-15T00:00:00")) == [
+ "2024-01-15",
+ "2024-01-16",
+ "2024-01-17",
+ "2024-01-18",
+ "2024-01-19",
+ "2024-01-20",
+ "2024-01-21",
+ ]
+
+
def test_encode_chain_mapper():
from airflow.sdk import ChainMapper, StartOfDayMapper, StartOfHourMapper
from airflow.serialization.encoders import encode_partition_mapper
diff --git a/scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
b/scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
new file mode 100755
index 00000000000..0c1adea9669
--- /dev/null
+++ b/scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# /// script
+# requires-python = ">=3.10"
+# dependencies = [
+# "rich>=13.6.0",
+# ]
+# ///
+"""
+Verify ``FanOutMapper.default_downstream_mapper_by_window_name`` stays in sync
+between core and the Task SDK.
+
+The default downstream-mapper table is defined twice — once in the core class
+hierarchy and once in the SDK copy — because the two hierarchies are
+independent (the SDK cannot import core) and the lookup is by ``Window`` class
+*name*. Both copies must list the same ``Window`` name -> mapper class mapping,
+otherwise a ``FanOutMapper`` resolves a different default depending on whether
+it runs in Dag-author code (SDK) or after deserialization (core).
+
+This check parses the ``default_downstream_mapper_by_window_name`` class
+attribute from both files via AST and asserts the two mappings are identical.
+
+Run from the repo root:
+
+ uv run --project scripts python
scripts/ci/prek/check_partition_mapper_defaults_in_sync.py
+
+Exits 0 if the two tables match, 1 (with a diff) otherwise.
+"""
+
+from __future__ import annotations
+
+import ast
+import sys
+from pathlib import Path
+
+from common_prek_utils import (
+ AIRFLOW_CORE_SOURCES_PATH,
+ AIRFLOW_TASK_SDK_SOURCES_PATH,
+)
+from rich.console import Console
+
+console = Console(color_system="standard", width=200)
+
+CLASS_NAME = "FanOutMapper"
+ATTR_NAME = "default_downstream_mapper_by_window_name"
+
+CORE_FILE = AIRFLOW_CORE_SOURCES_PATH / "airflow" / "partition_mappers" /
"temporal.py"
+SDK_FILE = (
+ AIRFLOW_TASK_SDK_SOURCES_PATH / "airflow" / "sdk" / "definitions" /
"partition_mappers" / "temporal.py"
+)
+
+
+def _find_attr_value(file_path: Path) -> ast.Dict:
+ """Return the AST node assigned to
``FanOutMapper.default_downstream_mapper_by_window_name``."""
+ tree = ast.parse(file_path.read_text(encoding="utf-8"),
filename=str(file_path))
+ for node in ast.walk(tree):
+ if not (isinstance(node, ast.ClassDef) and node.name == CLASS_NAME):
+ continue
+ for stmt in node.body:
+ target_name = None
+ if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target,
ast.Name):
+ target_name = stmt.target.id
+ elif (
+ isinstance(stmt, ast.Assign)
+ and len(stmt.targets) == 1
+ and isinstance(stmt.targets[0], ast.Name)
+ ):
+ target_name = stmt.targets[0].id
+ if target_name != ATTR_NAME:
+ continue
+ if not isinstance(stmt.value, ast.Dict):
+ raise ValueError(
+ f"{file_path}: {CLASS_NAME}.{ATTR_NAME} is not a dict
literal; "
+ f"this check parses a dict literal and must be updated."
+ )
+ return stmt.value
+ raise ValueError(f"{file_path}: {CLASS_NAME} has no {ATTR_NAME}
attribute.")
+ raise ValueError(f"{file_path}: no class {CLASS_NAME} found.")
+
+
+def _extract_mapping(file_path: Path) -> dict[str, str]:
+ """Extract the ``{window_name: mapper_class_name}`` mapping from a dict
literal."""
+ dict_node = _find_attr_value(file_path)
+ mapping: dict[str, str] = {}
+ for key_node, value_node in zip(dict_node.keys, dict_node.values):
+ if not (isinstance(key_node, ast.Constant) and
isinstance(key_node.value, str)):
+ raise ValueError(
+ f"{file_path}: {CLASS_NAME}.{ATTR_NAME} has a
non-string-literal key; "
+ f"this check expects window class names as string literals."
+ )
+ if not isinstance(value_node, ast.Name):
+ raise ValueError(
+ f"{file_path}: {CLASS_NAME}.{ATTR_NAME}[{key_node.value!r}] is
not a bare "
+ f"class name; this check expects a mapper class reference."
+ )
+ mapping[key_node.value] = value_node.id
+ return mapping
+
+
+def main() -> int:
+ try:
+ core_mapping = _extract_mapping(CORE_FILE)
+ sdk_mapping = _extract_mapping(SDK_FILE)
+ except ValueError as exc:
+ console.print(f"[red]Could not read the default mapper table:[/red]
{exc}")
+ return 1
+
+ if core_mapping == sdk_mapping:
+ return 0
+
+ console.print(f"[red]{CLASS_NAME}.{ATTR_NAME} is out of sync between core
and the Task SDK.[/red]\n")
+ all_windows = sorted(core_mapping.keys() | sdk_mapping.keys())
+ for window in all_windows:
+ core_val = core_mapping.get(window, "<missing>")
+ sdk_val = sdk_mapping.get(window, "<missing>")
+ marker = " " if core_val == sdk_val else "->"
+ color = "" if core_val == sdk_val else "[red]"
+ end = "" if core_val == sdk_val else "[/red]"
+ console.print(f"{color}{marker} {window}: core={core_val}
sdk={sdk_val}{end}")
+ console.print(
+ f"\nMake both tables list the same window -> mapper entries:\n core:
{CORE_FILE}\n sdk: {SDK_FILE}"
+ )
+ return 1
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst
index 2cc9e07bf1c..18fb8a08bbc 100644
--- a/task-sdk/docs/api.rst
+++ b/task-sdk/docs/api.rst
@@ -245,6 +245,8 @@ Partition Mapper
.. autoapiclass:: airflow.sdk.AllowedKeyMapper
+.. autoapiclass:: airflow.sdk.FanOutMapper
+
Rollup Windows
~~~~~~~~~~~~~~
diff --git a/task-sdk/src/airflow/sdk/__init__.py
b/task-sdk/src/airflow/sdk/__init__.py
index b922cad9e1f..ce5b8ab2c2e 100644
--- a/task-sdk/src/airflow/sdk/__init__.py
+++ b/task-sdk/src/airflow/sdk/__init__.py
@@ -54,6 +54,7 @@ __all__ = [
"EdgeModifier",
"EventsTimetable",
"ExceptionRetryPolicy",
+ "FanOutMapper",
"HourWindow",
"IdentityMapper",
"Label",
@@ -156,6 +157,7 @@ if TYPE_CHECKING:
from airflow.sdk.definitions.partition_mappers.identity import
IdentityMapper
from airflow.sdk.definitions.partition_mappers.product import ProductMapper
from airflow.sdk.definitions.partition_mappers.temporal import (
+ FanOutMapper,
StartOfDayMapper,
StartOfHourMapper,
StartOfMonthMapper,
@@ -241,6 +243,7 @@ __lazy_imports: dict[str, str] = {
"EdgeModifier": ".definitions.edges",
"EventsTimetable": ".definitions.timetables.events",
"ExceptionRetryPolicy": ".definitions.retry_policy",
+ "FanOutMapper": ".definitions.partition_mappers.temporal",
"HourWindow": ".definitions.partition_mappers.window",
"IdentityMapper": ".definitions.partition_mappers.identity",
"Label": ".definitions.edges",
diff --git a/task-sdk/src/airflow/sdk/__init__.pyi
b/task-sdk/src/airflow/sdk/__init__.pyi
index 703e3d4f0e6..1bb975e2202 100644
--- a/task-sdk/src/airflow/sdk/__init__.pyi
+++ b/task-sdk/src/airflow/sdk/__init__.pyi
@@ -70,6 +70,7 @@ from airflow.sdk.definitions.partition_mappers.chain import
ChainMapper
from airflow.sdk.definitions.partition_mappers.identity import IdentityMapper
from airflow.sdk.definitions.partition_mappers.product import ProductMapper
from airflow.sdk.definitions.partition_mappers.temporal import (
+ FanOutMapper,
StartOfDayMapper,
StartOfHourMapper,
StartOfMonthMapper,
@@ -152,6 +153,7 @@ __all__ = [
"EdgeModifier",
"EventsTimetable",
"ExceptionRetryPolicy",
+ "FanOutMapper",
"HourWindow",
"IdentityMapper",
"Label",
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
index 06364cacd39..200a547c403 100644
--- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
@@ -25,6 +25,8 @@ from airflow.sdk.definitions.partition_mappers.base import
PartitionMapper
if TYPE_CHECKING:
from pendulum import FixedTimezone, Timezone
+ from airflow.sdk.definitions.partition_mappers.window import Window
+
class _BaseTemporalMapper(PartitionMapper):
"""Base class for Temporal Partition Mappers."""
@@ -104,3 +106,78 @@ class StartOfYearMapper(_BaseTemporalMapper):
"""
default_output_format = "%Y"
+
+
+class FanOutMapper(PartitionMapper):
+ """
+ Partition mapper that fans one upstream key out into multiple downstream
keys.
+
+ Compose an ``upstream_mapper`` (which parses the coarse upstream key and
+ normalizes it to a period start) with a ``window`` that enumerates the
+ members of that period. ``downstream_mapper`` formats each member into a
+ downstream key string; if omitted, a default is chosen from the window
+ class (e.g. ``WeekWindow`` → ``StartOfDayMapper``).
+
+ ``downstream_mapper`` must be passed explicitly for any window without an
+ entry in the default table — currently ``HourWindow`` and any custom
+ ``Window`` subclass. Constructing a ``FanOutMapper`` for those windows
+ without a ``downstream_mapper`` raises ``ValueError`` at Dag-parse time.
+
+ Symmetric to :class:`~airflow.sdk.RollupMapper`: rollup is N→1 (downstream
+ waits for all members), fan-out is 1→N (one upstream event creates many
+ downstream Dag runs).
+
+ .. code-block:: python
+
+ # Weekly upstream → 7 daily downstream Dag runs
+ FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow())
+ """
+
+ # Keep ``FanOutMapper.default_downstream_mapper_by_window_name`` in sync
with
+ # the core copy in
+ # ``airflow-core/src/airflow/partition_mappers/temporal.py`` —
+ # the SDK and core class hierarchies are independent (the SDK cannot import
+ # core), so both sides carry the same defaults and the lookup is by class
+ # name. When adding a new ``Window`` subclass, extend the table on both
+ # sides; a missing entry raises ``ValueError`` at ``FanOutMapper.__init__``
+ # (see ``FanOutMapper._resolve_default_downstream_mapper``). The
+ # ``check-partition-mapper-defaults-in-sync`` prek hook enforces that the
+ # two tables stay identical.
+ default_downstream_mapper_by_window_name: ClassVar[dict[str,
type[_BaseTemporalMapper]]] = {
+ "DayWindow": StartOfHourMapper,
+ "WeekWindow": StartOfDayMapper,
+ "MonthWindow": StartOfDayMapper,
+ "QuarterWindow": StartOfMonthMapper,
+ "YearWindow": StartOfMonthMapper,
+ }
+
+ @classmethod
+ def _resolve_default_downstream_mapper(cls, window: Window) ->
PartitionMapper:
+ """
+ Return the conventional downstream mapper for *window*.
+
+ Looked up by the window's class **name** rather than identity so that
+ the SDK ``Window`` classes (used in Dag-author code) and the core
+ ``Window`` classes (used after deserialization) both resolve to the
+ same default. Subclasses can extend or override the defaults by
+ setting :attr:`default_downstream_mapper_by_window_name` on the
+ subclass.
+ """
+ mapper_cls =
cls.default_downstream_mapper_by_window_name.get(type(window).__name__)
+ if mapper_cls is None:
+ raise ValueError(
+ f"{cls.__name__} has no default downstream_mapper for window
type "
+ f"{type(window).__name__}; pass downstream_mapper explicitly."
+ )
+ return mapper_cls()
+
+ def __init__(
+ self,
+ *,
+ upstream_mapper: PartitionMapper,
+ window: Window,
+ downstream_mapper: PartitionMapper | None = None,
+ ) -> None:
+ self.upstream_mapper = upstream_mapper
+ self.window = window
+ self.downstream_mapper = downstream_mapper or
self._resolve_default_downstream_mapper(window)