This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 45b823907d [pvfs] Support define endpoint in path (#6008)
45b823907d is described below
commit 45b823907dd93cc21bf2a3c1adcae4bb26731438
Author: jerry <[email protected]>
AuthorDate: Fri Aug 1 15:25:56 2025 +0800
[pvfs] Support define endpoint in path (#6008)
---
paimon-python/pypaimon/pvfs/__init__.py | 87 +++++++++++++++++++------------
paimon-python/pypaimon/tests/pvfs_test.py | 4 ++
2 files changed, 59 insertions(+), 32 deletions(-)
diff --git a/paimon-python/pypaimon/pvfs/__init__.py
b/paimon-python/pypaimon/pvfs/__init__.py
index ec1cb0f05c..09273d65f4 100644
--- a/paimon-python/pypaimon/pvfs/__init__.py
+++ b/paimon-python/pypaimon/pvfs/__init__.py
@@ -16,7 +16,6 @@
# under the License.
import importlib
-import re
import time
import datetime
from abc import ABC
@@ -45,22 +44,37 @@ class StorageType(Enum):
class PVFSIdentifier(ABC):
catalog: str
+ endpoint: str
+
+ def get_cache_key(self) -> str:
+ return f"{self.catalog}.{self.__remove_endpoint_schema(self.endpoint)}"
+
+ @staticmethod
+ def __remove_endpoint_schema(url):
+ if url.startswith('https://'):
+ return url[8:]
+ elif url.startswith('http://'):
+ return url[7:]
+ return url
@dataclass
class PVFSCatalogIdentifier(PVFSIdentifier):
catalog: str
+ endpoint: str
@dataclass
class PVFSDatabaseIdentifier(PVFSIdentifier):
- database: str
catalog: str
+ endpoint: str
+ database: str
@dataclass
class PVFSTableIdentifier(PVFSIdentifier):
catalog: str
+ endpoint: str
database: str
table: str
sub_path: str = None
@@ -114,7 +128,6 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
options: Dict[str, Any]
protocol = PROTOCOL_NAME
- _identifier_pattern =
re.compile("^pvfs://([^/]+)/([^/]+)/([^/]+)(?:/[^/]+)*/?$")
def __init__(self, options: Dict = None, **kwargs):
options.update({CatalogOptions.HTTP_USER_AGENT_HEADER: 'PythonPVFS'})
@@ -150,7 +163,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def ls(self, path, detail=True, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(path)
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
databases = rest_api.list_databases()
if detail:
@@ -209,7 +222,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
self._convert_database_virtual_path(pvfs_identifier.catalog,
pvfs_identifier.database)
)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
table_path = self._get_table_store(rest_api, pvfs_identifier).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
@@ -224,14 +237,14 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
return True
elif isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
try:
rest_api.get_database(pvfs_identifier.database)
return True
except NoSuchResourceException:
return False
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
try:
table_path = self._get_table_store(rest_api,
pvfs_identifier).path
if table_path is not None and pvfs_identifier.sub_path is None:
@@ -252,7 +265,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
and target.sub_path is not None
and source.sub_path is not None
and source == target):
- rest_api = self.__rest_api(source.catalog)
+ rest_api = self.__rest_api(source)
table_path = self._get_table_store(rest_api, source).path
storage_type = self._get_storage_type(table_path)
storage_location = table_path
@@ -274,7 +287,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
if (isinstance(source, PVFSTableIdentifier) and
isinstance(target, PVFSTableIdentifier) and
target.catalog == source.catalog):
- rest_api = self.__rest_api(source.catalog)
+ rest_api = self.__rest_api(source)
if target.sub_path is None and source.sub_path is None:
source_identifier = Identifier.create(source.database,
source.table)
target_identifier = Identifier.create(target.database,
target.table)
@@ -306,7 +319,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def rm(self, path, recursive=False, maxdepth=None):
pvfs_identifier = self._extract_pvfs_identifier(path)
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
database_name = pvfs_identifier.database
if not recursive and len(rest_api.list_tables(database_name)) > 0:
@@ -342,7 +355,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def rm_file(self, path):
pvfs_identifier = self._extract_pvfs_identifier(path)
if isinstance(pvfs_identifier, PVFSTableIdentifier):
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
table_path = self._get_table_store(rest_api, pvfs_identifier).path
if pvfs_identifier.sub_path is not None:
storage_type = self._get_storage_type(table_path)
@@ -360,7 +373,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
files = self.ls(path)
if len(files) == 0:
pvfs_identifier = self._extract_pvfs_identifier(path)
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSDatabaseIdentifier):
database_name = pvfs_identifier.database
rest_api.drop_database(database_name)
@@ -406,7 +419,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
f"open is not supported for path: {path}"
)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
table_path = self._get_table_store(rest_api, pvfs_identifier).path
if pvfs_identifier.sub_path is None:
raise Exception(
@@ -428,7 +441,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def mkdir(self, path, create_parents=True, **kwargs):
pvfs_identifier = self._extract_pvfs_identifier(path)
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception(
f"mkdir is not supported for path: {path}"
@@ -471,7 +484,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def makedirs(self, path, exist_ok=True):
pvfs_identifier = self._extract_pvfs_identifier(path)
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception(
f"makedirs is not supported for path: {path}"
@@ -511,7 +524,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def created(self, path):
pvfs_identifier = self._extract_pvfs_identifier(path)
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception(
f"created is not supported for path: {path}"
@@ -533,7 +546,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
def modified(self, path):
pvfs_identifier = self._extract_pvfs_identifier(path)
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if isinstance(pvfs_identifier, PVFSCatalogIdentifier):
raise Exception(
f"modified is not supported for path: {path}"
@@ -570,7 +583,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
f"cat file is not supported for path: {path}"
)
else:
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
table = self._get_table_store(rest_api, pvfs_identifier)
storage_type = self._get_storage_type(table.path)
storage_location = table.path
@@ -594,7 +607,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
f"get file is not supported for path: {rpath}"
)
elif isinstance(pvfs_identifier, PVFSTableIdentifier):
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
if pvfs_identifier.sub_path is None:
raise Exception(
f"get file is not supported for path: {rpath}"
@@ -617,7 +630,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
)
def _create_object_table(self, pvfs_identifier: PVFSTableIdentifier):
- rest_api = self.__rest_api(pvfs_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_identifier)
schema = Schema(options={'type': 'object-table'})
table_identifier = pvfs_identifier.get_identifier()
rest_api.create_table(table_identifier, schema)
@@ -714,8 +727,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
return path[len(f"{StorageType.OSS.value}://"):]
return path
- @staticmethod
- def _extract_pvfs_identifier(path: str) -> Optional['PVFSIdentifier']:
+ def _extract_pvfs_identifier(self, path: str) ->
Optional['PVFSIdentifier']:
if not isinstance(path, str):
raise Exception("path is not a string")
path_without_protocol = path
@@ -726,19 +738,26 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
return None
components = [component for component in
path_without_protocol.rstrip('/').split('/') if component]
-
+ catalog: str = None
+ endpoint: str = self.options.get(CatalogOptions.URI)
+ if len(components) > 0:
+ if '.' in components[0]:
+ (catalog, endpoint) = components[0].split('.', 1)
+ else:
+ catalog = components[0]
if len(components) == 0:
return None
elif len(components) == 1:
- return PVFSCatalogIdentifier(components[0])
+ return (PVFSCatalogIdentifier(endpoint=endpoint, catalog=catalog))
elif len(components) == 2:
- return PVFSDatabaseIdentifier(catalog=components[0],
database=components[1])
+ return PVFSDatabaseIdentifier(endpoint=endpoint, catalog=catalog,
database=components[1])
elif len(components) == 3:
- return PVFSTableIdentifier(catalog=components[0],
database=components[1], table=components[2])
+ return PVFSTableIdentifier(endpoint=endpoint, catalog=catalog,
database=components[1], table=components[2])
elif len(components) > 3:
sub_path = '/'.join(components[3:])
return PVFSTableIdentifier(
- catalog=components[0], database=components[1],
+ endpoint=endpoint,
+ catalog=catalog, database=components[1],
table=components[2], sub_path=sub_path
)
return None
@@ -775,11 +794,15 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
modified = self.__converse_ts_to_datatime(table.updated_at)
return TableStore(path=table.path, created=created, modified=modified)
- def __rest_api(self, catalog: str):
+ def __rest_api(self, pvfs_identifier: PVFSIdentifier):
read_lock = self._rest_api_cache_lock.gen_rlock()
+ catalog = pvfs_identifier.catalog
+ if pvfs_identifier.endpoint is None or catalog is None:
+ raise ValueError("Endpoint or catalog is not set.")
+ key = pvfs_identifier.get_cache_key()
try:
read_lock.acquire()
- rest_api = self._rest_api_cache.get(catalog)
+ rest_api = self._rest_api_cache.get(key)
if rest_api is not None:
return rest_api
finally:
@@ -788,10 +811,10 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
write_lock = self._rest_api_cache_lock.gen_wlock()
try:
write_lock.acquire()
- rest_api = self._rest_api_cache.get(catalog)
+ rest_api = self._rest_api_cache.get(key)
if rest_api is None:
options = self.options.copy()
- options.update({CatalogOptions.WAREHOUSE: catalog})
+ options.update({CatalogOptions.WAREHOUSE: catalog,
CatalogOptions.URI: pvfs_identifier.endpoint})
rest_api = RESTApi(options)
self._rest_api_cache[catalog] = rest_api
return rest_api
@@ -819,7 +842,7 @@ class PaimonVirtualFileSystem(fsspec.AbstractFileSystem):
if storage_type == StorageType.LOCAL:
fs = LocalFileSystem()
elif storage_type == StorageType.OSS:
- rest_api = self.__rest_api(pvfs_table_identifier.catalog)
+ rest_api = self.__rest_api(pvfs_table_identifier)
load_token_response: GetTableTokenResponse =
rest_api.load_table_token(
Identifier.create(pvfs_table_identifier.database,
pvfs_table_identifier.table))
fs = self._get_oss_filesystem(load_token_response.token)
diff --git a/paimon-python/pypaimon/tests/pvfs_test.py
b/paimon-python/pypaimon/tests/pvfs_test.py
index b81914a347..fe25358759 100644
--- a/paimon-python/pypaimon/tests/pvfs_test.py
+++ b/paimon-python/pypaimon/tests/pvfs_test.py
@@ -155,6 +155,10 @@ class PVFSTestCase(unittest.TestCase):
self.assertSetEqual(set(table_dirs), expect_table_dirs)
database_virtual_path = f"pvfs://{self.catalog}/{self.database}"
self.assertEqual(database_virtual_path,
self.pvfs.info(database_virtual_path).get('name'))
+
+ database_virtual_path_with_endpoint =
f"pvfs://{self.catalog}.localhost:{self.server.port}/{self.database}"
+ self.assertEqual(database_virtual_path,
self.pvfs.info(database_virtual_path_with_endpoint).get('name'))
+
self.assertEqual(True, self.pvfs.exists(database_virtual_path))
table_virtual_path =
f"pvfs://{self.catalog}/{self.database}/{self.table}"
self.assertEqual(table_virtual_path,
self.pvfs.info(table_virtual_path).get('name'))