This is an automated email from the ASF dual-hosted git repository. bolke pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new f3b7cfc992 Make Datasets Pathlike (#36947) f3b7cfc992 is described below commit f3b7cfc9925d4a0abec29698a7398c2750ca5a15 Author: Bolke de Bruin <bo...@xs4all.nl> AuthorDate: Tue Jan 23 14:51:37 2024 +0100 Make Datasets Pathlike (#36947) This makes datasets inherit from os.Pathlike so they can directly be used by the Object Storage API. --- airflow/datasets/__init__.py | 6 +++++- tests/datasets/test_dataset.py | 8 ++++++++ tests/io/test_path.py | 9 +++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py index 0dc635a00b..a4a127e3f7 100644 --- a/airflow/datasets/__init__.py +++ b/airflow/datasets/__init__.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import os from typing import Any, ClassVar from urllib.parse import urlsplit @@ -23,7 +24,7 @@ import attr @attr.define() -class Dataset: +class Dataset(os.PathLike): """A Dataset is used for marking data dependencies between workflows.""" uri: str = attr.field(validator=[attr.validators.min_len(1), attr.validators.max_len(3000)]) @@ -42,3 +43,6 @@ class Dataset: parsed = urlsplit(uri) if parsed.scheme and parsed.scheme.lower() == "airflow": raise ValueError(f"{attr.name!r} scheme `airflow` is reserved") + + def __fspath__(self): + return self.uri diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py index f707be0792..9e9ca99513 100644 --- a/tests/datasets/test_dataset.py +++ b/tests/datasets/test_dataset.py @@ -17,6 +17,8 @@ from __future__ import annotations +import os + import pytest from airflow.datasets import Dataset @@ -46,3 +48,9 @@ def test_uri_with_scheme(): def test_uri_without_scheme(): dataset = Dataset(uri="example_dataset") EmptyOperator(task_id="task1", outlets=[dataset]) + + +def test_fspath(): + uri = "s3://example_dataset" + dataset = Dataset(uri=uri) + assert os.fspath(dataset) == uri diff --git a/tests/io/test_path.py b/tests/io/test_path.py index ab143b038e..deb8d412cc 100644 --- a/tests/io/test_path.py +++ b/tests/io/test_path.py @@ -26,6 +26,7 @@ import pytest from fsspec.implementations.local import LocalFileSystem from fsspec.utils import stringify_path +from airflow.datasets import Dataset from airflow.io import _register_filesystems, get_fs from airflow.io.path import ObjectStoragePath from airflow.io.store import _STORE_CACHE, ObjectStore, attach @@ -309,3 +310,11 @@ class TestFs: finally: # Reset the cache to avoid side effects _register_filesystems.cache_clear() + + def test_dataset(self): + p = "s3" + f = "/tmp/foo" + i = Dataset(uri=f"{p}://{f}", extra={"foo": "bar"}) + o = ObjectStoragePath(i) + assert o.protocol == p + assert o.path == f