uranusjr commented on code in PR #67716:
URL: https://github.com/apache/airflow/pull/67716#discussion_r3378422464
##########
airflow-core/src/airflow/partition_mappers/window.py:
##########
@@ -194,3 +194,52 @@ class YearWindow(Window):
def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
_require_day_one(period_start, type(self))
return (_shift_months(period_start, i) for i in range(12))
+
+
+class SegmentWindow(Window):
+ """
+ A fixed categorical set of string keys that constitute one downstream
period.
+
+ Paired with :class:`~airflow.partition_mappers.fixed_key.FixedKeyMapper`
inside a
+ :class:`~airflow.partition_mappers.base.RollupMapper` to express a
categorical
+ rollup: the scheduler holds the downstream run until every declared
segment key
+ has arrived from the upstream producer, then fires once.
+
+ ``to_upstream`` returns the complete segment set regardless of the
downstream
+ anchor value — the anchor is intentionally ignored because all segments
map onto
+ a single downstream partition key, not a time-based period.
+
+ :param segments: Non-empty iterable of non-empty string segment keys.
Duplicates
+ are silently de-duplicated.
+ :raises ValueError: if *segments* is empty, contains a non-``str``
element, or
+ contains an empty-string element.
+ """
+
+ expected_decoded_type: ClassVar[type] = str
+
+ def __init__(self, segments: Iterable[str]) -> None:
+ collected: list[str] = list(segments)
+ if not collected:
+ raise ValueError("SegmentWindow requires at least one segment key;
got an empty iterable.")
+ for i, item in enumerate(collected):
+ if not isinstance(item, str):
+ raise ValueError(
+ f"SegmentWindow segment keys must be str; "
+ f"got {type(item).__name__!r} at index {i}: {item!r}"
+ )
+ if item == "":
+ raise ValueError(
+ f"SegmentWindow segment keys must be non-empty strings;
got an empty string at index {i}."
+ )
+ self._segments: frozenset[str] = frozenset(collected)
+
+ def to_upstream(self, decoded_downstream: Any) -> frozenset[str]:
+ """Return the full declared segment set, ignoring the downstream
anchor."""
+ return frozenset(self._segments)
Review Comment:
Why does this need to do a copy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]