uranusjr commented on code in PR #66030:
URL: https://github.com/apache/airflow/pull/66030#discussion_r3278829279
##########
airflow-core/src/airflow/partition_mappers/temporal.py:
##########
@@ -386,3 +389,121 @@ def normalize(self, dt: datetime) -> datetime:
second=0,
microsecond=0,
)
+
+
+class FanOutMapper(PartitionMapper):
+ """
+ Partition mapper that fans one upstream key out into multiple downstream
keys.
+
+ Compose an ``upstream_mapper`` (parses the coarse upstream key and
+ normalizes it to its period start) with a ``window`` (enumerates the
+ members of that period). ``downstream_mapper`` formats each member into a
+ downstream key string; if omitted, a default is chosen from the window
+ class.
+
+ ``downstream_mapper`` must be passed explicitly for any window without an
+ entry in the default table — currently ``HourWindow`` and any custom
+ ``Window`` subclass. Constructing a ``FanOutMapper`` for those windows
+ without a ``downstream_mapper`` raises ``ValueError`` at Dag-parse time.
+
+ Symmetric to :class:`~airflow.partition_mappers.base.RollupMapper`: rollup
+ is N→1 (downstream waits until all members arrive), fanout is 1→N (one
+ upstream event creates one downstream Dag run per member).
+
+ .. code-block:: python
+
+ # Weekly upstream → 7 daily downstream Dag runs
+ FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow())
+ """
+
+ def __init__(
+ self,
+ *,
+ upstream_mapper: PartitionMapper,
+ window: Window,
+ downstream_mapper: PartitionMapper | None = None,
+ ) -> None:
+ self.upstream_mapper = upstream_mapper
+ self.window = window
+ self.downstream_mapper = downstream_mapper or
_resolve_default_downstream_mapper(window)
+
+ def to_downstream(self, key: str) -> Iterable[str]:
+ # Round-trip the upstream key through its mapper to obtain the
+ # period-start datetime (decoded form). This keeps the upstream_mapper
+ # opaque — we don't need to know whether it's temporal or segment.
+ formatted = self.upstream_mapper.to_downstream(key)
+ if not isinstance(formatted, str):
+ raise TypeError(
+ "FanOutMapper.upstream_mapper must produce a single key from "
+ "to_downstream; chained fan-out (mapper that itself returns
multiple keys) "
+ "is not supported."
+ )
+ coarse = self.upstream_mapper.decode_downstream(formatted)
+ return [_format_with(self.downstream_mapper, item) for item in
self.window.to_upstream(coarse)]
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.encoders import encode_partition_mapper,
encode_window
+
+ return {
+ "upstream_mapper": encode_partition_mapper(self.upstream_mapper),
+ "window": encode_window(self.window),
+ "downstream_mapper":
encode_partition_mapper(self.downstream_mapper),
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
+ from airflow.serialization.decoders import decode_partition_mapper,
decode_window
+
+ return cls(
+ upstream_mapper=decode_partition_mapper(data["upstream_mapper"]),
+ window=decode_window(data["window"]),
+
downstream_mapper=decode_partition_mapper(data["downstream_mapper"]),
+ )
+
+
+def _format_with(mapper: PartitionMapper, decoded: Any) -> str:
+ """
+ Format *decoded* using *mapper*'s downstream format.
+
+ Temporal mappers expose ``format(dt) -> str`` which uses ``output_format``;
+ that's the natural "datetime → key string" operation for fanout. For
+ non-temporal mappers (segment, custom) we fall back to ``str(decoded)``;
+ they can override by providing their own ``format`` method.
+ """
+ formatter = getattr(mapper, "format", None)
+ if callable(formatter):
+ return formatter(decoded)
+ return str(decoded)
+
+
+# Keep in sync with the SDK copy in
+# ``task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py`` —
+# the SDK and core class hierarchies are independent (the SDK cannot import
+# core), so both sides carry the same defaults and the lookup is by class name.
+# When adding a new ``Window`` subclass, extend this table on both sides; a
+# missing entry raises ``ValueError`` at ``FanOutMapper.__init__`` (see
+# ``_resolve_default_downstream_mapper``).
+_DEFAULT_DOWNSTREAM_MAPPER_BY_WINDOW_NAME: dict[str,
type[_BaseTemporalMapper]] = {
+ "DayWindow": StartOfHourMapper,
+ "WeekWindow": StartOfDayMapper,
+ "MonthWindow": StartOfDayMapper,
+ "QuarterWindow": StartOfMonthMapper,
+ "YearWindow": StartOfMonthMapper,
+}
+
+
+def _resolve_default_downstream_mapper(window: Window) -> PartitionMapper:
+ """
+ Return the conventional downstream mapper for *window*.
+
+ Looked up by the window's class **name** rather than identity so that the
+ SDK ``Window`` classes (used in Dag-author code) and the core ``Window``
+ classes (used after deserialization) both resolve to the same default.
+ """
+ cls = _DEFAULT_DOWNSTREAM_MAPPER_BY_WINDOW_NAME.get(type(window).__name__)
+ if cls is None:
+ raise ValueError(
+ f"FanOutMapper has no default downstream_mapper for window type "
+ f"{type(window).__name__}; pass downstream_mapper explicitly."
+ )
+ return cls()
Review Comment:
Should this be a method on FanOutMapper so user can subclass to override the
defaults?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]