Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3354561345
##########
airflow-core/docs/authoring-and-scheduling/assets.rst:
##########
@@ -638,6 +638,87 @@ including ``partition_key`` in the request body):
"partition_key": "us|2026-03-10T09:00:00"
}'
+Rollup mappers
+~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3.0
+
+The mappers shown above match upstream keys to a single downstream key
one-for-one.
+For a coarser downstream period made up of many upstream events — an hourly
upstream
+that drives a daily summary, daily inputs that compose a weekly report — use
+``RollupMapper``. ``RollupMapper`` composes an upstream mapper (which
normalizes each
+upstream key to the downstream granularity) with a ``Window`` that declares
the full
+set of upstream keys required for one downstream key. The scheduler holds the
Dag
+run until every upstream key in the window has arrived; partial windows stay
pending
+on the next-run-assets view so operators can see progress.
+
+The shipped windows are ``HourWindow`` (sixty minutes per hour), ``DayWindow``
+(twenty-four hours per day), ``WeekWindow`` (seven days per week),
``MonthWindow``,
+``QuarterWindow``, and ``YearWindow``. Pair each window with an upstream
mapper that
+decodes to the same temporal grain — for example ``StartOfHourMapper`` with
+``DayWindow``.
+
+The following hourly-to-daily example produces a daily summary once all
twenty-four
+upstream hourly partitions for a calendar day have arrived:
+
+.. code-block:: python
+
+ from airflow.sdk import (
+ DAG,
+ Asset,
+ CronPartitionTimetable,
+ DayWindow,
+ PartitionedAssetTimetable,
+ RollupMapper,
+ StartOfHourMapper,
+ task,
+ )
+
+ hourly_sales = Asset(uri="file://incoming/sales/hourly.csv",
name="hourly_sales")
+
+ # Producer: emits one partitioned event per hour (key looks like
2026-03-10T09:00:00).
+ with DAG(
+ dag_id="ingest_hourly_sales",
+ schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
+ ):
+
+ @task(outlets=[hourly_sales])
+ def ingest():
+ pass
+
+ ingest()
+
+ # Consumer: fires once a day's twenty-four hourly partitions are all in.
+ with DAG(
+ dag_id="daily_sales_summary",
+ schedule=PartitionedAssetTimetable(
+ assets=hourly_sales,
+ default_partition_mapper=RollupMapper(
+ upstream_mapper=StartOfHourMapper(),
+ window=DayWindow(),
+ ),
+ ),
+ catchup=False,
+ ):
+
+ @task
+ def summarize(dag_run=None):
+ # dag_run.partition_key is the day, e.g. "2026-03-10".
+ print(dag_run.partition_key)
+
+ summarize()
+
+A misconfigured ``RollupMapper`` — e.g. pairing an identity-decoding upstream
mapper
+with a ``DayWindow`` — raises ``TypeError`` at Dag parse so the
misconfiguration
+surfaces immediately instead of silently holding every downstream run forever.
+
+``DayWindow`` always enumerates twenty-four hourly steps. With an upstream
mapper
+configured for a local timezone that observes daylight-saving time, the
spring-forward
+day has only twenty-three real hours (one window member never has a matching
event,
+so the run is held indefinitely) and the fall-back day has twenty-five (the
repeated
+hour is dropped). Use a UTC-based upstream mapper for any rollup that crosses
a DST
+boundary; see the ``DayWindow`` class docstring for the full discussion.
Review Comment:
open https://github.com/apache/airflow/issues/68004 as a follow up issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]