This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 534c72e8fb [python] Fix OSSParam to access DLF (#6332)
534c72e8fb is described below
commit 534c72e8fb543b753555ee1043c47d6c351915f3
Author: umi <[email protected]>
AuthorDate: Fri Sep 26 21:50:07 2025 +0800
[python] Fix OSSParam to access DLF (#6332)
---
.../pypaimon/catalog/rest/rest_catalog.py | 22 +++++++++-----
.../pypaimon/catalog/rest/rest_token_file_io.py | 8 ++---
paimon-python/pypaimon/common/config.py | 1 -
paimon-python/pypaimon/common/file_io.py | 33 +++++++++++++++++----
.../pypaimon/tests/py36/ao_simple_test.py | 34 ++++++++++++++++++++++
5 files changed, 80 insertions(+), 18 deletions(-)
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 5e36559a7b..53db2abbaa 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -19,6 +19,9 @@ from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlparse
+import pyarrow
+from packaging.version import parse
+
from pypaimon.api.api_response import GetTableResponse, PagedList
from pypaimon.api.options import Options
from pypaimon.api.rest_api import RESTApi
@@ -200,17 +203,17 @@ class RESTCatalog(Catalog):
uuid=response.get_id()
)
- def file_io_from_options(self, table_path: Path) -> FileIO:
- return FileIO(str(table_path), self.context.options.data)
+ def file_io_from_options(self, table_path: str) -> FileIO:
+ return FileIO(table_path, self.context.options.data)
- def file_io_for_data(self, table_path: Path, identifier: Identifier):
+ def file_io_for_data(self, table_path: str, identifier: Identifier):
return RESTTokenFileIO(identifier, table_path,
self.context.options.data) \
if self.data_token_enabled else
self.file_io_from_options(table_path)
def load_table(self,
identifier: Identifier,
- internal_file_io: Callable[[Path], Any],
- external_file_io: Callable[[Path], Any],
+ internal_file_io: Callable[[str], Any],
+ external_file_io: Callable[[str], Any],
metadata_loader: Callable[[Identifier], TableMetadata],
) -> FileStoreTable:
metadata = metadata_loader(identifier)
@@ -223,9 +226,12 @@ class RESTCatalog(Catalog):
supports_version_management=True # REST catalogs support version
management
)
path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
- path = Path(path_parsed.path) if path_parsed.scheme is None else
Path(schema.options.get(CoreOptions.PATH))
- table_path = path_parsed.netloc + "/" + path_parsed.path \
- if path_parsed.scheme == "file" else path_parsed.path[1:]
+ path = path_parsed.path if path_parsed.scheme is None else
schema.options.get(CoreOptions.PATH)
+ if path_parsed.scheme == "file":
+ table_path = path_parsed.path
+ else:
+ table_path = path_parsed.netloc + path_parsed.path \
+ if parse(pyarrow.__version__) >= parse("7.0.0") else
path_parsed.path[1:]
table = self.create(data_file_io(path),
Path(table_path),
schema,
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index b9671c8ae9..a65e96695c 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -31,7 +31,7 @@ from pypaimon.common.identifier import Identifier
class RESTTokenFileIO(FileIO):
- def __init__(self, identifier: Identifier, path: Path,
+ def __init__(self, identifier: Identifier, path: str,
catalog_options: Optional[dict] = None):
self.identifier = identifier
self.path = path
@@ -39,12 +39,12 @@ class RESTTokenFileIO(FileIO):
self.api_instance: Optional[RESTApi] = None
self.lock = threading.Lock()
self.log = logging.getLogger(__name__)
- super().__init__(str(path), catalog_options)
+ super().__init__(path, catalog_options)
- def _initialize_oss_fs(self) -> FileSystem:
+ def _initialize_oss_fs(self, path) -> FileSystem:
self.try_to_refresh_token()
self.properties.update(self.token.token)
- return super()._initialize_oss_fs()
+ return super()._initialize_oss_fs(path)
def new_output_stream(self, path: Path):
return self.filesystem.open_output_stream(str(path))
diff --git a/paimon-python/pypaimon/common/config.py
b/paimon-python/pypaimon/common/config.py
index b3c9a673b3..0478c207bb 100644
--- a/paimon-python/pypaimon/common/config.py
+++ b/paimon-python/pypaimon/common/config.py
@@ -21,7 +21,6 @@ class OssOptions:
OSS_SECURITY_TOKEN = "fs.oss.securityToken"
OSS_ENDPOINT = "fs.oss.endpoint"
OSS_REGION = "fs.oss.region"
- OSS_BUCKET = "fs.oss.bucket"
class S3Options:
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 809d65338c..6f5dfc6a2a 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -31,12 +31,12 @@ from pypaimon.common.config import OssOptions, S3Options
class FileIO:
- def __init__(self, warehouse: str, catalog_options: dict):
+ def __init__(self, path: str, catalog_options: dict):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
- scheme, netloc, path = self.parse_location(warehouse)
+ scheme, netloc, _ = self.parse_location(path)
if scheme in {"oss"}:
- self.filesystem = self._initialize_oss_fs()
+ self.filesystem = self._initialize_oss_fs(path)
elif scheme in {"s3", "s3a", "s3n"}:
self.filesystem = self._initialize_s3_fs()
elif scheme in {"hdfs", "viewfs"}:
@@ -56,7 +56,29 @@ class FileIO:
else:
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
- def _initialize_oss_fs(self) -> FileSystem:
+ def _extract_oss_bucket(self, location) -> str:
+ uri = urlparse(location)
+ if uri.scheme and uri.scheme != "oss":
+ raise ValueError("Not an OSS URI: {}".format(location))
+
+ netloc = uri.netloc or ""
+ # parse oss://access_id:secret_key@Endpoint/bucket/path/to/object
+ if (getattr(uri, "username", None) or getattr(uri, "password", None))
or ("@" in netloc):
+ first_segment = uri.path.lstrip("/").split("/", 1)[0]
+ if not first_segment:
+ raise ValueError("Invalid OSS URI without bucket:
{}".format(location))
+ return first_segment
+
+ # parse oss://bucket/... or oss://bucket.endpoint/...
+ host = getattr(uri, "hostname", None) or netloc
+ if not host:
+ raise ValueError("Invalid OSS URI without host:
{}".format(location))
+ bucket = host.split(".", 1)[0]
+ if not bucket:
+ raise ValueError("Invalid OSS URI without bucket:
{}".format(location))
+ return bucket
+
+ def _initialize_oss_fs(self, path) -> FileSystem:
from pyarrow.fs import S3FileSystem
client_kwargs = {
@@ -71,7 +93,8 @@ class FileIO:
client_kwargs['force_virtual_addressing'] = True
client_kwargs['endpoint_override'] =
self.properties.get(OssOptions.OSS_ENDPOINT)
else:
- client_kwargs['endpoint_override'] =
(self.properties.get(OssOptions.OSS_BUCKET) + "." +
+ oss_bucket = self._extract_oss_bucket(path)
+ client_kwargs['endpoint_override'] = (oss_bucket + "." +
self.properties.get(OssOptions.OSS_ENDPOINT))
return S3FileSystem(**client_kwargs)
diff --git a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
index 584ba87587..e2a61df301 100644
--- a/paimon-python/pypaimon/tests/py36/ao_simple_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_simple_test.py
@@ -15,11 +15,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+from unittest.mock import patch
+
import pyarrow as pa
from pypaimon import Schema
from pypaimon.catalog.catalog_exception import TableNotExistException,
TableAlreadyExistException, \
DatabaseNotExistException, DatabaseAlreadyExistException
+from pypaimon.common.config import OssOptions
+from pypaimon.common.file_io import FileIO
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
from pypaimon.tests.rest.rest_base_test import RESTBaseTest
@@ -385,3 +389,33 @@ class AOSimpleTest(RESTBaseTest):
self.rest_catalog.drop_database("db1", True)
except DatabaseNotExistException:
self.fail("drop_database with ignore_if_exists=True should not
raise DatabaseNotExistException")
+
+ def test_initialize_oss_fs_pyarrow_lt_7(self):
+ props = {
+ OssOptions.OSS_ACCESS_KEY_ID: "AKID",
+ OssOptions.OSS_ACCESS_KEY_SECRET: "SECRET",
+ OssOptions.OSS_SECURITY_TOKEN: "TOKEN",
+ OssOptions.OSS_REGION: "cn-hangzhou",
+ OssOptions.OSS_ENDPOINT: "oss-cn-hangzhou.aliyuncs.com",
+ }
+
+ with patch("pypaimon.common.file_io.pyarrow.__version__", "6.0.0"), \
+ patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
+ FileIO("oss://oss-bucket/paimon-database/paimon-table", props)
+ mock_s3fs.assert_called_once_with(access_key="AKID",
+ secret_key="SECRET",
+ session_token="TOKEN",
+ region="cn-hangzhou",
+ endpoint_override="oss-bucket."
+ props[OssOptions.OSS_ENDPOINT])
+ FileIO("oss://oss-bucket.endpoint/paimon-database/paimon-table",
props)
+ mock_s3fs.assert_called_with(access_key="AKID",
+ secret_key="SECRET",
+ session_token="TOKEN",
+ region="cn-hangzhou",
+ endpoint_override="oss-bucket." +
props[OssOptions.OSS_ENDPOINT])
+
FileIO("oss://access_id:secret_key@Endpoint/oss-bucket/paimon-database/paimon-table",
props)
+ mock_s3fs.assert_called_with(access_key="AKID",
+ secret_key="SECRET",
+ session_token="TOKEN",
+ region="cn-hangzhou",
+ endpoint_override="oss-bucket." +
props[OssOptions.OSS_ENDPOINT])