This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cfd9943d68 Support timezone in SDK temporal partition mappers (#67164)
9cfd9943d68 is described below

commit 9cfd9943d68afa288dead1780a1ca48883b9aa3d
Author: Wei Lee <[email protected]>
AuthorDate: Fri May 22 09:25:20 2026 +0800

    Support timezone in SDK temporal partition mappers (#67164)
---
 airflow-core/newsfragments/67164.significant.rst   |  34 ++++++
 airflow-core/src/airflow/serialization/encoders.py |   3 +-
 .../tests/unit/partition_mappers/test_temporal.py  | 136 ++++++++++++++++++++-
 .../unit/serialization/test_serialized_objects.py  |  79 +++++++++---
 .../sdk/definitions/partition_mappers/temporal.py  |  13 ++
 5 files changed, 243 insertions(+), 22 deletions(-)

diff --git a/airflow-core/newsfragments/67164.significant.rst 
b/airflow-core/newsfragments/67164.significant.rst
new file mode 100644
index 00000000000..283ac07d7af
--- /dev/null
+++ b/airflow-core/newsfragments/67164.significant.rst
@@ -0,0 +1,34 @@
+SDK temporal partition mappers: ``timezone`` kwarg and keyword-only constructor
+
+``StartOfHourMapper``, ``StartOfDayMapper``, ``StartOfWeekMapper``,
+``StartOfMonthMapper``, ``StartOfQuarterMapper``, and ``StartOfYearMapper``
+(imported from ``airflow.sdk``) now accept a ``timezone`` keyword argument,
+matching the core ``_BaseTemporalMapper`` signature. The constructor is now
+keyword-only.
+
+**Behaviour changes:**
+
+- ``input_format`` and ``output_format`` are no longer accepted positionally.
+  Callers that relied on ``StartOfDayMapper("%Y-%m-%dT%H:%M:%S")`` (valid in
+  ``task-sdk`` 1.2.1) must switch to
+  ``StartOfDayMapper(input_format="%Y-%m-%dT%H:%M:%S")``.
+- A string ``timezone`` is now resolved via ``parse_timezone`` at
+  construction, so unknown names raise 
``pendulum.tz.exceptions.InvalidTimezone``
+  immediately instead of being stored verbatim and failing later (or, in some
+  paths, being silently dropped during serialization).
+
+**Migration:**
+
+- Update any ``StartOf*Mapper(...)`` call sites to pass ``input_format`` and
+  ``output_format`` by name.
+
+* Types of change
+
+  * [ ] Dag changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [x] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [x] Code interface changes
diff --git a/airflow-core/src/airflow/serialization/encoders.py 
b/airflow-core/src/airflow/serialization/encoders.py
index 741347943ca..e97dcff2623 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -45,6 +45,7 @@ from airflow.sdk import (
     PartitionMapper,
     ProductMapper,
     StartOfDayMapper,
+    StartOfHourMapper,
     StartOfMonthMapper,
     StartOfQuarterMapper,
     StartOfWeekMapper,
@@ -52,7 +53,6 @@ from airflow.sdk import (
 )
 from airflow.sdk.bases.timetable import BaseTimetable
 from airflow.sdk.definitions.asset import AssetRef
-from airflow.sdk.definitions.partition_mappers.temporal import 
StartOfHourMapper
 from airflow.sdk.definitions.timetables.assets import (
     AssetTriggeredTimetable,
     PartitionAtRuntime,
@@ -463,6 +463,7 @@ class _Serializer:
         | StartOfYearMapper,
     ) -> dict[str, Any]:
         return {
+            "timezone": encode_timezone(partition_mapper._timezone),
             "input_format": partition_mapper.input_format,
             "output_format": partition_mapper.output_format,
         }
diff --git a/airflow-core/tests/unit/partition_mappers/test_temporal.py 
b/airflow-core/tests/unit/partition_mappers/test_temporal.py
index c54ca8a51f9..0eb10991f31 100644
--- a/airflow-core/tests/unit/partition_mappers/test_temporal.py
+++ b/airflow-core/tests/unit/partition_mappers/test_temporal.py
@@ -16,8 +16,10 @@
 # under the License.
 from __future__ import annotations
 
+import pendulum
 import pytest
 
+from airflow import sdk
 from airflow.partition_mappers.temporal import (
     StartOfDayMapper,
     StartOfHourMapper,
@@ -27,6 +29,8 @@ from airflow.partition_mappers.temporal import (
     StartOfYearMapper,
     _BaseTemporalMapper,
 )
+from airflow.serialization.decoders import decode_partition_mapper
+from airflow.serialization.encoders import encode_partition_mapper
 
 
 class TestTemporalMappers:
@@ -57,7 +61,7 @@ class TestTemporalMappers:
         ],
     )
     @pytest.mark.parametrize(
-        ("mapper_cls", "expected_outut_format"),
+        ("mapper_cls", "expected_output_format"),
         [
             (StartOfHourMapper, "%Y-%m-%dT%H"),
             (StartOfDayMapper, "%Y-%m-%d"),
@@ -70,7 +74,7 @@ class TestTemporalMappers:
     def test_serialize(
         self,
         mapper_cls: type[_BaseTemporalMapper],
-        expected_outut_format: str,
+        expected_output_format: str,
         timezone: str | None,
         expected_timezone: str,
     ):
@@ -78,9 +82,16 @@ class TestTemporalMappers:
         assert pm.serialize() == {
             "timezone": expected_timezone,
             "input_format": "%Y-%m-%dT%H:%M:%S",
-            "output_format": expected_outut_format,
+            "output_format": expected_output_format,
         }
 
+    @pytest.mark.parametrize(
+        ("timezone_payload", "expected_tz_name"),
+        [
+            ("UTC", "UTC"),
+            ("Asia/Taipei", "Asia/Taipei"),
+        ],
+    )
     @pytest.mark.parametrize(
         "mapper_cls",
         [
@@ -92,10 +103,10 @@ class TestTemporalMappers:
             StartOfYearMapper,
         ],
     )
-    def test_deserialize(self, mapper_cls):
+    def test_deserialize(self, mapper_cls, timezone_payload, expected_tz_name):
         pm = mapper_cls.deserialize(
             {
-                "timezone": "UTC",
+                "timezone": timezone_payload,
                 "input_format": "%Y-%m-%dT%H:%M:%S",
                 "output_format": "customized-format",
             }
@@ -103,6 +114,31 @@ class TestTemporalMappers:
         assert isinstance(pm, mapper_cls)
         assert pm.input_format == "%Y-%m-%dT%H:%M:%S"
         assert pm.output_format == "customized-format"
+        assert pm._timezone.name == expected_tz_name
+
+    @pytest.mark.parametrize(
+        "mapper_cls",
+        [
+            StartOfHourMapper,
+            StartOfDayMapper,
+            StartOfWeekMapper,
+            StartOfMonthMapper,
+            StartOfQuarterMapper,
+            StartOfYearMapper,
+        ],
+    )
+    def test_deserialize_legacy_payload_without_timezone(self, mapper_cls):
+        """Payloads written by ``task-sdk`` 1.2.1 omit ``timezone`` from the
+        SDK-mapper wire format; the core decoder must default it to UTC so
+        those serialized Dags can still be loaded."""
+        pm = mapper_cls.deserialize(
+            {
+                "input_format": "%Y-%m-%dT%H:%M:%S",
+                "output_format": "customized-format",
+            }
+        )
+        assert isinstance(pm, mapper_cls)
+        assert pm._timezone.name == "UTC"
 
     def test_to_downstream_timezone_aware(self):
         """Input is interpreted as local time in the given timezone."""
@@ -125,3 +161,93 @@ class TestTemporalMappers:
         # 2026-02-11T06:00:00+00:00 UTC == 2026-02-11T01:00:00-05:00 New York
         # → start-of-day in New York is 2026-02-11
         assert pm.to_downstream("2026-02-11T06:00:00+0000") == "2026-02-11"
+
+
+class TestSdkTemporalMappersTimezoneSerialization:
+    """
+    SDK-side temporal mappers (``airflow.sdk.StartOf*Mapper``) must accept a
+    ``timezone`` kwarg in their constructor and round-trip it through the
+    encoder/decoder path. Previously the SDK class signature omitted timezone
+    entirely and the dispatch handler in ``encoders._Serializer`` dropped it,
+    so a Dag using ``StartOfDayMapper(timezone="Asia/Taipei")`` silently fell
+    back to UTC after serialization.
+    """
+
+    @pytest.mark.parametrize("timezone", ["UTC", "America/New_York", 
"Asia/Taipei"])
+    @pytest.mark.parametrize(
+        "sdk_mapper_name",
+        [
+            "StartOfHourMapper",
+            "StartOfDayMapper",
+            "StartOfWeekMapper",
+            "StartOfMonthMapper",
+            "StartOfQuarterMapper",
+            "StartOfYearMapper",
+        ],
+    )
+    def test_sdk_constructor_accepts_timezone(self, sdk_mapper_name, timezone):
+        sdk_cls = getattr(sdk, sdk_mapper_name)
+        mapper = sdk_cls(timezone=timezone)
+        assert mapper._timezone.name == timezone
+
+    @pytest.mark.parametrize("timezone", ["UTC", "America/New_York", 
"Asia/Taipei"])
+    @pytest.mark.parametrize(
+        ("sdk_mapper_name", "core_cls"),
+        [
+            ("StartOfHourMapper", StartOfHourMapper),
+            ("StartOfDayMapper", StartOfDayMapper),
+            ("StartOfWeekMapper", StartOfWeekMapper),
+            ("StartOfMonthMapper", StartOfMonthMapper),
+            ("StartOfQuarterMapper", StartOfQuarterMapper),
+            ("StartOfYearMapper", StartOfYearMapper),
+        ],
+    )
+    def test_encode_decode_round_trip_preserves_timezone(self, 
sdk_mapper_name, core_cls, timezone):
+        sdk_cls = getattr(sdk, sdk_mapper_name)
+        original = sdk_cls(timezone=timezone)
+        restored = decode_partition_mapper(encode_partition_mapper(original))
+
+        # decode resolves to the Core class via BUILTIN_PARTITION_MAPPERS.
+        assert isinstance(restored, core_cls)
+        assert restored._timezone.name == timezone
+
+    @pytest.mark.parametrize(
+        "sdk_mapper_name",
+        [
+            "StartOfHourMapper",
+            "StartOfDayMapper",
+            "StartOfWeekMapper",
+            "StartOfMonthMapper",
+            "StartOfQuarterMapper",
+            "StartOfYearMapper",
+        ],
+    )
+    def test_encode_decode_round_trip_accepts_pendulum_tzinfo(self, 
sdk_mapper_name):
+        """The SDK ``timezone`` kwarg is advertised as ``str | Timezone | 
FixedTimezone``;
+        a pendulum tz object must survive the encoder pipeline 
(encode_timezone handles
+        the object branch) and land on the core class with the matching IANA 
name."""
+        sdk_cls = getattr(sdk, sdk_mapper_name)
+        original = sdk_cls(timezone=pendulum.timezone("Asia/Taipei"))
+        restored = decode_partition_mapper(encode_partition_mapper(original))
+
+        assert restored._timezone.name == "Asia/Taipei"
+
+    @pytest.mark.parametrize(
+        "sdk_mapper_name",
+        [
+            "StartOfHourMapper",
+            "StartOfDayMapper",
+            "StartOfWeekMapper",
+            "StartOfMonthMapper",
+            "StartOfQuarterMapper",
+            "StartOfYearMapper",
+        ],
+    )
+    def test_sdk_constructor_invalid_timezone_raises_eagerly(self, 
sdk_mapper_name):
+        """Passing an unknown timezone string must raise at construction time
+        (via ``parse_timezone``), not silently fall back to UTC or fail 
later."""
+        from pendulum.tz.exceptions import InvalidTimezone
+
+        sdk_cls = getattr(sdk, sdk_mapper_name)
+        with pytest.raises(InvalidTimezone):
+            sdk_cls(timezone="Not/A/Real/Zone")
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 05463439e1f..1767209ca73 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -808,51 +808,95 @@ def test_encode_timezone():
 
 
 @pytest.mark.parametrize(
-    ("cls", "args", "encode_type", "encode_var"),
+    ("cls", "kwargs", "encode_type", "encode_var"),
     [
-        (IdentityMapper, [], 
"airflow.partition_mappers.identity.IdentityMapper", {}),
+        (IdentityMapper, {}, 
"airflow.partition_mappers.identity.IdentityMapper", {}),
         (
             StartOfHourMapper,
-            [],
+            {},
             "airflow.partition_mappers.temporal.StartOfHourMapper",
-            {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": 
"%Y-%m-%dT%H"},
+            {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-%m-%dT%H"},
+        ),
+        (
+            StartOfHourMapper,
+            {"timezone": "Asia/Taipei"},
+            "airflow.partition_mappers.temporal.StartOfHourMapper",
+            {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-%m-%dT%H"},
+        ),
+        (
+            StartOfDayMapper,
+            {},
+            "airflow.partition_mappers.temporal.StartOfDayMapper",
+            {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-%m-%d"},
         ),
         (
             StartOfDayMapper,
-            [],
+            {"timezone": "Asia/Taipei"},
             "airflow.partition_mappers.temporal.StartOfDayMapper",
-            {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d"},
+            {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-%m-%d"},
         ),
         (
             StartOfWeekMapper,
-            [],
+            {},
             "airflow.partition_mappers.temporal.StartOfWeekMapper",
-            {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d 
(W%V)"},
+            {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-%m-%d (W%V)"},
+        ),
+        (
+            StartOfWeekMapper,
+            {"timezone": "Asia/Taipei"},
+            "airflow.partition_mappers.temporal.StartOfWeekMapper",
+            {
+                "timezone": "Asia/Taipei",
+                "input_format": "%Y-%m-%dT%H:%M:%S",
+                "output_format": "%Y-%m-%d (W%V)",
+            },
         ),
         (
             StartOfMonthMapper,
-            [],
+            {},
             "airflow.partition_mappers.temporal.StartOfMonthMapper",
-            {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m"},
+            {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-%m"},
+        ),
+        (
+            StartOfMonthMapper,
+            {"timezone": "Asia/Taipei"},
+            "airflow.partition_mappers.temporal.StartOfMonthMapper",
+            {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-%m"},
+        ),
+        (
+            StartOfQuarterMapper,
+            {},
+            "airflow.partition_mappers.temporal.StartOfQuarterMapper",
+            {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y-Q{quarter}"},
         ),
         (
             StartOfQuarterMapper,
-            [],
+            {"timezone": "Asia/Taipei"},
             "airflow.partition_mappers.temporal.StartOfQuarterMapper",
-            {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": 
"%Y-Q{quarter}"},
+            {
+                "timezone": "Asia/Taipei",
+                "input_format": "%Y-%m-%dT%H:%M:%S",
+                "output_format": "%Y-Q{quarter}",
+            },
+        ),
+        (
+            StartOfYearMapper,
+            {},
+            "airflow.partition_mappers.temporal.StartOfYearMapper",
+            {"timezone": "UTC", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y"},
         ),
         (
             StartOfYearMapper,
-            [],
+            {"timezone": "Asia/Taipei"},
             "airflow.partition_mappers.temporal.StartOfYearMapper",
-            {"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y"},
+            {"timezone": "Asia/Taipei", "input_format": "%Y-%m-%dT%H:%M:%S", 
"output_format": "%Y"},
         ),
     ],
 )
-def test_encode_partition_mapper(cls, args, encode_type, encode_var):
+def test_encode_partition_mapper(cls, kwargs, encode_type, encode_var):
     from airflow.serialization.encoders import encode_partition_mapper
 
-    partition_mapper = cls(*args)
+    partition_mapper = cls(**kwargs)
     assert encode_partition_mapper(partition_mapper) == {
         Encoding.TYPE: encode_type,
         Encoding.VAR: encode_var,
@@ -913,6 +957,7 @@ def test_encode_product_mapper():
                 {
                     Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfHourMapper",
                     Encoding.VAR: {
+                        "timezone": "UTC",
                         "input_format": "%Y-%m-%dT%H:%M:%S",
                         "output_format": "%Y-%m-%dT%H",
                     },
@@ -951,6 +996,7 @@ def test_encode_chain_mapper():
                 {
                     Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfHourMapper",
                     Encoding.VAR: {
+                        "timezone": "UTC",
                         "input_format": "%Y-%m-%dT%H:%M:%S",
                         "output_format": "%Y-%m-%dT%H",
                     },
@@ -958,6 +1004,7 @@ def test_encode_chain_mapper():
                 {
                     Encoding.TYPE: 
"airflow.partition_mappers.temporal.StartOfDayMapper",
                     Encoding.VAR: {
+                        "timezone": "UTC",
                         "input_format": "%Y-%m-%dT%H",
                         "output_format": "%Y-%m-%d",
                     },
diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py 
b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
index 60ca18f5044..afcbcac6beb 100644
--- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
+++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py
@@ -16,19 +16,32 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
+from airflow.sdk._shared.timezones.timezone import parse_timezone
 from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
 
+if TYPE_CHECKING:
+    from pendulum import FixedTimezone, Timezone
+
 
 class _BaseTemporalMapper(PartitionMapper):
+    """Base class for Temporal Partition Mappers."""
+
     default_output_format: str
 
     def __init__(
         self,
+        *,
+        timezone: str | Timezone | FixedTimezone = "UTC",
         input_format: str = "%Y-%m-%dT%H:%M:%S",
         output_format: str | None = None,
     ) -> None:
         self.input_format = input_format
         self.output_format = output_format or self.default_output_format
+        if isinstance(timezone, str):
+            timezone = parse_timezone(timezone)
+        self._timezone = timezone
 
 
 class StartOfHourMapper(_BaseTemporalMapper):

Reply via email to