This is an automated email from the ASF dual-hosted git repository.

bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f3b7cfc992 Make Datasets Pathlike (#36947)
f3b7cfc992 is described below

commit f3b7cfc9925d4a0abec29698a7398c2750ca5a15
Author: Bolke de Bruin <bo...@xs4all.nl>
AuthorDate: Tue Jan 23 14:51:37 2024 +0100

    Make Datasets Pathlike (#36947)
    
    This makes datasets inherit from os.Pathlike so they can directly be used by
    the Object Storage API.
---
 airflow/datasets/__init__.py   | 6 +++++-
 tests/datasets/test_dataset.py | 8 ++++++++
 tests/io/test_path.py          | 9 +++++++++
 3 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 0dc635a00b..a4a127e3f7 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import os
 from typing import Any, ClassVar
 from urllib.parse import urlsplit
 
@@ -23,7 +24,7 @@ import attr
 
 
 @attr.define()
-class Dataset:
+class Dataset(os.PathLike):
     """A Dataset is used for marking data dependencies between workflows."""
 
     uri: str = attr.field(validator=[attr.validators.min_len(1), 
attr.validators.max_len(3000)])
@@ -42,3 +43,6 @@ class Dataset:
         parsed = urlsplit(uri)
         if parsed.scheme and parsed.scheme.lower() == "airflow":
             raise ValueError(f"{attr.name!r} scheme `airflow` is reserved")
+
+    def __fspath__(self):
+        return self.uri
diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py
index f707be0792..9e9ca99513 100644
--- a/tests/datasets/test_dataset.py
+++ b/tests/datasets/test_dataset.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+import os
+
 import pytest
 
 from airflow.datasets import Dataset
@@ -46,3 +48,9 @@ def test_uri_with_scheme():
 def test_uri_without_scheme():
     dataset = Dataset(uri="example_dataset")
     EmptyOperator(task_id="task1", outlets=[dataset])
+
+
+def test_fspath():
+    uri = "s3://example_dataset"
+    dataset = Dataset(uri=uri)
+    assert os.fspath(dataset) == uri
diff --git a/tests/io/test_path.py b/tests/io/test_path.py
index ab143b038e..deb8d412cc 100644
--- a/tests/io/test_path.py
+++ b/tests/io/test_path.py
@@ -26,6 +26,7 @@ import pytest
 from fsspec.implementations.local import LocalFileSystem
 from fsspec.utils import stringify_path
 
+from airflow.datasets import Dataset
 from airflow.io import _register_filesystems, get_fs
 from airflow.io.path import ObjectStoragePath
 from airflow.io.store import _STORE_CACHE, ObjectStore, attach
@@ -309,3 +310,11 @@ class TestFs:
         finally:
             # Reset the cache to avoid side effects
             _register_filesystems.cache_clear()
+
+    def test_dataset(self):
+        p = "s3"
+        f = "/tmp/foo"
+        i = Dataset(uri=f"{p}://{f}", extra={"foo": "bar"})
+        o = ObjectStoragePath(i)
+        assert o.protocol == p
+        assert o.path == f

Reply via email to