Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3279600029


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1859,40 +1876,175 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _check_rollup_asset_status(
+        self,
+        *,
+        asset_id: int,
+        apdr: AssetPartitionDagRun,
+        mapper: RollupMapper,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        if TYPE_CHECKING:
+            assert apdr.partition_key is not None
+        expected = mapper.to_upstream(apdr.partition_key)
+        return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+    def _resolve_asset_partition_status(
+        self,
+        *,
+        session: Session,
+        asset_id: int,
+        name: str,
+        uri: str,
+        apdr: AssetPartitionDagRun,
+        timetable: PartitionedAssetTimetable,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        """
+        Return whether *asset_id* has been satisfied for *apdr*.
+
+        Non-rollup assets resolve to ``True`` because the caller only invokes
+        this for assets that already have at least one logged event for *APDR*
+        (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+        the non-rollup contract for "received". Rollup assets defer to
+        :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+        A misconfigured mapper that raises returns ``False`` (treated as
+        not-yet-satisfied) and an audit log entry is written so the operator
+        can see why the Dag run is being held in the UI.
+        """
+        try:
+            mapper = timetable.get_partition_mapper(name=name, uri=uri)
+            if not mapper.is_rollup:
+                return True
+            return self._check_rollup_asset_status(
+                asset_id=asset_id,
+                apdr=apdr,
+                mapper=cast("RollupMapper", mapper),
+                actual_by_asset=actual_by_asset,
+            )
+        except Exception as err:
+            self.log.exception(
+                "Failed to evaluate rollup status for asset; treating as 
not-yet-satisfied. "
+                "This likely indicates a misconfigured partition mapper.",
+                dag_id=apdr.target_dag_id,
+                partition_key=apdr.partition_key,
+                asset_name=name,
+                asset_uri=uri,
+            )
+            if TYPE_CHECKING:
+                assert apdr.target_dag_id is not None
+            audit_key = (apdr.target_dag_id, name, uri)
+            if audit_key not in self._partition_audit_seen:
+                self._partition_audit_seen.add(audit_key)

Review Comment:
   It's now extracted as another method and is now using 
`create_session(scoped=False)`



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -1859,40 +1876,175 @@ def _do_scheduling(self, session: Session) -> int:
 
         return num_queued_tis
 
+    def _check_rollup_asset_status(
+        self,
+        *,
+        asset_id: int,
+        apdr: AssetPartitionDagRun,
+        mapper: RollupMapper,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        if TYPE_CHECKING:
+            assert apdr.partition_key is not None
+        expected = mapper.to_upstream(apdr.partition_key)
+        return expected.issubset(actual_by_asset.get(asset_id, set()))
+
+    def _resolve_asset_partition_status(
+        self,
+        *,
+        session: Session,
+        asset_id: int,
+        name: str,
+        uri: str,
+        apdr: AssetPartitionDagRun,
+        timetable: PartitionedAssetTimetable,
+        actual_by_asset: dict[int, set[str]],
+    ) -> bool:
+        """
+        Return whether *asset_id* has been satisfied for *apdr*.
+
+        Non-rollup assets resolve to ``True`` because the caller only invokes
+        this for assets that already have at least one logged event for *APDR*
+        (see :class:`~airflow.models.asset.PartitionedAssetKeyLog`), which is
+        the non-rollup contract for "received". Rollup assets defer to
+        :meth:`_check_rollup_asset_status` for the upstream-window check.
+
+        A misconfigured mapper that raises returns ``False`` (treated as
+        not-yet-satisfied) and an audit log entry is written so the operator
+        can see why the Dag run is being held in the UI.
+        """
+        try:
+            mapper = timetable.get_partition_mapper(name=name, uri=uri)
+            if not mapper.is_rollup:
+                return True
+            return self._check_rollup_asset_status(
+                asset_id=asset_id,
+                apdr=apdr,
+                mapper=cast("RollupMapper", mapper),
+                actual_by_asset=actual_by_asset,
+            )
+        except Exception as err:
+            self.log.exception(
+                "Failed to evaluate rollup status for asset; treating as 
not-yet-satisfied. "
+                "This likely indicates a misconfigured partition mapper.",
+                dag_id=apdr.target_dag_id,
+                partition_key=apdr.partition_key,
+                asset_name=name,
+                asset_uri=uri,
+            )
+            if TYPE_CHECKING:
+                assert apdr.target_dag_id is not None
+            audit_key = (apdr.target_dag_id, name, uri)
+            if audit_key not in self._partition_audit_seen:
+                self._partition_audit_seen.add(audit_key)
+                session.add(
+                    Log(
+                        event="failed to evaluate rollup status",
+                        dag_id=apdr.target_dag_id,
+                        extra=(
+                            "Could not evaluate rollup status for 
partition_key "
+                            f"'{apdr.partition_key}' on asset (name='{name}', 
uri='{uri}') "
+                            f"in target Dag '{apdr.target_dag_id}'. This 
likely indicates "
+                            "that the rollup mapper is misconfigured or does 
not support "
+                            f"this partition key.\n{type(err).__name__}: {err}"
+                        ),
+                    )
+                )
+            return False
+
     def _create_dagruns_for_partitioned_asset_dags(self, session: Session) -> 
set[str]:
+        """
+        Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose 
partition is satisfied.
+
+        Returns the set of ``dag_id`` strings that received a new 
partition-driven Dag run in this
+        tick. The caller (:meth:`_create_dagruns_for_dags`) uses this set to 
exclude the same Dags
+        from the standard schedule-driven and asset-triggered creation paths 
so a single Dag never
+        gets two Dag runs for the same tick when it appears in more than one 
creation path. We
+        return ``dag_id`` strings rather than full Dag/DagRun objects because 
the only downstream
+        use is membership lookup, and a heavier return type would just be 
discarded.
+        """
+        # Cap per-tick work so the scheduler transaction stays bounded and 
other
+        # scheduling work isn't starved. Remaining APDRs drain across 
subsequent ticks.
+        # Note: with strict FIFO ordering, persistently-unsatisfied APDRs at 
the head
+        # of the queue would block newer ones; switch to updated_at-based 
ordering if
+        # that becomes an issue.
+        pending_apdrs = session.scalars(
+            select(AssetPartitionDagRun)
+            .join(DagModel, DagModel.dag_id == 
AssetPartitionDagRun.target_dag_id)
+            .where(
+                AssetPartitionDagRun.created_dag_run_id.is_(None),
+                DagModel.is_stale.is_(False),
+            )
+            .order_by(AssetPartitionDagRun.created_at)

Review Comment:
   Yep, both sides now use FIFO. unify the order.



##########
airflow-core/src/airflow/partition_mappers/window.py:
##########
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+    from collections.abc import Iterable
+
+
+def _shift_months(dt: datetime, months: int) -> datetime:
+    """
+    Return *dt* shifted forward by *months*, wrapping the year as needed.
+
+    All built-in temporal upstream mappers normalise to day 1, so
+    :meth:`datetime.replace` on the new month is always valid.
+    """
+    total = dt.month - 1 + months
+    return dt.replace(year=dt.year + total // 12, month=total % 12 + 1)
+
+
+class Window(ABC):
+    """
+    Describes a rollup window: which decoded upstream items make up one 
decoded downstream period.
+
+    Paired with a upstream mapper inside a :class:`RollupMapper`. The window
+    operates purely in the upstream mapper's *decoded* form (``datetime`` for
+    temporal mappers, domain-specific types for future segment / runtime
+    mappers). It does not touch key strings, timezones, or formats — those
+    belong to the upstream mapper. ``RollupMapper`` orchestrates the three:
+    decode the downstream key, expand via the window, encode each upstream.
+    """
+
+    @abstractmethod
+    def to_upstream(self, decoded_downstream: Any) -> Iterable[Any]:
+        """Yield each decoded upstream item composing *decoded_downstream*."""
+
+    def serialize(self) -> dict[str, Any]:
+        return {}
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Window:
+        return cls()
+
+
+class HourWindow(Window):
+    """Sixty consecutive minute period-starts making up one hour."""
+
+    def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+        return (period_start + timedelta(minutes=i) for i in range(60))
+
+
+class DayWindow(Window):
+    """
+    Twenty-four consecutive hourly period-starts making up one day.
+
+    Arithmetic is done on naive datetime steps so the 24-hour stride is
+    unambiguous across DST transitions; the upstream mapper handles timezone
+    awareness when it encodes each upstream member back to a key string.
+
+    .. warning:: **DST edge cases with local-timezone upstream mappers**
+
+        ``DayWindow`` always yields exactly 24 steps regardless of the local
+        calendar date. When the upstream mapper uses a local timezone
+        (e.g. ``StartOfDayMapper(timezone="America/New_York")``), DST gaps
+        and folds can cause a mismatch:
+
+        - **Spring-forward (clock skips ahead)**: the local day has fewer than
+          24 real hours. One naive step falls in the gap (e.g. 02:00 ET on
+          spring-forward day does not exist), so the upstream mapper encodes it
+          to the *next* local hour. That key (e.g. ``"2024-03-10T03"``) does
+          not match any upstream event — the rollup window can never be fully
+          satisfied.
+        - **Fall-back (clock repeats)**: the local day has 25 real hours, but
+          ``DayWindow`` only enumerates 24 steps. The extra hour's upstream
+          events are never included in the expected set, so those events do not
+          contribute to any rollup.
+
+        **Mitigation**: use UTC ``input_format`` (e.g. ``%Y-%m-%dT%H%z``) and
+        ensure upstream producers emit UTC partition keys so local-clock
+        ambiguity never arises.
+    """
+
+    def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+        return (period_start + timedelta(hours=i) for i in range(24))
+
+
+class WeekWindow(Window):
+    """Seven consecutive daily period-starts making up one week."""
+
+    def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+        return (period_start + timedelta(days=i) for i in range(7))
+
+
+class MonthWindow(Window):
+    """
+    All daily period-starts making up one calendar month.
+
+    Assumes *period_start* falls on day 1 of the month (which all built-in
+    temporal upstream mappers normalise to). Iterates from day 1 to the last
+    day of that calendar month, so the yielded count varies between 28 and
+    31 depending on the month.
+    """
+
+    def to_upstream(self, period_start: datetime) -> Iterable[datetime]:
+        next_month = period_start.month % 12 + 1
+        next_year = period_start.year + (1 if period_start.month == 12 else 0)
+        next_start = period_start.replace(year=next_year, month=next_month)

Review Comment:
   added a new `_require_day_one` function to enforce it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to