This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new def02a537a [#6941] feat(gvfs): Add hook mechanism in Java and Python
GVFS (#6972)
def02a537a is described below
commit def02a537aad7fc51d16454164e64b9c320fc931
Author: mchades <[email protected]>
AuthorDate: Thu Apr 17 15:05:22 2025 +0800
[#6941] feat(gvfs): Add hook mechanism in Java and Python GVFS (#6972)
### What changes were proposed in this pull request?
support pre and post hook in GVFS ops
### Why are the changes needed?
Fix: #6941
### Does this PR introduce _any_ user-facing change?
yes, user can use their own hook class
### How was this patch tested?
tests added
---
clients/client-python/gravitino/filesystem/gvfs.py | 124 +++-
.../gravitino/filesystem/gvfs_config.py | 3 +
.../gravitino/filesystem/gvfs_hook.py | 649 +++++++++++++++++++++
.../tests/unittests/test_gvfs_with_hook.py | 386 ++++++++++++
.../hadoop/GravitinoVirtualFileSystem.java | 111 +++-
.../GravitinoVirtualFileSystemConfiguration.java | 6 +
.../hadoop/GravitinoVirtualFileSystemHook.java | 320 ++++++++++
.../gravitino/filesystem/hadoop/NoOpHook.java | 162 +++++
.../gravitino/filesystem/hadoop/MockGVFSHook.java | 205 +++++++
.../gravitino/filesystem/hadoop/TestGvfsBase.java | 42 ++
docs/how-to-use-gvfs.md | 2 +
11 files changed, 1968 insertions(+), 42 deletions(-)
diff --git a/clients/client-python/gravitino/filesystem/gvfs.py
b/clients/client-python/gravitino/filesystem/gvfs.py
index 9347879e3e..48cdd0e859 100644
--- a/clients/client-python/gravitino/filesystem/gvfs.py
+++ b/clients/client-python/gravitino/filesystem/gvfs.py
@@ -18,7 +18,7 @@ import functools
import importlib
import logging
import re
-from typing import Dict, Callable
+from typing import Dict, Optional, Callable
import fsspec
@@ -36,6 +36,7 @@ from gravitino.filesystem.gvfs_base_operations import (
)
from gravitino.filesystem.gvfs_config import GVFSConfig
from gravitino.filesystem.gvfs_default_operations import DefaultGVFSOperations
+from gravitino.filesystem.gvfs_hook import DEFAULT_HOOK,
GravitinoVirtualFileSystemHook
logger = logging.getLogger(__name__)
@@ -69,6 +70,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param options: Options for the GravitinoVirtualFileSystem
:param kwargs: Extra args for super filesystem
"""
+ self._hook = self._get_hook_class(options)
+ self._hook.initialize(options)
self._operations = self._get_gvfs_operations_class(
server_uri, metalake_name, options
)
@@ -79,6 +82,10 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
def fsid(self):
return PROTOCOL_NAME
+ @property
+ def hook(self):
+ return self._hook
+
@property
def operations(self):
return self._operations
@@ -125,10 +132,12 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return If details is true, returns a list of file info dicts, else
returns a list of file paths
"""
+ new_path = self._hook.pre_ls(path, detail, **kwargs)
decorated_ls = self._with_exception_translation(
FilesetDataOperation.LIST_STATUS
)(self._operations.ls)
- return decorated_ls(path, detail, **kwargs)
+ result = decorated_ls(new_path, detail, **kwargs)
+ return self._hook.post_ls(detail, result, **kwargs)
def info(self, path, **kwargs):
"""Get file info.
@@ -136,10 +145,15 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return A file info dict
"""
+ new_path = self._hook.pre_info(path, **kwargs)
decorated_info = self._with_exception_translation(
FilesetDataOperation.GET_FILE_STATUS
)(self._operations.info)
- return decorated_info(path, **kwargs)
+ result = decorated_info(new_path, **kwargs)
+ return self._hook.post_info(
+ result,
+ **kwargs,
+ )
def exists(self, path, **kwargs):
"""Check if a file or a directory exists.
@@ -147,24 +161,33 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return If a file or directory exists, it returns True, otherwise False
"""
+ new_path = self._hook.pre_exists(path, **kwargs)
decorated_exists = self._with_exception_translation(
FilesetDataOperation.EXISTS
)(self._operations.exists)
try:
- return decorated_exists(path, **kwargs)
+ result = decorated_exists(new_path, **kwargs)
except FilesetPathNotFoundError:
return False
+ return self._hook.post_exists(
+ new_path,
+ result,
+ **kwargs,
+ )
+
def cp_file(self, path1, path2, **kwargs):
"""Copy a file.
:param path1: Virtual src fileset path
:param path2: Virtual dst fileset path, should be consistent with the
src path fileset identifier
:param kwargs: Extra args
"""
+ new_path1, new_path2 = self._hook.pre_cp_file(path1, path2, **kwargs)
decorated_cp_file = self._with_exception_translation(
FilesetDataOperation.COPY_FILE
)(self._operations.cp_file)
- return decorated_cp_file(path1, path2, **kwargs)
+ decorated_cp_file(new_path1, new_path2, **kwargs)
+ self._hook.post_cp_file(new_path1, new_path2, **kwargs)
def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
"""Move a file to another directory.
@@ -176,10 +199,14 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param maxdepth: Maximum depth of recursive move
:param kwargs: Extra args
"""
+ new_path1, new_path2 = self._hook.pre_mv(
+ path1, path2, recursive, maxdepth, **kwargs
+ )
decorated_mv =
self._with_exception_translation(FilesetDataOperation.RENAME)(
self._operations.mv
)
- decorated_mv(path1, path2, recursive, maxdepth, **kwargs)
+ decorated_mv(new_path1, new_path2, recursive, maxdepth, **kwargs)
+ self._hook.post_mv(new_path1, new_path2, recursive, maxdepth, **kwargs)
def _rm(self, path):
raise GravitinoRuntimeException(
@@ -193,19 +220,23 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
When removing a directory, this parameter should be True.
:param maxdepth: The maximum depth to remove the directory recursively.
"""
+ new_path = self._hook.pre_rm(path, recursive, maxdepth)
decorated_rm =
self._with_exception_translation(FilesetDataOperation.DELETE)(
self._operations.rm
)
- decorated_rm(path, recursive, maxdepth)
+ decorated_rm(new_path, recursive, maxdepth)
+ self._hook.post_rm(new_path, recursive, maxdepth)
def rm_file(self, path):
"""Remove a file.
:param path: Virtual fileset path
"""
+ new_path = self._hook.pre_rm_file(path)
decorated_rm_file = self._with_exception_translation(
FilesetDataOperation.DELETE
)(self._operations.rm_file)
- decorated_rm_file(path)
+ decorated_rm_file(new_path)
+ self._hook.post_rm_file(new_path)
def rmdir(self, path):
"""Remove a directory.
@@ -213,10 +244,12 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
And it will throw an exception if delete a directory which is
non-empty for LocalFileSystem.
:param path: Virtual fileset path
"""
+ new_path = self._hook.pre_rmdir(path)
decorated_rmdir =
self._with_exception_translation(FilesetDataOperation.DELETE)(
self._operations.rmdir
)
- decorated_rmdir(path)
+ decorated_rmdir(new_path)
+ self._hook.post_rmdir(new_path)
def open(
self,
@@ -236,6 +269,9 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return A file-like object from the filesystem
"""
+ new_path = self._hook.pre_open(
+ path, mode, block_size, cache_options, compression, **kwargs
+ )
if mode in ("w", "wb"):
data_operation = FilesetDataOperation.OPEN_AND_WRITE
elif mode in ("a", "ab"):
@@ -247,8 +283,8 @@ class GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
self._operations.open
)
try:
- return decorated_open(
- path,
+ result = decorated_open(
+ new_path,
mode,
block_size,
cache_options,
@@ -258,11 +294,20 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
except FilesetPathNotFoundError as e:
if mode in ("w", "wb", "x", "xb", "a", "ab"):
raise OSError(
- f"Fileset is not found for path: {path} for operation
OPEN. This "
+ f"Fileset is not found for path: {new_path} for operation
OPEN. This "
f"may be caused by fileset related metadata not found or
not in use "
f"in Gravitino,"
) from e
raise
+ return self._hook.post_open(
+ new_path,
+ mode,
+ block_size,
+ cache_options,
+ compression,
+ result,
+ **kwargs,
+ )
def mkdir(self, path, create_parents=True, **kwargs):
"""Make a directory.
@@ -272,34 +317,38 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param create_parents: Create parent directories if missing when set
to True
:param kwargs: Extra args
"""
+ new_path = self._hook.pre_mkdir(path, create_parents, **kwargs)
decorated_mkdir =
self._with_exception_translation(FilesetDataOperation.MKDIRS)(
self._operations.mkdir
)
try:
- decorated_mkdir(path, create_parents, **kwargs)
+ decorated_mkdir(new_path, create_parents, **kwargs)
except FilesetPathNotFoundError as e:
raise OSError(
- f"Fileset is not found for path: {path} for operation MKDIRS.
This "
+ f"Fileset is not found for path: {new_path} for operation
MKDIRS. This "
f"may be caused by fileset related metadata not found or not
in use "
f"in Gravitino,"
) from e
+ self._hook.post_mkdir(new_path, create_parents, **kwargs)
def makedirs(self, path, exist_ok=True):
"""Make a directory recursively.
:param path: Virtual fileset path
:param exist_ok: Continue if a directory already exists
"""
+ new_path = self._hook.pre_makedirs(path, exist_ok)
decorated_makedirs = self._with_exception_translation(
FilesetDataOperation.MKDIRS
)(self._operations.makedirs)
try:
- decorated_makedirs(path, exist_ok)
+ decorated_makedirs(new_path, exist_ok)
except FilesetPathNotFoundError as e:
raise OSError(
- f"Fileset is not found for path: {path} for operation MKDIRS.
This "
+ f"Fileset is not found for path: {new_path} for operation
MKDIRS. This "
f"may be caused by fileset related metadata not found or not
in use "
f"in Gravitino,"
) from e
+ self._hook.post_makedirs(new_path, exist_ok)
def created(self, path):
"""Return the created timestamp of a file as a datetime.datetime
@@ -307,20 +356,30 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param path: Virtual fileset path
:return Created time(datetime.datetime)
"""
+ new_path = self._hook.pre_created(path)
decorated_created = self._with_exception_translation(
FilesetDataOperation.CREATED_TIME
)(self._operations.created)
- return decorated_created(path)
+ result = decorated_created(new_path)
+ return self._hook.post_created(
+ new_path,
+ result,
+ )
def modified(self, path):
"""Returns the modified time of the path file if it exists.
:param path: Virtual fileset path
:return Modified time(datetime.datetime)
"""
+ new_path = self._hook.pre_modified(path)
decorated_modified = self._with_exception_translation(
FilesetDataOperation.MODIFIED_TIME
)(self._operations.modified)
- return decorated_modified(path)
+ result = decorated_modified(new_path)
+ return self._hook.post_modified(
+ new_path,
+ result,
+ )
def cat_file(self, path, start=None, end=None, **kwargs):
"""Get the content of a file.
@@ -330,10 +389,18 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param kwargs: Extra args
:return File content
"""
+ new_path = self._hook.pre_cat_file(path, start, end, **kwargs)
decorated_cat_file = self._with_exception_translation(
FilesetDataOperation.CAT_FILE
)(self._operations.cat_file)
- return decorated_cat_file(path, start, end, **kwargs)
+ result = decorated_cat_file(new_path, start, end, **kwargs)
+ return self._hook.post_cat_file(
+ new_path,
+ start,
+ end,
+ result,
+ **kwargs,
+ )
def get_file(self, rpath, lpath, callback=None, outfile=None, **kwargs):
"""Copy single remote file to local.
@@ -343,16 +410,33 @@ class
GravitinoVirtualFileSystem(fsspec.AbstractFileSystem):
:param outfile: The output file path
:param kwargs: Extra args
"""
+ new_rpath = self._hook.pre_get_file(rpath, lpath, callback, outfile,
**kwargs)
decorated_get_file = self._with_exception_translation(
FilesetDataOperation.GET_FILE_STATUS
)(self._operations.get_file)
decorated_get_file(
- rpath,
+ new_rpath,
lpath,
callback,
outfile,
**kwargs,
)
+ self._hook.post_get_file(new_rpath, lpath, outfile, **kwargs)
+
+ def _get_hook_class(
+ self, options: Optional[Dict[str, str]]
+ ) -> GravitinoVirtualFileSystemHook:
+ hook_class = (
+ None if options is None else
options.get(GVFSConfig.GVFS_FILESYSTEM_HOOK)
+ )
+
+ if hook_class is not None:
+ module_name, class_name = hook_class.rsplit(".", 1)
+ module = importlib.import_module(module_name)
+ loaded_class = getattr(module, class_name)
+ return loaded_class()
+
+ return DEFAULT_HOOK
def _get_gvfs_operations_class(
self,
diff --git a/clients/client-python/gravitino/filesystem/gvfs_config.py
b/clients/client-python/gravitino/filesystem/gvfs_config.py
index a5d797bd9e..47aa55b763 100644
--- a/clients/client-python/gravitino/filesystem/gvfs_config.py
+++ b/clients/client-python/gravitino/filesystem/gvfs_config.py
@@ -64,3 +64,6 @@ class GVFSConfig:
# The configuration key for the class name of the file system operations.
# The default value is
"org.apache.gravitino.filesystem.hadoop.DefaultGVFSOperations".
GVFS_FILESYSTEM_OPERATIONS = "operations_class"
+
+ # The hook class that will be used to intercept file system operations.
+ GVFS_FILESYSTEM_HOOK = "hook_class"
diff --git a/clients/client-python/gravitino/filesystem/gvfs_hook.py
b/clients/client-python/gravitino/filesystem/gvfs_hook.py
new file mode 100644
index 0000000000..22018d3c41
--- /dev/null
+++ b/clients/client-python/gravitino/filesystem/gvfs_hook.py
@@ -0,0 +1,649 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from abc import ABC, abstractmethod
+from typing import Dict, Optional, Tuple, List, Any, Union
+
+from fsspec import Callback
+
+
+class GravitinoVirtualFileSystemHook(ABC):
+ """
+ Represents a hook that can be used to intercept file system operations.
The implementor
+ should handle the exception, if any, in the pre-hook method, otherwise the
exception will be
+ thrown to the caller and fail the operation. Besides, the implemented
pre-hook method should
+ be lightweight and fast, otherwise it will slow down the operation. The
pre-hook method may
+ be called more than once and in parallel, so the implementor should handle
the concurrent and
+ idempotent issues if required.
+ """
+
+ @abstractmethod
+ def initialize(self, config: Optional[Dict[str, str]]):
+ """
+ Initialize the hook with the configuration. This method will be called
in the GVFS initialize
+ method, and the configuration will be passed from the GVFS
configuration. The implementor
+ can initialize the hook with the configuration. The exception will be
thrown to the caller
+ and fail the GVFS initialization.
+
+ Args:
+ config: The configuration.
+ """
+ pass
+
+ @abstractmethod
+ def pre_ls(self, path: str, detail: bool, **kwargs) -> str:
+ """
+ Called before a directory is listed. The returned path will be used
for the ls operation.
+ The implementor can modify the path for customization. The exception
will be thrown to the
+ caller and fail the ls operation.
+
+ Args:
+ path: The path of the directory.
+ detail: Whether to list the directory in detail.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The path to list.
+ """
+
+ @abstractmethod
+ def post_ls(
+ self, detail: bool, entries: List[Union[str, Dict[str, Any]]], **kwargs
+ ) -> List[Union[str, Dict[str, Any]]]:
+ """
+ Called after a directory is listed. The implementor can modify the
entries for
+ customization. The exception will be thrown to the caller and fail the
ls operation.
+
+ Args:
+ detail: Whether to list the directory in detail.
+ entries: The entries of the directory. It can be a list of str if
the detail is False,
+ or a list of dict if the detail is True.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The entries to list.
+ """
+
+ @abstractmethod
+ def pre_info(self, path: str, **kwargs) -> str:
+ """
+ Called before the information of a file is retrieved. The returned
path will be used for
+ the info operation. The implementor can modify the path for
customization. The exception
+ will be thrown to the caller and fail the info operation.
+
+ Args:
+ path: The path of the file.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The path to get the information.
+ """
+
+ @abstractmethod
+ def post_info(self, info: Dict[str, Any], **kwargs) -> Dict[str, Any]:
+ """
+ Called after the information of a file is retrieved. The implementor
can modify the info for
+ customization. The exception will be thrown to the caller and fail the
info operation.
+
+ Args:
+ info: The information of the file.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The information to get.
+ """
+
+ @abstractmethod
+ def pre_exists(self, path: str, **kwargs) -> str:
+ """
+ Called before the existence of a file or a directory is checked. The
returned path will be
+ used for the exists operation. The implementor can modify the path for
customization. The
+ exception will be thrown to the caller and fail the exists operation.
+
+ Args:
+ path: The path of the file or the directory.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The path to check the existence.
+ """
+
+ @abstractmethod
+ def post_exists(self, gvfs_path: str, exists: bool, **kwargs) -> bool:
+ """
+ Called after the existence of a file or a directory is checked. The
implementor can modify the
+ value for customization. The exception will be thrown to the caller
and fail the exists
+ operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file or the directory
+ exists: The existence of the file or the directory.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The existence to check.
+ """
+
+ @abstractmethod
+ def pre_cp_file(self, src: str, dst: str, **kwargs) -> Tuple[str, str]:
+ """
+ Called before a file is copied. The returned source and destination
will be used for the cp
+ operation. The implementor can modify the source and destination for
customization. The
+ exception will be thrown to the caller and fail the cp operation.
+
+ Args:
+ src: The source file.
+ dst: The destination file.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The source and destination to copy.
+ """
+
+ @abstractmethod
+ def post_cp_file(self, src_gvfs_path: str, dst_gvfs_path: str, **kwargs):
+ """
+ Called after a file is copied. if this method is invoked, it means
that the cp_file is
+ succeeded. The exception in the method will be thrown to the caller
and fail the operation.
+
+ Args:
+ src_gvfs_path: The source GVFS path.
+ dst_gvfs_path: The destination GVFS path.
+ **kwargs: Additional arguments
+ """
+
+ @abstractmethod
+ def pre_mv(
+ self, src: str, dst: str, recursive: bool, maxdepth: int, **kwargs
+ ) -> Tuple[str, str]:
+ """
+ Called before a file or a directory is moved. The returned source and
destination will be
+ used for the mv operation. The implementor can modify the source and
destination for
+ customization. The exception will be thrown to the caller and fail the
mv operation.
+
+ Args:
+ src: The source file or directory.
+ dst: The destination file or directory.
+ recursive: Whether to move the file or directory recursively.
+ maxdepth: The maximum depth to move.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The source and destination to move.
+ """
+
+ @abstractmethod
+ def post_mv(
+ self,
+ src_gvfs_path: str,
+ dst_gvfs_path: str,
+ recursive: bool,
+ maxdepth: int,
+ **kwargs
+ ):
+ """
+ Called after a file or a directory is moved. If this method is
invoked, it means that the mv
+ is succeeded. The exception in the method will be thrown to the caller
and fail the operation.
+
+ Args:
+ src_gvfs_path: The source GVFS path.
+ dst_gvfs_path: The destination GVFS path.
+ recursive: Whether to move the file or directory recursively.
+ maxdepth: The maximum depth to move.
+ **kwargs: Additional arguments
+ """
+
+ @abstractmethod
+ def pre_rm(self, path: str, recursive: bool, maxdepth: int) -> str:
+ """
+ Called before a file or a directory is removed. The returned path will
be used for the rm
+ operation. The implementor can modify the path for customization. The
exception will be
+ thrown to the caller and fail the rm operation.
+
+ Args:
+ path: The path of the file or the directory.
+ recursive: Whether to remove the file or directory recursively.
+ maxdepth: The maximum depth to remove.
+
+ Returns:
+ The path to remove.
+ """
+
+ @abstractmethod
+ def post_rm(self, gvfs_path: str, recursive: bool, maxdepth: int):
+ """
+ Called after a file or a directory is removed. If this method is
invoked, it means that the
+ rm is succeeded. The exception will be thrown to the caller and fail
the rm operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file or the directory.
+ recursive: Whether to remove the file or directory recursively.
+ maxdepth: The maximum depth to remove.
+ """
+
+ @abstractmethod
+ def pre_rm_file(self, path: str) -> str:
+ """
+ Called before a file is removed. The returned path will be used for
the rm_file operation.
+ The implementor can modify the path for customization. The exception
will be thrown to the
+ caller and fail the rm_file operation.
+
+ Args:
+ path: The path of the file.
+
+ Returns:
+ The path to remove.
+ """
+
+ @abstractmethod
+ def post_rm_file(self, gvfs_path: str):
+ """
+ Called after a file is removed. If this method is invoked, it means
that the rm_file is
+ succeeded. The exception will be thrown to the caller and fail the
rm_file operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file.
+ """
+
+ @abstractmethod
+ def pre_rmdir(self, path: str) -> str:
+ """
+ Called before a directory is removed. The returned path will be used
for the rmdir operation.
+ The implementor can modify the path for customization. The exception
will be thrown to the
+ caller and fail the rmdir operation.
+
+ Args:
+ path: The path of the directory.
+
+ Returns:
+ The path to remove.
+ """
+
+ def post_rmdir(self, gvfs_path: str):
+ """
+ Called after a directory is removed. If this method is invoked, it
means that the rmdir
+ is succeeded. The exception will be thrown to the caller and fail the
rmdir operation.
+
+ Args:
+ gvfs_path: The GVFS path of the directory.
+ """
+
+ @abstractmethod
+ def pre_open(
+ self,
+ path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ **kwargs
+ ) -> str:
+ """
+ Called before a file is opened. The returned path will be used for the
open operation. The
+ implementor can modify the path for customization. The exception will
be thrown to the caller
+ and fail the open operation.
+
+ Args:
+ path: The path of the file.
+ mode: The mode to open the file.
+ block_size: The block size of the file.
+ cache_options: The cache options of the file.
+ compression: The compression of the file.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The path to open.
+ """
+
+ @abstractmethod
+ def post_open(
+ self,
+ gvfs_path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ file: Any,
+ **kwargs
+ ) -> Any:
+ """
+ Called after a file is opened. The implementor can modify the file
object for
+ customization. The exception will be thrown to the caller and fail the
open operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file.
+ mode: The mode to open the file.
+ block_size: The block size of the file.
+ cache_options: The cache options of the file.
+ compression: The compression of the file.
+ file: The file object to open.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The file to open.
+ """
+
+ @abstractmethod
+ def pre_mkdir(self, path: str, create_parents: bool, **kwargs) -> str:
+ """
+ Called before a directory is created. The returned path will be used
for the mkdir operation.
+ The implementor can modify the path for customization. The exception
will be thrown to the
+ caller and fail the mkdir operation.
+
+ Args:
+ path: The path of the directory.
+ create_parents: Whether to create the parent directories.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The path to mkdir.
+ """
+
+ @abstractmethod
+ def post_mkdir(self, gvfs_path: str, create_parents: bool, **kwargs):
+ """
+ Called after a directory is created. If this method is invoked, it
means that the mkdir
+ is succeeded. The exception will be thrown to the caller and fail the
mkdir operation.
+
+ Args:
+ gvfs_path: The GVFS path of the directory.
+ create_parents: Whether to create the parent directories.
+ **kwargs: Additional arguments.
+ """
+
+ @abstractmethod
+ def pre_makedirs(self, path: str, exist_ok: bool) -> str:
+ """
+ Called before a directory is created. The returned path will be used
for the makedirs
+ operation. The implementor can modify the path for customization. The
exception will be
+ thrown to the caller and fail the makedirs operation.
+
+ Args:
+ path: The path of the directory.
+ exist_ok: Whether to exist the directory.
+
+ Returns:
+ The path to makedirs.
+ """
+
+ def post_makedirs(self, gvfs_path: str, exist_ok: bool):
+ """
+ Called after a directory is created. If this method is invoked, it
means that the
+ makedirs is succeeded. The exception will be thrown to the caller and
fail the
+ makedirs operation.
+
+ Args:
+ gvfs_path: The GVFS path of the directory.
+ exist_ok: Whether to exist the directory.
+ """
+
+ @abstractmethod
+ def pre_cat_file(self, path: str, start: int, end: int, **kwargs) -> str:
+ """
+ Called before a file is read. The returned path will be used for the
cat_file operation. The
+ implementor can modify the path for customization. The exception will
be thrown to the caller
+ and fail the cat_file operation.
+
+ Args:
+ path: The path of the file.
+ start: The start position to read.
+ end: The end position to read.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The path to cat_file.
+ """
+
+ @abstractmethod
+ def post_cat_file(
+ self, gvfs_path: str, start: int, end: int, content: Any, **kwargs
+ ) -> Any:
+ """
+ Called after a file is read. The implementor can modify the content
for customization. The
+ exception will be thrown to the caller and fail the cat_file operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file.
+ start: The start position to read.
+ end: The end position to read.
+ content: The content of the file.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The content to cat_file.
+ """
+
+ @abstractmethod
+ def pre_get_file(
+ self, rpath: str, lpath: str, callback: Callback, outfile: str,
**kwargs
+ ) -> str:
+ """
+ Called before a file is downloaded. The returned path will be used for
the get_file operation.
+ The implementor can modify the path for customization. The exception
will be thrown to the caller
+ and fail the get_file operation.
+
+ Args:
+ rpath: The remote path of the file.
+ lpath: The local path of the file.
+ callback: The callback to call.
+ outfile: The output file.
+ **kwargs: Additional arguments.
+
+ Returns:
+ The path to get_file.
+ """
+
+ def post_get_file(self, gvfs_path: str, local_path: str, outfile: str,
**kwargs):
+ """
+ Called after a file is downloaded. If this method is invoked, it means
that the get_file
+ is succeeded. The exception will be thrown to the caller and fail the
get_file operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file.
+ local_path: The local path of the file.
+ outfile: The output file.
+ **kwargs: Additional arguments
+ """
+
+ @abstractmethod
+ def pre_created(self, path: str) -> str:
+ """
+ Called before the creation time of a file is retrieved. The returned
path will be used for
+ the created operation. The implementor can modify the path for
customization. The exception
+ will be thrown to the caller and fail the created operation.
+
+ Args:
+ path: The path of the file.
+
+ Returns:
+ The path to get the creation time.
+ """
+
+ @abstractmethod
+ def post_created(self, gvfs_path: str, created: Any) -> Any:
+ """
+ Called after the creation time of a file is retrieved. The implementor
can modify the created
+ time for customization. The exception will be thrown to the caller and
fail the created operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file.
+ created: The creation time of the file.
+
+ Returns:
+ The creation time to get.
+ """
+
+ @abstractmethod
+ def pre_modified(self, path: str) -> str:
+ """
+ Called before the modification time of a file is retrieved. The
returned path will be used for
+ the modified operation. The implementor can modify the path for
customization. The exception
+ will be thrown to the caller and fail the modified operation.
+
+ Args:
+ path: The path of the file.
+
+ Returns:
+ The path to get the modification time.
+ """
+
+ @abstractmethod
+ def post_modified(self, gvfs_path: str, modified: Any) -> Any:
+ """
+ Called after the modification time of a file is retrieved. The
implementor can modify the modified
+ time for customization. The exception will be thrown to the caller and
fail the modified operation.
+
+ Args:
+ gvfs_path: The GVFS path of the file.
+ modified: The modification time of the file.
+
+ Returns:
+ The modification time to get.
+ """
+
+
+class NoOpHook(GravitinoVirtualFileSystemHook):
+ """
+ A no-op hook that does nothing.
+ """
+
+ def initialize(self, config: Optional[Dict[str, str]]):
+ pass
+
+ def pre_ls(self, path: str, detail: bool, **kwargs) -> str:
+ return path
+
+ def post_ls(
+ self, detail: bool, entries: List[Union[str, Dict[str, Any]]], **kwargs
+ ) -> List[Union[str, Dict[str, Any]]]:
+ return entries
+
+ def pre_info(self, path: str, **kwargs) -> str:
+ return path
+
+ def post_info(self, info: Dict[str, Any], **kwargs) -> Dict[str, Any]:
+ return info
+
+ def pre_exists(self, path: str, **kwargs) -> str:
+ return path
+
+ def post_exists(self, gvfs_path: str, exists: bool, **kwargs) -> bool:
+ return exists
+
+ def pre_cp_file(self, src: str, dst: str, **kwargs) -> Tuple[str, str]:
+ return src, dst
+
+ def post_cp_file(self, src_gvfs_path: str, dst_gvfs_path: str, **kwargs):
+ pass
+
+ def pre_mv(
+ self, src: str, dst: str, recursive: bool, maxdepth: int, **kwargs
+ ) -> Tuple[str, str]:
+ return src, dst
+
+ def post_mv(
+ self,
+ src_gvfs_path: str,
+ dst_gvfs_path: str,
+ recursive: bool,
+ maxdepth: int,
+ **kwargs
+ ):
+ pass
+
+ def pre_rm(self, path: str, recursive: bool, maxdepth: int) -> str:
+ return path
+
+ def post_rm(self, gvfs_path: str, recursive: bool, maxdepth: int):
+ pass
+
+ def pre_rm_file(self, path: str) -> str:
+ return path
+
+ def post_rm_file(self, gvfs_path: str):
+ pass
+
+ def pre_rmdir(self, path: str) -> str:
+ return path
+
+ def post_rmdir(self, gvfs_path: str):
+ pass
+
+ def pre_open(
+ self,
+ path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ **kwargs
+ ) -> str:
+ return path
+
+ def post_open(
+ self,
+ gvfs_path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ file: Any,
+ **kwargs
+ ) -> Any:
+ return file
+
+ def pre_mkdir(self, path: str, create_parents: bool, **kwargs) -> str:
+ return path
+
+ def post_mkdir(self, gvfs_path: str, create_parents: bool, **kwargs):
+ pass
+
+ def pre_makedirs(self, path: str, exist_ok: bool) -> str:
+ return path
+
+ def post_makedirs(self, gvfs_path: str, exist_ok: bool):
+ pass
+
+ def pre_cat_file(self, path: str, start: int, end: int, **kwargs) -> str:
+ return path
+
+ def post_cat_file(
+ self, gvfs_path: str, start: int, end: int, content: Any, **kwargs
+ ) -> Any:
+ return content
+
+ def pre_get_file(
+ self, rpath: str, lpath: str, callback: Callback, outfile: str,
**kwargs
+ ) -> str:
+ return rpath
+
+ def post_get_file(self, gvfs_path: str, local_path: str, outfile: str,
**kwargs):
+ pass
+
+ def pre_modified(self, path: str) -> str:
+ return path
+
+ def post_modified(self, gvfs_path: str, modified: Any) -> Any:
+ return modified
+
+ def pre_created(self, path: str) -> str:
+ return path
+
+ def post_created(self, gvfs_path: str, created: Any) -> Any:
+ return created
+
+
+DEFAULT_HOOK = NoOpHook()
diff --git a/clients/client-python/tests/unittests/test_gvfs_with_hook.py
b/clients/client-python/tests/unittests/test_gvfs_with_hook.py
new file mode 100644
index 0000000000..44c721e738
--- /dev/null
+++ b/clients/client-python/tests/unittests/test_gvfs_with_hook.py
@@ -0,0 +1,386 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+from typing import Optional, Dict, List, Union, Any
+from unittest import mock
+from unittest.mock import patch
+
+from fsspec import Callback
+from fsspec.implementations.local import LocalFileSystem
+
+from gravitino.filesystem import gvfs
+from gravitino.filesystem.gvfs_config import GVFSConfig
+from gravitino.filesystem.gvfs_hook import GravitinoVirtualFileSystemHook
+from tests.unittests import mock_base
+from tests.unittests.test_gvfs_with_local import generate_unique_random_string
+
+
+class MockGVFSHook(GravitinoVirtualFileSystemHook):
+
+ # pylint: disable=too-many-instance-attributes
+
+ def __init__(self):
+ self.ls_called = False
+ self.info_called = False
+ self.exists_called = False
+ self.cp_file_called = False
+ self.mv_called = False
+ self.rm_called = False
+ self.rm_file_called = False
+ self.rmdir_called = False
+ self.open_called = False
+ self.mkdir_called = False
+ self.makedirs_called = False
+ self.cat_file_called = False
+ self.get_file_called = False
+ self.created_called = False
+ self.modified_called = False
+ self.post_ls_called = False
+ self.post_info_called = False
+ self.post_exists_called = False
+ self.post_cp_file_called = False
+ self.post_mv_called = False
+ self.post_rm_called = False
+ self.post_rm_file_called = False
+ self.post_rmdir_called = False
+ self.post_open_called = False
+ self.post_mkdir_called = False
+ self.post_makedirs_called = False
+ self.post_cat_file_called = False
+ self.post_get_file_called = False
+ self.post_created_called = False
+ self.post_modified_called = False
+
+ def initialize(self, config: Optional[Dict[str, str]]):
+ pass
+
+ def pre_ls(self, path: str, detail: bool, **kwargs):
+ self.ls_called = True
+ return path
+
+ def post_ls(
+ self, detail: bool, entries: List[Union[str, Dict[str, Any]]], **kwargs
+ ) -> List[Union[str, Dict[str, Any]]]:
+ self.post_ls_called = True
+ return entries
+
+ def pre_info(self, path: str, **kwargs):
+ self.info_called = True
+ return path
+
+ def post_info(self, info: Dict[str, Any], **kwargs) -> Dict[str, Any]:
+ self.post_info_called = True
+ return info
+
+ def pre_exists(self, path: str, **kwargs):
+ self.exists_called = True
+ return path
+
+ def post_exists(self, gvfs_path: str, exists: bool, **kwargs) -> bool:
+ self.post_exists_called = True
+ return exists
+
+ def pre_cp_file(self, src: str, dst: str, **kwargs):
+ self.cp_file_called = True
+ return src, dst
+
+ def post_cp_file(self, src_gvfs_path: str, dst_gvfs_path: str, **kwargs):
+ self.post_cp_file_called = True
+
+ def pre_mv(self, src: str, dst: str, recursive: bool, maxdepth: int,
**kwargs):
+ self.mv_called = True
+ return src, dst
+
+ def post_mv(
+ self,
+ src_gvfs_path: str,
+ dst_gvfs_path: str,
+ recursive: bool,
+ maxdepth: int,
+ **kwargs,
+ ):
+ self.post_mv_called = True
+
+ def pre_rm(self, path: str, recursive: bool, maxdepth: int):
+ self.rm_called = True
+ return path
+
+ def post_rm(self, gvfs_path: str, recursive: bool, maxdepth: int):
+ self.post_rm_called = True
+
+ def pre_rm_file(self, path: str):
+ self.rm_file_called = True
+ return path
+
+ def post_rm_file(self, gvfs_path: str):
+ self.post_rm_file_called = True
+
+ def pre_rmdir(self, path: str):
+ self.rmdir_called = True
+ return path
+
+ def post_rmdir(self, gvfs_path: str):
+ self.post_rmdir_called = True
+
+ def pre_open(
+ self,
+ path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ **kwargs,
+ ):
+ self.open_called = True
+ return path
+
+ def post_open(
+ self,
+ gvfs_path: str,
+ mode: str,
+ block_size: int,
+ cache_options: dict,
+ compression: str,
+ file: Any,
+ **kwargs,
+ ) -> Any:
+ self.post_open_called = True
+ return file
+
+ def pre_mkdir(self, path: str, create_parents: bool, **kwargs):
+ self.mkdir_called = True
+ return path
+
+ def post_mkdir(self, gvfs_path: str, create_parents: bool, **kwargs):
+ self.post_mkdir_called = True
+
+ def pre_makedirs(self, path: str, exist_ok: bool):
+ self.makedirs_called = True
+ return path
+
+ def post_makedirs(self, gvfs_path: str, exist_ok: bool):
+ self.post_makedirs_called = True
+
+ def pre_cat_file(self, path: str, start: int, end: int, **kwargs):
+ self.cat_file_called = True
+ return path
+
+ def post_cat_file(
+ self, gvfs_path: str, start: int, end: int, content: Any, **kwargs
+ ) -> Any:
+ self.post_cat_file_called = True
+ return content
+
+ def pre_get_file(
+ self, rpath: str, lpath: str, callback: Callback, outfile: str,
**kwargs
+ ):
+ self.get_file_called = True
+ return rpath
+
+ def post_get_file(self, gvfs_path: str, local_path: str, outfile: str,
**kwargs):
+ self.post_get_file_called = True
+
+ def pre_modified(self, path: str) -> str:
+ self.modified_called = True
+ return path
+
+ def post_modified(self, gvfs_path: str, modified: Any) -> Any:
+ self.post_modified_called = True
+ return modified
+
+ def pre_created(self, path: str) -> str:
+ self.created_called = True
+ return path
+
+ def post_created(self, gvfs_path: str, created: Any) -> Any:
+ self.post_created_called = True
+ return created
+
+
+@patch(
+ "gravitino.client.generic_fileset.GenericFileset.get_credentials",
+ return_value=[],
+)
+@mock_base.mock_data
+class TestGVFSHook(unittest.TestCase):
+ _local_base_dir_path: str = "file:/tmp/fileset"
+ _fileset_dir: str = (
+
f"{_local_base_dir_path}/{generate_unique_random_string(10)}/fileset_catalog_with_hook/tmp"
+ )
+
+ def setUp(self):
+ local_fs = LocalFileSystem()
+ if not local_fs.exists(self._fileset_dir):
+ local_fs.mkdir(self._fileset_dir)
+
+ def tearDown(self):
+ local_fs = LocalFileSystem()
+ if local_fs.exists(self._local_base_dir_path):
+ local_fs.rm(self._local_base_dir_path, recursive=True)
+
+ def test_hook(self, *mock_method):
+ # pylint: disable-msg=too-many-locals
+ # pylint: disable=too-many-statements
+ fileset_storage_location = f"{self._fileset_dir}/test_location"
+ fileset_virtual_path = "fileset/fileset_catalog/tmp/test_location"
+ actual_path = fileset_storage_location
+
+ with patch.multiple(
+ "gravitino.client.fileset_catalog.FilesetCatalog",
+ load_fileset=mock.MagicMock(
+ return_value=mock_base.mock_load_fileset(
+ "fileset", fileset_storage_location
+ )
+ ),
+ get_file_location=mock.MagicMock(return_value=actual_path),
+ ):
+ local_fs = LocalFileSystem()
+ local_fs.mkdir(fileset_storage_location)
+
+ fs = gvfs.GravitinoVirtualFileSystem(
+ server_uri="http://localhost:9090",
+ metalake_name="metalake_demo",
+ skip_instance_catch=True,
+ options={
+ GVFSConfig.GVFS_FILESYSTEM_HOOK:
"tests.unittests.test_gvfs_with_hook.MockGVFSHook"
+ },
+ )
+
+ # Test pre_exists and post_exists hook
+ fs.exists(fileset_virtual_path)
+ self.assertTrue(fs.hook.exists_called)
+ self.assertTrue(fs.hook.post_exists_called)
+
+ # Test pre_ls and post_ls hook
+ fs.ls(fileset_virtual_path)
+ self.assertTrue(fs.hook.ls_called)
+ self.assertTrue(fs.hook.post_ls_called)
+
+ # Test pre_info and post_info hook
+ fs.info(fileset_virtual_path)
+ self.assertTrue(fs.hook.info_called)
+ self.assertTrue(fs.hook.post_info_called)
+
+ # Test open.
+ src_file_name = "src_test_file"
+ with patch(
+
"gravitino.client.fileset_catalog.FilesetCatalog.get_file_location",
+ return_value=f"{fileset_storage_location}/{src_file_name}",
+ ):
+ test_file_path = f"{fileset_virtual_path}/{src_file_name}"
+
+ # Test pre_open and post_open hook
+ with fs.open(test_file_path, "wb") as f:
+ f.write(b"test")
+ self.assertTrue(fs.hook.open_called)
+ self.assertTrue(fs.hook.post_open_called)
+
+ fs.modified(test_file_path)
+ self.assertTrue(fs.hook.modified_called)
+ self.assertTrue(fs.hook.post_modified_called)
+
+ fs.created(test_file_path)
+ self.assertTrue(fs.hook.created_called)
+ self.assertTrue(fs.hook.post_created_called)
+
+ # Test pre_cat_file and post_cat_file hook
+ fs.cat_file(test_file_path)
+ self.assertTrue(fs.hook.cat_file_called)
+ self.assertTrue(fs.hook.post_cat_file_called)
+
+ # Test pre_get_file and post_get_file hook
+ local_file = (
+
f"{self._local_base_dir_path}/{generate_unique_random_string(10)}"
+ )
+ fs.get_file(test_file_path, local_file)
+ self.assertTrue(fs.hook.get_file_called)
+ self.assertTrue(fs.hook.post_get_file_called)
+
+ # Test cp.
+ dst_file_name = "dst_test_file"
+ with patch(
+
"gravitino.client.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[
+ f"{fileset_storage_location}/{src_file_name}",
+ f"{fileset_storage_location}/{dst_file_name}",
+ ],
+ ):
+ src_file_path = f"{fileset_virtual_path}/{src_file_name}"
+ dst_file_path = f"{fileset_virtual_path}/{dst_file_name}"
+
+ # Test pre_cp_file and post_cp_file hook
+ fs.cp_file(src_file_path, dst_file_path)
+ self.assertTrue(fs.hook.cp_file_called)
+ self.assertTrue(fs.hook.post_cp_file_called)
+
+ # Test mv.
+ dst_file_name_1 = "dst_test_file_1"
+ with patch(
+
"gravitino.client.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[
+ f"{fileset_storage_location}/{dst_file_name}",
+ f"{fileset_storage_location}/{dst_file_name_1}",
+ ],
+ ):
+ dst_file_path_1 = f"{fileset_virtual_path}/{dst_file_name_1}"
+
+ # Test pre_mv and post_mv hook
+ fs.mv(dst_file_path, dst_file_path_1)
+ self.assertTrue(fs.hook.mv_called)
+ self.assertTrue(fs.hook.post_mv_called)
+
+ # Test rm.
+ with patch(
+
"gravitino.client.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[
+ f"{fileset_storage_location}/{src_file_name}",
+ f"{fileset_storage_location}/{dst_file_name_1}",
+ ],
+ ):
+ # Test pre_rm and post_rm hook
+ fs.rm(src_file_path)
+ self.assertTrue(fs.hook.rm_called)
+ self.assertTrue(fs.hook.post_rm_called)
+
+ # Test pre_rm_file and post_rm_file hook
+ fs.rm_file(dst_file_path_1)
+ self.assertTrue(fs.hook.rm_file_called)
+ self.assertTrue(fs.hook.post_rm_file_called)
+
+ # Test mkdir, makedirs, rmdir.
+ with patch(
+
"gravitino.client.fileset_catalog.FilesetCatalog.get_file_location",
+ side_effect=[
+ f"{fileset_storage_location}/test_dir",
+ f"{fileset_storage_location}/test_dir_1",
+ f"{fileset_storage_location}/test_dir",
+ ],
+ ):
+ # Test pre_mkdir and post_mkdir hook
+ fs.mkdir(f"{fileset_virtual_path}/test_dir")
+ self.assertTrue(fs.hook.mkdir_called)
+ self.assertTrue(fs.hook.post_mkdir_called)
+
+ # Test pre_makedirs and post_makedirs hook
+ fs.makedirs(f"{fileset_virtual_path}/test_dir_1")
+ self.assertTrue(fs.hook.makedirs_called)
+ self.assertTrue(fs.hook.post_makedirs_called)
+
+ # Test pre_rmdir and post_rmdir hook
+ fs.rmdir(f"{fileset_virtual_path}/test_dir")
+ self.assertTrue(fs.hook.rmdir_called)
+ self.assertTrue(fs.hook.post_rmdir_called)
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
index 673f3e50d2..33ef650d2e 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java
@@ -18,10 +18,13 @@
*/
package org.apache.gravitino.filesystem.hadoop;
+import static
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.getConfigMap;
+
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.exceptions.CatalogNotInUseException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
@@ -52,6 +55,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
private Path workingDirectory;
private URI uri;
+ private GravitinoVirtualFileSystemHook hook;
private BaseGVFSOperations operations;
@Override
@@ -63,6 +67,19 @@ public class GravitinoVirtualFileSystem extends FileSystem {
name.getScheme(),
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME));
}
+ String hookClassName =
+ configuration.get(
+ GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_HOOK_CLASS,
+
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_HOOK_CLASS_DEFAULT);
+ try {
+ Class<? extends GravitinoVirtualFileSystemHook> clz =
+ (Class<? extends GravitinoVirtualFileSystemHook>)
Class.forName(hookClassName);
+ this.hook = clz.getDeclaredConstructor().newInstance();
+ hook.initialize(getConfigMap(configuration));
+ } catch (Exception e) {
+ throw new GravitinoRuntimeException(e, "Cannot create hook instance:
%s", hookClassName);
+ }
+
String operationsClassName =
configuration.get(
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_OPERATIONS_CLASS,
@@ -88,6 +105,11 @@ public class GravitinoVirtualFileSystem extends FileSystem {
super.initialize(uri, getConf());
}
+ @VisibleForTesting
+ GravitinoVirtualFileSystemHook getHook() {
+ return hook;
+ }
+
@VisibleForTesting
BaseGVFSOperations getOperations() {
return operations;
@@ -105,23 +127,29 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
@Override
public synchronized void setWorkingDirectory(Path newDir) {
+ Path newPath = hook.preSetWorkingDirectory(newDir);
try {
runWithExceptionTranslation(
() -> {
- operations.setWorkingDirectory(newDir);
+ operations.setWorkingDirectory(newPath);
return null;
},
FilesetDataOperation.SET_WORKING_DIR);
} catch (FilesetPathNotFoundException e) {
throw new RuntimeException(e);
}
- this.workingDirectory = newDir;
+ this.workingDirectory = newPath;
+ hook.postSetWorkingDirectory(newPath);
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
- return runWithExceptionTranslation(
- () -> operations.open(path, bufferSize), FilesetDataOperation.OPEN);
+ Path newPath = hook.preOpen(path, bufferSize);
+ return hook.postOpen(
+ newPath,
+ bufferSize,
+ runWithExceptionTranslation(
+ () -> operations.open(newPath, bufferSize),
FilesetDataOperation.OPEN));
}
@Override
@@ -134,9 +162,17 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
long blockSize,
Progressable progress)
throws IOException {
+ Path newPath = hook.preCreate(path, permission, overwrite, bufferSize,
replication, blockSize);
try {
- return operations.create(
- path, permission, overwrite, bufferSize, replication, blockSize,
progress);
+ return hook.postCreate(
+ newPath,
+ permission,
+ overwrite,
+ bufferSize,
+ replication,
+ blockSize,
+ operations.create(
+ newPath, permission, overwrite, bufferSize, replication,
blockSize, progress));
} catch (NoSuchCatalogException
| CatalogNotInUseException
| NoSuchFilesetException
@@ -155,21 +191,33 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
@Override
public FSDataOutputStream append(Path path, int bufferSize, Progressable
progress)
throws IOException {
- return runWithExceptionTranslation(
- () -> operations.append(path, bufferSize, progress),
FilesetDataOperation.APPEND);
+ Path newPath = hook.preAppend(path, bufferSize);
+ return hook.postAppend(
+ newPath,
+ bufferSize,
+ runWithExceptionTranslation(
+ () -> operations.append(newPath, bufferSize, progress),
FilesetDataOperation.APPEND));
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
- return runWithExceptionTranslation(
- () -> operations.rename(src, dst), FilesetDataOperation.RENAME);
+ Pair<Path, Path> pair = hook.preRename(src, dst);
+ return hook.postRename(
+ pair.getLeft(),
+ pair.getRight(),
+ runWithExceptionTranslation(
+ () -> operations.rename(pair.getLeft(), pair.getRight()),
FilesetDataOperation.RENAME));
}
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
+ Path newPath = hook.preDelete(path, recursive);
try {
- return runWithExceptionTranslation(
- () -> operations.delete(path, recursive),
FilesetDataOperation.DELETE);
+ return hook.postDelete(
+ newPath,
+ recursive,
+ runWithExceptionTranslation(
+ () -> operations.delete(newPath, recursive),
FilesetDataOperation.DELETE));
} catch (FilesetPathNotFoundException e) {
return false;
}
@@ -177,20 +225,25 @@ public class GravitinoVirtualFileSystem extends
FileSystem {
@Override
public FileStatus getFileStatus(Path path) throws IOException {
- return runWithExceptionTranslation(
- () -> operations.getFileStatus(path),
FilesetDataOperation.GET_FILE_STATUS);
+ Path newPath = hook.preGetFileStatus(path);
+ return hook.postGetFileStatus(
+ runWithExceptionTranslation(
+ () -> operations.getFileStatus(newPath),
FilesetDataOperation.GET_FILE_STATUS));
}
@Override
public FileStatus[] listStatus(Path path) throws IOException {
- return runWithExceptionTranslation(
- () -> operations.listStatus(path), FilesetDataOperation.LIST_STATUS);
+ Path newPath = hook.preListStatus(path);
+ return hook.postListStatus(
+ runWithExceptionTranslation(
+ () -> operations.listStatus(newPath),
FilesetDataOperation.LIST_STATUS));
}
@Override
public boolean mkdirs(Path path, FsPermission permission) throws IOException
{
+ Path newPath = hook.preMkdirs(path, permission);
try {
- return operations.mkdirs(path, permission);
+ return hook.postMkdirs(newPath, permission, operations.mkdirs(newPath,
permission));
} catch (NoSuchCatalogException
| CatalogNotInUseException
| NoSuchFilesetException
@@ -198,7 +251,7 @@ public class GravitinoVirtualFileSystem extends FileSystem {
| FilesetPathNotFoundException e) {
String message =
"Fileset is not found for path: "
- + path
+ + newPath
+ " for operation MKDIRS. "
+ "This may be caused by fileset related metadata not found or
not in use in "
+ "Gravitino, please check the fileset metadata in Gravitino.";
@@ -208,9 +261,13 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
@Override
public short getDefaultReplication(Path f) {
+ Path newPath = hook.preGetDefaultReplication(f);
try {
- return runWithExceptionTranslation(
- () -> operations.getDefaultReplication(f),
FilesetDataOperation.GET_DEFAULT_REPLICATION);
+ return hook.postGetDefaultReplication(
+ newPath,
+ runWithExceptionTranslation(
+ () -> operations.getDefaultReplication(newPath),
+ FilesetDataOperation.GET_DEFAULT_REPLICATION));
} catch (IOException e) {
return 1;
}
@@ -218,9 +275,13 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
@Override
public long getDefaultBlockSize(Path f) {
+ Path newPath = hook.preGetDefaultBlockSize(f);
try {
- return runWithExceptionTranslation(
- () -> operations.getDefaultBlockSize(f),
FilesetDataOperation.GET_DEFAULT_BLOCK_SIZE);
+ return hook.postGetDefaultBlockSize(
+ newPath,
+ runWithExceptionTranslation(
+ () -> operations.getDefaultBlockSize(newPath),
+ FilesetDataOperation.GET_DEFAULT_BLOCK_SIZE));
} catch (IOException e) {
return operations.defaultBlockSize();
}
@@ -233,6 +294,12 @@ public class GravitinoVirtualFileSystem extends FileSystem
{
@Override
public synchronized void close() throws IOException {
+ try {
+ hook.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close hook: {}", hook.getClass().getName(), e);
+ }
+
try {
operations.close();
} catch (IOException e) {
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index c73b2cb477..c79cd6e890 100644
---
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -129,5 +129,11 @@ public class GravitinoVirtualFileSystemConfiguration {
/** The default block size of the GVFS file. */
public static final long FS_GRAVITINO_BLOCK_SIZE_DEFAULT = 32 * 1024 * 1024;
+ /** The configuration key for the Gravitino hook class. */
+ public static final String FS_GRAVITINO_HOOK_CLASS =
"fs.gravitino.hook.class";
+
+ /** The default value for the Gravitino hook class. */
+ public static final String FS_GRAVITINO_HOOK_CLASS_DEFAULT =
NoOpHook.class.getCanonicalName();
+
private GravitinoVirtualFileSystemConfiguration() {}
}
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemHook.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemHook.java
new file mode 100644
index 0000000000..27e8f7684e
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemHook.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * A hook interface for injecting custom logic before the Gravitino Virtual
File System operations.
+ * The implementor should handle the exception, if any, in the pre-hook
method, otherwise the
+ * exception will be thrown to the caller and fail the operation. Besides, the
implemented pre-hook
+ * method should be lightweight and fast, otherwise it will slow down the
operation. The pre-hook
+ * method may be called more than once and in parallel, so the implementor
should handle the
+ * concurrent and idempotent issues if required.
+ */
+public interface GravitinoVirtualFileSystemHook extends Closeable {
+
+ /**
+ * Initialize the hook with the configuration. This method will be called in
the GVFS initialize
+ * method, and the configuration will be passed from the GVFS configuration.
The implementor can
+ * initialize the hook with the configuration. The exception will be thrown
to the caller and fail
+ * the GVFS initialization.
+ *
+ * @param config The configuration.
+ */
+ void initialize(Map<String, String> config);
+
+ /**
+ * Pre-hook for setWorkingDirectory operation. This method will be called
before the
+ * setWorkingDirectory operation. The returned path will be used for the
setWorkingDirectory
+ * operation. The implementor can modify the path for customization. The
exception will be thrown
+ * to the caller and fail the setWorkingDirectory operation.
+ *
+ * @param path The path to set working directory.
+ * @return The path to set working directory.
+ */
+ Path preSetWorkingDirectory(Path path);
+
+ /**
+ * Post-hook for setWorkingDirectory operation. This method will be called
after the
+ * setWorkingDirectory operation. The fileset path will be passed to the
post-hook method. The
+ * implementor can do some post-processing after the setWorkingDirectory
operation. The exception
+ * will be thrown to the caller and fail the setWorkingDirectory operation.
+ *
+ * @param gvfsPath the GVFS path to be set as the working directory
+ */
+ void postSetWorkingDirectory(Path gvfsPath);
+
+ /**
+ * Pre-hook for open operation. This method will be called before the open
operation. The returned
+ * path will be used for the open operation. The implementor can modify the
path for
+ * customization. The exception will be thrown to the caller and fail the
open operation.
+ *
+ * @param path The path to open.
+ * @param bufferSize The buffer size.
+ * @return The path to open.
+ */
+ Path preOpen(Path path, int bufferSize);
+
+ /**
+ * Post-hook for open operation. This method will be called after the open
operation. The input
+ * stream will be passed to the post-hook method. The implementor can do
some post-processing
+ * after the open operation. The exception will be thrown to the caller and
fail the open
+ * operation.
+ *
+ * @param gvfsPath The GVFS path to open.
+ * @param bufferSize The buffer size to open the file.
+ * @param inputStream The input stream.
+ * @return The input stream.
+ */
+ FSDataInputStream postOpen(Path gvfsPath, int bufferSize, FSDataInputStream
inputStream);
+
+ /**
+ * Pre-hook for create operation. This method will be called before the
create operation. The
+ * returned path will be used for the create operation. The implementor can
modify the path for
+ * customization. The exception will be thrown to the caller and fail the
create operation.
+ *
+ * @param path The path to create.
+ * @param permission The permission.
+ * @param overwrite Whether to overwrite the file.
+ * @param bufferSize The buffer size.
+ * @param replication The replication factor.
+ * @param blockSize The block size.
+ * @return The path to create.
+ */
+ Path preCreate(
+ Path path,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize);
+
+ /**
+ * Post-hook for create operation. This method will be called after the
create operation. The
+ * output stream will be passed to the post-hook method. The implementor can
do some
+ * post-processing after the create operation. The exception will be thrown
to the caller and fail
+ * the create operation.
+ *
+ * @param gvfsPath The GVFS path to create.
+ * @param permission The permission.
+ * @param overwrite Whether to overwrite the file.
+ * @param bufferSize The buffer size.
+ * @param replication The replication factor.
+ * @param blockSize The block size.
+ * @param outputStream The output stream.
+ * @return The output stream.
+ */
+ FSDataOutputStream postCreate(
+ Path gvfsPath,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ FSDataOutputStream outputStream);
+
+ /**
+ * Pre-hook for append operation. This method will be called before the
append operation. The
+ * returned path will be used for the append operation. The implementor can
modify the path for
+ * customization. The exception will be thrown to the caller and fail the
append operation.
+ *
+ * @param path The path to append.
+ * @param bufferSize The buffer size.
+ * @return The path to append.
+ */
+ Path preAppend(Path path, int bufferSize);
+
+ /**
+ * Post-hook for append operation. This method will be called after the
append operation. The
+ * output stream will be passed to the post-hook method. The implementor can
do some
+ * post-processing after the append operation. The exception will be thrown
to the caller and fail
+ * the append operation.
+ *
+ * @param gvfsPath The GVFS path to append.
+ * @param bufferSize The buffer size.
+ * @param outputStream The output stream.
+ * @return The output stream.
+ */
+ FSDataOutputStream postAppend(Path gvfsPath, int bufferSize,
FSDataOutputStream outputStream);
+
+ /**
+ * Pre-hook for rename operation. This method will be called before the
rename operation. The
+ * source path and destination path will be passed to the pre-hook method.
The implementor can can
+ * modify the source path and destination path for customization. The
exception will be thrown to
+ * the caller and fail the rename operation.
+ *
+ * @param src The source path.
+ * @param dst The destination path.
+ * @return The pair of source path and destination path.
+ */
+ Pair<Path, Path> preRename(Path src, Path dst);
+
+ /**
+ * Post-hook for rename operation. This method will be called after the
rename operation. The
+ * implementor can do some post-processing after the rename operation. The
exception will be
+ * thrown to the caller and fail the rename operation.
+ *
+ * @param srcGVFSPath The source GVFS path.
+ * @param dstGVFSPath The destination GVFS path.
+ * @param success Whether the rename operation is successful.
+ * @return Whether the rename operation is successful.
+ */
+ boolean postRename(Path srcGVFSPath, Path dstGVFSPath, boolean success);
+
+ /**
+ * Pre-hook for delete operation. This method will be called before the
delete operation. The
+ * returned path will be used for the delete operation. The implementor can
modify the path for
+ * customization. The exception will be thrown to the caller and fail the
delete operation.
+ *
+ * @param path The path to delete.
+ * @param recursive Whether to delete recursively.
+ * @return The path to delete.
+ */
+ Path preDelete(Path path, boolean recursive);
+
+ /**
+ * Post-hook for delete operation. This method will be called after the
delete operation. The
+ * implementor can do some post-processing after the delete operation. The
exception will be
+ * thrown to the caller and fail the delete operation.
+ *
+ * @param gvfsPath The GVFS path to delete.
+ * @param recursive Whether to delete recursively.
+ * @param success Whether the delete operation is successful.
+ * @return Whether the delete operation is successful.
+ */
+ boolean postDelete(Path gvfsPath, boolean recursive, boolean success);
+
+ /**
+ * Pre-hook for getFileStatus operation. This method will be called before
the getFileStatus. The
+ * returned path will be used for the getFileStatus operation. The
implementor can modify the path
+ * for customization. The exception will be thrown to the caller and fail
the getFileStatus
+ * operation.
+ *
+ * @param path The path to get file status.
+ * @return The path to get file status.
+ */
+ Path preGetFileStatus(Path path);
+
+ /**
+ * Post-hook for getFileStatus operation. This method will be called after
the getFileStatus
+ * operation. The file status will be passed to the post-hook method. The
implementor can do some
+ * post-processing after the getFileStatus operation. The exception will be
thrown to the caller
+ * and fail the getFileStatus operation.
+ *
+ * @param fileStatus The file status.
+ * @return The file status.
+ */
+ FileStatus postGetFileStatus(FileStatus fileStatus);
+
+ /**
+ * Pre-hook for listStatus operation.
+ *
+ * @param path The path to list status.
+ * @return The path to list status.
+ */
+ Path preListStatus(Path path);
+
+ /**
+ * Post-hook for listStatus operation. This method will be called after the
listStatus operation.
+ * The file statuses will be passed to the post-hook method. The implementor
can do some
+ * post-processing after the listStatus operation. The exception will be
thrown to the caller and
+ * fail the listStatus operation.
+ *
+ * @param fileStatuses The file statuses.
+ * @return The file statuses.
+ */
+ FileStatus[] postListStatus(FileStatus[] fileStatuses);
+
+ /**
+ * Pre-hook for mkdirs operation. This method will be called before the
mkdirs operation. The
+ * returned path will be used for the mkdirs operation. The implementor can
modify the path for
+ * customization. The exception will be thrown to the caller and fail the
mkdirs operation.
+ *
+ * @param path The path to mkdirs.
+ * @param permission The permission.
+ * @return The path to mkdirs.
+ */
+ Path preMkdirs(Path path, FsPermission permission);
+
+ /**
+ * Post-hook for mkdirs operation. This method will be called after the
mkdirs operation. The
+ * implementor can do some post-processing after the mkdirs operation. The
exception will be
+ * thrown to the caller and fail the mkdirs operation.
+ *
+ * @param gvfsPath The GVFS path to mkdirs.
+ * @param permission The permission.
+ * @param success Whether the mkdirs operation is successful.
+ * @return Whether the mkdirs operation is successful.
+ */
+ boolean postMkdirs(Path gvfsPath, FsPermission permission, boolean success);
+
+ /**
+ * Pre-hook for getDefaultReplication operation. This method will be called
before the
+ * getDefaultReplication operation. The returned path will be used for the
getDefaultReplication
+ * operation. The implementor can modify the path for customization. The
exception will be thrown
+ * to the caller and fail the getDefaultReplication operation.
+ *
+ * @param path The path to get default replication.
+ * @return The path to get default replication.
+ */
+ Path preGetDefaultReplication(Path path);
+
+ /**
+ * Post-hook for getDefaultReplication operation. This method will be called
after the
+ * getDefaultReplication operation. The default replication will be passed
to the post-hook
+ * method. The implementor can do some post-processing after the
getDefaultReplication operation.
+ * The exception will be thrown to the caller and fail the
getDefaultReplication operation.
+ *
+ * @param gvfsPath The GVFS path to get default replication.
+ * @param replication The default replication.
+ * @return The default replication.
+ */
+ short postGetDefaultReplication(Path gvfsPath, short replication);
+
+ /**
+ * Pre-hook for getDefaultBlockSize operation. This method will be called
before the
+ * getDefaultBlockSize operation. The returned path will be used for the
getDefaultBlockSize
+ * operation. The implementor can modify the path for customization. The
exception will be thrown
+ * to the caller and fail the getDefaultBlockSize operation.
+ *
+ * @param path The path to get default block size.
+ * @return The path to get default block size.
+ */
+ Path preGetDefaultBlockSize(Path path);
+
+ /**
+ * Post-hook for getDefaultBlockSize operation. This method will be called
after the
+ * getDefaultBlockSize operation. The default block size will be passed to
the post-hook method.
+ * The implementor can do some post-processing after the getDefaultBlockSize
operation. The
+ * exception will be thrown to the caller and fail the getDefaultBlockSize
operation.
+ *
+ * @param gvfsPath The GVFS path to get default block size.
+ * @param blockSize The default block size.
+ * @return The default block size.
+ */
+ long postGetDefaultBlockSize(Path gvfsPath, long blockSize);
+}
diff --git
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/NoOpHook.java
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/NoOpHook.java
new file mode 100644
index 0000000000..d1fff5478b
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/NoOpHook.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+/**
+ * The default implementation of {@link GravitinoVirtualFileSystemHook}. This
class does nothing.
+ */
+public class NoOpHook implements GravitinoVirtualFileSystemHook {
+
+ @Override
+ public void initialize(Map<String, String> config) {}
+
+ @Override
+ public Path preSetWorkingDirectory(Path path) {
+ return path;
+ }
+
+ @Override
+ public void postSetWorkingDirectory(Path gvfsPath) {}
+
+ @Override
+ public Path preOpen(Path path, int bufferSize) {
+ return path;
+ }
+
+ @Override
+ public FSDataInputStream postOpen(Path gvfsPath, int bufferSize,
FSDataInputStream inputStream) {
+ return inputStream;
+ }
+
+ @Override
+ public Path preCreate(
+ Path path,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) {
+ return path;
+ }
+
+ @Override
+ public FSDataOutputStream postCreate(
+ Path gvfsPath,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ FSDataOutputStream outputStream) {
+ return outputStream;
+ }
+
+ @Override
+ public Path preAppend(Path path, int bufferSize) {
+ return path;
+ }
+
+ @Override
+ public FSDataOutputStream postAppend(
+ Path gvfsPath, int bufferSize, FSDataOutputStream outputStream) {
+ return outputStream;
+ }
+
+ @Override
+ public Pair<Path, Path> preRename(Path src, Path dst) {
+ return Pair.of(src, dst);
+ }
+
+ @Override
+ public boolean postRename(Path srcGvfsPath, Path dstGvfsPath, boolean
success) {
+ return success;
+ }
+
+ @Override
+ public Path preDelete(Path path, boolean recursive) {
+ return path;
+ }
+
+ @Override
+ public boolean postDelete(Path gvfsPath, boolean recursive, boolean success)
{
+ return success;
+ }
+
+ @Override
+ public Path preGetFileStatus(Path path) {
+ return path;
+ }
+
+ @Override
+ public FileStatus postGetFileStatus(FileStatus fileStatus) {
+ return fileStatus;
+ }
+
+ @Override
+ public Path preListStatus(Path path) {
+ return path;
+ }
+
+ @Override
+ public FileStatus[] postListStatus(FileStatus[] fileStatuses) {
+ return fileStatuses;
+ }
+
+ @Override
+ public Path preMkdirs(Path path, FsPermission permission) {
+ return path;
+ }
+
+ @Override
+ public boolean postMkdirs(Path gvfsPath, FsPermission permission, boolean
success) {
+ return success;
+ }
+
+ @Override
+ public Path preGetDefaultReplication(Path path) {
+ return path;
+ }
+
+ @Override
+ public short postGetDefaultReplication(Path gvfsPath, short replication) {
+ return replication;
+ }
+
+ @Override
+ public Path preGetDefaultBlockSize(Path path) {
+ return path;
+ }
+
+ @Override
+ public long postGetDefaultBlockSize(Path gvfsPath, long blockSize) {
+ return blockSize;
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/MockGVFSHook.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/MockGVFSHook.java
new file mode 100644
index 0000000000..640af6325d
--- /dev/null
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/MockGVFSHook.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+public class MockGVFSHook implements GravitinoVirtualFileSystemHook {
+
+ boolean preSetWorkingDirectoryCalled = false;
+ boolean preOpenCalled = false;
+ boolean preCreateCalled = false;
+ boolean preAppendCalled = false;
+ boolean preRenameCalled = false;
+ boolean preDeleteCalled = false;
+ boolean preGetFileStatusCalled = false;
+ boolean preListStatusCalled = false;
+ boolean preMkdirsCalled = false;
+ boolean preGetDefaultReplicationCalled = false;
+ boolean preGetDefaultBlockSizeCalled = false;
+ boolean postSetWorkingDirectoryCalled = false;
+ boolean postOpenCalled = false;
+ boolean postCreateCalled = false;
+ boolean postAppendCalled = false;
+ boolean postRenameCalled = false;
+ boolean postDeleteCalled = false;
+ boolean postGetFileStatusCalled = false;
+ boolean postListStatusCalled = false;
+ boolean postMkdirsCalled = false;
+ boolean postGetDefaultReplicationCalled = false;
+ boolean postGetDefaultBlockSizeCalled = false;
+
+ @Override
+ public void initialize(Map<String, String> config) {}
+
+ @Override
+ public Path preSetWorkingDirectory(Path path) {
+ this.preSetWorkingDirectoryCalled = true;
+ return path;
+ }
+
+ @Override
+ public void postSetWorkingDirectory(Path gvfsPath) {
+ this.postSetWorkingDirectoryCalled = true;
+ }
+
+ @Override
+ public Path preOpen(Path path, int bufferSize) {
+ this.preOpenCalled = true;
+ return path;
+ }
+
+ @Override
+ public FSDataInputStream postOpen(Path gvfsPath, int bufferSize,
FSDataInputStream inputStream) {
+ this.postOpenCalled = true;
+ return inputStream;
+ }
+
+ @Override
+ public Path preCreate(
+ Path path,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) {
+ this.preCreateCalled = true;
+ return path;
+ }
+
+ @Override
+ public FSDataOutputStream postCreate(
+ Path gvfsPath,
+ FsPermission permission,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ FSDataOutputStream outputStream) {
+ this.postCreateCalled = true;
+ return outputStream;
+ }
+
+ @Override
+ public Path preAppend(Path path, int bufferSize) {
+ this.preAppendCalled = true;
+ return path;
+ }
+
+ @Override
+ public FSDataOutputStream postAppend(
+ Path gvfsPath, int bufferSize, FSDataOutputStream outputStream) {
+ this.postAppendCalled = true;
+ return outputStream;
+ }
+
+ @Override
+ public Pair<Path, Path> preRename(Path src, Path dst) {
+ this.preRenameCalled = true;
+ return Pair.of(src, dst);
+ }
+
+ @Override
+ public boolean postRename(Path srcGvfsPath, Path dstGvfsPath, boolean
success) {
+ this.postRenameCalled = true;
+ return success;
+ }
+
+ @Override
+ public Path preDelete(Path path, boolean recursive) {
+ this.preDeleteCalled = true;
+ return path;
+ }
+
+ @Override
+ public boolean postDelete(Path gvfsPath, boolean recursive, boolean success)
{
+ this.postDeleteCalled = true;
+ return success;
+ }
+
+ @Override
+ public Path preGetFileStatus(Path path) {
+ this.preGetFileStatusCalled = true;
+ return path;
+ }
+
+ @Override
+ public FileStatus postGetFileStatus(FileStatus fileStatus) {
+ this.postGetFileStatusCalled = true;
+ return fileStatus;
+ }
+
+ @Override
+ public Path preListStatus(Path path) {
+ this.preListStatusCalled = true;
+ return path;
+ }
+
+ @Override
+ public FileStatus[] postListStatus(FileStatus[] fileStatuses) {
+ this.postListStatusCalled = true;
+ return fileStatuses;
+ }
+
+ @Override
+ public Path preMkdirs(Path path, FsPermission permission) {
+ this.preMkdirsCalled = true;
+ return path;
+ }
+
+ @Override
+ public boolean postMkdirs(Path gvfsPath, FsPermission permission, boolean
success) {
+ this.postMkdirsCalled = true;
+ return success;
+ }
+
+ @Override
+ public Path preGetDefaultReplication(Path path) {
+ this.preGetDefaultReplicationCalled = true;
+ return path;
+ }
+
+ @Override
+ public short postGetDefaultReplication(Path gvfsPath, short replication) {
+ this.postGetDefaultReplicationCalled = true;
+ return replication;
+ }
+
+ @Override
+ public Path preGetDefaultBlockSize(Path path) {
+ this.preGetDefaultBlockSizeCalled = true;
+ return path;
+ }
+
+ @Override
+ public long postGetDefaultBlockSize(Path gvfsPath, long blockSize) {
+ this.postGetDefaultBlockSizeCalled = true;
+ return blockSize;
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index 62ee35f12c..eb6c674961 100644
---
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -103,6 +103,9 @@ public class TestGvfsBase extends GravitinoMockServerBase {
String.format(
"fs.%s.impl.disable.cache",
GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME),
"true");
+ conf.set(
+ GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_HOOK_CLASS,
+ MockGVFSHook.class.getCanonicalName());
}
@AfterAll
@@ -127,6 +130,9 @@ public class TestGvfsBase extends GravitinoMockServerBase {
public void testOpsException() throws IOException, NoSuchFieldException,
IllegalAccessException {
Assumptions.assumeTrue(getClass() == TestGvfsBase.class);
Configuration newConf = new Configuration(conf);
+ newConf.set(
+ GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_HOOK_CLASS,
+ NoOpHook.class.getCanonicalName());
try (GravitinoVirtualFileSystem fs =
(GravitinoVirtualFileSystem) new
Path("gvfs://fileset/").getFileSystem(newConf)) {
BaseGVFSOperations mockOps = Mockito.mock(BaseGVFSOperations.class);
@@ -382,6 +388,9 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileSystemTestUtils.create(filePath, gravitinoFileSystem);
assertTrue(localFileSystem.exists(localFilePath));
localFileSystem.delete(localFilePath, true);
+ // test gvfs preCreate and postCreate are called
+ assertTrue(getHook(gravitinoFileSystem).preCreateCalled);
+ assertTrue(getHook(gravitinoFileSystem).postCreateCalled);
// mock the invalid fileset not in the server
String invalidFilesetName = "invalid_fileset";
@@ -434,6 +443,11 @@ public class TestGvfsBase extends GravitinoMockServerBase {
FileSystemTestUtils.create(localAppendFile, localFileSystem);
assertTrue(localFileSystem.exists(localAppendFile));
FileSystemTestUtils.append(appendFile, gravitinoFileSystem);
+
+ // test gvfs preAppend and postAppend are called
+ assertTrue(getHook(gravitinoFileSystem).preAppendCalled);
+ assertTrue(getHook(gravitinoFileSystem).postAppendCalled);
+
assertEquals(
"Hello, World!",
new String(
@@ -512,6 +526,10 @@ public class TestGvfsBase extends GravitinoMockServerBase {
assertTrue(localFileSystem.exists(dstLocalRenamePath2));
localFileSystem.delete(dstLocalRenamePath2, true);
+ // test gvfs preRename and postRename are called
+ assertTrue(getHook(gravitinoFileSystem).preRenameCalled);
+ assertTrue(getHook(gravitinoFileSystem).postRenameCalled);
+
// test invalid src path
Path invalidSrcPath =
FileSystemTestUtils.createFilesetPath(
@@ -566,6 +584,10 @@ public class TestGvfsBase extends GravitinoMockServerBase {
gravitinoFileSystem.delete(dirPath, true);
assertFalse(localFileSystem.exists(localDirPath));
+ // test gvfs preDelete and postDelete called
+ assertTrue(getHook(gravitinoFileSystem).preDeleteCalled);
+ assertTrue(getHook(gravitinoFileSystem).postDeleteCalled);
+
// mock the invalid fileset not in server
String invalidFilesetName = "invalid_fileset";
Path invalidFilesetPath =
@@ -612,6 +634,10 @@ public class TestGvfsBase extends GravitinoMockServerBase {
.replaceFirst(
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX,
FileSystemTestUtils.localRootPrefix()));
+
+ // test gvfs preGetStatus and postGetStatus called
+ assertTrue(getHook(gravitinoFileSystem).preGetFileStatusCalled);
+ assertTrue(getHook(gravitinoFileSystem).postGetFileStatusCalled);
}
}
@@ -648,6 +674,10 @@ public class TestGvfsBase extends GravitinoMockServerBase {
gravitinoStatuses.sort(Comparator.comparing(FileStatus::getPath));
assertEquals(5, gravitinoStatuses.size());
+ // test gvfs preListStatus and postListStatus are called
+ assertTrue(getHook(gravitinoFileSystem).preListStatusCalled);
+ assertTrue(getHook(gravitinoFileSystem).postListStatusCalled);
+
List<FileStatus> localStatuses =
new
ArrayList<>(Arrays.asList(localFileSystem.listStatus(localPath)));
localStatuses.sort(Comparator.comparing(FileStatus::getPath));
@@ -708,6 +738,10 @@ public class TestGvfsBase extends GravitinoMockServerBase {
.replaceFirst(
GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX,
FileSystemTestUtils.localRootPrefix()));
+
+ // test gvfs preMkdirs and postMkdirs called
+ assertTrue(getHook(gravitinoFileSystem).preMkdirsCalled);
+ assertTrue(getHook(gravitinoFileSystem).postMkdirsCalled);
}
}
@@ -813,6 +847,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
buildMockResourceForCredential(filesetName, localPath.toString());
assertEquals(1, fs.getDefaultReplication(managedFilesetPath));
+ assertTrue(getHook(fs).preGetDefaultReplicationCalled);
+ assertTrue(getHook(fs).postGetDefaultReplicationCalled);
}
}
@@ -836,6 +872,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
buildMockResourceForCredential(filesetName, localPath.toString());
assertEquals(32 * 1024 * 1024,
fs.getDefaultBlockSize(managedFilesetPath));
+ assertTrue(getHook(fs).preGetDefaultBlockSizeCalled);
+ assertTrue(getHook(fs).postGetDefaultBlockSizeCalled);
}
}
@@ -926,4 +964,8 @@ public class TestGvfsBase extends GravitinoMockServerBase {
buildMockResource(
Method.GET, credentialsPath, ImmutableMap.of(), null,
credentialResponse, SC_OK);
}
+
+ private MockGVFSHook getHook(FileSystem gvfs) {
+ return (MockGVFSHook) ((GravitinoVirtualFileSystem) gvfs).getHook();
+ }
}
diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md
index 3f768f136e..caaa2e92e8 100644
--- a/docs/how-to-use-gvfs.md
+++ b/docs/how-to-use-gvfs.md
@@ -68,6 +68,7 @@ the path mapping and convert automatically.
| `fs.gravitino.current.location.name` | The configuration
used to select the location of the fileset. If this configuration is not set,
the value of environment variable configured by
`fs.gravitino.current.location.env.var` will be checked. If neither is set, the
value of fileset property `default-location-name` will be used as the location
name. | the value of fileset property `default-location-name` | No
| 0.9.0-incubating |
| `fs.gravitino.current.location.name.env.var` | The environment
variable name to get the current location name.
| `CURRENT_LOCATION_NAME`
| No |
0.9.0-incubating |
| `fs.gravitino.operations.class` | The operations class
to provide the FS operations for the Gravitino Virtual File System. Users can
extends `BaseGVFSOperations` to implement their own operations and configure
the class name in this conf to use custom FS operations.
|
`org.apache.gravitino.filesystem.hadoop.DefaultGVFSOperations` | No
| 0.9.0-incubating |
+| `fs.gravitino.hook.class` | The hook class to
inject into the <br/>Gravitino Virtual File System. Users can implement their
own `GravitinoVirtualFileSystemHook` and configure the class name in this conf
to inject custom code.
|
`org.apache.gravitino.filesystem.hadoop.NoOpHook` | No
| 0.9.0-incubating |
Apart from the above properties, to access fileset like S3, GCS, OSS and
custom fileset, extra properties are needed, please see
[S3 GVFS Java client
configurations](./hadoop-catalog-with-s3.md#using-the-gvfs-java-client-to-access-the-fileset),
[GCS GVFS Java client
configurations](./hadoop-catalog-with-gcs.md#using-the-gvfs-java-client-to-access-the-fileset),
[OSS GVFS Java client
configurations](./hadoop-catalog-with-oss.md#using-the-gvfs-java-client-to-access-the-fileset)
and [Azure Blob Storage GVFS Java client
configurations](./hadoop-catalog-with-adls.md#using-the-gvfs-java-client-to-access-the-fileset)
for [...]
@@ -361,6 +362,7 @@ to recompile the native libraries like `libhdfs` and
others, and completely repl
| `current_location_name` | The configuration used to select the
location of the fileset. If this configuration is not set, the value of
environment variable configured by `current_location_name_env_var` will be
checked. If neither is set, the value of fileset property
`default-location-name` will be used as the location name. | the value of
fileset property `default-location-name` | No
| 0.9.0-incubating |
| `current_location_name_env_var` | The environment variable name to get the
current location name.
| `CURRENT_LOCATION_NAME`
| No | 0.9.0-incubating |
| `operations_class` | The operations class to provide the FS
operations for the Gravitino Virtual File System. Users can extends
`BaseGVFSOperations` to implement their own operations and configure the class
name in this conf to use custom FS operations.
|
`gravitino.filesystem.gvfs_default_operations.DefaultGVFSOperations` | No
| 0.9.0-incubating |
+| `hook_class` | The hook class to inject into the
Gravitino Virtual File System. Users can implement their own
`GravitinoVirtualFileSystemHook` and configure the class name in this conf to
inject custom code.
|
`gravitino.filesystem.gvfs_hook.NoOpHook` | No
| 0.9.0-incubating |
#### Configurations for S3, GCS, OSS and Azure Blob storage fileset