This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e4817fd1f46 Remove custom serializers from airflow-core as its in task
sdk now (#59631)
e4817fd1f46 is described below
commit e4817fd1f469cf08c3e31ae8e9683e202c5d6a40
Author: Amogh Desai <[email protected]>
AuthorDate: Sat Dec 20 21:39:26 2025 +0530
Remove custom serializers from airflow-core as its in task sdk now (#59631)
---
.../airflow/serialization/serializers/__init__.py | 23 +++++
.../airflow/serialization/serializers/bignum.py | 59 ------------
.../airflow/serialization/serializers/builtin.py | 59 ------------
.../airflow/serialization/serializers/datetime.py | 101 --------------------
.../airflow/serialization/serializers/deltalake.py | 79 ----------------
.../airflow/serialization/serializers/iceberg.py | 76 ---------------
.../serialization/serializers/kubernetes.py | 64 -------------
.../src/airflow/serialization/serializers/numpy.py | 88 ------------------
.../airflow/serialization/serializers/pandas.py | 75 ---------------
.../airflow/serialization/serializers/pydantic.py | 75 ---------------
.../airflow/serialization/serializers/timezone.py | 102 ---------------------
11 files changed, 23 insertions(+), 778 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/serializers/__init__.py
b/airflow-core/src/airflow/serialization/serializers/__init__.py
index 217e5db9607..d10a53a480b 100644
--- a/airflow-core/src/airflow/serialization/serializers/__init__.py
+++ b/airflow-core/src/airflow/serialization/serializers/__init__.py
@@ -15,3 +15,26 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+"""Deprecated serializers module - moved to airflow.sdk.serde.serializers."""
+
+from __future__ import annotations
+
+import importlib
+import warnings
+
+from airflow.utils.deprecation_tools import DeprecatedImportWarning
+
+
+def __getattr__(name: str):
+ """Redirect all submodule imports to airflow.sdk.serde.serializers with
deprecation warning."""
+ warnings.warn(
+ f"`airflow.serialization.serializers.{name}` is deprecated. "
+ f"Please use `airflow.sdk.serde.serializers.{name}` instead.",
+ DeprecatedImportWarning,
+ stacklevel=2,
+ )
+ try:
+ return importlib.import_module(f"airflow.sdk.serde.serializers.{name}")
+ except ModuleNotFoundError:
+ raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
diff --git a/airflow-core/src/airflow/serialization/serializers/bignum.py
b/airflow-core/src/airflow/serialization/serializers/bignum.py
deleted file mode 100644
index 4dfa7dd2397..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/bignum.py
+++ /dev/null
@@ -1,59 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow._shared.module_loading import qualname
-
-if TYPE_CHECKING:
- import decimal
-
- from airflow.serialization.serde import U
-
-
-serializers = ["decimal.Decimal"]
-deserializers = serializers
-
-__version__ = 1
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- from decimal import Decimal
-
- if not isinstance(o, Decimal):
- return "", "", 0, False
- name = qualname(o)
- _, _, exponent = o.as_tuple()
- if isinstance(exponent, int) and exponent >= 0: # No digits after the
decimal point.
- return int(o), name, __version__, True
- # Technically lossy due to floating point errors, but the best we
- # can do without implementing a custom encode function.
- return float(o), name, __version__, True
-
-
-def deserialize(cls: type, version: int, data: object) -> decimal.Decimal:
- from decimal import Decimal
-
- if version > __version__:
- raise TypeError(f"serialized {version} of {qualname(cls)} >
{__version__}")
-
- if cls is not Decimal:
- raise TypeError(f"do not know how to deserialize {qualname(cls)}")
-
- return Decimal(str(data))
diff --git a/airflow-core/src/airflow/serialization/serializers/builtin.py
b/airflow-core/src/airflow/serialization/serializers/builtin.py
deleted file mode 100644
index 0edb56cb335..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/builtin.py
+++ /dev/null
@@ -1,59 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, cast
-
-from airflow._shared.module_loading import qualname
-
-if TYPE_CHECKING:
- from airflow.serialization.serde import U
-
-__version__ = 1
-
-serializers = ["builtins.frozenset", "builtins.set", "builtins.tuple"]
-deserializers = serializers
-stringifiers = serializers
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- return list(cast("list", o)), qualname(o), __version__, True
-
-
-def deserialize(cls: type, version: int, data: list) -> tuple | set |
frozenset:
- if version > __version__:
- raise TypeError(f"serialized version {version} is newer than class
version {__version__}")
-
- if cls is tuple:
- return tuple(data)
-
- if cls is set:
- return set(data)
-
- if cls is frozenset:
- return frozenset(data)
-
- raise TypeError(f"do not know how to deserialize {qualname(cls)}")
-
-
-def stringify(classname: str, version: int, data: list) -> str:
- if classname not in stringifiers:
- raise TypeError(f"do not know how to stringify {classname}")
-
- s = ",".join(str(d) for d in data)
- return f"({s})"
diff --git a/airflow-core/src/airflow/serialization/serializers/datetime.py
b/airflow-core/src/airflow/serialization/serializers/datetime.py
deleted file mode 100644
index 9d455d010b2..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/datetime.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow._shared.module_loading import qualname
-from airflow._shared.timezones.timezone import parse_timezone
-from airflow.serialization.serializers.timezone import (
- deserialize as deserialize_timezone,
- serialize as serialize_timezone,
-)
-
-if TYPE_CHECKING:
- import datetime
-
- from airflow.serialization.serde import U
-
-__version__ = 2
-
-serializers = ["datetime.date", "datetime.datetime", "datetime.timedelta",
"pendulum.datetime.DateTime"]
-deserializers = serializers
-
-TIMESTAMP = "timestamp"
-TIMEZONE = "tz"
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- from datetime import date, datetime, timedelta
-
- if isinstance(o, datetime):
- qn = qualname(o)
-
- tz = serialize_timezone(o.tzinfo) if o.tzinfo else None
-
- return {TIMESTAMP: o.timestamp(), TIMEZONE: tz}, qn, __version__, True
-
- if isinstance(o, date):
- return o.isoformat(), qualname(o), __version__, True
-
- if isinstance(o, timedelta):
- return o.total_seconds(), qualname(o), __version__, True
-
- return "", "", 0, False
-
-
-def deserialize(cls: type, version: int, data: dict | str) -> datetime.date |
datetime.timedelta:
- import datetime
-
- from pendulum import DateTime
-
- tz: datetime.tzinfo | None = None
- if isinstance(data, dict) and TIMEZONE in data:
- if version == 1:
- # try to deserialize unsupported timezones
- timezone_mapping = {
- "EDT": parse_timezone(-4 * 3600),
- "CDT": parse_timezone(-5 * 3600),
- "MDT": parse_timezone(-6 * 3600),
- "PDT": parse_timezone(-7 * 3600),
- "CEST": parse_timezone("CET"),
- }
- if data[TIMEZONE] in timezone_mapping:
- tz = timezone_mapping[data[TIMEZONE]]
- else:
- tz = parse_timezone(data[TIMEZONE])
- else:
- tz = (
- deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2],
data[TIMEZONE][0])
- if data[TIMEZONE]
- else None
- )
-
- if cls is datetime.datetime and isinstance(data, dict):
- return datetime.datetime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
-
- if cls is DateTime and isinstance(data, dict):
- return DateTime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
-
- if cls is datetime.timedelta and isinstance(data, str | float):
- return datetime.timedelta(seconds=float(data))
-
- if cls is datetime.date and isinstance(data, str):
- return datetime.date.fromisoformat(data)
-
- raise TypeError(f"unknown date/time format {qualname(cls)}")
diff --git a/airflow-core/src/airflow/serialization/serializers/deltalake.py
b/airflow-core/src/airflow/serialization/serializers/deltalake.py
deleted file mode 100644
index 86cdd9c4cf5..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/deltalake.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow._shared.module_loading import qualname
-
-serializers = ["deltalake.table.DeltaTable"]
-deserializers = serializers
-stringifiers = serializers
-
-if TYPE_CHECKING:
- from airflow.serialization.serde import U
-
-__version__ = 1
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- from deltalake.table import DeltaTable
-
- if not isinstance(o, DeltaTable):
- return "", "", 0, False
-
- from airflow.models.crypto import get_fernet
-
- # we encrypt the information here until we have as part of the
- # storage options can have sensitive information
- fernet = get_fernet()
- properties: dict = {}
- for k, v in o._storage_options.items() if o._storage_options else {}:
- properties[k] = fernet.encrypt(v.encode("utf-8")).decode("utf-8")
-
- data = {
- "table_uri": o.table_uri,
- "version": o.version(),
- "storage_options": properties,
- }
-
- return data, qualname(o), __version__, True
-
-
-def deserialize(cls: type, version: int, data: dict):
- from deltalake.table import DeltaTable
-
- from airflow.models.crypto import get_fernet
-
- if version > __version__:
- raise TypeError("serialized version is newer than class version")
-
- if cls is DeltaTable:
- fernet = get_fernet()
- properties = {}
- for k, v in data["storage_options"].items():
- properties[k] = fernet.decrypt(v.encode("utf-8")).decode("utf-8")
-
- if len(properties) == 0:
- storage_options = None
- else:
- storage_options = properties
-
- return DeltaTable(data["table_uri"], version=data["version"],
storage_options=storage_options)
-
- raise TypeError(f"do not know how to deserialize {qualname(cls)}")
diff --git a/airflow-core/src/airflow/serialization/serializers/iceberg.py
b/airflow-core/src/airflow/serialization/serializers/iceberg.py
deleted file mode 100644
index d14d5a404b4..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/iceberg.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow._shared.module_loading import qualname
-
-serializers = ["pyiceberg.table.Table"]
-deserializers = serializers
-stringifiers = serializers
-
-if TYPE_CHECKING:
- from airflow.serialization.serde import U
-
-__version__ = 1
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- from pyiceberg.table import Table
-
- if not isinstance(o, Table):
- return "", "", 0, False
-
- from airflow.models.crypto import get_fernet
-
- # we encrypt the catalog information here until we have
- # global catalog management in airflow and the properties
- # can have sensitive information
- fernet = get_fernet()
- properties = {}
- for k, v in o.catalog.properties.items():
- properties[k] = fernet.encrypt(v.encode("utf-8")).decode("utf-8")
-
- data = {
- "identifier": o._identifier,
- "catalog_properties": properties,
- }
-
- return data, qualname(o), __version__, True
-
-
-def deserialize(cls: type, version: int, data: dict):
- from pyiceberg.catalog import load_catalog
- from pyiceberg.table import Table
-
- from airflow.models.crypto import get_fernet
-
- if version > __version__:
- raise TypeError("serialized version is newer than class version")
-
- if cls is Table:
- fernet = get_fernet()
- properties = {}
- for k, v in data["catalog_properties"].items():
- properties[k] = fernet.decrypt(v.encode("utf-8")).decode("utf-8")
-
- catalog = load_catalog(data["identifier"][0], **properties)
- return catalog.load_table((data["identifier"][1],
data["identifier"][2]))
-
- raise TypeError(f"do not know how to deserialize {qualname(cls)}")
diff --git a/airflow-core/src/airflow/serialization/serializers/kubernetes.py
b/airflow-core/src/airflow/serialization/serializers/kubernetes.py
deleted file mode 100644
index 5a6c2bbeb30..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/kubernetes.py
+++ /dev/null
@@ -1,64 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import logging
-from typing import TYPE_CHECKING
-
-from airflow._shared.module_loading import qualname
-
-# lazy loading for performance reasons
-serializers = [
- "kubernetes.client.models.v1_resource_requirements.V1ResourceRequirements",
- "kubernetes.client.models.v1_pod.V1Pod",
-]
-
-if TYPE_CHECKING:
- from airflow.serialization.serde import U
-
-__version__ = 1
-
-deserializers: list[type[object]] = []
-log = logging.getLogger(__name__)
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- from kubernetes.client import models as k8s
-
- if not k8s:
- return "", "", 0, False
-
- if isinstance(o, (k8s.V1Pod, k8s.V1ResourceRequirements)):
- from airflow.providers.cncf.kubernetes.pod_generator import
PodGenerator
-
- # We're running this in an except block, so we don't want it to fail
- # under any circumstances, e.g. accessing a non-existing attribute.
- def safe_get_name(pod):
- try:
- return pod.metadata.name
- except Exception:
- return None
-
- try:
- return PodGenerator.serialize_pod(o), qualname(o), __version__,
True
- except Exception:
- log.warning("Serialization failed for pod %s", safe_get_name(o))
- log.debug("traceback for serialization error", exc_info=True)
- return "", "", 0, False
-
- return "", "", 0, False
diff --git a/airflow-core/src/airflow/serialization/serializers/numpy.py
b/airflow-core/src/airflow/serialization/serializers/numpy.py
deleted file mode 100644
index 300f47f6677..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/numpy.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Any
-
-from airflow._shared.module_loading import qualname
-
-# lazy loading for performance reasons
-serializers = [
- "numpy.int8",
- "numpy.int16",
- "numpy.int32",
- "numpy.int64",
- "numpy.uint8",
- "numpy.uint16",
- "numpy.uint32",
- "numpy.uint64",
- "numpy.float64",
- "numpy.float32",
- "numpy.float16",
- "numpy.complex128",
- "numpy.complex64",
- "numpy.bool",
- "numpy.bool_",
-]
-
-if TYPE_CHECKING:
- from airflow.serialization.serde import U
-
-deserializers = serializers
-
-__version__ = 1
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- import numpy as np
-
- if np is None:
- return "", "", 0, False
-
- name = qualname(o)
- metadata = (name, __version__, True)
- if isinstance(
- o,
- np.int_
- | np.intc
- | np.intp
- | np.int8
- | np.int16
- | np.int32
- | np.int64
- | np.uint8
- | np.uint16
- | np.uint32
- | np.uint64,
- ):
- return int(o), *metadata
-
- if hasattr(np, "bool") and isinstance(o, np.bool) or isinstance(o,
np.bool_):
- return bool(o), *metadata
-
- if isinstance(o, (np.float16, np.float32, np.float64, np.complex64,
np.complex128)):
- return float(o), *metadata
-
- return "", "", 0, False
-
-
-def deserialize(cls: type, version: int, data: str) -> Any:
- if version > __version__:
- raise TypeError("serialized version is newer than class version")
-
- return cls(data)
diff --git a/airflow-core/src/airflow/serialization/serializers/pandas.py
b/airflow-core/src/airflow/serialization/serializers/pandas.py
deleted file mode 100644
index 422dcec497d..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/pandas.py
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow._shared.module_loading import qualname
-
-# lazy loading for performance reasons
-serializers = [
- "pandas.core.frame.DataFrame",
-]
-deserializers = serializers
-
-if TYPE_CHECKING:
- import pandas as pd
-
- from airflow.serialization.serde import U
-
-__version__ = 1
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- import pandas as pd
- import pyarrow as pa
- from pyarrow import parquet as pq
-
- if not isinstance(o, pd.DataFrame):
- return "", "", 0, False
-
- # for now, we *always* serialize into in memory
- # until we have a generic backend that manages
- # sinks
- table = pa.Table.from_pandas(o)
- buf = pa.BufferOutputStream()
- pq.write_table(table, buf, compression="snappy")
-
- return buf.getvalue().hex().decode("utf-8"), qualname(o), __version__, True
-
-
-def deserialize(cls: type, version: int, data: object) -> pd.DataFrame:
- if version > __version__:
- raise TypeError(f"serialized {version} of {qualname(cls)} >
{__version__}")
-
- import pandas as pd
-
- if cls is not pd.DataFrame:
- raise TypeError(f"do not know how to deserialize {qualname(cls)}")
-
- if not isinstance(data, str):
- raise TypeError(f"serialized {qualname(cls)} has wrong data type
{type(data)}")
-
- from io import BytesIO
-
- from pyarrow import parquet as pq
-
- with BytesIO(bytes.fromhex(data)) as buf:
- df = pq.read_table(buf).to_pandas()
-
- return df
diff --git a/airflow-core/src/airflow/serialization/serializers/pydantic.py
b/airflow-core/src/airflow/serialization/serializers/pydantic.py
deleted file mode 100644
index aa2d79485fe..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/pydantic.py
+++ /dev/null
@@ -1,75 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from typing import TYPE_CHECKING
-
-from airflow._shared.module_loading import qualname
-from airflow.serialization.typing import is_pydantic_model
-
-if TYPE_CHECKING:
- from airflow.serialization.serde import U
-
-serializers = [
- "pydantic.main.BaseModel",
-]
-deserializers = serializers
-
-__version__ = 1
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- """
- Serialize a Pydantic BaseModel instance into a dict of built-in types.
-
- Returns a tuple of:
- - serialized data (as built-in types)
- - fixed class name for registration (BaseModel)
- - version number
- - is_serialized flag (True if handled)
- """
- if not is_pydantic_model(o):
- return "", "", 0, False
-
- data = o.model_dump(mode="json") # type: ignore
-
- return data, qualname(o), __version__, True
-
-
-def deserialize(cls: type, version: int, data: dict):
- """
- Deserialize a Pydantic class.
-
- Pydantic models can be serialized into a Python dictionary via
`pydantic.main.BaseModel.model_dump`
- and the dictionary can be deserialized through
`pydantic.main.BaseModel.model_validate`. This function
- can deserialize arbitrary Pydantic models that are in
`allowed_deserialization_classes`.
-
- :param cls: The actual model class
- :param version: Serialization version (must not exceed __version__)
- :param data: Dictionary with built-in types, typically from model_dump()
- :return: An instance of the actual Pydantic model
- """
- if version > __version__:
- raise TypeError(f"Serialized version {version} is newer than the
supported version {__version__}")
-
- if not is_pydantic_model(cls):
- # no deserializer available
- raise TypeError(f"No deserializer found for {qualname(cls)}")
-
- # Perform validation-based reconstruction
- return cls.model_validate(data) # type: ignore
diff --git a/airflow-core/src/airflow/serialization/serializers/timezone.py
b/airflow-core/src/airflow/serialization/serializers/timezone.py
deleted file mode 100644
index 8ed0aad27a6..00000000000
--- a/airflow-core/src/airflow/serialization/serializers/timezone.py
+++ /dev/null
@@ -1,102 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import datetime
-from typing import TYPE_CHECKING, Any, cast
-
-from airflow._shared.module_loading import qualname
-
-if TYPE_CHECKING:
- from airflow.serialization.serde import U
-
-
-serializers = [
- "pendulum.tz.timezone.FixedTimezone",
- "pendulum.tz.timezone.Timezone",
- "zoneinfo.ZoneInfo",
-]
-
-deserializers = serializers
-
-__version__ = 1
-
-
-def serialize(o: object) -> tuple[U, str, int, bool]:
- """
- Encode a Pendulum Timezone for serialization.
-
- Airflow only supports timezone objects that implements Pendulum's Timezone
- interface. We try to keep as much information as possible to make
conversion
- round-tripping possible (see ``decode_timezone``). We need to special-case
- UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
- 0 without the special case), but passing 0 into ``pendulum.timezone`` does
- not give us UTC (but ``+00:00``).
- """
- from pendulum.tz.timezone import FixedTimezone
-
- name = qualname(o)
-
- if isinstance(o, FixedTimezone):
- if o.offset == 0:
- return "UTC", name, __version__, True
- return o.offset, name, __version__, True
-
- tz_name = _get_tzinfo_name(cast("datetime.tzinfo", o))
- if tz_name is not None:
- return tz_name, name, __version__, True
-
- if cast("datetime.tzinfo", o).utcoffset(None) == datetime.timedelta(0):
- return "UTC", qualname(FixedTimezone), __version__, True
-
- return "", "", 0, False
-
-
-def deserialize(cls: type, version: int, data: object) -> Any:
- from zoneinfo import ZoneInfo
-
- from airflow._shared.timezones.timezone import parse_timezone
-
- if not isinstance(data, (str, int)):
- raise TypeError(f"{data} is not of type int or str but of
{type(data)}")
-
- if version > __version__:
- raise TypeError(f"serialized {version} of {qualname(cls)} >
{__version__}")
-
- if cls is ZoneInfo and isinstance(data, str):
- return ZoneInfo(data)
-
- return parse_timezone(data)
-
-
-# ported from pendulum.tz.timezone._get_tzinfo_name
-def _get_tzinfo_name(tzinfo: datetime.tzinfo | None) -> str | None:
- if tzinfo is None:
- return None
-
- if hasattr(tzinfo, "key"):
- # zoneinfo timezone
- return tzinfo.key
- if hasattr(tzinfo, "name"):
- # Pendulum timezone
- return tzinfo.name
- if hasattr(tzinfo, "zone"):
- # pytz timezone
- return tzinfo.zone
-
- return None