Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3354281491


##########
airflow-core/docs/authoring-and-scheduling/assets.rst:
##########
@@ -638,6 +638,87 @@ including ``partition_key`` in the request body):
         "partition_key": "us|2026-03-10T09:00:00"
       }'
 
+Rollup mappers
+~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3.0
+
+The mappers shown above match upstream keys to a single downstream key 
one-for-one.
+For a coarser downstream period made up of many upstream events — an hourly 
upstream
+that drives a daily summary, daily inputs that compose a weekly report — use
+``RollupMapper``. ``RollupMapper`` composes an upstream mapper (which 
normalizes each
+upstream key to the downstream granularity) with a ``Window`` that declares 
the full
+set of upstream keys required for one downstream key. The scheduler holds the 
Dag
+run until every upstream key in the window has arrived; partial windows stay 
pending
+on the next-run-assets view so operators can see progress.
+
+The shipped windows are ``HourWindow`` (sixty minutes per hour), ``DayWindow``
+(twenty-four hours per day), ``WeekWindow`` (seven days per week), 
``MonthWindow``,
+``QuarterWindow``, and ``YearWindow``. Pair each window with an upstream 
mapper that
+decodes to the same temporal grain — for example ``StartOfHourMapper`` with
+``DayWindow``.
+
+The following hourly-to-daily example produces a daily summary once all 
twenty-four
+upstream hourly partitions for a calendar day have arrived:
+
+.. code-block:: python
+
+    from airflow.sdk import (
+        DAG,
+        Asset,
+        CronPartitionTimetable,
+        DayWindow,
+        PartitionedAssetTimetable,
+        RollupMapper,
+        StartOfHourMapper,
+        task,
+    )
+
+    hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", 
name="hourly_sales")
+
+    # Producer: emits one partitioned event per hour (key looks like 
2026-03-10T09:00:00).
+    with DAG(
+        dag_id="ingest_hourly_sales",
+        schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+    ):
+
+        @task(outlets=[hourly_sales])
+        def ingest():
+            pass
+
+        ingest()
+
+    # Consumer: fires once a day's twenty-four hourly partitions are all in.
+    with DAG(
+        dag_id="daily_sales_summary",
+        schedule=PartitionedAssetTimetable(
+            assets=hourly_sales,
+            default_partition_mapper=RollupMapper(
+                upstream_mapper=StartOfHourMapper(),
+                window=DayWindow(),
+            ),

Review Comment:
   e.g., An upstream event carries a like `2026-03-10T09:00:00`; 
`StartOfHourMapper` normalizes it to `2026-03-10T09`, which is the format the 
24 `DayWindow` members are matched against. An identity mapper wouldn't 
normalize — and pairing identity with a temporal window is actually rejected at 
construction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to