bolkedebruin commented on code in PR #34729:
URL: https://github.com/apache/airflow/pull/34729#discussion_r1347370339


##########
airflow/io/fs/__init__.py:
##########
@@ -0,0 +1,966 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import functools
+import os.path
+import uuid
+from typing import cast
+from urllib.parse import urlparse
+from dataclasses import dataclass
+from os import PathLike
+
+from fsspec import AbstractFileSystem
+from fsspec.callbacks import NoOpCallback
+
+from airflow.io.fsspec import SCHEME_TO_FS
+
+
+@dataclass
+class Mount(PathLike):
+    source: str
+    mount_point: str
+
+    fs: AbstractFileSystem
+
+    conn_id: str | None = None
+
+    def wrap(self, method: str, *args, **kwargs):
+        """
+        Wrap a filesystem method to replace the mount point with the original 
source.
+
+        :param method: the method to wrap
+        :type method: str
+        :param args: the arguments to pass to the method
+        :type args: tuple
+        :param kwargs: the keyword arguments to pass to the method
+        :type kwargs: dict
+        :return: the result of the method
+        :rtype: Any
+        """
+        path = kwargs.pop("path") if "path" in kwargs else args[0]
+        path = self.replace_mount_point(cast(str, path))
+
+        return getattr(self.fs, method)(path, *args[1:], **kwargs)
+
+    def __fspath__(self):
+        return self.mount_point
+
+    def __str__(self):
+        return self.mount_point
+
+    def __truediv__(self, other):
+        return os.path.join(self.mount_point, other.lstrip(os.sep))
+
+    def __getattr__(self, item):
+        return functools.partial(self.wrap, item)
+
+    def replace_mount_point(self, path: str) -> str:
+        new_path = path.replace(self.mount_point, self.source, 
1).replace("//", "/")
+
+        # check for traversal?
+        if self.source not in new_path:
+            new_path = os.path.join(self.source, new_path.lstrip(os.sep))
+
+        return new_path
+
+
+MOUNTS: dict[str:Mount] = {}
+
+
+def get_mount(path: str) -> Mount:
+    """
+    Get the mount point for a given path.
+
+    :param path: the path to get the mount point for
+    :type path: str
+    :return: the mount point
+    :rtype: str
+    """
+    mount_point = None
+    mount_points = sorted(MOUNTS.keys(), key=len, reverse=True)
+
+    for prefix in mount_points:
+        if os.path.commonprefix([prefix, path]) == prefix:
+            mount_point = prefix
+
+    if mount_point is None:
+        raise ValueError(f"No mount point found for path: {path}")
+
+    return MOUNTS.get(mount_point)
+
+
+def _replace_mount_point(path: str) -> str:
+    """
+    Replace the mount point in a path with the original source.
+
+    :param path: the path to replace the mount point in
+    :type path: str
+    :return: the path with the mount point replaced
+    :rtype: str
+    """
+    mnt = get_mount(path)
+
+    return path.replace(mnt.mount_point, mnt.source)
+
+
+def _rewrite_path(path: str, mnt: Mount) -> str:
+    """
+    Rewrite a path to include the mount point and remove the original source.
+
+    :param path: the path to rewrite
+    :type path: str
+    :param mnt: the mount point to include in the path
+    :type mnt: Mount
+    :return: the rewritten path
+    :rtype: str
+    """
+    return os.path.join(mnt.mount_point, path.replace(mnt.source, 
"").lstrip(os.sep))
+
+
+def _rewrite_info(info: dict, mnt: Mount) -> dict:
+    """
+    Rewrite the path in a file info dict to include the mount point and remove 
the original source.
+
+    :param info: the file info dict to rewrite
+    :type info: dict
+    :param mnt: the mount point to include in the path
+    :type mnt: Mount
+    :return: the rewritten file info dict
+    :rtype: dict
+    """
+    info["original_name"] = info["name"]
+    info["name"] = _rewrite_path(info["name"], mnt)
+
+    return info
+
+
+def mount(
+    source: str,
+    mount_point: str | None = None,
+    conn_id: str | None = None,
+    encryption_type: str | None = "",
+    fs_type: AbstractFileSystem | None = None,
+    remount: bool = False,
+) -> Mount:
+    """
+    Mount a filesystem or object storage to a mount point.
+
+    :param source: the source path to mount
+    :type source: str
+    :param mount_point: the target mount point
+    :type mount_point: str
+    :param conn_id: the connection to use to connect to the filesystem
+    :type conn_id: str
+    :param encryption_type: the encryption type to use to connect to the 
filesystem
+    :type encryption_type: str
+    :param fs_type: the filesystem type to use to connect to the filesystem
+    :type fs_type: AbstractFileSystem
+    :param remount: whether to remount the filesystem if it is already mounted
+    :type remount: bool
+    """
+    if not remount and mount_point and mount_point in MOUNTS:
+        raise ValueError(f"Mount point {mount_point} already mounted")

Review Comment:
   We are mimicking POSIX behavior for `mount` hence failing without a remount 
(`remount=True` would replace it). And additionally, following pythonic 
behavior, rather be explicit than implicit?
   
   I'm not overly against it particularly, but it just feels a bit wrong to me 
:-).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to