This is an automated email from the ASF dual-hosted git repository.
jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 50979b1cf1 [python] support rolling for blob write when
blob-as-descriptor (#6578)
50979b1cf1 is described below
commit 50979b1cf18446036dbd3a8bd27739763778176c
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Nov 12 09:26:59 2025 +0800
[python] support rolling for blob write when blob-as-descriptor (#6578)
---
.../paimon/append/RollingBlobFileWriterTest.java | 353 ++++++++++++-
paimon-python/pypaimon/common/core_options.py | 18 +
paimon-python/pypaimon/tests/blob_table_test.py | 559 +++++++++++++++++++++
.../pypaimon/write/writer/blob_file_writer.py | 111 ++++
paimon-python/pypaimon/write/writer/blob_writer.py | 216 +++++++-
paimon-python/pypaimon/write/writer/data_writer.py | 2 +-
6 files changed, 1252 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index fb74b5f203..e2e68905c8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -81,7 +81,8 @@ public class RollingBlobFileWriterTest {
new DataFilePathFactory(
new Path(tempDir + "/bucket-0"),
"parquet",
- "data",
+ "data-", // dataFilePrefix should include the hyphen
to match expected
+ // format: data-{uuid}-{count}
"changelog",
false,
null,
@@ -192,7 +193,8 @@ public class RollingBlobFileWriterTest {
new DataFilePathFactory(
new Path(tempDir + "/blob-size-test"),
"parquet",
- "data",
+ "data-", // dataFilePrefix should include the
hyphen to match
+ // expected format: data-{uuid}-{count}
"changelog",
false,
null,
@@ -261,6 +263,353 @@ public class RollingBlobFileWriterTest {
results.forEach(file ->
assertThat(file.schemaId()).isEqualTo(SCHEMA_ID));
}
+ @Test
+ void testBlobFileNameFormatWithSharedUuid() throws IOException {
+ long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files
+
+ RollingBlobFileWriter fileNameTestWriter =
+ new RollingBlobFileWriter(
+ LocalFileIO.create(),
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("parquet", new Options()),
+ 128 * 1024 * 1024,
+ blobTargetFileSize,
+ SCHEMA,
+ pathFactory, // Use the same pathFactory to ensure
shared UUID
+ new LongCounter(),
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new
Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false, // asyncFileWrite
+ false // statsDenseStore
+ );
+
+ // Create blob data that will trigger rolling
+ byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
+ new Random(456).nextBytes(blobData);
+
+ // Write enough rows to trigger multiple blob file rollings
+ for (int i = 0; i < 10; i++) {
+ InternalRow row =
+ GenericRow.of(i, BinaryString.fromString("test-" + i), new
BlobData(blobData));
+ fileNameTestWriter.write(row);
+ }
+
+ fileNameTestWriter.close();
+ List<DataFileMeta> results = fileNameTestWriter.result();
+
+ // Filter blob files
+ List<DataFileMeta> blobFiles =
+ results.stream()
+ .filter(file -> "blob".equals(file.fileFormat()))
+ .collect(java.util.stream.Collectors.toList());
+
+ assertThat(blobFiles)
+ .as("Should have multiple blob files due to rolling")
+ .hasSizeGreaterThan(1);
+
+ // Extract UUID and counter from file names
+ // Format: data-{uuid}-{count}.blob
+ String firstFileName = blobFiles.get(0).fileName();
+ assertThat(firstFileName)
+ .as("File name should match expected format:
data-{uuid}-{count}.blob")
+ .matches(
+
"data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}-\\d+\\.blob");
+
+ // Extract UUID from first file name
+ String uuid = firstFileName.substring(5,
firstFileName.lastIndexOf('-'));
+ int firstCounter =
+ Integer.parseInt(
+ firstFileName.substring(
+ firstFileName.lastIndexOf('-') + 1,
+ firstFileName.lastIndexOf('.')));
+
+ // Verify all blob files use the same UUID and have sequential counters
+ for (int i = 0; i < blobFiles.size(); i++) {
+ String fileName = blobFiles.get(i).fileName();
+ String fileUuid = fileName.substring(5, fileName.lastIndexOf('-'));
+ int counter =
+ Integer.parseInt(
+ fileName.substring(
+ fileName.lastIndexOf('-') + 1,
fileName.lastIndexOf('.')));
+
+ assertThat(fileUuid).as("All blob files should use the same
UUID").isEqualTo(uuid);
+
+ assertThat(counter)
+ .as("File counter should be sequential starting from first
counter")
+ .isEqualTo(firstCounter + i);
+ }
+ }
+
+ @Test
+ void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws
IOException {
+ long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files
+
+ RollingBlobFileWriter fileNameTestWriter =
+ new RollingBlobFileWriter(
+ LocalFileIO.create(),
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("parquet", new Options()),
+ 128 * 1024 * 1024,
+ blobTargetFileSize,
+ SCHEMA,
+ pathFactory, // Use the same pathFactory to ensure
shared UUID
+ new LongCounter(),
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new
Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false, // asyncFileWrite
+ false // statsDenseStore
+ );
+
+ // Create blob data that will trigger rolling (non-descriptor mode:
direct blob data)
+ byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
+ new Random(789).nextBytes(blobData);
+
+ // Write enough rows to trigger multiple blob file rollings
+ for (int i = 0; i < 10; i++) {
+ InternalRow row =
+ GenericRow.of(i, BinaryString.fromString("test-" + i), new
BlobData(blobData));
+ fileNameTestWriter.write(row);
+ }
+
+ fileNameTestWriter.close();
+ List<DataFileMeta> results = fileNameTestWriter.result();
+
+ // Filter blob files
+ List<DataFileMeta> blobFiles =
+ results.stream()
+ .filter(file -> "blob".equals(file.fileFormat()))
+ .collect(java.util.stream.Collectors.toList());
+
+ assertThat(blobFiles.size()).as("Should have at least one blob
file").isPositive();
+
+ // Extract UUID and counter from file names
+ // Format: data-{uuid}-{count}.blob
+ String firstFileName = blobFiles.get(0).fileName();
+ assertThat(firstFileName)
+ .as("File name should match expected format:
data-{uuid}-{count}.blob")
+ .matches(
+
"data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}-\\d+\\.blob");
+
+ // Extract UUID from first file name
+ String uuid = firstFileName.substring(5,
firstFileName.lastIndexOf('-'));
+ int firstCounter =
+ Integer.parseInt(
+ firstFileName.substring(
+ firstFileName.lastIndexOf('-') + 1,
+ firstFileName.lastIndexOf('.')));
+
+ // Verify all blob files use the same UUID and have sequential counters
+ for (int i = 0; i < blobFiles.size(); i++) {
+ String fileName = blobFiles.get(i).fileName();
+ String fileUuid = fileName.substring(5, fileName.lastIndexOf('-'));
+ int counter =
+ Integer.parseInt(
+ fileName.substring(
+ fileName.lastIndexOf('-') + 1,
fileName.lastIndexOf('.')));
+
+ assertThat(fileUuid).as("All blob files should use the same
UUID").isEqualTo(uuid);
+
+ assertThat(counter)
+ .as("File counter should be sequential starting from first
counter")
+ .isEqualTo(firstCounter + i);
+ }
+ }
+
+ @Test
+ void testSequenceNumberIncrementInBlobAsDescriptorMode() throws
IOException {
+ // Write multiple rows to trigger one-by-one writing in
blob-as-descriptor mode
+ int numRows = 10;
+ for (int i = 0; i < numRows; i++) {
+ InternalRow row =
+ GenericRow.of(
+ i, BinaryString.fromString("test" + i), new
BlobData(testBlobData));
+ writer.write(row);
+ }
+
+ writer.close();
+ List<DataFileMeta> metasResult = writer.result();
+
+ // Extract blob files (skip the first normal file)
+ List<DataFileMeta> blobFiles =
+ metasResult.stream()
+ .filter(f -> f.fileFormat().equals("blob"))
+ .collect(java.util.stream.Collectors.toList());
+
+ assertThat(blobFiles).as("Should have at least one blob
file").isNotEmpty();
+
+ // Verify sequence numbers for each blob file
+ for (DataFileMeta blobFile : blobFiles) {
+ long minSeq = blobFile.minSequenceNumber();
+ long maxSeq = blobFile.maxSequenceNumber();
+ long rowCount = blobFile.rowCount();
+
+ // Critical assertion: min_seq should NOT equal max_seq when there
are multiple rows
+ if (rowCount > 1) {
+ assertThat(minSeq)
+ .as(
+ "Sequence numbers should be different for
files with multiple rows. "
+ + "File: %s, row_count: %d, min_seq:
%d, max_seq: %d. "
+ + "This indicates sequence generator
was not incremented for each row.",
+ blobFile.fileName(), rowCount, minSeq, maxSeq)
+ .isNotEqualTo(maxSeq);
+
+ // Verify that max_seq - min_seq + 1 equals row_count
+ // (each row should have a unique sequence number)
+ assertThat(maxSeq - minSeq + 1)
+ .as(
+ "Sequence number range should match row count.
"
+ + "File: %s, row_count: %d, min_seq:
%d, max_seq: %d, "
+ + "expected range: %d, actual range:
%d",
+ blobFile.fileName(),
+ rowCount,
+ minSeq,
+ maxSeq,
+ rowCount,
+ maxSeq - minSeq + 1)
+ .isEqualTo(rowCount);
+ } else {
+ // For single row files, min_seq == max_seq is acceptable
+ assertThat(minSeq)
+ .as(
+ "Single row file should have min_seq ==
max_seq. "
+ + "File: %s, min_seq: %d, max_seq: %d",
+ blobFile.fileName(), minSeq, maxSeq)
+ .isEqualTo(maxSeq);
+ }
+ }
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(numRows);
+ }
+
+ @Test
+ void testSequenceNumberIncrementInNonDescriptorMode() throws IOException {
+ // Write multiple rows as a batch to trigger batch writing in
non-descriptor mode
+ // (blob-as-descriptor=false, which is the default)
+ int numRows = 10;
+ for (int i = 0; i < numRows; i++) {
+ InternalRow row =
+ GenericRow.of(
+ i, BinaryString.fromString("test" + i), new
BlobData(testBlobData));
+ writer.write(row);
+ }
+
+ writer.close();
+ List<DataFileMeta> metasResult = writer.result();
+
+ // Extract blob files (skip the first normal file)
+ List<DataFileMeta> blobFiles =
+ metasResult.stream()
+ .filter(f -> f.fileFormat().equals("blob"))
+ .collect(java.util.stream.Collectors.toList());
+
+ assertThat(blobFiles).as("Should have at least one blob
file").isNotEmpty();
+
+ // Verify sequence numbers for each blob file
+ for (DataFileMeta blobFile : blobFiles) {
+ long minSeq = blobFile.minSequenceNumber();
+ long maxSeq = blobFile.maxSequenceNumber();
+ long rowCount = blobFile.rowCount();
+
+ // Critical assertion: min_seq should NOT equal max_seq when there
are multiple rows
+ if (rowCount > 1) {
+ assertThat(minSeq)
+ .as(
+ "Sequence numbers should be different for
files with multiple rows. "
+ + "File: %s, row_count: %d, min_seq:
%d, max_seq: %d. "
+ + "This indicates sequence generator
was not incremented for each row in batch.",
+ blobFile.fileName(), rowCount, minSeq, maxSeq)
+ .isNotEqualTo(maxSeq);
+
+ // Verify that max_seq - min_seq + 1 equals row_count
+ // (each row should have a unique sequence number)
+ assertThat(maxSeq - minSeq + 1)
+ .as(
+ "Sequence number range should match row count.
"
+ + "File: %s, row_count: %d, min_seq:
%d, max_seq: %d, "
+ + "expected range: %d, actual range:
%d",
+ blobFile.fileName(),
+ rowCount,
+ minSeq,
+ maxSeq,
+ rowCount,
+ maxSeq - minSeq + 1)
+ .isEqualTo(rowCount);
+ } else {
+ // For single row files, min_seq == max_seq is acceptable
+ assertThat(minSeq)
+ .as(
+ "Single row file should have min_seq ==
max_seq. "
+ + "File: %s, min_seq: %d, max_seq: %d",
+ blobFile.fileName(), minSeq, maxSeq)
+ .isEqualTo(maxSeq);
+ }
+ }
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(numRows);
+ }
+
+ @Test
+ void testBlobStatsSchemaWithCustomColumnName() throws IOException {
+ RowType customSchema =
+ RowType.builder()
+ .field("id", DataTypes.INT())
+ .field("name", DataTypes.STRING())
+ .field("my_custom_blob", DataTypes.BLOB()) // Custom
blob column name
+ .build();
+
+ // Reinitialize writer with custom schema
+ writer =
+ new RollingBlobFileWriter(
+ LocalFileIO.create(),
+ SCHEMA_ID,
+ FileFormat.fromIdentifier("parquet", new Options()),
+ TARGET_FILE_SIZE,
+ TARGET_FILE_SIZE,
+ customSchema, // Use custom schema
+ pathFactory,
+ seqNumCounter,
+ COMPRESSION,
+ new StatsCollectorFactories(new CoreOptions(new
Options())),
+ new FileIndexOptions(),
+ FileSource.APPEND,
+ false, // asyncFileWrite
+ false // statsDenseStore
+ );
+
+ // Write data
+ for (int i = 0; i < 3; i++) {
+ InternalRow row =
+ GenericRow.of(
+ i, BinaryString.fromString("test" + i), new
BlobData(testBlobData));
+ writer.write(row);
+ }
+
+ writer.close();
+ List<DataFileMeta> metasResult = writer.result();
+
+ // Extract blob files
+ List<DataFileMeta> blobFiles =
+ metasResult.stream()
+ .filter(f -> f.fileFormat().equals("blob"))
+ .collect(java.util.stream.Collectors.toList());
+
+ assertThat(blobFiles).as("Should have at least one blob
file").isNotEmpty();
+
+ for (DataFileMeta blobFile : blobFiles) {
+ assertThat(blobFile.fileName()).endsWith(".blob");
+ assertThat(blobFile.rowCount()).isGreaterThan(0);
+ }
+
+ // Verify total record count
+ assertThat(writer.recordCount()).isEqualTo(3);
+ }
+
/** Simple implementation of BundleRecords for testing. */
private static class TestBundleRecords implements BundleRecords {
private final List<InternalRow> rows;
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index d6643399bb..87b8e9034a 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -47,6 +47,8 @@ class CoreOptions(str, Enum):
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
FILE_BLOCK_SIZE = "file.block-size"
FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
+ TARGET_FILE_SIZE = "target-file-size"
+ BLOB_TARGET_FILE_SIZE = "blob.target-file-size"
# Scan options
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
@@ -76,3 +78,19 @@ class CoreOptions(str, Enum):
cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST]
return MemorySize.parse(cost_str).get_bytes()
return MemorySize.of_mebi_bytes(4).get_bytes()
+
+ @staticmethod
+ def get_target_file_size(options: dict, has_primary_key: bool = False) ->
int:
+ """Get target file size from options, default to 128MB for primary key
table, 256MB for append-only table."""
+ if CoreOptions.TARGET_FILE_SIZE in options:
+ size_str = options[CoreOptions.TARGET_FILE_SIZE]
+ return MemorySize.parse(size_str).get_bytes()
+ return MemorySize.of_mebi_bytes(128 if has_primary_key else
256).get_bytes()
+
+ @staticmethod
+ def get_blob_target_file_size(options: dict) -> int:
+ """Get blob target file size from options, default to target-file-size
(256MB for append-only table)."""
+ if CoreOptions.BLOB_TARGET_FILE_SIZE in options:
+ size_str = options[CoreOptions.BLOB_TARGET_FILE_SIZE]
+ return MemorySize.parse(size_str).get_bytes()
+ return CoreOptions.get_target_file_size(options, has_primary_key=False)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 1b76499efb..3a2eee98fb 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1518,6 +1518,565 @@ class DataBlobWriterTest(unittest.TestCase):
[RowKind.INSERT, RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER,
RowKind.DELETE],
f"Row {row_id}: RowKind should be valid")
+ def test_blob_as_descriptor_target_file_size_rolling(self):
+ import random
+ import os
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobDescriptor
+
+ pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string()),
('blob_data', pa.large_binary())])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true',
+ 'blob-as-descriptor': 'true', 'target-file-size': '1MB'
+ })
+ self.catalog.create_table('test_db.blob_target_size_test', schema,
False)
+ table = self.catalog.get_table('test_db.blob_target_size_test')
+
+ # Create 5 external blob files (2MB each, > target-file-size)
+ num_blobs, blob_size = 5, 2 * 1024 * 1024
+ random.seed(42)
+ descriptors = []
+ for i in range(num_blobs):
+ path = os.path.join(self.temp_dir, f'external_blob_{i}')
+ data = bytes(bytearray([random.randint(0, 255) for _ in
range(blob_size)]))
+ with open(path, 'wb') as f:
+ f.write(data)
+ descriptors.append(BlobDescriptor(path, 0, len(data)))
+
+ # Write data
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(1, num_blobs + 1)),
+ 'name': [f'item_{i}' for i in range(1, num_blobs + 1)],
+ 'blob_data': [d.serialize() for d in descriptors]
+ }, schema=pa_schema)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ # Check blob files
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ total_size = sum(f.file_size for f in blob_files)
+
+ # Verify that rolling works correctly: should have multiple files when
total size exceeds target
+ # Each blob is 2MB, target-file-size is 1MB, so should have multiple
files
+ self.assertGreater(
+ len(blob_files), 1,
+ f"Should have multiple blob files when total size ({total_size /
1024 / 1024:.2f}MB) exceeds target (1MB), "
+ f"but got {len(blob_files)} file(s)"
+ )
+
+ # Verify data integrity
+ result =
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.num_rows, num_blobs)
+ self.assertEqual(result.column('id').to_pylist(), list(range(1,
num_blobs + 1)))
+
+ def test_blob_file_name_format_with_shared_uuid(self):
+ import random
+ import re
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobDescriptor
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-as-descriptor': 'true',
+ 'blob.target-file-size': '1MB' # Small target size to trigger
multiple rollings
+ })
+
+ self.catalog.create_table('test_db.blob_file_name_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_file_name_test')
+
+ # Create multiple external blob files (2MB each, > target-file-size)
+ # This will trigger multiple blob file rollings
+ num_blobs, blob_size = 5, 2 * 1024 * 1024
+ random.seed(789)
+ descriptors = []
+ for i in range(num_blobs):
+ path = os.path.join(self.temp_dir, f'external_blob_{i}')
+ data = bytes(bytearray([random.randint(0, 255) for _ in
range(blob_size)]))
+ with open(path, 'wb') as f:
+ f.write(data)
+ descriptors.append(BlobDescriptor(path, 0, len(data)))
+
+ # Write data
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(1, num_blobs + 1)),
+ 'name': [f'item_{i}' for i in range(1, num_blobs + 1)],
+ 'blob_data': [d.serialize() for d in descriptors]
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ # Extract blob files from commit messages
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+ # Should have multiple blob files due to rolling
+ self.assertGreater(len(blob_files), 1, "Should have multiple blob
files due to rolling")
+
+ # Verify file name format: data-{uuid}-{count}.blob
+ file_name_pattern = re.compile(
+ r'^data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-'
+ r'[a-f0-9]{12}-(\d+)\.blob$'
+ )
+
+ first_file_name = blob_files[0].file_name
+ self.assertTrue(
+ file_name_pattern.match(first_file_name),
+ f"File name should match expected format:
data-{{uuid}}-{{count}}.blob, got: {first_file_name}"
+ )
+
+ first_match = file_name_pattern.match(first_file_name)
+ first_counter = int(first_match.group(1))
+
+ # Extract UUID (everything between "data-" and last "-")
+ uuid_start = len("data-")
+ uuid_end = first_file_name.rfind('-', uuid_start)
+ shared_uuid = first_file_name[uuid_start:uuid_end]
+
+ # Verify all blob files use the same UUID and have sequential counters
+ for i, blob_file in enumerate(blob_files):
+ file_name = blob_file.file_name
+ match = file_name_pattern.match(file_name)
+
+ self.assertIsNotNone(
+ match,
+ f"File name should match expected format:
data-{{uuid}}-{{count}}.blob, got: {file_name}"
+ )
+
+ counter = int(match.group(1))
+
+ # Extract UUID from this file
+ file_uuid = file_name[uuid_start:file_name.rfind('-', uuid_start)]
+
+ self.assertEqual(
+ file_uuid,
+ shared_uuid,
+ f"All blob files should use the same UUID. Expected:
{shared_uuid}, got: {file_uuid} in {file_name}"
+ )
+
+ self.assertEqual(
+ counter,
+ first_counter + i,
+ f"File counter should be sequential. Expected: {first_counter
+ i}, got: {counter} in {file_name}"
+ )
+
+ # Verify data integrity
+ result =
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.num_rows, num_blobs)
+ self.assertEqual(result.column('id').to_pylist(), list(range(1,
num_blobs + 1)))
+
+ def test_blob_as_descriptor_sequence_number_increment(self):
+ import os
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobDescriptor
+
+ # Create schema with blob column (blob-as-descriptor=true)
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-as-descriptor': 'true'
+ })
+
+ self.catalog.create_table('test_db.blob_sequence_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_sequence_test')
+
+ # Create multiple external blob files
+ num_blobs = 10
+ descriptors = []
+ for i in range(num_blobs):
+ path = os.path.join(self.temp_dir, f'external_blob_seq_{i}')
+ data = f"blob data {i}".encode('utf-8')
+ with open(path, 'wb') as f:
+ f.write(data)
+ descriptors.append(BlobDescriptor(path, 0, len(data)))
+
+ # Write data row by row (this triggers the one-by-one writing in
blob-as-descriptor mode)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ # Write each row separately to ensure one-by-one writing
+ for i in range(num_blobs):
+ test_data = pa.Table.from_pydict({
+ 'id': [i + 1],
+ 'name': [f'item_{i}'],
+ 'blob_data': [descriptors[i].serialize()]
+ }, schema=pa_schema)
+ writer.write_arrow(test_data)
+
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ # Extract blob files from commit messages
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+ # Verify that we have at least one blob file
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ # Verify sequence numbers for each blob file
+ for blob_file in blob_files:
+ min_seq = blob_file.min_sequence_number
+ max_seq = blob_file.max_sequence_number
+ row_count = blob_file.row_count
+
+ # Critical assertion: min_seq should NOT equal max_seq when there
are multiple rows
+ if row_count > 1:
+ self.assertNotEqual(
+ min_seq, max_seq,
+ f"Sequence numbers should be different for files with
multiple rows. "
+ f"File: {blob_file.file_name}, row_count: {row_count}, "
+ f"min_seq: {min_seq}, max_seq: {max_seq}. "
+ f"This indicates sequence generator was not incremented
for each row."
+ )
+ self.assertEqual(
+ max_seq - min_seq + 1, row_count,
+ f"Sequence number range should match row count. "
+ f"File: {blob_file.file_name}, row_count: {row_count}, "
+ f"min_seq: {min_seq}, max_seq: {max_seq}, "
+ f"expected range: {row_count}, actual range: {max_seq -
min_seq + 1}"
+ )
+ else:
+ # For single row files, min_seq == max_seq is acceptable
+ self.assertEqual(
+ min_seq, max_seq,
+ f"Single row file should have min_seq == max_seq. "
+ f"File: {blob_file.file_name}, min_seq: {min_seq},
max_seq: {max_seq}"
+ )
+
+ # Verify data integrity
+ result =
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.num_rows, num_blobs)
+ self.assertEqual(result.column('id').to_pylist(), list(range(1,
num_blobs + 1)))
+
+ def test_blob_non_descriptor_sequence_number_increment(self):
+ from pypaimon import Schema
+
+ # Create schema with blob column (blob-as-descriptor=false, normal
mode)
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-as-descriptor': 'false' # Normal mode, not descriptor mode
+ })
+
+ self.catalog.create_table('test_db.blob_sequence_non_desc_test',
schema, False)
+ table = self.catalog.get_table('test_db.blob_sequence_non_desc_test')
+
+ # Create test data with multiple rows in a batch
+ num_rows = 10
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(1, num_rows + 1)),
+ 'name': [f'item_{i}' for i in range(num_rows)],
+ 'blob_data': [f'blob data {i}'.encode('utf-8') for i in
range(num_rows)]
+ }, schema=pa_schema)
+
+ # Write data as a batch (this triggers batch writing in non-descriptor
mode)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ # Extract blob files from commit messages
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+ # Verify that we have at least one blob file
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ # Verify sequence numbers for each blob file
+ for blob_file in blob_files:
+ min_seq = blob_file.min_sequence_number
+ max_seq = blob_file.max_sequence_number
+ row_count = blob_file.row_count
+
+ # Critical assertion: min_seq should NOT equal max_seq when there
are multiple rows
+ if row_count > 1:
+ self.assertNotEqual(
+ min_seq, max_seq,
+ f"Sequence numbers should be different for files with
multiple rows. "
+ f"File: {blob_file.file_name}, row_count: {row_count}, "
+ f"min_seq: {min_seq}, max_seq: {max_seq}. "
+ f"This indicates sequence generator was not incremented
for each row in batch."
+ )
+ self.assertEqual(
+ max_seq - min_seq + 1, row_count,
+ f"Sequence number range should match row count. "
+ f"File: {blob_file.file_name}, row_count: {row_count}, "
+ f"min_seq: {min_seq}, max_seq: {max_seq}, "
+ f"expected range: {row_count}, actual range: {max_seq -
min_seq + 1}"
+ )
+ else:
+ # For single row files, min_seq == max_seq is acceptable
+ self.assertEqual(
+ min_seq, max_seq,
+ f"Single row file should have min_seq == max_seq. "
+ f"File: {blob_file.file_name}, min_seq: {min_seq},
max_seq: {max_seq}"
+ )
+
+ print("✅ Non-descriptor mode sequence number increment test passed")
+
+ def test_blob_stats_schema_with_custom_column_name(self):
+ from pypaimon import Schema
+
+ # Create schema with blob column using a custom name (not 'blob_data')
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('my_custom_blob', pa.large_binary()) # Custom blob column name
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ })
+
+ self.catalog.create_table('test_db.blob_custom_name_test', schema,
False)
+ table = self.catalog.get_table('test_db.blob_custom_name_test')
+
+ # Write data
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie'],
+ 'my_custom_blob': [b'blob1', b'blob2', b'blob3']
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ # Extract blob files from commit messages
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+ # Verify we have at least one blob file
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ # Verify that the stats schema uses the actual blob column name, not
hardcoded 'blob_data'
+ for blob_file in blob_files:
+ value_stats = blob_file.value_stats
+ if value_stats is not None and value_stats.min_values is not None:
+ # Get the field names from the stats
+ # The stats should use 'my_custom_blob', not 'blob_data'
+ min_values = value_stats.min_values
+ if hasattr(min_values, 'fields') and len(min_values.fields) >
0:
+ # Check if the field name matches the actual blob column
name
+ field_name = min_values.fields[0].name
+ self.assertEqual(
+ field_name, 'my_custom_blob',
+ f"Blob stats field name should be 'my_custom_blob'
(actual column name), "
+ f"but got '{field_name}'. This indicates the field
name was hardcoded "
+ f"instead of using the blob_column parameter."
+ )
+
+ # Verify data integrity
+ result =
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.num_rows, 3)
+ self.assertEqual(result.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob',
'Charlie'])
+
+ def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self):
+ import random
+ import re
+ from pypaimon import Schema
+
+ # Create schema with blob column (blob-as-descriptor=false)
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob-as-descriptor': 'false', # Non-descriptor mode
+ 'target-file-size': '1MB' # Small target size to trigger multiple
rollings
+ })
+
+ self.catalog.create_table('test_db.blob_file_name_test_non_desc',
schema, False)
+ table = self.catalog.get_table('test_db.blob_file_name_test_non_desc')
+
+ num_blobs, blob_size = 5, 2 * 1024 * 1024
+ random.seed(123)
+ blob_data_list = []
+ for i in range(num_blobs):
+ blob_data = bytes(bytearray([random.randint(0, 255) for _ in
range(blob_size)]))
+ blob_data_list.append(blob_data)
+
+ # Write data in batches to trigger multiple file rollings
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ # Write data that will trigger rolling
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(1, num_blobs + 1)),
+ 'name': [f'item_{i}' for i in range(1, num_blobs + 1)],
+ 'blob_data': blob_data_list
+ }, schema=pa_schema)
+
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ # Extract blob files from commit messages
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+ # Should have at least one blob file (may have multiple if rolling
occurred)
+ self.assertGreaterEqual(len(blob_files), 1, "Should have at least one
blob file")
+
+ # Verify file name format: data-{uuid}-{count}.blob
+ file_name_pattern = re.compile(
+ r'^data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-'
+ r'[a-f0-9]{12}-(\d+)\.blob$'
+ )
+
+ first_file_name = blob_files[0].file_name
+ self.assertTrue(
+ file_name_pattern.match(first_file_name),
+ f"File name should match expected format:
data-{{uuid}}-{{count}}.blob, got: {first_file_name}"
+ )
+
+ # Extract UUID and counter from first file name
+ first_match = file_name_pattern.match(first_file_name)
+ first_counter = int(first_match.group(1))
+
+ # Extract UUID (everything between "data-" and last "-")
+ uuid_start = len("data-")
+ uuid_end = first_file_name.rfind('-', uuid_start)
+ shared_uuid = first_file_name[uuid_start:uuid_end]
+
+ # Verify all blob files use the same UUID and have sequential counters
+ for i, blob_file in enumerate(blob_files):
+ file_name = blob_file.file_name
+ match = file_name_pattern.match(file_name)
+
+ self.assertIsNotNone(
+ match,
+ f"File name should match expected format:
data-{{uuid}}-{{count}}.blob, got: {file_name}"
+ )
+
+ counter = int(match.group(1))
+
+ # Extract UUID from this file
+ file_uuid = file_name[uuid_start:file_name.rfind('-', uuid_start)]
+
+ self.assertEqual(
+ file_uuid,
+ shared_uuid,
+ f"All blob files should use the same UUID. Expected:
{shared_uuid}, got: {file_uuid} in {file_name}"
+ )
+
+ self.assertEqual(
+ counter,
+ first_counter + i,
+ f"File counter should be sequential. Expected: {first_counter
+ i}, got: {counter} in {file_name}"
+ )
+
+ # Verify data integrity
+ result =
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.num_rows, num_blobs)
+ self.assertEqual(result.column('id').to_pylist(), list(range(1,
num_blobs + 1)))
+
+ def test_blob_non_descriptor_target_file_size_rolling(self):
+ """Test that blob.target-file-size is respected in non-descriptor
mode."""
+ from pypaimon import Schema
+
+ # Create schema with blob column (non-descriptor mode)
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ # Test with blob.target-file-size set to a small value
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '1MB' # Set blob-specific target
file size
+ }
+ )
+
+ self.catalog.create_table('test_db.blob_non_descriptor_rolling',
schema, False)
+ table = self.catalog.get_table('test_db.blob_non_descriptor_rolling')
+
+ # Write multiple blobs that together exceed the target size
+ # Each blob is 0.6MB, so 3 blobs = 1.8MB > 1MB target
+ num_blobs = 3
+ blob_size = int(0.6 * 1024 * 1024) # 0.6MB per blob
+
+ test_data = pa.Table.from_pydict({
+ 'id': list(range(1, num_blobs + 1)),
+ 'blob_data': [b'x' * blob_size for _ in range(num_blobs)]
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Extract blob files from commit messages
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+ # The key test: verify that blob.target-file-size is used instead of
target-file-size
+ # If target-file-size (default 256MB for append-only) was used, we'd
have 1 file
+ # If blob.target-file-size (1MB) is used, we should have multiple files
+ total_data_size = num_blobs * blob_size
+
+ # Verify that the rolling logic used blob_target_file_size (1MB) not
target_file_size (256MB)
+ # If target_file_size was used, all data would fit in one file
+ # If blob_target_file_size was used, data should be split
+ if total_data_size > 1024 * 1024: # Total > 1MB
+ self.assertGreater(
+ len(blob_files), 1,
+ f"Should have multiple blob files when using
blob.target-file-size (1MB). "
+ f"Total data size: {total_data_size / 1024 / 1024:.2f}MB, "
+ f"got {len(blob_files)} file(s). "
+ f"This indicates blob.target-file-size was ignored and
target-file-size was used instead."
+ )
+
+ # Verify data integrity
+ result = table.new_read_builder().new_read().to_arrow(
+ table.new_read_builder().new_scan().plan().splits()
+ )
+ self.assertEqual(result.num_rows, num_blobs)
+ self.assertEqual(result.column('id').to_pylist(), list(range(1,
num_blobs + 1)))
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/blob_file_writer.py
b/paimon-python/pypaimon/write/writer/blob_file_writer.py
new file mode 100644
index 0000000000..2b3ba1a1af
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/blob_file_writer.py
@@ -0,0 +1,111 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from pathlib import Path
+
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+from pypaimon.table.row.generic_row import GenericRow, RowKind
+from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+
+
+class BlobFileWriter:
+ """
+ Single blob file writer
+ Writes rows one by one and tracks file size.
+ """
+
+ def __init__(self, file_io, file_path: Path, blob_as_descriptor: bool):
+ self.file_io = file_io
+ self.file_path = file_path
+ self.blob_as_descriptor = blob_as_descriptor
+ self.output_stream = file_io.new_output_stream(file_path)
+ self.writer = BlobFormatWriter(self.output_stream)
+ self.row_count = 0
+ self.closed = False
+
+ def write_row(self, row_data: pa.Table):
+ """Write a single row to the blob file."""
+ if row_data.num_rows != 1:
+ raise ValueError(f"Expected 1 row, got {row_data.num_rows}")
+
+ # Convert PyArrow row to GenericRow
+ records_dict = row_data.to_pydict()
+ field_name = row_data.schema[0].name
+ col_data = records_dict[field_name][0]
+
+ # Convert to Blob
+ if self.blob_as_descriptor:
+ # In blob-as-descriptor mode, we need to read external file data
+ # for rolling size calculation (based on external file size)
+ if isinstance(col_data, bytes):
+ blob_descriptor = BlobDescriptor.deserialize(col_data)
+ else:
+ # Handle PyArrow types
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ blob_descriptor = BlobDescriptor.deserialize(col_data)
+ # Read external file data for rolling size calculation
+ uri_reader =
self.file_io.uri_reader_factory.create(blob_descriptor.uri)
+ blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
+ elif isinstance(col_data, bytes):
+ blob_data = BlobData(col_data)
+ else:
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ blob_data = BlobData(col_data)
+
+ # Create GenericRow
+ fields = [DataField(0, field_name,
PyarrowFieldParser.to_paimon_type(row_data.schema[0].type, False))]
+ row = GenericRow([blob_data], fields, RowKind.INSERT)
+
+ # Write to blob format writer
+ self.writer.add_element(row)
+ self.row_count += 1
+
+ def reach_target_size(self, suggested_check: bool, target_size: int) ->
bool:
+ return self.writer.reach_target_size(suggested_check, target_size)
+
+ def close(self) -> int:
+ if self.closed:
+ return self.file_io.get_file_size(self.file_path)
+
+ self.writer.close()
+ self.closed = True
+
+ # Get actual file size
+ file_size = self.file_io.get_file_size(self.file_path)
+ return file_size
+
+ def abort(self):
+ """Abort the writer and delete the file."""
+ if not self.closed:
+ try:
+ if hasattr(self.output_stream, 'close'):
+ self.output_stream.close()
+ except Exception:
+ pass
+ self.closed = True
+
+ # Delete the file
+ self.file_io.delete_quietly(self.file_path)
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index ff153da843..09bce43a3f 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -17,13 +17,19 @@
################################################################################
import logging
-from typing import Tuple
+import uuid
+import pyarrow as pa
+from pathlib import Path
+from typing import Optional, Tuple
from pypaimon.common.core_options import CoreOptions
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+from pypaimon.write.writer.blob_file_writer import BlobFileWriter
logger = logging.getLogger(__name__)
+CHECK_ROLLING_RECORD_CNT = 1000
+
class BlobWriter(AppendOnlyDataWriter):
@@ -33,11 +39,213 @@ class BlobWriter(AppendOnlyDataWriter):
# Override file format to "blob"
self.file_format = CoreOptions.FILE_FORMAT_BLOB
- logger.info("Initialized BlobWriter with blob file format")
+ # Store blob column name for use in metadata creation
+ self.blob_column = blob_column
+
+ options = self.table.options
+ self.blob_target_file_size =
CoreOptions.get_blob_target_file_size(options)
+
+ self.current_writer: Optional[BlobFileWriter] = None
+ self.current_file_path: Optional[Path] = None
+ self.record_count = 0
+
+ self.file_uuid = str(uuid.uuid4())
+ self.file_count = 0
+
+ logger.info(f"Initialized BlobWriter with blob file format,
blob_target_file_size={self.blob_target_file_size}")
+
+ def _check_and_roll_if_needed(self):
+ if self.pending_data is None:
+ return
+
+ if self.blob_as_descriptor:
+ # blob-as-descriptor=true: Write row by row and check actual file
size
+ for i in range(self.pending_data.num_rows):
+ row_data = self.pending_data.slice(i, 1)
+ self._write_row_to_file(row_data)
+ self.record_count += 1
+
+ if self.rolling_file(False):
+ self.close_current_writer()
+
+ # All data has been written
+ self.pending_data = None
+ else:
+ # blob-as-descriptor=false: Use blob_target_file_size instead of
target_file_size
+ current_size = self.pending_data.nbytes
+ if current_size > self.blob_target_file_size:
+ split_row = self._find_optimal_split_point(self.pending_data,
self.blob_target_file_size)
+ if split_row > 0:
+ data_to_write = self.pending_data.slice(0, split_row)
+ remaining_data = self.pending_data.slice(split_row)
+
+ self._write_data_to_file(data_to_write)
+ self.pending_data = remaining_data
+ self._check_and_roll_if_needed()
+
+ def _write_row_to_file(self, row_data: pa.Table):
+ """Write a single row to the current blob file. Opens a new file if
needed."""
+ if row_data.num_rows == 0:
+ return
+
+ if self.current_writer is None:
+ self.open_current_writer()
+
+ self.current_writer.write_row(row_data)
+ # This ensures each row has a unique sequence number for data
versioning and consistency
+ self.sequence_generator.next()
+
+ def open_current_writer(self):
+ file_name =
f"data-{self.file_uuid}-{self.file_count}.{self.file_format}"
+ self.file_count += 1 # Increment counter for next file
+ file_path = self._generate_file_path(file_name)
+ self.current_file_path = file_path
+ self.current_writer = BlobFileWriter(self.file_io, file_path,
self.blob_as_descriptor)
+
+ def rolling_file(self, force_check: bool = False) -> bool:
+ if self.current_writer is None:
+ return False
+
+ should_check = force_check or (self.record_count %
CHECK_ROLLING_RECORD_CNT == 0)
+ return self.current_writer.reach_target_size(should_check,
self.blob_target_file_size)
+
+ def close_current_writer(self):
+ """Close current writer and create metadata."""
+ if self.current_writer is None:
+ return
+
+ file_size = self.current_writer.close()
+ file_name = self.current_file_path.name
+ row_count = self.current_writer.row_count
+
+ self._add_file_metadata(file_name, self.current_file_path, row_count,
file_size)
+
+ self.current_writer = None
+ self.current_file_path = None
+
+ def _write_data_to_file(self, data):
+ """
+ Override for blob format in normal mode (blob-as-descriptor=false).
+ Only difference from parent: use shared UUID + counter for file naming.
+ """
+ if data.num_rows == 0:
+ return
+
+ # This ensures each row gets a unique sequence number, matching the
behavior expected
+ for _ in range(data.num_rows):
+ self.sequence_generator.next()
+
+ file_name =
f"data-{self.file_uuid}-{self.file_count}.{self.file_format}"
+ self.file_count += 1
+ file_path = self._generate_file_path(file_name)
+
+ # Write blob file (parent class already supports blob format)
+ self.file_io.write_blob(file_path, data, self.blob_as_descriptor)
+
+ file_size = self.file_io.get_file_size(file_path)
+
+ # Reuse _add_file_metadata for consistency (blob table is append-only,
no primary keys)
+ self._add_file_metadata(file_name, file_path, data, file_size)
+
+ def _add_file_metadata(self, file_name: str, file_path: Path,
data_or_row_count, file_size: int):
+ """Add file metadata to committed_files."""
+ from datetime import datetime
+ from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+ from pypaimon.manifest.schema.simple_stats import SimpleStats
+ from pypaimon.table.row.generic_row import GenericRow
+ from pypaimon.schema.data_types import PyarrowFieldParser
+
+ # Handle both Table and row_count
+ if isinstance(data_or_row_count, pa.Table):
+ data = data_or_row_count
+ row_count = data.num_rows
+ data_fields = PyarrowFieldParser.to_paimon_schema(data.schema)
+ # Compute statistics across all batches, not just the first one
+ # This ensures correct min/max/null_counts when data has multiple
batches
+ column_stats = {
+ field.name: self._get_column_stats(data, field.name)
+ for field in data_fields
+ }
+ min_value_stats = [column_stats[field.name]['min_values'] for
field in data_fields]
+ max_value_stats = [column_stats[field.name]['max_values'] for
field in data_fields]
+ value_null_counts = [column_stats[field.name]['null_counts'] for
field in data_fields]
+ else:
+ # row_count only (from BlobFileWriter)
+ row_count = data_or_row_count
+ data_fields =
[PyarrowFieldParser.to_paimon_schema(pa.schema([(self.blob_column,
pa.large_binary())]))[0]]
+ min_value_stats = [None]
+ max_value_stats = [None]
+ value_null_counts = [0]
+
+ min_seq = self.sequence_generator.current - row_count
+ max_seq = self.sequence_generator.current - 1
+ self.sequence_generator.start = self.sequence_generator.current
+
+ self.committed_files.append(DataFileMeta(
+ file_name=file_name,
+ file_size=file_size,
+ row_count=row_count,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats(GenericRow([], []), GenericRow([], []), []),
+ value_stats=SimpleStats(
+ GenericRow(min_value_stats, data_fields),
+ GenericRow(max_value_stats, data_fields),
+ value_null_counts),
+ min_sequence_number=min_seq,
+ max_sequence_number=max_seq,
+ schema_id=self.table.table_schema.id,
+ level=0,
+ extra_files=[],
+ creation_time=datetime.now(),
+ delete_row_count=0,
+ file_source=0, # FileSource.APPEND = 0
+ value_stats_cols=None,
+ external_path=None,
+ first_row_id=None,
+ write_cols=self.write_cols,
+ file_path=str(file_path),
+ ))
+
+ def prepare_commit(self):
+ """Prepare commit, ensuring all data is written."""
+ # Close current file if open (blob-as-descriptor=true mode)
+ if self.current_writer is not None:
+ self.close_current_writer()
+
+ # Call parent to handle pending_data (blob-as-descriptor=false mode)
+ return super().prepare_commit()
+
+ def close(self):
+ """Close current writer if open."""
+ # Close current file if open (blob-as-descriptor=true mode)
+ if self.current_writer is not None:
+ self.close_current_writer()
+
+ # Call parent to handle pending_data (blob-as-descriptor=false mode)
+ super().close()
+
+ def abort(self):
+ if self.current_writer is not None:
+ try:
+ self.current_writer.abort()
+ except Exception as e:
+ logger.warning(f"Error aborting blob writer: {e}", exc_info=e)
+ self.current_writer = None
+ self.current_file_path = None
+ super().abort()
@staticmethod
- def _get_column_stats(record_batch, column_name: str):
- column_array = record_batch.column(column_name)
+ def _get_column_stats(data_or_batch, column_name: str):
+ """
+ Compute column statistics for a column in a Table or RecordBatch.
+ """
+ # Handle both Table and RecordBatch
+ if isinstance(data_or_batch, pa.Table):
+ column_array = data_or_batch.column(column_name)
+ else:
+ column_array = data_or_batch.column(column_name)
+
# For blob data, don't generate min/max values
return {
"min_values": None,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 351ff32979..9cbb686441 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -47,7 +47,7 @@ class DataWriter(ABC):
self.trimmed_primary_keys = self.table.trimmed_primary_keys
options = self.table.options
- self.target_file_size = 256 * 1024 * 1024
+ self.target_file_size = CoreOptions.get_target_file_size(options,
self.table.is_primary_key_table)
self.file_format = options.get(CoreOptions.FILE_FORMAT,
CoreOptions.FILE_FORMAT_PARQUET
if self.bucket !=
BucketMode.POSTPONE_BUCKET.value