This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 67e7bd8e96 [Python] Blob Table supports partition (#6488)
67e7bd8e96 is described below
commit 67e7bd8e9679b7272714a1ee2766daab2276c6af
Author: umi <[email protected]>
AuthorDate: Wed Oct 29 13:47:36 2025 +0800
[Python] Blob Table supports partition (#6488)
---
paimon-python/pypaimon/read/split_read.py | 5 +-
paimon-python/pypaimon/tests/blob_table_test.py | 158 ++++++++++++++++++++----
2 files changed, 134 insertions(+), 29 deletions(-)
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index a744fbc4f0..5c90c2ddb6 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -461,18 +461,19 @@ class DataEvolutionSplitRead(SplitRead):
if not read_fields:
file_record_readers[i] = None
else:
+ read_field_names = self._remove_partition_fields(read_fields)
table_fields = self.read_fields
self.read_fields = read_fields # create reader based on
read_fields
# Create reader for this bunch
if len(bunch.files()) == 1:
file_record_readers[i] = self._create_file_reader(
- bunch.files()[0], [field.name for field in read_fields]
+ bunch.files()[0], read_field_names
)
else:
# Create concatenated reader for multiple files
suppliers = [
lambda f=file: self._create_file_reader(
- f, [field.name for field in read_fields]
+ f, read_field_names
) for file in bunch.files()
]
file_record_readers[i] = MergeAllBatchReader(suppliers)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index fe987ea0a6..67813ca5c3 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -310,7 +310,7 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify the blob size is approximately 50MB
blob_size_mb = len(large_blob_data) / (1024 * 1024)
self.assertGreater(blob_size_mb, 49) # Should be at least 49MB
- self.assertLess(blob_size_mb, 51) # Should be less than 51MB
+ self.assertLess(blob_size_mb, 51) # Should be less than 51MB
total_rows = 0
@@ -646,9 +646,9 @@ class DataBlobWriterTest(unittest.TestCase):
result_type = result_df.iloc[i]['type']
result_data = result_df.iloc[i]['data']
- self.assertEqual(result_id, original_id, f"Row {i+1}: ID should
match")
- self.assertEqual(result_type, original_type, f"Row {i+1}: Type
should match")
- self.assertEqual(result_data, original_data, f"Row {i+1}: Blob
data should match")
+ self.assertEqual(result_id, original_id, f"Row {i + 1}: ID should
match")
+ self.assertEqual(result_type, original_type, f"Row {i + 1}: Type
should match")
+ self.assertEqual(result_data, original_data, f"Row {i + 1}: Blob
data should match")
def test_data_blob_writer_empty_batches(self):
"""Test DataBlobWriter with empty batches."""
@@ -742,7 +742,7 @@ class DataBlobWriterTest(unittest.TestCase):
# Create data that should trigger rolling
large_content = 'X' * 1000 # Large string content
- large_blob = b'B' * 5000 # Large blob data
+ large_blob = b'B' * 5000 # Large blob data
# Write multiple batches to test rolling
for i in range(10): # 10 batches
@@ -806,7 +806,8 @@ class DataBlobWriterTest(unittest.TestCase):
b'medium_blob_data_2_with_more_content',
b'large_blob_data_3_with_even_more_content_and_details',
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', #
noqa: E501
-
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
# noqa: E501
+ b'extremely_large_blob_data_5_with_comprehensive_content_and_'
+ b'extensive_details_covering_multiple_aspects' # noqa: E501
]
}, schema=pa_schema)
@@ -849,8 +850,10 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify normal columns
self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID
column should match")
- self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob',
'Charlie', 'David', 'Eve'], "Name column should match") # noqa: E501
- self.assertEqual(result.column('description').to_pylist(), ['User 1',
'User 2', 'User 3', 'User 4', 'User 5'], "Description column should match") #
noqa: E501
+ self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob',
'Charlie', 'David', 'Eve'],
+ "Name column should match") # noqa: E501
+ self.assertEqual(result.column('description').to_pylist(), ['User 1',
'User 2', 'User 3', 'User 4', 'User 5'],
+ "Description column should match") # noqa: E501
# Verify blob data correctness
blob_data = result.column('blob_data').to_pylist()
@@ -859,7 +862,8 @@ class DataBlobWriterTest(unittest.TestCase):
b'medium_blob_data_2_with_more_content',
b'large_blob_data_3_with_even_more_content_and_details',
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', #
noqa: E501
-
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
# noqa: E501
+
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
+ # noqa: E501
]
self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
@@ -867,10 +871,103 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify individual blob sizes
for i, (actual_blob, expected_blob) in enumerate(zip(blob_data,
expected_blobs)):
- self.assertEqual(len(actual_blob), len(expected_blob), f"Blob
{i+1} size should match")
- self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content
should match exactly")
+ self.assertEqual(len(actual_blob), len(expected_blob), f"Blob {i +
1} size should match")
+ self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1}
content should match exactly")
- print(f"✅ End-to-end blob write/read test passed: wrote and read back
{len(blob_data)} blob records correctly") # noqa: E501
+ print(
+ f"✅ End-to-end blob write/read test passed: wrote and read back
{len(blob_data)} blob records correctly") # noqa: E501
+
+ def test_blob_write_read_partition(self):
+ """Test complete end-to-end blob functionality: write blob data and
read it back to verify correctness."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('description', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema, partition_keys=['name'],
+ # pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_write_read_partition', schema,
False)
+ table = self.catalog.get_table('test_db.blob_write_read_partition')
+
+ # Test data with various blob sizes and types
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'name': ['Alice', 'Alice', 'David', 'David', 'David'],
+ 'description': ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'],
+ 'blob_data': [
+ b'small_blob_1',
+ b'medium_blob_data_2_with_more_content',
+ b'large_blob_data_3_with_even_more_content_and_details',
+
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', #
noqa: E501
+ b'extremely_large_blob_data_5_with_comprehensive_content_and_'
+ b'extensive_details_covering_multiple_aspects'
+ # noqa: E501
+ ]
+ }, schema=pa_schema)
+
+ # Write data using table API
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+
+ # Commit the data
+ commit_messages = writer.prepare_commit()
+
+ # Create commit and commit the data
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back using table API
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ splits = table_scan.plan().splits()
+ result = table_read.to_arrow(splits)
+
+ # Verify the data was read back correctly
+ self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+ self.assertEqual(result.num_columns, 4, "Should have 4 columns")
+
+ # Verify normal columns
+ self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID
column should match")
+ self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Alice',
'David', 'David', 'David'],
+ "Name column should match") # noqa: E501
+ self.assertEqual(result.column('description').to_pylist(), ['User 1',
'User 2', 'User 3', 'User 4', 'User 5'],
+ "Description column should match") # noqa: E501
+
+ # Verify blob data correctness
+ blob_data = result.column('blob_data').to_pylist()
+ expected_blobs = [
+ b'small_blob_1',
+ b'medium_blob_data_2_with_more_content',
+ b'large_blob_data_3_with_even_more_content_and_details',
+
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', #
noqa: E501
+
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
+ # noqa: E501
+ ]
+
+ self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
+ self.assertEqual(blob_data, expected_blobs, "Blob data should match
exactly")
+
+ # Verify individual blob sizes
+ for i, (actual_blob, expected_blob) in enumerate(zip(blob_data,
expected_blobs)):
+ self.assertEqual(len(actual_blob), len(expected_blob), f"Blob {i +
1} size should match")
+ self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1}
content should match exactly")
+
+ print(
+ f"✅ End-to-end blob write/read test passed: wrote and read back
{len(blob_data)} blob records correctly") # noqa: E501
def test_blob_write_read_end_to_end_with_descriptor(self):
"""Test end-to-end blob functionality using blob descriptors."""
@@ -1044,18 +1141,20 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify normal columns
self.assertEqual(result.column('id').to_pylist(), [1, 2, 3], "ID
column should match")
- self.assertEqual(result.column('metadata').to_pylist(), ['Large blob
1', 'Large blob 2', 'Large blob 3'], "Metadata column should match") # noqa:
E501
+ self.assertEqual(result.column('metadata').to_pylist(), ['Large blob
1', 'Large blob 2', 'Large blob 3'],
+ "Metadata column should match") # noqa: E501
# Verify blob data integrity
blob_data = result.column('large_blob').to_pylist()
self.assertEqual(len(blob_data), 3, "Should have 3 blob records")
for i, blob in enumerate(blob_data):
- self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1}
should be {large_blob_size} bytes")
- self.assertEqual(blob, large_blob_data, f"Blob {i+1} content
should match exactly")
- print(f"✅ Verified large blob {i+1}: {len(blob)} bytes")
+ self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1}
should be {large_blob_size} bytes")
+ self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content
should match exactly")
+ print(f"✅ Verified large blob {i + 1}: {len(blob)} bytes")
- print(f"✅ Large blob end-to-end test passed: wrote and read back
{len(blob_data)} large blob records correctly") # noqa: E501
+ print(
+ f"✅ Large blob end-to-end test passed: wrote and read back
{len(blob_data)} large blob records correctly") # noqa: E501
def test_blob_write_read_mixed_sizes_end_to_end(self):
"""Test end-to-end blob functionality with mixed blob sizes."""
@@ -1129,7 +1228,8 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify normal columns
self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID
column should match")
- self.assertEqual(result.column('size_category').to_pylist(), ['tiny',
'small', 'medium', 'large', 'huge'], "Size category column should match") #
noqa: E501
+ self.assertEqual(result.column('size_category').to_pylist(), ['tiny',
'small', 'medium', 'large', 'huge'],
+ "Size category column should match") # noqa: E501
# Verify blob data
blob_data = result.column('blob_data').to_pylist()
@@ -1145,9 +1245,10 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify individual blob content
for i, (actual_blob, expected_blob) in enumerate(zip(blob_data,
expected_blobs)):
- self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content
should match exactly")
+ self.assertEqual(actual_blob, expected_blob, f"Blob {i + 1}
content should match exactly")
- print(f"✅ Mixed sizes end-to-end test passed: wrote and read back
blobs ranging from {min(sizes)} to {max(sizes)} bytes") # noqa: E501
+ print(
+ f"✅ Mixed sizes end-to-end test passed: wrote and read back blobs
ranging from {min(sizes)} to {max(sizes)} bytes") # noqa: E501
def test_blob_write_read_large_data_end_to_end_with_rolling(self):
"""Test end-to-end blob functionality with large blob data (50MB per
blob) and rolling behavior (40 blobs)."""
@@ -1180,7 +1281,7 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify the blob size is exactly 50MB
actual_size = len(large_blob_data)
- print(f"Created blob data: {actual_size:,} bytes ({actual_size /
(1024*1024):.2f} MB)")
+ print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024
* 1024):.2f} MB)")
# Write 40 batches of data (each with 1 blob of 50MB)
write_builder = table.new_batch_write_builder()
@@ -1245,8 +1346,10 @@ class DataBlobWriterTest(unittest.TestCase):
expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)]
self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID
column should match")
- self.assertEqual(result.column('batch_id').to_pylist(),
expected_batch_ids, "Batch ID column should match") # noqa: E501
- self.assertEqual(result.column('metadata').to_pylist(),
expected_metadata, "Metadata column should match") # noqa: E501
+ self.assertEqual(result.column('batch_id').to_pylist(),
expected_batch_ids,
+ "Batch ID column should match") # noqa: E501
+ self.assertEqual(result.column('metadata').to_pylist(),
expected_metadata,
+ "Metadata column should match") # noqa: E501
# Verify blob data integrity
blob_data = result.column('large_blob').to_pylist()
@@ -1254,12 +1357,12 @@ class DataBlobWriterTest(unittest.TestCase):
# Verify each blob
for i, blob in enumerate(blob_data):
- self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1}
should be {large_blob_size:,} bytes")
- self.assertEqual(blob, large_blob_data, f"Blob {i+1} content
should match exactly")
+ self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1}
should be {large_blob_size:,} bytes")
+ self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content
should match exactly")
# Print progress every 10 blobs
if (i + 1) % 10 == 0:
- print(f"✅ Verified blob {i+1}/40: {len(blob):,} bytes")
+ print(f"✅ Verified blob {i + 1}/40: {len(blob):,} bytes")
# Verify total data size
total_blob_size = sum(len(blob) for blob in blob_data)
@@ -1269,7 +1372,8 @@ class DataBlobWriterTest(unittest.TestCase):
print("✅ Large blob rolling end-to-end test passed:")
print(" - Wrote and read back 40 blobs of 50MB each")
- print(f" - Total data size: {total_blob_size:,} bytes
({total_blob_size / (1024*1024*1024):.2f} GB)") # noqa: E501
+ print(
+ f" - Total data size: {total_blob_size:,} bytes
({total_blob_size / (1024 * 1024 * 1024):.2f} GB)") # noqa: E501
print(" - All blob content verified as correct")
def test_data_blob_writer_with_shard(self):