This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e6fc8734077 fix(scheduler): populate partition_date for temporal asset
partitions (#68266)
e6fc8734077 is described below
commit e6fc873407711f5e254e35c81b51d82fee94f765
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 10 22:31:43 2026 +0800
fix(scheduler): populate partition_date for temporal asset partitions
(#68266)
---
airflow-core/newsfragments/68266.bugfix.rst | 1 +
.../src/airflow/jobs/scheduler_job_runner.py | 79 +++++++++-
airflow-core/src/airflow/partition_mappers/base.py | 19 +++
.../src/airflow/partition_mappers/chain.py | 9 +-
.../src/airflow/partition_mappers/temporal.py | 12 ++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 167 ++++++++++++++++++++-
.../tests/unit/partition_mappers/test_chain.py | 18 +++
.../tests/unit/partition_mappers/test_temporal.py | 95 ++++++++++++
8 files changed, 392 insertions(+), 8 deletions(-)
diff --git a/airflow-core/newsfragments/68266.bugfix.rst
b/airflow-core/newsfragments/68266.bugfix.rst
new file mode 100644
index 00000000000..0d41b654186
--- /dev/null
+++ b/airflow-core/newsfragments/68266.bugfix.rst
@@ -0,0 +1 @@
+Asset-triggered partitioned Dag runs now set ``partition_date`` when the
consumer's partition mapper is temporal (directly, or wrapped in
``RollupMapper`` / ``FanOutMapper`` / ``ChainMapper``). Non-temporal mappers
leave ``partition_date`` unset.
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index a6496665e8b..a860810b746 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -113,8 +113,8 @@ from airflow.partition_mappers.base import is_rollup
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
from airflow.serialization.definitions.notset import NOTSET
from airflow.ti_deps.dependencies_states import ACTIVE_STATES, EXECUTION_STATES
-from airflow.timetables.base import compute_rollup_fingerprint
-from airflow.timetables.simple import AssetTriggeredTimetable,
PartitionedAssetTimetable
+from airflow.timetables.base import Timetable, compute_rollup_fingerprint
+from airflow.timetables.simple import AssetTriggeredTimetable
from airflow.triggers.base import TriggerEvent
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -1916,7 +1916,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
name: str,
uri: str,
apdr: AssetPartitionDagRun,
- timetable: PartitionedAssetTimetable,
+ timetable: Timetable,
actual_by_asset: dict[int, set[str]],
) -> bool:
"""
@@ -1953,6 +1953,68 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
)
return False
+ def _resolve_partition_date(
+ self,
+ *,
+ timetable: Timetable,
+ asset_infos: Iterable[tuple[str, str]],
+ partition_key: str,
+ dag_id: str,
+ ) -> datetime | None:
+ """
+ Return the temporal anchor (period-start datetime) for *partition_key*.
+
+ Resolves the temporal anchor (period-start datetime) for
*partition_key*
+ across *asset_infos* — the ``(name, uri)`` pairs of the upstream assets
+ that contributed to it. Each upstream mapper resolves the key via
+
:meth:`~airflow.partition_mappers.base.PartitionMapper.to_partition_date`:
+ temporal mappers decode the key, composite mappers delegate to their
+ child, and non-temporal mappers (e.g.
+ :class:`~airflow.partition_mappers.identity.IdentityMapper`) return
``None``.
+
+ A partitioned consumer has a single partition identity, so every
temporal
+ mapper feeding it must resolve the same key to the same instant.
Anchors
+ are compared by instant (timezone-aware), so equivalent moments
collapse
+ to one. When the temporal mappers agree, that anchor is returned; when
+ they disagree — a misconfiguration, e.g. assets mapping the same key
under
+ different timezones — ``partition_date`` is left unset and a warning is
+ logged rather than silently picking one by scan order. Returns
``None`` if
+ no mapper is temporal.
+
+ A failure in any mapper aborts the whole resolution and returns
``None``
+ (logged) — anchors accumulated from earlier mappers are discarded
rather
+ than used as a partial result, since a partial set could hide a
conflict.
+ A broken mapper must not crash the scheduler tick.
+ """
+ anchors: set[datetime] = set()
+ try:
+ for name, uri in asset_infos:
+ mapper = timetable.get_partition_mapper(name=name, uri=uri)
+ anchor = mapper.to_partition_date(partition_key)
+ if anchor is not None:
+ anchors.add(anchor)
+ except Exception:
+ self.log.exception(
+ "Failed to resolve partition_date for asset-triggered Dag run;
partition_date will be None.",
+ dag_id=dag_id,
+ partition_key=partition_key,
+ )
+ return None
+
+ if not anchors:
+ return None
+ if len(anchors) > 1:
+ self.log.warning(
+ "Upstream partition mappers resolved conflicting
partition_date values for the same "
+ "key; leaving partition_date unset. The consumer's assets
likely use inconsistent "
+ "partition mappers.",
+ dag_id=dag_id,
+ partition_key=partition_key,
+ partition_dates=sorted(anchor.isoformat() for anchor in
anchors),
+ )
+ return None
+ return anchors.pop()
+
def _create_dagruns_for_partitioned_asset_dags(self, session: Session) ->
set[str]:
"""
Create Dag runs for pending :class:`AssetPartitionDagRun` rows whose
partition is satisfied.
@@ -2119,8 +2181,6 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
for asset_id, (name, uri) in asset_info_per_apdr[apdr.id].items():
key = SerializedAssetUniqueKey(name=name, uri=uri)
if timetable.partitioned:
- if TYPE_CHECKING:
- assert isinstance(timetable, PartitionedAssetTimetable)
statuses[key] = self._resolve_asset_partition_status(
session=session,
asset_id=asset_id,
@@ -2137,6 +2197,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
partition_dag_ids.add(apdr.target_dag_id)
run_after = timezone.utcnow()
+ partition_date: datetime | None = None
+ if timetable.partitioned:
+ partition_date = self._resolve_partition_date(
+ timetable=timetable,
+ asset_infos=asset_info_per_apdr[apdr.id].values(),
+ partition_key=apdr.partition_key,
+ dag_id=apdr.target_dag_id,
+ )
dag_run = dag.create_dagrun(
run_id=DagRun.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED, logical_date=None,
run_after=run_after
@@ -2144,6 +2212,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
logical_date=None,
data_interval=None,
partition_key=apdr.partition_key,
+ partition_date=partition_date,
run_after=run_after,
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
diff --git a/airflow-core/src/airflow/partition_mappers/base.py
b/airflow-core/src/airflow/partition_mappers/base.py
index ba4a3ee658e..ee70842cdcf 100644
--- a/airflow-core/src/airflow/partition_mappers/base.py
+++ b/airflow-core/src/airflow/partition_mappers/base.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Any, ClassVar, TypeGuard
if TYPE_CHECKING:
from collections.abc import Iterable
+ from datetime import datetime
from airflow.partition_mappers.window import Window
@@ -92,6 +93,19 @@ class PartitionMapper(ABC):
"""
return decoded
+ def to_partition_date(self, downstream_key: str) -> datetime | None:
+ """
+ Return the temporal anchor (period-start datetime) for
*downstream_key*.
+
+ The scheduler stamps this on the asset-triggered Dag run as its
+ ``partition_date``. The base implementation returns ``None`` — a plain
+ partition key carries no temporal meaning. Temporal mappers override to
+ decode the key into its window anchor; composite mappers
+ (:class:`RollupMapper`,
:class:`~airflow.partition_mappers.temporal.FanOutMapper`)
+ delegate to whichever child owns the downstream key's identity.
+ """
+ return None
+
def serialize(self) -> dict[str, Any]:
return {}
@@ -138,6 +152,11 @@ class RollupMapper(PartitionMapper):
for expected_upstream in self.window.to_upstream(decoded)
)
+ def to_partition_date(self, downstream_key: str) -> datetime | None:
+ # The downstream key is in upstream_mapper's format (to_downstream
delegates
+ # to it), so the anchor is the upstream_mapper's to resolve.
+ return self.upstream_mapper.to_partition_date(downstream_key)
+
def serialize(self) -> dict[str, Any]:
from airflow.serialization.encoders import encode_partition_mapper,
encode_window
diff --git a/airflow-core/src/airflow/partition_mappers/chain.py
b/airflow-core/src/airflow/partition_mappers/chain.py
index 850517482cf..a4a2110c256 100644
--- a/airflow-core/src/airflow/partition_mappers/chain.py
+++ b/airflow-core/src/airflow/partition_mappers/chain.py
@@ -18,10 +18,13 @@
from __future__ import annotations
from collections.abc import Iterable
-from typing import Any
+from typing import TYPE_CHECKING, Any
from airflow.partition_mappers.base import PartitionMapper
+if TYPE_CHECKING:
+ from datetime import datetime
+
class ChainMapper(PartitionMapper):
"""Partition mapper that applies multiple mappers sequentially."""
@@ -60,6 +63,10 @@ class ChainMapper(PartitionMapper):
keys = next_keys
return keys[0] if len(keys) == 1 else keys
+ def to_partition_date(self, downstream_key: str) -> datetime | None:
+ # The last mapper in the chain formats the final downstream key, so it
owns the anchor.
+ return self.mappers[-1].to_partition_date(downstream_key)
+
def serialize(self) -> dict[str, Any]:
from airflow.serialization.encoders import encode_partition_mapper
diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py
b/airflow-core/src/airflow/partition_mappers/temporal.py
index 4fc1ed45ed3..622f0c6a556 100644
--- a/airflow-core/src/airflow/partition_mappers/temporal.py
+++ b/airflow-core/src/airflow/partition_mappers/temporal.py
@@ -198,6 +198,14 @@ class _BaseTemporalMapper(PartitionMapper):
"""
return datetime.strptime(downstream_key, self.output_format)
+ def to_partition_date(self, downstream_key: str) -> datetime:
+ anchor = self.normalize(self.decode_downstream(downstream_key))
+ # decode_downstream returns a naive datetime; localise it with the
mapper's
+ # own timezone, mirroring to_downstream, so the stored instant is
correct.
+ if anchor.tzinfo is None:
+ anchor = make_aware(anchor, self._timezone)
+ return anchor
+
def encode_upstream(self, dt: datetime) -> str:
"""
Format *dt* as an upstream partition key string.
@@ -522,6 +530,10 @@ class FanOutMapper(PartitionMapper):
coarse = self.upstream_mapper.decode_downstream(formatted)
return [_format_with(self.downstream_mapper, item) for item in
self.window.to_upstream(coarse)]
+ def to_partition_date(self, downstream_key: str) -> datetime | None:
+ # Fan-out keys are formatted by downstream_mapper, so it owns the
anchor.
+ return self.downstream_mapper.to_partition_date(downstream_key)
+
def serialize(self) -> dict[str, Any]:
from airflow.serialization.encoders import encode_partition_mapper,
encode_window
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index d08628a51fb..6b9c6e34309 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -93,8 +93,18 @@ from airflow.partition_mappers.base import (
PartitionMapper as CorePartitionMapper,
RollupMapper as CoreRollupMapper,
)
-from airflow.partition_mappers.temporal import StartOfHourMapper as
CoreStartOfHourMapper
-from airflow.partition_mappers.window import DayWindow as CoreDayWindow,
HourWindow as CoreHourWindow
+from airflow.partition_mappers.identity import IdentityMapper as
CoreIdentityMapper
+from airflow.partition_mappers.temporal import (
+ FanOutMapper as CoreFanOutMapper,
+ StartOfDayMapper as CoreStartOfDayMapper,
+ StartOfHourMapper as CoreStartOfHourMapper,
+ StartOfWeekMapper as CoreStartOfWeekMapper,
+)
+from airflow.partition_mappers.window import (
+ DayWindow as CoreDayWindow,
+ HourWindow as CoreHourWindow,
+ WeekWindow as CoreWeekWindow,
+)
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.file import FileDeleteTrigger
@@ -109,6 +119,7 @@ from airflow.sdk import (
IdentityMapper,
RollupMapper,
SegmentWindow,
+ StartOfDayMapper,
StartOfHourMapper,
task,
)
@@ -11712,3 +11723,155 @@ class TestReapStaleConnectionTests:
session.expire_all()
assert session.get(ConnectionTestRequest, ct_success.id).state ==
ConnectionTestState.SUCCESS
assert session.get(ConnectionTestRequest, ct_failed.id).state ==
ConnectionTestState.FAILED
+
+
[email protected]_serialized_dag
[email protected]("clear_asset_partition_rows")
[email protected](
+ ("sdk_mapper", "upstream_partition_key", "expected_downstream_key",
"expected_partition_date"),
+ [
+ (
+ StartOfDayMapper(),
+ "2024-03-15T10:30:00",
+ "2024-03-15",
+ datetime.datetime(2024, 3, 15, 0, 0, 0,
tzinfo=datetime.timezone.utc),
+ ),
+ (
+ RollupMapper(upstream_mapper=StartOfHourMapper(),
window=HourWindow()),
+ "2024-01-01T00:00:00",
+ "2024-01-01T00",
+ datetime.datetime(2024, 1, 1, 0, 0, 0,
tzinfo=datetime.timezone.utc),
+ ),
+ (
+ IdentityMapper(),
+ "key-abc",
+ "key-abc",
+ None,
+ ),
+ ],
+)
+def test_partition_date_populated_on_dagrun(
+ dag_maker: DagMaker,
+ session: Session,
+ sdk_mapper,
+ upstream_partition_key,
+ expected_downstream_key,
+ expected_partition_date,
+):
+ """DagRun.partition_date is set correctly for temporal /
rollup-of-temporal mappers."""
+ asset_1 = Asset(name="asset-pd-test")
+
+ with dag_maker(
+ dag_id="partition-date-consumer",
+ schedule=PartitionedAssetTimetable(
+ assets=asset_1,
+ default_partition_mapper=sdk_mapper,
+ ),
+ session=session,
+ ):
+ EmptyOperator(task_id="hi")
+ session.commit()
+
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+
+ apdr = _produce_and_register_asset_event(
+ dag_id="partition-date-producer",
+ asset=asset_1,
+ partition_key=upstream_partition_key,
+ session=session,
+ dag_maker=dag_maker,
+ expected_partition_key=expected_downstream_key,
+ )
+
+ # For the rollup case, send all 60 minute keys so the window is complete.
+ if isinstance(sdk_mapper, RollupMapper):
+ for minute in range(1, 60):
+ _produce_and_register_asset_event(
+ dag_id=f"partition-date-producer-{minute}",
+ asset=asset_1,
+ partition_key=f"2024-01-01T00:{minute:02d}:00",
+ session=session,
+ dag_maker=dag_maker,
+ expected_partition_key=expected_downstream_key,
+ )
+
+ runner._create_dagruns_for_partitioned_asset_dags(session=session)
+ session.refresh(apdr)
+
+ assert apdr.created_dag_run_id is not None
+ dag_run = session.scalar(select(DagRun).where(DagRun.id ==
apdr.created_dag_run_id))
+ assert dag_run is not None
+ assert dag_run.partition_date == expected_partition_date
+
+
+def _make_runner() -> SchedulerJobRunner:
+ return SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+
+
[email protected](
+ ("mappers", "partition_key", "expected"),
+ [
+ # Non-temporal mapper → no anchor.
+ pytest.param([CoreIdentityMapper()], "some-key", None,
id="non-temporal-none"),
+ # StartOfDayMapper(NY): "2024-03-15" → NY midnight = 04:00 UTC (EDT,
DST since 2024-03-10),
+ # localised with the mapper's own timezone rather than the global
default.
+ pytest.param(
+ [CoreStartOfDayMapper(timezone="America/New_York")],
+ "2024-03-15",
+ datetime.datetime(2024, 3, 15, 4, 0, 0,
tzinfo=datetime.timezone.utc),
+ id="non-utc-uses-mapper-timezone",
+ ),
+ # Key cannot be decoded by the mapper's format → caught → None (no
raise).
+ pytest.param([CoreStartOfDayMapper()], "not-a-date", None,
id="decode-failure-none"),
+ # FanOutMapper unwraps to its downstream_mapper (daily), which owns
the per-day key.
+ pytest.param(
+ [CoreFanOutMapper(upstream_mapper=CoreStartOfWeekMapper(),
window=CoreWeekWindow())],
+ "2024-01-16",
+ datetime.datetime(2024, 1, 16, 0, 0, 0,
tzinfo=datetime.timezone.utc),
+ id="fanout-uses-downstream-mapper",
+ ),
+ # Two temporal mappers resolving the same instant → that single anchor.
+ pytest.param(
+ [CoreStartOfDayMapper(), CoreStartOfDayMapper()],
+ "2024-03-15",
+ datetime.datetime(2024, 3, 15, 0, 0, 0,
tzinfo=datetime.timezone.utc),
+ id="agreeing-mappers-anchor",
+ ),
+ # Same key, UTC midnight (00:00Z) vs NY midnight (04:00Z) — distinct
instants → None.
+ pytest.param(
+ [CoreStartOfDayMapper(timezone="UTC"),
CoreStartOfDayMapper(timezone="America/New_York")],
+ "2024-03-15",
+ None,
+ id="conflicting-mappers-none",
+ ),
+ # Second mapper (hour format) raises on the day key → whole resolution
aborts → None
+ # (the first mapper's anchor is discarded; all-or-nothing).
+ pytest.param(
+ [CoreStartOfDayMapper(), CoreStartOfHourMapper()],
+ "2024-03-15",
+ None,
+ id="one-failing-mapper-aborts",
+ ),
+ ],
+)
+def test_resolve_partition_date(mappers, partition_key, expected):
+ """_resolve_partition_date over mapper compositions: temporal / fan-out /
agree / conflict / failure.
+
+ The mappers are consumed one per upstream asset, so ``asset_infos`` is
sized to ``mappers``.
+ """
+ runner = _make_runner()
+ timetable = mock.MagicMock()
+ timetable.get_partition_mapper.side_effect = mappers
+ asset_infos = [(f"asset-{i}-name", f"asset-{i}-uri") for i in
range(len(mappers))]
+
+ result = runner._resolve_partition_date(
+ timetable=timetable,
+ asset_infos=asset_infos,
+ partition_key=partition_key,
+ dag_id="test-dag",
+ )
+ assert result == expected
diff --git a/airflow-core/tests/unit/partition_mappers/test_chain.py
b/airflow-core/tests/unit/partition_mappers/test_chain.py
index 79f888af75f..528a176b1e0 100644
--- a/airflow-core/tests/unit/partition_mappers/test_chain.py
+++ b/airflow-core/tests/unit/partition_mappers/test_chain.py
@@ -17,6 +17,8 @@
from __future__ import annotations
+from datetime import datetime, timezone
+
import pytest
from airflow.partition_mappers.base import PartitionMapper
@@ -40,6 +42,22 @@ class TestChainMapper:
sm = ChainMapper(StartOfHourMapper(),
StartOfDayMapper(input_format="%Y-%m-%dT%H"))
assert sm.to_downstream("2024-01-15T10:30:00") == "2024-01-15"
+ @pytest.mark.parametrize(
+ ("chain", "downstream_key", "expected"),
+ [
+ # Last mapper temporal → it owns the final downstream key, so it
owns the anchor.
+ (
+ ChainMapper(IdentityMapper(), StartOfDayMapper()),
+ "2024-03-15",
+ datetime(2024, 3, 15, 0, 0, 0, tzinfo=timezone.utc),
+ ),
+ # Last mapper non-temporal → no anchor.
+ (ChainMapper(StartOfDayMapper(), IdentityMapper()), "anything",
None),
+ ],
+ )
+ def test_to_partition_date_delegates_to_last_mapper(self, chain,
downstream_key, expected):
+ assert chain.to_partition_date(downstream_key) == expected
+
def test_to_downstream_invalid_non_iterable_return(self):
sm = ChainMapper(IdentityMapper(), _InvalidReturnMapper())
with pytest.raises(TypeError, match="must return a string or iterable
of strings"):
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index 6ec84893506..ab2a79f52ce 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -16,11 +16,16 @@
# under the License.
from __future__ import annotations
+from datetime import datetime, timezone as dt_timezone
+
import pendulum
import pytest
from airflow import sdk
+from airflow.partition_mappers.base import RollupMapper
+from airflow.partition_mappers.identity import IdentityMapper
from airflow.partition_mappers.temporal import (
+ FanOutMapper,
StartOfDayMapper,
StartOfHourMapper,
StartOfMonthMapper,
@@ -30,6 +35,7 @@ from airflow.partition_mappers.temporal import (
_BaseTemporalMapper,
_compile_output_format_regex,
)
+from airflow.partition_mappers.window import HourWindow, WeekWindow
from airflow.serialization.decoders import decode_partition_mapper
from airflow.serialization.encoders import encode_partition_mapper
@@ -352,3 +358,92 @@ class TestOutputFormatValidation:
assert match is not None
assert match.group("first") == "foo"
assert match.group("last") == "bar"
+
+
+class TestTemporalMapperDecodeNormalizeRoundTrip:
+ """
+ ``normalize(decode_downstream(to_downstream(dt)))`` must equal the anchor
+ produced by ``normalize(dt)`` for every ``_BaseTemporalMapper`` subclass.
+
+ This is the "Step 2" semantic guarantee: ``decode_downstream`` reconstructs
+ the period-start, and ``normalize`` is idempotent, so the composed call
+ used in ``_resolve_partition_date`` must not drift from the direct anchor.
+ """
+
+ SAMPLE_DT = datetime(2024, 3, 15, 10, 42, 35)
+
+ @pytest.mark.parametrize(
+ "mapper",
+ [
+ StartOfHourMapper(),
+ StartOfDayMapper(),
+ StartOfWeekMapper(),
+ StartOfMonthMapper(),
+ StartOfQuarterMapper(),
+ StartOfYearMapper(),
+ ],
+ )
+ def test_round_trip_anchor_is_stable(self, mapper: _BaseTemporalMapper):
+ """``normalize(decode_downstream(to_downstream(dt)))`` ==
``normalize(dt)``."""
+ downstream_key =
mapper.to_downstream(self.SAMPLE_DT.strftime(mapper.input_format))
+ decoded = mapper.decode_downstream(downstream_key)
+ round_tripped = mapper.normalize(decoded)
+ direct_anchor = mapper.normalize(self.SAMPLE_DT)
+ assert round_tripped == direct_anchor, (
+ f"{type(mapper).__name__}: round-trip anchor {round_tripped!r} "
+ f"differs from direct anchor {direct_anchor!r}"
+ )
+
+ @pytest.mark.parametrize(
+ ("mapper", "expected_aware"),
+ [
+ # UTC mapper: UTC midnight stays at 00:00 UTC.
+ (
+ StartOfDayMapper(timezone="UTC"),
+ datetime(2024, 3, 15, 0, 0, 0, tzinfo=dt_timezone.utc),
+ ),
+ # Non-UTC mapper: NY midnight (EDT = UTC-4) → 04:00 UTC.
+ (
+ StartOfDayMapper(timezone="America/New_York"),
+ datetime(2024, 3, 15, 4, 0, 0, tzinfo=dt_timezone.utc),
+ ),
+ ],
+ )
+ def test_to_partition_date_uses_mapper_timezone(
+ self, mapper: _BaseTemporalMapper, expected_aware: datetime
+ ):
+ """``to_partition_date`` localises the anchor with
``mapper._timezone``, not the global default."""
+ downstream_key =
mapper.to_downstream(self.SAMPLE_DT.strftime(mapper.input_format))
+ aware = mapper.to_partition_date(downstream_key)
+ # Convert to UTC for a timezone-neutral comparison.
+ aware_utc = aware.astimezone(dt_timezone.utc)
+ assert aware_utc == expected_aware, (
+ f"{type(mapper).__name__} (tz={mapper._timezone}): "
+ f"to_partition_date produced {aware_utc!r}, expected
{expected_aware!r}"
+ )
+
+
+class TestToPartitionDateDelegation:
+ """Composite mappers delegate ``to_partition_date`` to the child that owns
the downstream key."""
+
+ @pytest.mark.parametrize(
+ ("mapper", "downstream_key", "expected"),
+ [
+ # RollupMapper (fan-in): downstream key is the upstream_mapper's
format → it owns it.
+ (
+ RollupMapper(upstream_mapper=StartOfHourMapper(),
window=HourWindow()),
+ "2024-01-01T00",
+ datetime(2024, 1, 1, 0, 0, 0, tzinfo=dt_timezone.utc),
+ ),
+ # FanOutMapper (fan-out): downstream keys are the
downstream_mapper's format → it owns them.
+ (
+ FanOutMapper(upstream_mapper=StartOfWeekMapper(),
window=WeekWindow()),
+ "2024-01-16",
+ datetime(2024, 1, 16, 0, 0, 0, tzinfo=dt_timezone.utc),
+ ),
+ # Non-temporal mapper → no anchor.
+ (IdentityMapper(), "anything", None),
+ ],
+ )
+ def test_to_partition_date(self, mapper, downstream_key, expected):
+ assert mapper.to_partition_date(downstream_key) == expected