This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 573d650708 AIP-58: Add object storage backend for xcom (#37058)
573d650708 is described below
commit 573d650708334c5e4ea4f1d72d01b976edefc6cf
Author: Bolke de Bruin <[email protected]>
AuthorDate: Sat Feb 3 09:30:48 2024 +0100
AIP-58: Add object storage backend for xcom (#37058)
* Add object storage backend for xcom
This adds the possibility to store xcoms on a configurable object storage
supported backend.
---------
Co-authored-by: Ephraim Anierobi <[email protected]>
---
airflow/models/xcom.py | 18 +-
airflow/provider.yaml.schema.json | 7 +
airflow/providers/common/io/provider.yaml | 34 +++
.../common/io/{provider.yaml => xcom/__init__.py} | 39 +--
airflow/providers/common/io/xcom/backend.py | 168 +++++++++++++
.../configurations-ref.rst | 18 ++
docs/apache-airflow-providers-common-io/index.rst | 2 +
.../xcom_backend.rst | 42 ++++
docs/apache-airflow/core-concepts/xcoms.rst | 29 +++
docs/spelling_wordlist.txt | 1 +
tests/models/test_xcom.py | 4 +-
.../providers/common/io/xcom/__init__.py | 29 ---
tests/providers/common/io/xcom/test_backend.py | 263 +++++++++++++++++++++
13 files changed, 596 insertions(+), 58 deletions(-)
diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index ee74dd89f7..f36f3ba3f7 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -29,6 +29,7 @@ from functools import cached_property, wraps
from typing import TYPE_CHECKING, Any, Generator, Iterable, cast, overload
import attr
+from deprecated import deprecated
from sqlalchemy import (
Column,
ForeignKeyConstraint,
@@ -368,6 +369,7 @@ class BaseXCom(TaskInstanceDependencies, LoggingMixin):
@staticmethod
@provide_session
@internal_api_call
+ @deprecated
def get_one(
execution_date: datetime.datetime | None = None,
key: str | None = None,
@@ -418,7 +420,7 @@ class BaseXCom(TaskInstanceDependencies, LoggingMixin):
result = query.with_entities(BaseXCom.value).first()
if result:
- return BaseXCom.deserialize_value(result)
+ return XCom.deserialize_value(result)
return None
@overload
@@ -556,9 +558,15 @@ class BaseXCom(TaskInstanceDependencies, LoggingMixin):
for xcom in xcoms:
if not isinstance(xcom, XCom):
raise TypeError(f"Expected XCom; received
{xcom.__class__.__name__}")
+ XCom.purge(xcom, session)
session.delete(xcom)
session.commit()
+ @staticmethod
+ def purge(xcom: XCom, session: Session) -> None:
+ """Purge an XCom entry from underlying storage implementations."""
+ pass
+
@overload
@staticmethod
@internal_api_call
@@ -641,7 +649,13 @@ class BaseXCom(TaskInstanceDependencies, LoggingMixin):
query = session.query(BaseXCom).filter_by(dag_id=dag_id,
task_id=task_id, run_id=run_id)
if map_index is not None:
query = query.filter_by(map_index=map_index)
- query.delete()
+
+ for xcom in query:
+ # print(f"Clearing XCOM {xcom} with value {xcom.value}")
+ XCom.purge(xcom, session)
+ session.delete(xcom)
+
+ session.commit()
@staticmethod
def serialize_value(
diff --git a/airflow/provider.yaml.schema.json
b/airflow/provider.yaml.schema.json
index 430cf2e632..ff14ea59a7 100644
--- a/airflow/provider.yaml.schema.json
+++ b/airflow/provider.yaml.schema.json
@@ -189,6 +189,13 @@
"type": "string"
}
},
+ "xcom": {
+ "type": "array",
+ "description": "XCom module names",
+ "items": {
+ "type": "string"
+ }
+ },
"transfers": {
"type": "array",
"items": {
diff --git a/airflow/providers/common/io/provider.yaml
b/airflow/providers/common/io/provider.yaml
index 7c32a42d0c..3c919320fc 100644
--- a/airflow/providers/common/io/provider.yaml
+++ b/airflow/providers/common/io/provider.yaml
@@ -43,3 +43,37 @@ operators:
- integration-name: Common IO
python-modules:
- airflow.providers.common.io.operators.file_transfer
+
+xcom:
+ - airflow.providers.common.io.xcom.backend
+
+config:
+ common.io:
+ description: Common IO configuration section
+ options:
+ xcom_objectstorage_path:
+ description: |
+ Path to a location on object storage where XComs can be stored in
url format.
+ version_added: 1.3.0
+ type: string
+ example: "s3://conn_id@bucket/path"
+ default: ""
+ xcom_objectstorage_threshold:
+ description: |
+ Threshold in bytes for storing XComs in object storage. -1 means
always store in the
+ database. 0 means always store in object storage. Any positive
number means
+ it will be stored in object storage if the size of the value is
greater than the threshold.
+ version_added: 1.3.0
+ type: integer
+ example: "1000000"
+ default: "-1"
+ xcom_objectstorage_compression:
+ description: |
+ Compression algorithm to use when storing XComs in object storage.
Supported algorithms
+ are a.o.: snappy, zip, gzip, bz2, and lzma. If not specified, no
compression will be used.
+ Note that the compression algorithm must be available in the Python
installation (e.g.
+ python-snappy for snappy). Zip, gz, bz2 are available by default.
+ version_added: 1.3.0
+ type: string
+ example: "gz"
+ default: ""
diff --git a/airflow/providers/common/io/provider.yaml
b/airflow/providers/common/io/xcom/__init__.py
similarity index 56%
copy from airflow/providers/common/io/provider.yaml
copy to airflow/providers/common/io/xcom/__init__.py
index 7c32a42d0c..7977c868bb 100644
--- a/airflow/providers/common/io/provider.yaml
+++ b/airflow/providers/common/io/xcom/__init__.py
@@ -14,32 +14,19 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from __future__ import annotations
----
-package-name: apache-airflow-providers-common-io
-name: Common IO
-description: |
- ``Common IO Provider``
+import packaging.version
-state: ready
-source-date-epoch: 1704610529
-versions:
- - 1.2.0
- - 1.1.0
- - 1.0.1
- - 1.0.0
+try:
+ from airflow import __version__ as airflow_version
+except ImportError:
+ from airflow.version import version as airflow_version
-dependencies:
- - apache-airflow>=2.8.0
-
-integrations:
- - integration-name: Common IO
- external-doc-url:
https://filesystem-spec.readthedocs.io/en/latest/index.html
- how-to-guide:
- - /docs/apache-airflow-providers-common-io/operators.rst
- tags: [software]
-
-operators:
- - integration-name: Common IO
- python-modules:
- - airflow.providers.common.io.operators.file_transfer
+if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
+ "2.9.0"
+):
+ raise RuntimeError(
+ "The package xcom backend feature of
`apache-airflow-providers-common-io` needs "
+ "Apache Airflow 2.9.0+"
+ )
diff --git a/airflow/providers/common/io/xcom/backend.py
b/airflow/providers/common/io/xcom/backend.py
new file mode 100644
index 0000000000..0bb5dd286f
--- /dev/null
+++ b/airflow/providers/common/io/xcom/backend.py
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import json
+import uuid
+from typing import TYPE_CHECKING, Any, TypeVar
+from urllib.parse import urlsplit
+
+import fsspec.utils
+
+from airflow.configuration import conf
+from airflow.io.path import ObjectStoragePath
+from airflow.models.xcom import BaseXCom
+from airflow.utils.json import XComDecoder, XComEncoder
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+ from airflow.models import XCom
+
+T = TypeVar("T")
+
+SECTION = "common.io"
+
+
+def _is_relative_to(o: ObjectStoragePath, other: ObjectStoragePath) -> bool:
+ """This is a port of the pathlib.Path.is_relative_to method. It is not
available in python 3.8."""
+ if hasattr(o, "is_relative_to"):
+ return o.is_relative_to(other)
+
+ try:
+ o.relative_to(other)
+ return True
+ except ValueError:
+ return False
+
+
+def _get_compression_suffix(compression: str) -> str:
+ """This returns the compression suffix for the given compression.
+
+ :raises ValueError: if the compression is not supported
+ """
+ for suffix, c in fsspec.utils.compressions.items():
+ if c == compression:
+ return suffix
+
+ raise ValueError(f"Compression {compression} is not supported. Make sure
it is installed.")
+
+
+class XComObjectStoreBackend(BaseXCom):
+ """XCom backend that stores data in an object store or database depending
on the size of the data.
+
+ If the value is larger than the configured threshold, it will be stored in
an object store.
+ Otherwise, it will be stored in the database. If it is stored in an object
store, the path
+ to the object in the store will be returned and saved in the database (by
BaseXCom). Otherwise, the value
+ itself will be returned and thus saved in the database.
+ """
+
+ @staticmethod
+ def _get_key(data: str) -> str:
+ """This gets the key from the url and normalizes it to be relative to
the configured path.
+
+ :raises ValueError: if the key is not relative to the configured path
+ :raises TypeError: if the url is not a valid url or cannot be split
+ """
+ path = conf.get(SECTION, "xcom_objectstore_path", fallback="")
+ p = ObjectStoragePath(path)
+
+ try:
+ url = urlsplit(data)
+ except AttributeError:
+ raise TypeError(f"Not a valid url: {data}")
+
+ if url.scheme:
+ k = ObjectStoragePath(data)
+
+ if _is_relative_to(k, p) is False:
+ raise ValueError(f"Invalid key: {data}")
+ else:
+ return data.replace(path, "", 1).lstrip("/")
+
+ raise ValueError(f"Not a valid url: {data}")
+
+ @staticmethod
+ def serialize_value(
+ value: T,
+ *,
+ key: str | None = None,
+ task_id: str | None = None,
+ dag_id: str | None = None,
+ run_id: str | None = None,
+ map_index: int | None = None,
+ ) -> bytes | str:
+ # we will always serialize ourselves and not by BaseXCom as the
deserialize method
+ # from BaseXCom accepts only XCom objects and not the value directly
+ s_val = json.dumps(value, cls=XComEncoder).encode("utf-8")
+ path = conf.get(SECTION, "xcom_objectstore_path", fallback="")
+ compression = conf.get(SECTION, "xcom_objectstore_compression",
fallback=None)
+
+ if compression:
+ suffix = "." + _get_compression_suffix(compression)
+ else:
+ suffix = ""
+
+ threshold = conf.getint(SECTION, "xcom_objectstore_threshold",
fallback=-1)
+
+ if path and -1 < threshold < len(s_val):
+ # safeguard against collisions
+ while True:
+ p = ObjectStoragePath(path) /
f"{dag_id}/{run_id}/{task_id}/{str(uuid.uuid4())}{suffix}"
+ if not p.exists():
+ break
+
+ if not p.parent.exists():
+ p.parent.mkdir(parents=True, exist_ok=True)
+
+ with p.open("wb", compression=compression) as f:
+ f.write(s_val)
+
+ return BaseXCom.serialize_value(str(p))
+ else:
+ return s_val
+
+ @staticmethod
+ def deserialize_value(
+ result: XCom,
+ ) -> Any:
+ """Deserializes the value from the database or object storage.
+
+ Compression is inferred from the file extension.
+ """
+ data = BaseXCom.deserialize_value(result)
+ path = conf.get(SECTION, "xcom_objectstore_path", fallback="")
+
+ try:
+ p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(data)
+ return json.load(p.open("rb", compression="infer"),
cls=XComDecoder)
+ except TypeError:
+ return data
+ except ValueError:
+ return data
+
+ @staticmethod
+ def purge(xcom: XCom, session: Session) -> None:
+ path = conf.get(SECTION, "xcom_objectstore_path", fallback="")
+ if isinstance(xcom.value, str):
+ try:
+ p = ObjectStoragePath(path) /
XComObjectStoreBackend._get_key(xcom.value)
+ p.unlink(missing_ok=True)
+ except TypeError:
+ pass
+ except ValueError:
+ pass
diff --git a/docs/apache-airflow-providers-common-io/configurations-ref.rst
b/docs/apache-airflow-providers-common-io/configurations-ref.rst
new file mode 100644
index 0000000000..5885c9d91b
--- /dev/null
+++ b/docs/apache-airflow-providers-common-io/configurations-ref.rst
@@ -0,0 +1,18 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. include:: ../exts/includes/providers-configurations-ref.rst
diff --git a/docs/apache-airflow-providers-common-io/index.rst
b/docs/apache-airflow-providers-common-io/index.rst
index 6cd056dfdc..488582fcd0 100644
--- a/docs/apache-airflow-providers-common-io/index.rst
+++ b/docs/apache-airflow-providers-common-io/index.rst
@@ -28,6 +28,7 @@
Home <self>
Changelog <changelog>
Security <security>
+ Configuration <configurations-ref>
.. toctree::
:hidden:
@@ -36,6 +37,7 @@
Transferring a file <transfer>
Operators <operators>
+ Object Storage XCom Backend <xcom_backend>
.. toctree::
:hidden:
diff --git a/docs/apache-airflow-providers-common-io/xcom_backend.rst
b/docs/apache-airflow-providers-common-io/xcom_backend.rst
new file mode 100644
index 0000000000..9216fff711
--- /dev/null
+++ b/docs/apache-airflow-providers-common-io/xcom_backend.rst
@@ -0,0 +1,42 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Object Storage XCom Backend
+===========================
+
+The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class,
which stores XComs in the Airflow database. This is fine for small values, but
can be problematic for large values, or for large numbers of XComs.
+
+To enable storing XComs in an object store, you can set the ``xcom_backend``
configuration option to
``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will
also need to set ``xcom_objectstorage_path`` to the desired location. The
connection
+id is obtained from the user part of the url the you will provide, e.g.
``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore,
``xcom_objectstorage_threshold`` is required
+to be something larger than -1. Any object smaller than the threshold in bytes
will be stored in the database and anything larger will be be
+put in object storage. This will allow a hybrid setup. If an xcom is stored on
object storage a reference will be
+saved in the database. Finally, you can set ``xcom_objectstorage_compression``
to fsspec supported compression methods like ``zip`` or ``snappy`` to
+compress the data before storing it in object storage.
+
+So for example the following configuration will store anything above 1MB in S3
and will compress it using gzip::
+
+ [core]
+ xcom_backend =
airflow.providers.common.io.xcom.backend.XComObjectStoreBackend
+
+ [common.io]
+ xcom_objectstorage_path = s3://conn_id@mybucket/key
+ xcom_objectstorage_threshold = 1048576
+ xcom_objectstorage_compression = gzip
+
+.. note::
+
+ Compression requires the support for it is installed in your python
environment. For example, to use ``snappy`` compression, you need to install
``python-snappy``. Zip, gzip and bz2 work out of the box.
diff --git a/docs/apache-airflow/core-concepts/xcoms.rst
b/docs/apache-airflow/core-concepts/xcoms.rst
index b4685b4cb0..b2dc2b665c 100644
--- a/docs/apache-airflow/core-concepts/xcoms.rst
+++ b/docs/apache-airflow/core-concepts/xcoms.rst
@@ -56,6 +56,35 @@ XComs are a relative of :doc:`variables`, with the main
difference being that XC
If the first task run is not succeeded then on every retry task XComs will
be cleared to make the task run idempotent.
+
+Object Storage XCom Backend
+---------------------------
+
+The default XCom backend is the :class:`~airflow.models.xcom.BaseXCom` class,
which stores XComs in the Airflow database. This is fine for small values, but
can be problematic for large values, or for large numbers of XComs.
+
+To enable storing XComs in an object store, you can set the ``xcom_backend``
configuration option to
``airflow.providers.common.io.xcom.backend.XComObjectStoreBackend``. You will
also need to set ``xcom_objectstorage_path`` to the desired location. The
connection
+id is obtained from the user part of the url the you will provide, e.g.
``xcom_objectstorage_path = s3://conn_id@mybucket/key``. Furthermore,
``xcom_objectstorage_threshold`` is required
+to be something larger than -1. Any object smaller than the threshold in bytes
will be stored in the database and anything larger will be be
+put in object storage. This will allow a hybrid setup. If an xcom is stored on
object storage a reference will be
+saved in the database. Finally, you can set ``xcom_objectstorage_compression``
to fsspec supported compression methods like ``zip`` or ``snappy`` to
+compress the data before storing it in object storage.
+
+So for example the following configuration will store anything above 1MB in S3
and will compress it using gzip::
+
+ [core]
+ xcom_backend =
airflow.providers.common.io.xcom.backend.XComObjectStoreBackend
+
+ [common.io]
+ xcom_objectstorage_path = s3://conn_id@mybucket/key
+ xcom_objectstorage_threshold = 1048576
+ xcom_objectstorage_compression = gzip
+
+
+.. note::
+
+ Compression requires the support for it is installed in your python
environment. For example, to use ``snappy`` compression, you need to install
``python-snappy``. Zip, gzip and bz2 work out of the box.
+
+
Custom XCom Backends
--------------------
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 1dc8d5693b..08d2817eae 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1080,6 +1080,7 @@ Oauth
oauth
Oauthlib
objectORfile
+objectstorage
observability
od
odbc
diff --git a/tests/models/test_xcom.py b/tests/models/test_xcom.py
index 8ab7d4eb56..14a9da7f6a 100644
--- a/tests/models/test_xcom.py
+++ b/tests/models/test_xcom.py
@@ -569,7 +569,8 @@ class TestXComClear:
push_simple_json_xcom(ti=task_instance, key="xcom_1", value={"key":
"value"})
@pytest.mark.usefixtures("setup_for_xcom_clear")
- def test_xcom_clear(self, session, task_instance):
+ @mock.patch("airflow.models.xcom.XCom.purge")
+ def test_xcom_clear(self, mock_purge, session, task_instance):
assert session.query(XCom).count() == 1
XCom.clear(
dag_id=task_instance.dag_id,
@@ -578,6 +579,7 @@ class TestXComClear:
session=session,
)
assert session.query(XCom).count() == 0
+ assert mock_purge.call_count == 1
@pytest.mark.usefixtures("setup_for_xcom_clear")
def test_xcom_clear_with_execution_date(self, session, task_instance):
diff --git a/airflow/providers/common/io/provider.yaml
b/tests/providers/common/io/xcom/__init__.py
similarity index 56%
copy from airflow/providers/common/io/provider.yaml
copy to tests/providers/common/io/xcom/__init__.py
index 7c32a42d0c..13a83393a9 100644
--- a/airflow/providers/common/io/provider.yaml
+++ b/tests/providers/common/io/xcom/__init__.py
@@ -14,32 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-common-io
-name: Common IO
-description: |
- ``Common IO Provider``
-
-state: ready
-source-date-epoch: 1704610529
-versions:
- - 1.2.0
- - 1.1.0
- - 1.0.1
- - 1.0.0
-
-dependencies:
- - apache-airflow>=2.8.0
-
-integrations:
- - integration-name: Common IO
- external-doc-url:
https://filesystem-spec.readthedocs.io/en/latest/index.html
- how-to-guide:
- - /docs/apache-airflow-providers-common-io/operators.rst
- tags: [software]
-
-operators:
- - integration-name: Common IO
- python-modules:
- - airflow.providers.common.io.operators.file_transfer
diff --git a/tests/providers/common/io/xcom/test_backend.py
b/tests/providers/common/io/xcom/test_backend.py
new file mode 100644
index 0000000000..fce5ed985e
--- /dev/null
+++ b/tests/providers/common/io/xcom/test_backend.py
@@ -0,0 +1,263 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from configparser import DuplicateSectionError
+from typing import TYPE_CHECKING
+
+import pytest
+
+import airflow.models.xcom
+from airflow import settings
+from airflow.configuration import conf
+from airflow.io.path import ObjectStoragePath
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.models.xcom import BaseXCom, resolve_xcom_backend
+from airflow.operators.empty import EmptyOperator
+from airflow.providers.common.io.xcom.backend import XComObjectStoreBackend
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.types import DagRunType
+from airflow.utils.xcom import XCOM_RETURN_KEY
+from tests.test_utils.config import conf_vars
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+
[email protected](autouse=True)
+def reset_db():
+ """Reset XCom entries."""
+ with create_session() as session:
+ session.query(DagRun).delete()
+ session.query(airflow.models.xcom.XCom).delete()
+
+
[email protected]()
+def task_instance_factory(request, session: Session):
+ def func(*, dag_id, task_id, execution_date):
+ run_id = DagRun.generate_run_id(DagRunType.SCHEDULED, execution_date)
+ run = DagRun(
+ dag_id=dag_id,
+ run_type=DagRunType.SCHEDULED,
+ run_id=run_id,
+ execution_date=execution_date,
+ )
+ session.add(run)
+ ti = TaskInstance(EmptyOperator(task_id=task_id), run_id=run_id)
+ ti.dag_id = dag_id
+ session.add(ti)
+ session.commit()
+
+ def cleanup_database():
+ # This should also clear task instances by cascading.
+ session.query(DagRun).filter_by(id=run.id).delete()
+ session.commit()
+
+ request.addfinalizer(cleanup_database)
+ return ti
+
+ return func
+
+
[email protected]()
+def task_instance(task_instance_factory):
+ return task_instance_factory(
+ dag_id="dag",
+ task_id="task_1",
+ execution_date=timezone.datetime(2021, 12, 3, 4, 56),
+ )
+
+
+class TestXcomObjectStoreBackend:
+ path = "file:/tmp/xcom"
+
+ def setup_method(self):
+ try:
+ conf.add_section("common.io")
+ except DuplicateSectionError:
+ pass
+ conf.set("core", "xcom_backend",
"airflow.providers.common.io.xcom.backend.XComObjectStoreBackend")
+ conf.set("common.io", "xcom_objectstore_path", self.path)
+ conf.set("common.io", "xcom_objectstore_threshold", "50")
+ settings.configure_vars()
+
+ def teardown_method(self):
+ conf.remove_option("core", "xcom_backend")
+ conf.remove_option("common.io", "xcom_objectstore_path")
+ conf.remove_option("common.io", "xcom_objectstore_threshold")
+ settings.configure_vars()
+ p = ObjectStoragePath(self.path)
+ if p.exists():
+ p.rmdir(recursive=True)
+
+ @pytest.mark.db_test
+ def test_value_db(self, task_instance, session):
+ XCom = resolve_xcom_backend()
+ airflow.models.xcom.XCom = XCom
+
+ XCom.set(
+ key=XCOM_RETURN_KEY,
+ value={"key": "value"},
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+
+ value = XCom.get_value(
+ key=XCOM_RETURN_KEY,
+ ti_key=task_instance.key,
+ session=session,
+ )
+ assert value == {"key": "value"}
+
+ qry = XCom.get_many(
+ key=XCOM_RETURN_KEY,
+ dag_ids=task_instance.dag_id,
+ task_ids=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+ assert qry.first().value == {"key": "value"}
+
+ @pytest.mark.db_test
+ def test_value_storage(self, task_instance, session):
+ XCom = resolve_xcom_backend()
+ airflow.models.xcom.XCom = XCom
+
+ XCom.set(
+ key=XCOM_RETURN_KEY,
+ value={"key": "bigvaluebigvaluebigvalue" * 100},
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+
+ res = (
+ XCom.get_many(
+ key=XCOM_RETURN_KEY,
+ dag_ids=task_instance.dag_id,
+ task_ids=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+ .with_entities(BaseXCom.value)
+ .first()
+ )
+
+ data = BaseXCom.deserialize_value(res)
+ p = ObjectStoragePath(self.path) /
XComObjectStoreBackend._get_key(data)
+ assert p.exists() is True
+
+ value = XCom.get_value(
+ key=XCOM_RETURN_KEY,
+ ti_key=task_instance.key,
+ session=session,
+ )
+ assert value == {"key": "bigvaluebigvaluebigvalue" * 100}
+
+ qry = XCom.get_many(
+ key=XCOM_RETURN_KEY,
+ dag_ids=task_instance.dag_id,
+ task_ids=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+ assert self.path in qry.first().value
+
+ @pytest.mark.db_test
+ def test_clear(self, task_instance, session):
+ XCom = resolve_xcom_backend()
+ airflow.models.xcom.XCom = XCom
+
+ XCom.set(
+ key=XCOM_RETURN_KEY,
+ value={"key": "superlargevalue" * 100},
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+
+ res = (
+ XCom.get_many(
+ key=XCOM_RETURN_KEY,
+ dag_ids=task_instance.dag_id,
+ task_ids=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+ .with_entities(BaseXCom.value)
+ .first()
+ )
+
+ data = BaseXCom.deserialize_value(res)
+ p = ObjectStoragePath(self.path) /
XComObjectStoreBackend._get_key(data)
+ assert p.exists() is True
+
+ XCom.clear(
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+
+ assert p.exists() is False
+
+ @pytest.mark.db_test
+ @conf_vars({("common.io", "xcom_objectstore_compression"): "gzip"})
+ def test_compression(self, task_instance, session):
+ XCom = resolve_xcom_backend()
+ airflow.models.xcom.XCom = XCom
+
+ XCom.set(
+ key=XCOM_RETURN_KEY,
+ value={"key": "superlargevalue" * 100},
+ dag_id=task_instance.dag_id,
+ task_id=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+
+ res = (
+ XCom.get_many(
+ key=XCOM_RETURN_KEY,
+ dag_ids=task_instance.dag_id,
+ task_ids=task_instance.task_id,
+ run_id=task_instance.run_id,
+ session=session,
+ )
+ .with_entities(BaseXCom.value)
+ .first()
+ )
+
+ data = BaseXCom.deserialize_value(res)
+ p = ObjectStoragePath(self.path) /
XComObjectStoreBackend._get_key(data)
+ assert p.exists() is True
+ assert p.suffix == ".gz"
+
+ value = XCom.get_value(
+ key=XCOM_RETURN_KEY,
+ ti_key=task_instance.key,
+ session=session,
+ )
+
+ assert value == {"key": "superlargevalue" * 100}