This is an automated email from the ASF dual-hosted git repository.

bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d0afbdfb2 Add lightweight serialization for deltalake tables (#35462)
7d0afbdfb2 is described below

commit 7d0afbdfb28edbeb6d0fcb344084e561192b6057
Author: Bolke de Bruin <bo...@xs4all.nl>
AuthorDate: Sun Nov 5 23:36:36 2023 +0100

    Add lightweight serialization for deltalake tables (#35462)
---
 airflow/serialization/serializers/deltalake.py     | 79 ++++++++++++++++++++++
 setup.py                                           |  5 ++
 .../serialization/serializers/test_serializers.py  | 24 +++++++
 3 files changed, 108 insertions(+)

diff --git a/airflow/serialization/serializers/deltalake.py 
b/airflow/serialization/serializers/deltalake.py
new file mode 100644
index 0000000000..60456baf80
--- /dev/null
+++ b/airflow/serialization/serializers/deltalake.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from airflow.utils.module_loading import qualname
+
+serializers = ["deltalake.table.DeltaTable"]
+deserializers = serializers
+stringifiers = serializers
+
+if TYPE_CHECKING:
+    from airflow.serialization.serde import U
+
+__version__ = 1
+
+
+def serialize(o: object) -> tuple[U, str, int, bool]:
+    from deltalake.table import DeltaTable
+
+    if not isinstance(o, DeltaTable):
+        return "", "", 0, False
+
+    from airflow.models.crypto import get_fernet
+
+    # we encrypt the information here until we have as part of the
+    # storage options can have sensitive information
+    fernet = get_fernet()
+    properties: dict = {}
+    for k, v in o._storage_options.items() if o._storage_options else {}:
+        properties[k] = fernet.encrypt(v.encode("utf-8")).decode("utf-8")
+
+    data = {
+        "table_uri": o.table_uri,
+        "version": o.version(),
+        "storage_options": properties,
+    }
+
+    return data, qualname(o), __version__, True
+
+
+def deserialize(classname: str, version: int, data: dict):
+    from deltalake.table import DeltaTable
+
+    from airflow.models.crypto import get_fernet
+
+    if version > __version__:
+        raise TypeError("serialized version is newer than class version")
+
+    if classname == qualname(DeltaTable):
+        fernet = get_fernet()
+        properties = {}
+        for k, v in data["storage_options"].items():
+            properties[k] = fernet.decrypt(v.encode("utf-8")).decode("utf-8")
+
+        if len(properties) == 0:
+            storage_options = None
+        else:
+            storage_options = properties
+
+        return DeltaTable(data["table_uri"], version=data["version"], 
storage_options=storage_options)
+
+    raise TypeError(f"do not know how to deserialize {classname}")
diff --git a/setup.py b/setup.py
index 9710aa2d79..8a3abf52ec 100644
--- a/setup.py
+++ b/setup.py
@@ -432,6 +432,10 @@ _devel_only_debuggers = [
     "ipdb",
 ]
 
+_devel_only_deltalake = [
+    "deltalake>=0.12.0",
+]
+
 _devel_only_devscripts = [
     "click>=8.0",
     "gitpython",
@@ -493,6 +497,7 @@ devel_only = [
     *_devel_only_azure,
     *_devel_only_breeze,
     *_devel_only_debuggers,
+    *_devel_only_deltalake,
     *_devel_only_devscripts,
     *_devel_only_duckdb,
     *_devel_only_mongo,
diff --git a/tests/serialization/serializers/test_serializers.py 
b/tests/serialization/serializers/test_serializers.py
index b82438f6e5..aab9f91395 100644
--- a/tests/serialization/serializers/test_serializers.py
+++ b/tests/serialization/serializers/test_serializers.py
@@ -24,6 +24,7 @@ import numpy as np
 import pendulum.tz
 import pytest
 from dateutil.tz import tzutc
+from deltalake import DeltaTable
 from pendulum import DateTime
 from pyiceberg.catalog import Catalog
 from pyiceberg.io import FileIO
@@ -198,3 +199,26 @@ class TestSerializers:
         assert i == d
         mock_load_catalog.assert_called_with("catalog", uri=uri)
         mock_load_table.assert_called_with((identifier[1], identifier[2]))
+
+    @patch("deltalake.table.Metadata")
+    @patch("deltalake.table.RawDeltaTable")
+    @patch.object(DeltaTable, "version", return_value=0)
+    @patch.object(DeltaTable, "table_uri", new_callable=lambda: 
"/tmp/bucket/path")
+    def test_deltalake(self, mock_table_uri, mock_version, mock_deltalake, 
mock_metadata):
+        uri = "/tmp/bucket/path"
+
+        i = DeltaTable(uri, storage_options={"key": "value"})
+
+        e = serialize(i)
+        d = deserialize(e)
+        assert i.table_uri == d.table_uri
+        assert i.version() == d.version()
+        assert i._storage_options == d._storage_options
+
+        i = DeltaTable(uri)
+        e = serialize(i)
+        d = deserialize(e)
+        assert i.table_uri == d.table_uri
+        assert i.version() == d.version()
+        assert i._storage_options == d._storage_options
+        assert d._storage_options is None

Reply via email to