This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fd413d7bfe [python] Blob type more test for descriptor (#6422)
fd413d7bfe is described below
commit fd413d7bfe3ac24eb1364ea031aece76fd3e7d7f
Author: YeJunHao <[email protected]>
AuthorDate: Fri Oct 17 15:09:12 2025 +0800
[python] Blob type more test for descriptor (#6422)
---
paimon-python/pypaimon/common/core_options.py | 4 +
paimon-python/pypaimon/read/split_read.py | 2 +-
paimon-python/pypaimon/tests/blob_table_test.py | 100 +++++++++++++++++++++
paimon-python/pypaimon/write/writer/data_writer.py | 2 +-
4 files changed, 106 insertions(+), 2 deletions(-)
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index da1ad0674e..cbf35b33e4 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -51,3 +51,7 @@ class CoreOptions(str, Enum):
COMMIT_USER_PREFIX = "commit.user-prefix"
ROW_TRACKING_ENABLED = "row-tracking.enabled"
DATA_EVOLUTION_ENABLED = "data-evolution.enabled"
+
+ @staticmethod
+ def get_blob_as_descriptor(options: dict) -> bool:
+ return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR,
"false").lower() == 'true'
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 64a28ac63c..5a8bc4825b 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -82,7 +82,7 @@ class SplitRead(ABC):
format_reader = FormatAvroReader(self.table.file_io, file_path,
read_fields,
self.read_fields,
self.push_down_predicate)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
- blob_as_descriptor =
self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
+ blob_as_descriptor =
CoreOptions.get_blob_as_descriptor(self.table.options)
format_reader = FormatBlobReader(self.table.file_io, file_path,
read_fields,
self.read_fields,
self.push_down_predicate, blob_as_descriptor)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 3c8273bde4..d24b3f0d5a 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -23,6 +23,7 @@ import unittest
import pyarrow as pa
from pypaimon import CatalogFactory
+from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.write.commit_message import CommitMessage
@@ -871,6 +872,105 @@ class DataBlobWriterTest(unittest.TestCase):
print(f"✅ End-to-end blob write/read test passed: wrote and read back
{len(blob_data)} blob records correctly") # noqa: E501
+ def test_blob_write_read_end_to_end_with_descriptor(self):
+ """Test end-to-end blob functionality using blob descriptors."""
+ import random
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobDescriptor, Blob
+ from pypaimon.common.uri_reader import UriReaderFactory
+ from pypaimon.common.config import CatalogOptions
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('picture', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-as-descriptor': 'true'
+ }
+ )
+
+ # Create table
+ self.catalog.create_table('test_db.blob_descriptor_test', schema,
False)
+ table: FileStoreTable =
self.catalog.get_table('test_db.blob_descriptor_test')
+
+ # Create test blob data (1MB)
+ blob_data = bytearray(1024 * 1024)
+ random.seed(42) # For reproducible tests
+ for i in range(len(blob_data)):
+ blob_data[i] = random.randint(0, 255)
+ blob_data = bytes(blob_data)
+
+ # Create external blob file
+ external_blob_path = os.path.join(self.temp_dir, 'external_blob')
+ with open(external_blob_path, 'wb') as f:
+ f.write(blob_data)
+
+ # Create blob descriptor pointing to external file
+ blob_descriptor = BlobDescriptor(external_blob_path, 0, len(blob_data))
+
+ # Create test data with blob descriptor
+ test_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'name': ['paimon'],
+ 'picture': [blob_descriptor.serialize()]
+ }, schema=pa_schema)
+
+ # Write data using table API
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+
+ # Commit the data
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data was written and read correctly
+ self.assertEqual(result.num_rows, 1, "Should have 1 row")
+ self.assertEqual(result.column('id').to_pylist(), [1], "ID should
match")
+ self.assertEqual(result.column('name').to_pylist(), ['paimon'], "Name
should match")
+
+ # Get the blob descriptor bytes from the result
+ picture_bytes = result.column('picture').to_pylist()[0]
+ self.assertIsInstance(picture_bytes, bytes, "Picture should be bytes")
+
+ # Deserialize the blob descriptor
+ new_blob_descriptor = BlobDescriptor.deserialize(picture_bytes)
+
+ # The URI might be different if the blob was stored in the table's
data directory
+ # Let's verify the descriptor properties and try to read the data
+ # Note: offset might be non-zero due to blob file format overhead
+ self.assertGreaterEqual(new_blob_descriptor.offset, 0, "Offset should
be non-negative")
+ self.assertEqual(new_blob_descriptor.length, len(blob_data), "Length
should match")
+
+ # Create URI reader factory and read the blob data
+ catalog_options = {CatalogOptions.WAREHOUSE: self.warehouse}
+ uri_reader_factory = UriReaderFactory(catalog_options)
+ uri_reader = uri_reader_factory.create(new_blob_descriptor.uri)
+ blob = Blob.from_descriptor(uri_reader, new_blob_descriptor)
+
+ # Verify the blob data matches the original
+ self.assertEqual(blob.to_data(), blob_data, "Blob data should match
original")
+
+ print("✅ Blob descriptor end-to-end test passed:")
+ print(" - Created external blob file and descriptor")
+ print(" - Wrote and read blob descriptor successfully")
+ print(" - Verified blob data can be read from descriptor")
+ print(" - Tested blob-as-descriptor=true mode")
+
def test_blob_write_read_large_data_end_to_end(self):
"""Test end-to-end blob functionality with large blob data (1MB per
blob)."""
from pypaimon import Schema
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 502d196ae6..6ee92082db 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -58,7 +58,7 @@ class DataWriter(ABC):
self.pending_data: Optional[pa.Table] = None
self.committed_files: List[DataFileMeta] = []
self.write_cols = write_cols
- self.blob_as_descriptor =
options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
+ self.blob_as_descriptor = CoreOptions.get_blob_as_descriptor(options)
def write(self, data: pa.RecordBatch):
try: