This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f24cf39c4b [python] fix rest file io and blob-as-descriptor file io
token merge issue and add rest catalog blob-as-descriptor sample (#7009)
f24cf39c4b is described below
commit f24cf39c4b3d4d3a8149088256467b7fa09cd01c
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jan 12 21:47:51 2026 +0800
[python] fix rest file io and blob-as-descriptor file io token merge issue
and add rest catalog blob-as-descriptor sample (#7009)
---
paimon-python/pypaimon/api/rest_util.py | 21 +++
.../pypaimon/catalog/rest/rest_token_file_io.py | 14 +-
.../pypaimon/sample/oss_blob_as_descriptor.py | 111 ---------------
.../rest_catalog_blob_as_descriptor_sample.py | 155 +++++++++++++++++++++
.../pypaimon/tests/rest/rest_token_file_io_test.py | 46 ++++++
5 files changed, 234 insertions(+), 113 deletions(-)
diff --git a/paimon-python/pypaimon/api/rest_util.py
b/paimon-python/pypaimon/api/rest_util.py
index 3fe4ec0625..97a709ecc3 100644
--- a/paimon-python/pypaimon/api/rest_util.py
+++ b/paimon-python/pypaimon/api/rest_util.py
@@ -43,3 +43,24 @@ class RESTUtil:
new_key = key[len(prefix):]
result[new_key] = str(value)
return result
+
+ @staticmethod
+ def merge(
+ base_properties: Dict[str, str],
+ override_properties: Dict[str, str]) -> Dict[str, str]:
+ if override_properties is None:
+ override_properties = {}
+ if base_properties is None:
+ base_properties = {}
+
+ result = {}
+
+ for key, value in base_properties.items():
+ if value is not None and key not in override_properties:
+ result[key] = value
+
+ for key, value in override_properties.items():
+ if value is not None:
+ result[key] = value
+
+ return result
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 76e6dcf340..2cec5df721 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -23,6 +23,7 @@ from typing import Optional
from pyarrow._fs import FileSystem
from pypaimon.api.rest_api import RESTApi
+from pypaimon.api.rest_util import RESTUtil
from pypaimon.catalog.rest.rest_token import RESTToken
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
@@ -60,8 +61,17 @@ class RESTTokenFileIO(FileIO):
def _initialize_oss_fs(self, path) -> FileSystem:
self.try_to_refresh_token()
merged_token = self._merge_token_with_catalog_options(self.token.token)
- self.properties.data.update(merged_token)
- return super()._initialize_oss_fs(path)
+ merged_properties = RESTUtil.merge(
+ self.properties.to_map() if self.properties else {},
+ merged_token
+ )
+ merged_options = Options(merged_properties)
+ original_properties = self.properties
+ self.properties = merged_options
+ try:
+ return super()._initialize_oss_fs(path)
+ finally:
+ self.properties = original_properties
def _merge_token_with_catalog_options(self, token: dict) -> dict:
"""Merge token with catalog options, DLF OSS endpoint should override
the standard OSS endpoint."""
diff --git a/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
b/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
deleted file mode 100644
index 16696f298a..0000000000
--- a/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
+++ /dev/null
@@ -1,111 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-import logging
-import pyarrow as pa
-
-from pypaimon.catalog.catalog_factory import CatalogFactory
-
-# Enable debug logging for catalog operations
-logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(name)s:
%(message)s')
-from pypaimon.schema.schema import Schema
-from pypaimon.table.row.blob import BlobDescriptor, Blob
-
-
-def oss_blob_as_descriptor():
- warehouse = 'oss://<your-bucket>/<warehouse-path>'
- catalog = CatalogFactory.create({
- 'warehouse': warehouse,
- 'fs.oss.endpoint': 'oss-<your-region>.aliyuncs.com',
- 'fs.oss.accessKeyId': '<your-ak>',
- 'fs.oss.accessKeySecret': '<your-sk>',
- 'fs.oss.region': '<your-region>'
- })
-
- pa_schema = pa.schema([
- ('id', pa.int32()),
- ('blob_data', pa.large_binary()),
- ])
-
- schema = Schema.from_pyarrow_schema(
- pa_schema,
- options={
- 'row-tracking.enabled': 'true',
- 'data-evolution.enabled': 'true',
- 'blob-as-descriptor': 'true',
- 'target-file-size': '100MB'
- }
- )
-
- catalog.create_database("test_db", True)
- catalog.create_table("test_db.blob_uri_scheme_test", schema, True)
- table = catalog.get_table("test_db.blob_uri_scheme_test")
-
- # Create external blob file in OSS
- external_blob_uri =
f"{warehouse.rstrip('/')}/external_blob_scheme_test.bin"
- blob_content = b'This is external blob data'
-
- with table.file_io.new_output_stream(external_blob_uri) as out_stream:
- out_stream.write(blob_content)
-
- # Create BlobDescriptor with OSS scheme
- blob_descriptor = BlobDescriptor(external_blob_uri, 0, len(blob_content))
- descriptor_bytes = blob_descriptor.serialize()
-
- # Write the descriptor bytes to the table
- test_data = pa.Table.from_pydict({
- 'id': [1],
- 'blob_data': [descriptor_bytes]
- }, schema=pa_schema)
-
- write_builder = table.new_batch_write_builder()
- writer = write_builder.new_write()
- writer.write_arrow(test_data)
- commit_messages = writer.prepare_commit()
- commit = write_builder.new_commit()
- commit.commit(commit_messages)
- writer.close()
-
- read_builder = table.new_read_builder()
- table_scan = read_builder.new_scan()
- table_read = read_builder.new_read()
- result = table_read.to_arrow(table_scan.plan().splits())
-
- picture_bytes = result.column('blob_data').to_pylist()[0]
- new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)
-
- print(f"Original URI: {external_blob_uri}")
- print(f"Read URI: {new_blob_descriptor.uri}")
- assert new_blob_descriptor.uri.startswith('oss://'), \
- f"URI scheme should be preserved. Got: {new_blob_descriptor.uri}"
-
- from pypaimon.common.uri_reader import FileUriReader
- uri_reader = FileUriReader(table.file_io)
- blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
-
- blob_descriptor_from_blob = blob.to_descriptor()
- print(f"Blob descriptor URI from Blob.from_descriptor:
{blob_descriptor_from_blob.uri}")
-
- read_data = blob.to_data()
- assert read_data == blob_content, "Blob data should match original content"
-
- print("✅ All assertions passed!")
-
-
-if __name__ == '__main__':
- oss_blob_as_descriptor()
diff --git
a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
new file mode 100644
index 0000000000..802d1def10
--- /dev/null
+++ b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
@@ -0,0 +1,155 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""
+Sample demonstrating how to use blob-as-descriptor mode with REST catalog.
+"""
+from pypaimon import CatalogFactory
+import pyarrow as pa
+
+from pypaimon import Schema
+from pypaimon.table.row.blob import BlobDescriptor, Blob
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+
+
+def write_table_with_blob(catalog, video_file_path: str, external_oss_options:
dict):
+ database_name = 'blob_demo'
+ table_name = 'test_table_blob_' + str(int(__import__('time').time()))
+
+ catalog.create_database(
+ name=database_name,
+ ignore_if_exists=True,
+ )
+
+ pa_schema = pa.schema([
+ ('text', pa.string()),
+ ('names', pa.list_(pa.string())),
+ ('video', pa.large_binary()) # Blob column
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema=pa_schema,
+ partition_keys=None,
+ primary_keys=None,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-field': 'video',
+ 'blob-as-descriptor': 'true'
+ },
+ comment='Table with blob column using blob-as-descriptor mode')
+
+ table_identifier = f'{database_name}.{table_name}'
+ catalog.create_table(
+ identifier=table_identifier,
+ schema=schema,
+ ignore_if_exists=True
+ )
+
+ table = catalog.get_table(table_identifier)
+ print(f"✓ Table created: {table_identifier}")
+
+ # Access external OSS file to get file size
+ try:
+ external_file_io = FileIO(video_file_path,
Options(external_oss_options))
+ video_file_size = external_file_io.get_file_size(video_file_path)
+ except Exception as e:
+ raise FileNotFoundError(
+ f"Failed to access external OSS file: {video_file_path}\n"
+ f"Error: {e}\n"
+ f"Please check your external_oss_options credentials."
+ ) from e
+
+ # Create BlobDescriptor
+ blob_descriptor = BlobDescriptor(video_file_path, 0, video_file_size)
+ descriptor_bytes = blob_descriptor.serialize()
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_arrow(pa.Table.from_pydict({
+ 'text': ['Sample video'],
+ 'names': [['video1.mp4']],
+ 'video': [descriptor_bytes]
+ }, schema=pa_schema))
+
+ table_commit.commit(table_write.prepare_commit())
+ print("✓ Data committed successfully")
+ table_write.close()
+ table_commit.close()
+
+ return f'{database_name}.{table_name}'
+
+
+def read_table_with_blob(catalog, table_name: str):
+ table = catalog.get_table(table_name)
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ splits = table_scan.plan().splits()
+ table_read = read_builder.new_read()
+
+ result = table_read.to_arrow(splits)
+ print(f"✓ Read {result.num_rows} rows")
+
+ video_bytes_list = result.column('video').to_pylist()
+ for video_bytes in video_bytes_list:
+ if video_bytes is None:
+ continue
+ blob_descriptor = BlobDescriptor.deserialize(video_bytes)
+ from pypaimon.common.uri_reader import FileUriReader
+ uri_reader = FileUriReader(table.file_io)
+ blob = Blob.from_descriptor(uri_reader, blob_descriptor)
+ blob_data = blob.to_data()
+ print(f"✓ Blob data verified: {len(blob_data) / 1024 / 1024:.2f} MB")
+ break
+
+ return result
+
+
+if __name__ == '__main__':
+ external_oss_options = {
+ 'fs.oss.accessKeyId': "YOUR_EXTERNAL_OSS_ACCESS_KEY_ID",
+ 'fs.oss.accessKeySecret': "YOUR_EXTERNAL_OSS_ACCESS_KEY_SECRET",
+ 'fs.oss.endpoint': "oss-cn-hangzhou.aliyuncs.com",
+ 'fs.oss.region': "cn-hangzhou",
+ }
+
+ video_file_path = "oss://your-bucket/blob_test/video.mov"
+
+ catalog_options = {
+ 'metastore': 'rest',
+ 'uri': "http://your-rest-catalog-uri",
+ 'warehouse': "your_warehouse",
+ 'dlf.region': 'cn-hangzhou',
+ "token.provider": "dlf",
+ 'dlf.access-key-id': "YOUR_DLF_ACCESS_KEY_ID",
+ 'dlf.access-key-secret': "YOUR_DLF_ACCESS_KEY_SECRET",
+ 'dlf.oss-endpoint': "oss-cn-hangzhou.aliyuncs.com",
+ **external_oss_options
+ }
+
+ catalog = CatalogFactory.create(catalog_options)
+
+ try:
+ table_name = write_table_with_blob(catalog, video_file_path,
external_oss_options)
+ result = read_table_with_blob(catalog, table_name)
+ print("✓ Test completed successfully!")
+ except Exception as e:
+ print(f'✗ Error: {e}')
+ raise
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
index eb0c9d3688..07e445b12a 100644
--- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -18,6 +18,7 @@
import os
import pickle
import tempfile
+import time
import unittest
from unittest.mock import patch
@@ -192,6 +193,51 @@ class RESTTokenFileIOTest(unittest.TestCase):
"Other token properties should be preserved"
)
+ def test_catalog_options_not_modified(self):
+ from pypaimon.api.rest_util import RESTUtil
+ from pypaimon.catalog.rest.rest_token import RESTToken
+ from pyarrow.fs import LocalFileSystem
+
+ original_catalog_options = Options({
+ CatalogOptions.URI.key(): "http://test-uri",
+ "custom.key": "custom.value"
+ })
+
+ catalog_options_copy = Options(original_catalog_options.to_map())
+
+ with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+ file_io = RESTTokenFileIO(
+ self.identifier,
+ self.warehouse_path,
+ original_catalog_options
+ )
+
+ token_dict = {
+ OssOptions.OSS_ACCESS_KEY_ID.key(): "token-access-key",
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key",
+ OssOptions.OSS_ENDPOINT.key(): "token-endpoint"
+ }
+ file_io.token = RESTToken(token_dict, int(time.time() * 1000) +
3600000)
+
+ with patch.object(FileIO, '_initialize_oss_fs',
return_value=LocalFileSystem()):
+ file_io._initialize_oss_fs("file:///test/path")
+
+ self.assertEqual(
+ original_catalog_options.to_map(),
+ catalog_options_copy.to_map(),
+ "Original catalog_options should not be modified"
+ )
+
+ merged_properties = RESTUtil.merge(
+ original_catalog_options.to_map(),
+ file_io._merge_token_with_catalog_options(token_dict)
+ )
+
+ self.assertIn("custom.key", merged_properties)
+ self.assertEqual(merged_properties["custom.key"], "custom.value")
+ self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(),
merged_properties)
+
self.assertEqual(merged_properties[OssOptions.OSS_ACCESS_KEY_ID.key()],
"token-access-key")
+
if __name__ == '__main__':
unittest.main()