phanikumv commented on code in PR #67475:
URL: https://github.com/apache/airflow/pull/67475#discussion_r3350168434


##########
airflow-core/src/airflow/partition_mappers/window.py:
##########
@@ -0,0 +1,233 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timedelta
+from enum import Enum
+from typing import TYPE_CHECKING, Any, ClassVar
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable
+
+
+def _require_day_one(dt: datetime, window_cls: type) -> None:
+    """
+    Raise ``ValueError`` if *dt* is not on day 1 of its month.
+
+    Month-aligned windows expand by month-step arithmetic, which is only
+    safe when the period starts on day 1 (otherwise e.g.
+    ``replace(month=feb)`` on Jan 31 raises ``ValueError: day is out of
+    range for month``). Built-in temporal upstream mappers normalise to
+    day 1, but a custom ``PartitionMapper.decode_downstream`` could return
+    any day — checking here turns a confusing scheduler-tick crash into
+    an explicit signal about the upstream-mapper contract.
+    """
+    if dt.day != 1:
+        raise ValueError(
+            f"{window_cls.__name__} expects a period start on day 1 of the 
month, "
+            f"got {dt.isoformat()}. The paired upstream mapper's 
decode_downstream "
+            "must normalise to day 1."
+        )
+
+
+def _shift_months(dt: datetime, months: int) -> datetime:
+    """
+    Return *dt* shifted forward by *months*, wrapping the year as needed.
+
+    Caller is responsible for ensuring *dt* is on day 1 (see
+    :func:`_require_day_one`) so that ``replace(month=...)`` is always valid.
+    """
+    total = dt.month - 1 + months
+    return dt.replace(year=dt.year + total // 12, month=total % 12 + 1)
+
+
+class WindowDirection(str, Enum):
+    """Direction of a :class:`Window` fan-out relative to the upstream key."""
+
+    BACKWARD = "backward"
+    """Default; yield the period the upstream key itself represents."""
+
+    FORWARD = "forward"
+    """Yield the trailing period ending at the upstream key (the mirror of 
BACKWARD)."""
+
+
+class Window(ABC):
+    """
+    Describes a rollup window: which decoded upstream items make up one 
decoded downstream period.
+
+    Paired with a upstream mapper inside a :class:`RollupMapper`. The window
+    operates purely in the upstream mapper's *decoded* form (``datetime`` for
+    temporal mappers, domain-specific types for future segment / runtime
+    mappers). It does not touch key strings, timezones, or formats — those
+    belong to the upstream mapper. ``RollupMapper`` orchestrates the three:
+    decode the downstream key, expand via the window, encode each upstream.
+
+    The shipped temporal windows describe contiguous, non-overlapping periods
+    in which each upstream key feeds exactly one downstream key. Sliding /
+    overlapping semantics — one upstream key contributing to multiple adjacent
+    downstream periods, e.g. a rolling 7-day window or the
+    ``modifies-past-2-hours`` example from the AIP-76 spec — cannot be
+    expressed by subclassing ``Window`` alone: ``Window.to_upstream`` only
+    enumerates which upstream keys one downstream needs. The complementary
+    direction (which downstreams an upstream key feeds) lives on the paired
+    :meth:`PartitionMapper.to_downstream`, which already supports returning an
+    iterable to fan out one source key across multiple target APDRs. Sliding
+    therefore requires customizing **both** ``Window.to_upstream`` and the
+    paired ``PartitionMapper.to_downstream`` consistently, so the schema
+    invariant ``upstream_key in window.to_upstream(D) ⇔ D in
+    mapper.to_downstream(upstream_key)`` holds.
+    """
+
+    #: Type that ``to_upstream`` expects as its ``decoded_downstream`` 
argument.
+    #: ``RollupMapper.__init__`` uses this to reject pairings where the 
upstream
+    #: mapper's ``decode_downstream`` leaves the value as ``str`` (base 
identity)
+    #: but the window needs a different type. Temporal windows declare 
``datetime``.
+    expected_decoded_type: ClassVar[type] = str
+
+    def __init__(self, *, direction: WindowDirection = 
WindowDirection.BACKWARD) -> None:
+        self.direction = direction
+
+    @abstractmethod
+    def to_upstream(self, decoded_downstream: Any) -> Iterable[Any]:
+        """Yield each decoded upstream item composing *decoded_downstream*."""
+
+    def serialize(self) -> dict[str, Any]:
+        return {"direction": self.direction.value}
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Window:
+        return cls(direction=WindowDirection(data["direction"]))
+
+
+class HourWindow(Window):
+    """Sixty consecutive minute period-starts making up one hour."""
+
+    expected_decoded_type: ClassVar[type] = datetime
+
+    def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+        if self.direction is WindowDirection.FORWARD:
+            period_start = period_start - timedelta(minutes=59)
+        return (period_start + timedelta(minutes=i) for i in range(60))
+
+
+class DayWindow(Window):
+    """
+    Twenty-four consecutive hourly period-starts making up one day.
+
+    Arithmetic is done on naive datetime steps so the 24-hour stride is
+    unambiguous across DST transitions; the upstream mapper handles timezone
+    awareness when it encodes each upstream member back to a key string.
+
+    .. warning:: **DST edge cases with local-timezone upstream mappers**
+
+        ``DayWindow`` always yields exactly 24 steps regardless of the local
+        calendar date. When the upstream mapper uses a local timezone
+        (e.g. ``StartOfDayMapper(timezone="America/New_York")``), DST gaps
+        and folds can cause a mismatch:
+
+        - **Spring-forward (clock skips ahead)**: the local day has fewer than
+          24 real hours. One naive step falls in the gap (e.g. 02:00 ET on
+          spring-forward day does not exist), so the upstream mapper encodes it
+          to the *next* local hour. That key (e.g. ``"2024-03-10T03"``) does
+          not match any upstream event — the rollup window can never be fully
+          satisfied.
+        - **Fall-back (clock repeats)**: the local day has 25 real hours, but
+          ``DayWindow`` only enumerates 24 steps. The extra hour's upstream
+          events are never included in the expected set, so those events do not
+          contribute to any rollup.
+
+        **Mitigation**: use UTC ``input_format`` (e.g. ``%Y-%m-%dT%H%z``) and
+        ensure upstream producers emit UTC partition keys so local-clock
+        ambiguity never arises.
+
+        The same 24-hour-stride assumption applies to 
``DayWindow(direction=WindowDirection.FORWARD)``:
+        the 24 members are enumerated as naive hourly steps ending at the 
anchor, not as
+        a step back to the "previous calendar day" in local time.
+    """
+
+    expected_decoded_type: ClassVar[type] = datetime
+
+    def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+        if self.direction is WindowDirection.FORWARD:
+            period_start = period_start - timedelta(hours=23)
+        return (period_start + timedelta(hours=i) for i in range(24))
+
+
+class WeekWindow(Window):
+    """Seven consecutive daily period-starts making up one week."""
+
+    expected_decoded_type: ClassVar[type] = datetime
+
+    def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+        if self.direction is WindowDirection.FORWARD:
+            period_start = period_start - timedelta(days=6)

Review Comment:
     `FORWARD` here yields the days *ending at* the anchor — for a Mon 
`2024-03-04`
     key it fans out `Feb 27 – Mar 4` (the preceding period).
   
     That's the opposite direction from the example in the issue this closes 
(#65761),
     which defines forward as the days that *follow*: "W10 (March 4–10) → the 
seven
     following days (March 11–17, forward)". Is inverting that meaning 
intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to