Lee-W commented on code in PR #64571:
URL: https://github.com/apache/airflow/pull/64571#discussion_r3279682297
##########
airflow-core/src/airflow/partition_mappers/temporal.py:
##########
@@ -99,27 +99,135 @@ def normalize(self, dt: datetime) -> datetime:
class StartOfWeekMapper(_BaseTemporalMapper):
- """Map a time-based partition key to week."""
+ """Map a time-based partition key to the start of its week."""
default_output_format = "%Y-%m-%d (W%V)"
+ def __init__(
+ self,
+ *,
+ week_start: int = 0,
+ timezone: str | Timezone | FixedTimezone = "UTC",
+ input_format: str = "%Y-%m-%dT%H:%M:%S",
+ output_format: str | None = None,
+ ) -> None:
+ super().__init__(timezone=timezone, input_format=input_format,
output_format=output_format)
+ self.week_start = week_start # 0 = Monday (ISO default), 6 = Sunday
+
def normalize(self, dt: datetime) -> datetime:
- start = dt - timedelta(days=dt.weekday())
+ days_since_start = (dt.weekday() - self.week_start) % 7
+ start = dt - timedelta(days=days_since_start)
return start.replace(hour=0, minute=0, second=0, microsecond=0)
+ def serialize(self) -> dict[str, Any]:
+ return {**super().serialize(), "week_start": self.week_start}
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
+ return cls(
+ week_start=data.get("week_start", 0),
+ timezone=parse_timezone(data.get("timezone", "UTC")),
+ input_format=data["input_format"],
+ output_format=data["output_format"],
+ )
+
+
+class WeeklyRollupMapper(StartOfWeekMapper, RollupMapper):
+ """
+ Map a time-based partition key to the start of its week, requiring all 7
daily keys.
+
+ Use this when a partitioned Dag should only run once every daily asset
partition
+ for a full week has been produced. Configure ``week_start`` to set which
day begins
+ the week (0 = Monday, 6 = Sunday).
+ """
+
+ def __init__(self, **kwargs) -> None:
+ super().__init__(**kwargs)
+ if "%Y-%m-%d" not in self.output_format:
+ raise ValueError(
+ f"WeeklyRollupMapper requires output_format to contain
'%Y-%m-%d' so that "
+ f"to_upstream() can recover the week-start date, got:
{self.output_format!r}"
+ )
+
+ def to_upstream(self, downstream_key: str) -> frozenset[str]:
+ # Python strptime raises ValueError when %V (ISO week number) appears
without
+ # %G and a weekday directive, so we cannot parse via the full
output_format.
+ # Instead, locate %Y-%m-%d in the format string — __init__ guarantees
it is
+ # present — and parse only the matching 10-char slice of the key.
+ # The prefix before %Y-%m-%d is literal text (no format directives),
so its
+ # length in the format string equals its length in the formatted
output.
+ ymd_fmt = "%Y-%m-%d"
+ key_start = len(self.output_format[:
self.output_format.index(ymd_fmt)])
+ week_start_naive = datetime.strptime(downstream_key[key_start :
key_start + 10], ymd_fmt)
Review Comment:
Fixed by the same `_compile_output_format_regex` refactor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]