This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f24cf39c4b [python] fix rest file io and blob-as-descriptor file io 
token merge issue and add rest catalog blob-as-descriptor sample (#7009)
f24cf39c4b is described below

commit f24cf39c4b3d4d3a8149088256467b7fa09cd01c
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jan 12 21:47:51 2026 +0800

    [python] fix rest file io and blob-as-descriptor file io token merge issue 
and add rest catalog blob-as-descriptor sample (#7009)
---
 paimon-python/pypaimon/api/rest_util.py            |  21 +++
 .../pypaimon/catalog/rest/rest_token_file_io.py    |  14 +-
 .../pypaimon/sample/oss_blob_as_descriptor.py      | 111 ---------------
 .../rest_catalog_blob_as_descriptor_sample.py      | 155 +++++++++++++++++++++
 .../pypaimon/tests/rest/rest_token_file_io_test.py |  46 ++++++
 5 files changed, 234 insertions(+), 113 deletions(-)

diff --git a/paimon-python/pypaimon/api/rest_util.py 
b/paimon-python/pypaimon/api/rest_util.py
index 3fe4ec0625..97a709ecc3 100644
--- a/paimon-python/pypaimon/api/rest_util.py
+++ b/paimon-python/pypaimon/api/rest_util.py
@@ -43,3 +43,24 @@ class RESTUtil:
                 new_key = key[len(prefix):]
                 result[new_key] = str(value)
         return result
+
+    @staticmethod
+    def merge(
+            base_properties: Dict[str, str],
+            override_properties: Dict[str, str]) -> Dict[str, str]:
+        if override_properties is None:
+            override_properties = {}
+        if base_properties is None:
+            base_properties = {}
+        
+        result = {}
+        
+        for key, value in base_properties.items():
+            if value is not None and key not in override_properties:
+                result[key] = value
+        
+        for key, value in override_properties.items():
+            if value is not None:
+                result[key] = value
+        
+        return result
diff --git a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py 
b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
index 76e6dcf340..2cec5df721 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_token_file_io.py
@@ -23,6 +23,7 @@ from typing import Optional
 from pyarrow._fs import FileSystem
 
 from pypaimon.api.rest_api import RESTApi
+from pypaimon.api.rest_util import RESTUtil
 from pypaimon.catalog.rest.rest_token import RESTToken
 from pypaimon.common.file_io import FileIO
 from pypaimon.common.identifier import Identifier
@@ -60,8 +61,17 @@ class RESTTokenFileIO(FileIO):
     def _initialize_oss_fs(self, path) -> FileSystem:
         self.try_to_refresh_token()
         merged_token = self._merge_token_with_catalog_options(self.token.token)
-        self.properties.data.update(merged_token)
-        return super()._initialize_oss_fs(path)
+        merged_properties = RESTUtil.merge(
+            self.properties.to_map() if self.properties else {},
+            merged_token
+        )
+        merged_options = Options(merged_properties)
+        original_properties = self.properties
+        self.properties = merged_options
+        try:
+            return super()._initialize_oss_fs(path)
+        finally:
+            self.properties = original_properties
 
     def _merge_token_with_catalog_options(self, token: dict) -> dict:
         """Merge token with catalog options, DLF OSS endpoint should override 
the standard OSS endpoint."""
diff --git a/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py 
b/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
deleted file mode 100644
index 16696f298a..0000000000
--- a/paimon-python/pypaimon/sample/oss_blob_as_descriptor.py
+++ /dev/null
@@ -1,111 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-################################################################################
-
-import logging
-import pyarrow as pa
-
-from pypaimon.catalog.catalog_factory import CatalogFactory
-
-# Enable debug logging for catalog operations
-logging.basicConfig(level=logging.DEBUG, format='%(levelname)s: %(name)s: 
%(message)s')
-from pypaimon.schema.schema import Schema
-from pypaimon.table.row.blob import BlobDescriptor, Blob
-
-
-def oss_blob_as_descriptor():
-    warehouse = 'oss://<your-bucket>/<warehouse-path>'
-    catalog = CatalogFactory.create({
-        'warehouse': warehouse,
-        'fs.oss.endpoint': 'oss-<your-region>.aliyuncs.com',
-        'fs.oss.accessKeyId': '<your-ak>',
-        'fs.oss.accessKeySecret': '<your-sk>',
-        'fs.oss.region': '<your-region>'
-    })
-
-    pa_schema = pa.schema([
-        ('id', pa.int32()),
-        ('blob_data', pa.large_binary()),
-    ])
-
-    schema = Schema.from_pyarrow_schema(
-        pa_schema,
-        options={
-            'row-tracking.enabled': 'true',
-            'data-evolution.enabled': 'true',
-            'blob-as-descriptor': 'true',
-            'target-file-size': '100MB'
-        }
-    )
-
-    catalog.create_database("test_db", True)
-    catalog.create_table("test_db.blob_uri_scheme_test", schema, True)
-    table = catalog.get_table("test_db.blob_uri_scheme_test")
-
-    # Create external blob file in OSS
-    external_blob_uri = 
f"{warehouse.rstrip('/')}/external_blob_scheme_test.bin"
-    blob_content = b'This is external blob data'
-
-    with table.file_io.new_output_stream(external_blob_uri) as out_stream:
-        out_stream.write(blob_content)
-
-    # Create BlobDescriptor with OSS scheme
-    blob_descriptor = BlobDescriptor(external_blob_uri, 0, len(blob_content))
-    descriptor_bytes = blob_descriptor.serialize()
-
-    # Write the descriptor bytes to the table
-    test_data = pa.Table.from_pydict({
-        'id': [1],
-        'blob_data': [descriptor_bytes]
-    }, schema=pa_schema)
-
-    write_builder = table.new_batch_write_builder()
-    writer = write_builder.new_write()
-    writer.write_arrow(test_data)
-    commit_messages = writer.prepare_commit()
-    commit = write_builder.new_commit()
-    commit.commit(commit_messages)
-    writer.close()
-
-    read_builder = table.new_read_builder()
-    table_scan = read_builder.new_scan()
-    table_read = read_builder.new_read()
-    result = table_read.to_arrow(table_scan.plan().splits())
-
-    picture_bytes = result.column('blob_data').to_pylist()[0]
-    new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)
-
-    print(f"Original URI: {external_blob_uri}")
-    print(f"Read URI: {new_blob_descriptor.uri}")
-    assert new_blob_descriptor.uri.startswith('oss://'), \
-        f"URI scheme should be preserved. Got: {new_blob_descriptor.uri}"
-
-    from pypaimon.common.uri_reader import FileUriReader
-    uri_reader = FileUriReader(table.file_io)
-    blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
-
-    blob_descriptor_from_blob = blob.to_descriptor()
-    print(f"Blob descriptor URI from Blob.from_descriptor: 
{blob_descriptor_from_blob.uri}")
-
-    read_data = blob.to_data()
-    assert read_data == blob_content, "Blob data should match original content"
-
-    print("✅ All assertions passed!")
-
-
-if __name__ == '__main__':
-    oss_blob_as_descriptor()
diff --git 
a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py 
b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
new file mode 100644
index 0000000000..802d1def10
--- /dev/null
+++ b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
@@ -0,0 +1,155 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+"""
+Sample demonstrating how to use blob-as-descriptor mode with REST catalog.
+"""
+from pypaimon import CatalogFactory
+import pyarrow as pa
+
+from pypaimon import Schema
+from pypaimon.table.row.blob import BlobDescriptor, Blob
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.options import Options
+
+
+def write_table_with_blob(catalog, video_file_path: str, external_oss_options: 
dict):
+    database_name = 'blob_demo'
+    table_name = 'test_table_blob_' + str(int(__import__('time').time()))
+
+    catalog.create_database(
+        name=database_name,
+        ignore_if_exists=True,
+    )
+
+    pa_schema = pa.schema([
+        ('text', pa.string()),
+        ('names', pa.list_(pa.string())),
+        ('video', pa.large_binary())  # Blob column
+    ])
+
+    schema = Schema.from_pyarrow_schema(
+        pa_schema=pa_schema,
+        partition_keys=None,
+        primary_keys=None,
+        options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-field': 'video',
+            'blob-as-descriptor': 'true'
+        },
+        comment='Table with blob column using blob-as-descriptor mode')
+
+    table_identifier = f'{database_name}.{table_name}'
+    catalog.create_table(
+        identifier=table_identifier,
+        schema=schema,
+        ignore_if_exists=True
+    )
+
+    table = catalog.get_table(table_identifier)
+    print(f"✓ Table created: {table_identifier}")
+
+    # Access external OSS file to get file size
+    try:
+        external_file_io = FileIO(video_file_path, 
Options(external_oss_options))
+        video_file_size = external_file_io.get_file_size(video_file_path)
+    except Exception as e:
+        raise FileNotFoundError(
+            f"Failed to access external OSS file: {video_file_path}\n"
+            f"Error: {e}\n"
+            f"Please check your external_oss_options credentials."
+        ) from e
+
+    # Create BlobDescriptor
+    blob_descriptor = BlobDescriptor(video_file_path, 0, video_file_size)
+    descriptor_bytes = blob_descriptor.serialize()
+    write_builder = table.new_batch_write_builder()
+    table_write = write_builder.new_write()
+    table_commit = write_builder.new_commit()
+
+    table_write.write_arrow(pa.Table.from_pydict({
+        'text': ['Sample video'],
+        'names': [['video1.mp4']],
+        'video': [descriptor_bytes]
+    }, schema=pa_schema))
+
+    table_commit.commit(table_write.prepare_commit())
+    print("✓ Data committed successfully")
+    table_write.close()
+    table_commit.close()
+    
+    return f'{database_name}.{table_name}'
+
+
+def read_table_with_blob(catalog, table_name: str):
+    table = catalog.get_table(table_name)
+
+    read_builder = table.new_read_builder()
+    table_scan = read_builder.new_scan()
+    splits = table_scan.plan().splits()
+    table_read = read_builder.new_read()
+    
+    result = table_read.to_arrow(splits)
+    print(f"✓ Read {result.num_rows} rows")
+    
+    video_bytes_list = result.column('video').to_pylist()
+    for video_bytes in video_bytes_list:
+        if video_bytes is None:
+            continue
+        blob_descriptor = BlobDescriptor.deserialize(video_bytes)
+        from pypaimon.common.uri_reader import FileUriReader
+        uri_reader = FileUriReader(table.file_io)
+        blob = Blob.from_descriptor(uri_reader, blob_descriptor)
+        blob_data = blob.to_data()
+        print(f"✓ Blob data verified: {len(blob_data) / 1024 / 1024:.2f} MB")
+        break
+    
+    return result
+
+
+if __name__ == '__main__':
+    external_oss_options = {
+        'fs.oss.accessKeyId': "YOUR_EXTERNAL_OSS_ACCESS_KEY_ID",
+        'fs.oss.accessKeySecret': "YOUR_EXTERNAL_OSS_ACCESS_KEY_SECRET",
+        'fs.oss.endpoint': "oss-cn-hangzhou.aliyuncs.com",
+        'fs.oss.region': "cn-hangzhou",
+    }
+    
+    video_file_path = "oss://your-bucket/blob_test/video.mov"
+    
+    catalog_options = {
+        'metastore': 'rest',
+        'uri': "http://your-rest-catalog-uri";,
+        'warehouse': "your_warehouse",
+        'dlf.region': 'cn-hangzhou',
+        "token.provider": "dlf",
+        'dlf.access-key-id': "YOUR_DLF_ACCESS_KEY_ID",
+        'dlf.access-key-secret': "YOUR_DLF_ACCESS_KEY_SECRET",
+        'dlf.oss-endpoint': "oss-cn-hangzhou.aliyuncs.com",
+        **external_oss_options
+    }
+
+    catalog = CatalogFactory.create(catalog_options)
+    
+    try:
+        table_name = write_table_with_blob(catalog, video_file_path, 
external_oss_options)
+        result = read_table_with_blob(catalog, table_name)
+        print("✓ Test completed successfully!")
+    except Exception as e:
+        print(f'✗ Error: {e}')
+        raise
diff --git a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py 
b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
index eb0c9d3688..07e445b12a 100644
--- a/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_token_file_io_test.py
@@ -18,6 +18,7 @@
 import os
 import pickle
 import tempfile
+import time
 import unittest
 from unittest.mock import patch
 
@@ -192,6 +193,51 @@ class RESTTokenFileIOTest(unittest.TestCase):
                 "Other token properties should be preserved"
             )
 
+    def test_catalog_options_not_modified(self):
+        from pypaimon.api.rest_util import RESTUtil
+        from pypaimon.catalog.rest.rest_token import RESTToken
+        from pyarrow.fs import LocalFileSystem
+        
+        original_catalog_options = Options({
+            CatalogOptions.URI.key(): "http://test-uri";,
+            "custom.key": "custom.value"
+        })
+        
+        catalog_options_copy = Options(original_catalog_options.to_map())
+        
+        with patch.object(RESTTokenFileIO, 'try_to_refresh_token'):
+            file_io = RESTTokenFileIO(
+                self.identifier,
+                self.warehouse_path,
+                original_catalog_options
+            )
+            
+            token_dict = {
+                OssOptions.OSS_ACCESS_KEY_ID.key(): "token-access-key",
+                OssOptions.OSS_ACCESS_KEY_SECRET.key(): "token-secret-key",
+                OssOptions.OSS_ENDPOINT.key(): "token-endpoint"
+            }
+            file_io.token = RESTToken(token_dict, int(time.time() * 1000) + 
3600000)
+            
+            with patch.object(FileIO, '_initialize_oss_fs', 
return_value=LocalFileSystem()):
+                file_io._initialize_oss_fs("file:///test/path")
+            
+            self.assertEqual(
+                original_catalog_options.to_map(),
+                catalog_options_copy.to_map(),
+                "Original catalog_options should not be modified"
+            )
+            
+            merged_properties = RESTUtil.merge(
+                original_catalog_options.to_map(),
+                file_io._merge_token_with_catalog_options(token_dict)
+            )
+            
+            self.assertIn("custom.key", merged_properties)
+            self.assertEqual(merged_properties["custom.key"], "custom.value")
+            self.assertIn(OssOptions.OSS_ACCESS_KEY_ID.key(), 
merged_properties)
+            
self.assertEqual(merged_properties[OssOptions.OSS_ACCESS_KEY_ID.key()], 
"token-access-key")
+
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to