Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3309961331
##########
airflow-core/src/airflow/partition_mappers/base.py:
##########
@@ -18,26 +18,128 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, ClassVar
if TYPE_CHECKING:
from collections.abc import Iterable
+ from airflow.partition_mappers.window import Window
+
class PartitionMapper(ABC):
"""
Base partition mapper class.
- Maps keys from asset events to target dag run partitions.
+ Maps keys from asset events to target Dag run partitions.
"""
+ is_rollup: ClassVar[bool] = False
+
+ def __init_subclass__(cls, **kwargs: Any) -> None:
+ super().__init_subclass__(**kwargs)
+ decode_overridden = cls.decode_downstream is not
PartitionMapper.decode_downstream
+ encode_overridden = cls.encode_upstream is not
PartitionMapper.encode_upstream
+ if decode_overridden ^ encode_overridden:
+ raise TypeError(
+ f"{cls.__qualname__} overrides only one side of the
decode/encode pair. "
+ "Both 'decode_downstream' and 'encode_upstream' must be
overridden together "
+ "or not at all. Overriding only one side leaves
RollupMapper.to_upstream "
+ "producing non-str members, causing the scheduler's
upstream-window check "
+ "to silently never satisfy and the Dag run to be held forever."
+ )
+
@abstractmethod
def to_downstream(self, key: str) -> str | Iterable[str]:
"""Return the target key that the given source partition key maps
to."""
+ def decode_downstream(self, downstream_key: str) -> Any:
+ """
+ Recover the canonical decoded form of *downstream_key*.
+
+ Used by :class:`RollupMapper` to hand the window an opaque "anchor"
+ for the downstream period; the window iterates in this decoded space
+ and the mapper re-encodes each expected upstream via
+ :meth:`encode_upstream`. Default is identity (string in, string out)
+ — temporal mappers override to return ``datetime``, future segment
+ mappers will return whatever shape suits them.
+
+ .. warning::
+
+ ``decode_downstream`` and :meth:`encode_upstream` form an inverse
+ pair. If you override this method to return a non-str (e.g.
+ ``datetime``, ``tuple``), you **must** also override
+ ``encode_upstream`` to convert the decoded form back to an
+ upstream key string. Overriding only one side leaves
+ :class:`RollupMapper.to_upstream` producing non-str members, the
+ scheduler's upstream-window check silently never satisfies, and
+ the Dag run is held forever with no audit log entry.
+ """
+ return downstream_key
+
+ def encode_upstream(self, decoded: Any) -> str:
+ """
+ Encode an expected upstream object back into a key string.
+
+ Pair of :meth:`decode_downstream`. Default is identity. Temporal
+ mappers override to apply timezone + ``input_format``.
+
+ .. warning::
+
+ The default identity implementation is only correct when
+ :meth:`decode_downstream` also uses the identity default (str in,
+ str out). See :meth:`decode_downstream` for the consequences of
+ overriding only one side of the pair.
+ """
+ return decoded
+
def serialize(self) -> dict[str, Any]:
return {}
@classmethod
def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
return cls()
+
+
+class RollupMapper(PartitionMapper):
+ """
+ Partition mapper that rolls up many upstream keys into one downstream key.
+
+ Compose a ``upstream_mapper`` (which normalizes each upstream key to the
+ downstream granularity) with a ``window`` that declares the full set of
+ upstream keys required for a given downstream key. The scheduler holds
+ the Dag run until every upstream key in the window has arrived.
+ """
+
+ is_rollup: ClassVar[bool] = True
+
+ def __init__(self, *, upstream_mapper: PartitionMapper, window: Window) ->
None:
Review Comment:
`Window` now declares `expected_decoded_type: ClassVar[type] = str`
(default), and the built-in temporal windows override to `datetime`.
`RollupMapper.__init__` raises `TypeError` when the upstream mapper leaves
`decode_downstream` at the base identity (returns str), but the paired window
expects a non-str decoded type. Matches the style of the de/encode-pair guard.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]