This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c49f857e9d Fix Airflow serialization for namedtuple (#37168)
c49f857e9d is described below
commit c49f857e9dd793add5f7d7d6075aaee85ea6e858
Author: Joffrey Bienvenu <[email protected]>
AuthorDate: Sun Feb 18 10:53:38 2024 +0100
Fix Airflow serialization for namedtuple (#37168)
Namedtuple is serialized like 'builtins.tuple'
The serialize method (in airflow/serialization/serializers/builtin.py) does
qualname() on the namedtuple, which returns an arbitrary name. If this is used
as classname, it will fail to deserialize: there won't be any deserializer for
it.
---
airflow/serialization/serde.py | 29 +++++++++++++++++++++++------
tests/serialization/test_serde.py | 9 +++++++++
2 files changed, 32 insertions(+), 6 deletions(-)
diff --git a/airflow/serialization/serde.py b/airflow/serialization/serde.py
index fd7eb33af7..42e5a3a658 100644
--- a/airflow/serialization/serde.py
+++ b/airflow/serialization/serde.py
@@ -134,6 +134,20 @@ def serialize(o: object, depth: int = 0) -> U | None:
cls = type(o)
qn = qualname(o)
+ classname = None
+
+ # Serialize namedtuple like tuples
+ # We also override the classname returned by the builtin.py serializer.
The classname
+ # has to be "builtins.tuple", so that the deserializer can deserialize the
object into tuple.
+ if _is_namedtuple(o):
+ qn = "builtins.tuple"
+ classname = qn
+
+ # if there is a builtin serializer available use that
+ if qn in _serializers:
+ data, serialized_classname, version, is_serialized =
_serializers[qn].serialize(o)
+ if is_serialized:
+ return encode(classname or serialized_classname, version,
serialize(data, depth + 1))
# custom serializers
dct = {
@@ -141,12 +155,6 @@ def serialize(o: object, depth: int = 0) -> U | None:
VERSION: getattr(cls, "__version__", DEFAULT_VERSION),
}
- # if there is a builtin serializer available use that
- if qn in _serializers:
- data, classname, version, is_serialized = _serializers[qn].serialize(o)
- if is_serialized:
- return encode(classname, version, serialize(data, depth + 1))
-
# object / class brings their own
if hasattr(o, "serialize"):
data = getattr(o, "serialize")()
@@ -337,6 +345,15 @@ def _is_pydantic(cls: Any) -> bool:
return hasattr(cls, "model_config") and hasattr(cls, "model_fields") and
hasattr(cls, "model_fields_set")
+def _is_namedtuple(cls: Any) -> bool:
+ """Return True if the class is a namedtuple.
+
+ Checking is done by attributes as it is significantly faster than
+ using isinstance.
+ """
+ return hasattr(cls, "_asdict") and hasattr(cls, "_fields") and
hasattr(cls, "_field_defaults")
+
+
def _register():
"""Register builtin serializers and deserializers for types that don't
have any themselves."""
_serializers.clear()
diff --git a/tests/serialization/test_serde.py
b/tests/serialization/test_serde.py
index fc37aa6063..6298de53e6 100644
--- a/tests/serialization/test_serde.py
+++ b/tests/serialization/test_serde.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import datetime
import enum
+from collections import namedtuple
from dataclasses import dataclass
from importlib import import_module
from typing import ClassVar
@@ -185,6 +186,14 @@ class TestSerDe:
i = {SCHEMA_ID: "cannot"}
serialize(i)
+ def test_ser_namedtuple(self):
+ CustomTuple = namedtuple("CustomTuple", ["id", "value"])
+ data = CustomTuple(id=1, value="something")
+
+ i = deserialize(serialize(data))
+ e = (1, "something")
+ assert i == e
+
def test_no_serializer(self):
with pytest.raises(TypeError, match="^cannot serialize"):
i = Exception