This is an automated email from the ASF dual-hosted git repository.

jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 50979b1cf1 [python] support rolling for blob write when 
blob-as-descriptor (#6578)
50979b1cf1 is described below

commit 50979b1cf18446036dbd3a8bd27739763778176c
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Nov 12 09:26:59 2025 +0800

    [python] support rolling for blob write when blob-as-descriptor (#6578)
---
 .../paimon/append/RollingBlobFileWriterTest.java   | 353 ++++++++++++-
 paimon-python/pypaimon/common/core_options.py      |  18 +
 paimon-python/pypaimon/tests/blob_table_test.py    | 559 +++++++++++++++++++++
 .../pypaimon/write/writer/blob_file_writer.py      | 111 ++++
 paimon-python/pypaimon/write/writer/blob_writer.py | 216 +++++++-
 paimon-python/pypaimon/write/writer/data_writer.py |   2 +-
 6 files changed, 1252 insertions(+), 7 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index fb74b5f203..e2e68905c8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -81,7 +81,8 @@ public class RollingBlobFileWriterTest {
                 new DataFilePathFactory(
                         new Path(tempDir + "/bucket-0"),
                         "parquet",
-                        "data",
+                        "data-", // dataFilePrefix should include the hyphen 
to match expected
+                        // format: data-{uuid}-{count}
                         "changelog",
                         false,
                         null,
@@ -192,7 +193,8 @@ public class RollingBlobFileWriterTest {
                         new DataFilePathFactory(
                                 new Path(tempDir + "/blob-size-test"),
                                 "parquet",
-                                "data",
+                                "data-", // dataFilePrefix should include the 
hyphen to match
+                                // expected format: data-{uuid}-{count}
                                 "changelog",
                                 false,
                                 null,
@@ -261,6 +263,353 @@ public class RollingBlobFileWriterTest {
         results.forEach(file -> 
assertThat(file.schemaId()).isEqualTo(SCHEMA_ID));
     }
 
+    @Test
+    void testBlobFileNameFormatWithSharedUuid() throws IOException {
+        long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files
+
+        RollingBlobFileWriter fileNameTestWriter =
+                new RollingBlobFileWriter(
+                        LocalFileIO.create(),
+                        SCHEMA_ID,
+                        FileFormat.fromIdentifier("parquet", new Options()),
+                        128 * 1024 * 1024,
+                        blobTargetFileSize,
+                        SCHEMA,
+                        pathFactory, // Use the same pathFactory to ensure 
shared UUID
+                        new LongCounter(),
+                        COMPRESSION,
+                        new StatsCollectorFactories(new CoreOptions(new 
Options())),
+                        new FileIndexOptions(),
+                        FileSource.APPEND,
+                        false, // asyncFileWrite
+                        false // statsDenseStore
+                        );
+
+        // Create blob data that will trigger rolling
+        byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
+        new Random(456).nextBytes(blobData);
+
+        // Write enough rows to trigger multiple blob file rollings
+        for (int i = 0; i < 10; i++) {
+            InternalRow row =
+                    GenericRow.of(i, BinaryString.fromString("test-" + i), new 
BlobData(blobData));
+            fileNameTestWriter.write(row);
+        }
+
+        fileNameTestWriter.close();
+        List<DataFileMeta> results = fileNameTestWriter.result();
+
+        // Filter blob files
+        List<DataFileMeta> blobFiles =
+                results.stream()
+                        .filter(file -> "blob".equals(file.fileFormat()))
+                        .collect(java.util.stream.Collectors.toList());
+
+        assertThat(blobFiles)
+                .as("Should have multiple blob files due to rolling")
+                .hasSizeGreaterThan(1);
+
+        // Extract UUID and counter from file names
+        // Format: data-{uuid}-{count}.blob
+        String firstFileName = blobFiles.get(0).fileName();
+        assertThat(firstFileName)
+                .as("File name should match expected format: 
data-{uuid}-{count}.blob")
+                .matches(
+                        
"data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}-\\d+\\.blob");
+
+        // Extract UUID from first file name
+        String uuid = firstFileName.substring(5, 
firstFileName.lastIndexOf('-'));
+        int firstCounter =
+                Integer.parseInt(
+                        firstFileName.substring(
+                                firstFileName.lastIndexOf('-') + 1,
+                                firstFileName.lastIndexOf('.')));
+
+        // Verify all blob files use the same UUID and have sequential counters
+        for (int i = 0; i < blobFiles.size(); i++) {
+            String fileName = blobFiles.get(i).fileName();
+            String fileUuid = fileName.substring(5, fileName.lastIndexOf('-'));
+            int counter =
+                    Integer.parseInt(
+                            fileName.substring(
+                                    fileName.lastIndexOf('-') + 1, 
fileName.lastIndexOf('.')));
+
+            assertThat(fileUuid).as("All blob files should use the same 
UUID").isEqualTo(uuid);
+
+            assertThat(counter)
+                    .as("File counter should be sequential starting from first 
counter")
+                    .isEqualTo(firstCounter + i);
+        }
+    }
+
+    @Test
+    void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws 
IOException {
+        long blobTargetFileSize = 2 * 1024 * 1024L; // 2 MB for blob files
+
+        RollingBlobFileWriter fileNameTestWriter =
+                new RollingBlobFileWriter(
+                        LocalFileIO.create(),
+                        SCHEMA_ID,
+                        FileFormat.fromIdentifier("parquet", new Options()),
+                        128 * 1024 * 1024,
+                        blobTargetFileSize,
+                        SCHEMA,
+                        pathFactory, // Use the same pathFactory to ensure 
shared UUID
+                        new LongCounter(),
+                        COMPRESSION,
+                        new StatsCollectorFactories(new CoreOptions(new 
Options())),
+                        new FileIndexOptions(),
+                        FileSource.APPEND,
+                        false, // asyncFileWrite
+                        false // statsDenseStore
+                        );
+
+        // Create blob data that will trigger rolling (non-descriptor mode: 
direct blob data)
+        byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
+        new Random(789).nextBytes(blobData);
+
+        // Write enough rows to trigger multiple blob file rollings
+        for (int i = 0; i < 10; i++) {
+            InternalRow row =
+                    GenericRow.of(i, BinaryString.fromString("test-" + i), new 
BlobData(blobData));
+            fileNameTestWriter.write(row);
+        }
+
+        fileNameTestWriter.close();
+        List<DataFileMeta> results = fileNameTestWriter.result();
+
+        // Filter blob files
+        List<DataFileMeta> blobFiles =
+                results.stream()
+                        .filter(file -> "blob".equals(file.fileFormat()))
+                        .collect(java.util.stream.Collectors.toList());
+
+        assertThat(blobFiles.size()).as("Should have at least one blob 
file").isPositive();
+
+        // Extract UUID and counter from file names
+        // Format: data-{uuid}-{count}.blob
+        String firstFileName = blobFiles.get(0).fileName();
+        assertThat(firstFileName)
+                .as("File name should match expected format: 
data-{uuid}-{count}.blob")
+                .matches(
+                        
"data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}-\\d+\\.blob");
+
+        // Extract UUID from first file name
+        String uuid = firstFileName.substring(5, 
firstFileName.lastIndexOf('-'));
+        int firstCounter =
+                Integer.parseInt(
+                        firstFileName.substring(
+                                firstFileName.lastIndexOf('-') + 1,
+                                firstFileName.lastIndexOf('.')));
+
+        // Verify all blob files use the same UUID and have sequential counters
+        for (int i = 0; i < blobFiles.size(); i++) {
+            String fileName = blobFiles.get(i).fileName();
+            String fileUuid = fileName.substring(5, fileName.lastIndexOf('-'));
+            int counter =
+                    Integer.parseInt(
+                            fileName.substring(
+                                    fileName.lastIndexOf('-') + 1, 
fileName.lastIndexOf('.')));
+
+            assertThat(fileUuid).as("All blob files should use the same 
UUID").isEqualTo(uuid);
+
+            assertThat(counter)
+                    .as("File counter should be sequential starting from first 
counter")
+                    .isEqualTo(firstCounter + i);
+        }
+    }
+
+    @Test
+    void testSequenceNumberIncrementInBlobAsDescriptorMode() throws 
IOException {
+        // Write multiple rows to trigger one-by-one writing in 
blob-as-descriptor mode
+        int numRows = 10;
+        for (int i = 0; i < numRows; i++) {
+            InternalRow row =
+                    GenericRow.of(
+                            i, BinaryString.fromString("test" + i), new 
BlobData(testBlobData));
+            writer.write(row);
+        }
+
+        writer.close();
+        List<DataFileMeta> metasResult = writer.result();
+
+        // Extract blob files (skip the first normal file)
+        List<DataFileMeta> blobFiles =
+                metasResult.stream()
+                        .filter(f -> f.fileFormat().equals("blob"))
+                        .collect(java.util.stream.Collectors.toList());
+
+        assertThat(blobFiles).as("Should have at least one blob 
file").isNotEmpty();
+
+        // Verify sequence numbers for each blob file
+        for (DataFileMeta blobFile : blobFiles) {
+            long minSeq = blobFile.minSequenceNumber();
+            long maxSeq = blobFile.maxSequenceNumber();
+            long rowCount = blobFile.rowCount();
+
+            // Critical assertion: min_seq should NOT equal max_seq when there 
are multiple rows
+            if (rowCount > 1) {
+                assertThat(minSeq)
+                        .as(
+                                "Sequence numbers should be different for 
files with multiple rows. "
+                                        + "File: %s, row_count: %d, min_seq: 
%d, max_seq: %d. "
+                                        + "This indicates sequence generator 
was not incremented for each row.",
+                                blobFile.fileName(), rowCount, minSeq, maxSeq)
+                        .isNotEqualTo(maxSeq);
+
+                // Verify that max_seq - min_seq + 1 equals row_count
+                // (each row should have a unique sequence number)
+                assertThat(maxSeq - minSeq + 1)
+                        .as(
+                                "Sequence number range should match row count. 
"
+                                        + "File: %s, row_count: %d, min_seq: 
%d, max_seq: %d, "
+                                        + "expected range: %d, actual range: 
%d",
+                                blobFile.fileName(),
+                                rowCount,
+                                minSeq,
+                                maxSeq,
+                                rowCount,
+                                maxSeq - minSeq + 1)
+                        .isEqualTo(rowCount);
+            } else {
+                // For single row files, min_seq == max_seq is acceptable
+                assertThat(minSeq)
+                        .as(
+                                "Single row file should have min_seq == 
max_seq. "
+                                        + "File: %s, min_seq: %d, max_seq: %d",
+                                blobFile.fileName(), minSeq, maxSeq)
+                        .isEqualTo(maxSeq);
+            }
+        }
+
+        // Verify total record count
+        assertThat(writer.recordCount()).isEqualTo(numRows);
+    }
+
+    @Test
+    void testSequenceNumberIncrementInNonDescriptorMode() throws IOException {
+        // Write multiple rows as a batch to trigger batch writing in 
non-descriptor mode
+        // (blob-as-descriptor=false, which is the default)
+        int numRows = 10;
+        for (int i = 0; i < numRows; i++) {
+            InternalRow row =
+                    GenericRow.of(
+                            i, BinaryString.fromString("test" + i), new 
BlobData(testBlobData));
+            writer.write(row);
+        }
+
+        writer.close();
+        List<DataFileMeta> metasResult = writer.result();
+
+        // Extract blob files (skip the first normal file)
+        List<DataFileMeta> blobFiles =
+                metasResult.stream()
+                        .filter(f -> f.fileFormat().equals("blob"))
+                        .collect(java.util.stream.Collectors.toList());
+
+        assertThat(blobFiles).as("Should have at least one blob 
file").isNotEmpty();
+
+        // Verify sequence numbers for each blob file
+        for (DataFileMeta blobFile : blobFiles) {
+            long minSeq = blobFile.minSequenceNumber();
+            long maxSeq = blobFile.maxSequenceNumber();
+            long rowCount = blobFile.rowCount();
+
+            // Critical assertion: min_seq should NOT equal max_seq when there 
are multiple rows
+            if (rowCount > 1) {
+                assertThat(minSeq)
+                        .as(
+                                "Sequence numbers should be different for 
files with multiple rows. "
+                                        + "File: %s, row_count: %d, min_seq: 
%d, max_seq: %d. "
+                                        + "This indicates sequence generator 
was not incremented for each row in batch.",
+                                blobFile.fileName(), rowCount, minSeq, maxSeq)
+                        .isNotEqualTo(maxSeq);
+
+                // Verify that max_seq - min_seq + 1 equals row_count
+                // (each row should have a unique sequence number)
+                assertThat(maxSeq - minSeq + 1)
+                        .as(
+                                "Sequence number range should match row count. 
"
+                                        + "File: %s, row_count: %d, min_seq: 
%d, max_seq: %d, "
+                                        + "expected range: %d, actual range: 
%d",
+                                blobFile.fileName(),
+                                rowCount,
+                                minSeq,
+                                maxSeq,
+                                rowCount,
+                                maxSeq - minSeq + 1)
+                        .isEqualTo(rowCount);
+            } else {
+                // For single row files, min_seq == max_seq is acceptable
+                assertThat(minSeq)
+                        .as(
+                                "Single row file should have min_seq == 
max_seq. "
+                                        + "File: %s, min_seq: %d, max_seq: %d",
+                                blobFile.fileName(), minSeq, maxSeq)
+                        .isEqualTo(maxSeq);
+            }
+        }
+
+        // Verify total record count
+        assertThat(writer.recordCount()).isEqualTo(numRows);
+    }
+
+    @Test
+    void testBlobStatsSchemaWithCustomColumnName() throws IOException {
+        RowType customSchema =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("name", DataTypes.STRING())
+                        .field("my_custom_blob", DataTypes.BLOB()) // Custom 
blob column name
+                        .build();
+
+        // Reinitialize writer with custom schema
+        writer =
+                new RollingBlobFileWriter(
+                        LocalFileIO.create(),
+                        SCHEMA_ID,
+                        FileFormat.fromIdentifier("parquet", new Options()),
+                        TARGET_FILE_SIZE,
+                        TARGET_FILE_SIZE,
+                        customSchema, // Use custom schema
+                        pathFactory,
+                        seqNumCounter,
+                        COMPRESSION,
+                        new StatsCollectorFactories(new CoreOptions(new 
Options())),
+                        new FileIndexOptions(),
+                        FileSource.APPEND,
+                        false, // asyncFileWrite
+                        false // statsDenseStore
+                        );
+
+        // Write data
+        for (int i = 0; i < 3; i++) {
+            InternalRow row =
+                    GenericRow.of(
+                            i, BinaryString.fromString("test" + i), new 
BlobData(testBlobData));
+            writer.write(row);
+        }
+
+        writer.close();
+        List<DataFileMeta> metasResult = writer.result();
+
+        // Extract blob files
+        List<DataFileMeta> blobFiles =
+                metasResult.stream()
+                        .filter(f -> f.fileFormat().equals("blob"))
+                        .collect(java.util.stream.Collectors.toList());
+
+        assertThat(blobFiles).as("Should have at least one blob 
file").isNotEmpty();
+
+        for (DataFileMeta blobFile : blobFiles) {
+            assertThat(blobFile.fileName()).endsWith(".blob");
+            assertThat(blobFile.rowCount()).isGreaterThan(0);
+        }
+
+        // Verify total record count
+        assertThat(writer.recordCount()).isEqualTo(3);
+    }
+
     /** Simple implementation of BundleRecords for testing. */
     private static class TestBundleRecords implements BundleRecords {
         private final List<InternalRow> rows;
diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index d6643399bb..87b8e9034a 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -47,6 +47,8 @@ class CoreOptions(str, Enum):
     FILE_FORMAT_PER_LEVEL = "file.format.per.level"
     FILE_BLOCK_SIZE = "file.block-size"
     FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
+    TARGET_FILE_SIZE = "target-file-size"
+    BLOB_TARGET_FILE_SIZE = "blob.target-file-size"
     # Scan options
     SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
     INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
@@ -76,3 +78,19 @@ class CoreOptions(str, Enum):
             cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST]
             return MemorySize.parse(cost_str).get_bytes()
         return MemorySize.of_mebi_bytes(4).get_bytes()
+
+    @staticmethod
+    def get_target_file_size(options: dict, has_primary_key: bool = False) -> 
int:
+        """Get target file size from options, default to 128MB for primary key 
table, 256MB for append-only table."""
+        if CoreOptions.TARGET_FILE_SIZE in options:
+            size_str = options[CoreOptions.TARGET_FILE_SIZE]
+            return MemorySize.parse(size_str).get_bytes()
+        return MemorySize.of_mebi_bytes(128 if has_primary_key else 
256).get_bytes()
+
+    @staticmethod
+    def get_blob_target_file_size(options: dict) -> int:
+        """Get blob target file size from options, default to target-file-size 
(256MB for append-only table)."""
+        if CoreOptions.BLOB_TARGET_FILE_SIZE in options:
+            size_str = options[CoreOptions.BLOB_TARGET_FILE_SIZE]
+            return MemorySize.parse(size_str).get_bytes()
+        return CoreOptions.get_target_file_size(options, has_primary_key=False)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 1b76499efb..3a2eee98fb 100644
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -1518,6 +1518,565 @@ class DataBlobWriterTest(unittest.TestCase):
                 [RowKind.INSERT, RowKind.UPDATE_BEFORE, RowKind.UPDATE_AFTER, 
RowKind.DELETE],
                 f"Row {row_id}: RowKind should be valid")
 
+    def test_blob_as_descriptor_target_file_size_rolling(self):
+        import random
+        import os
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobDescriptor
+
+        pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string()), 
('blob_data', pa.large_binary())])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true',
+            'blob-as-descriptor': 'true', 'target-file-size': '1MB'
+        })
+        self.catalog.create_table('test_db.blob_target_size_test', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_target_size_test')
+
+        # Create 5 external blob files (2MB each, > target-file-size)
+        num_blobs, blob_size = 5, 2 * 1024 * 1024
+        random.seed(42)
+        descriptors = []
+        for i in range(num_blobs):
+            path = os.path.join(self.temp_dir, f'external_blob_{i}')
+            data = bytes(bytearray([random.randint(0, 255) for _ in 
range(blob_size)]))
+            with open(path, 'wb') as f:
+                f.write(data)
+            descriptors.append(BlobDescriptor(path, 0, len(data)))
+
+        # Write data
+        test_data = pa.Table.from_pydict({
+            'id': list(range(1, num_blobs + 1)),
+            'name': [f'item_{i}' for i in range(1, num_blobs + 1)],
+            'blob_data': [d.serialize() for d in descriptors]
+        }, schema=pa_schema)
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        # Check blob files
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        total_size = sum(f.file_size for f in blob_files)
+
+        # Verify that rolling works correctly: should have multiple files when 
total size exceeds target
+        # Each blob is 2MB, target-file-size is 1MB, so should have multiple 
files
+        self.assertGreater(
+            len(blob_files), 1,
+            f"Should have multiple blob files when total size ({total_size / 
1024 / 1024:.2f}MB) exceeds target (1MB), "
+            f"but got {len(blob_files)} file(s)"
+        )
+
+        # Verify data integrity
+        result = 
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+        self.assertEqual(result.num_rows, num_blobs)
+        self.assertEqual(result.column('id').to_pylist(), list(range(1, 
num_blobs + 1)))
+
+    def test_blob_file_name_format_with_shared_uuid(self):
+        import random
+        import re
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobDescriptor
+
+        # Create schema with blob column
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-as-descriptor': 'true',
+            'blob.target-file-size': '1MB'  # Small target size to trigger 
multiple rollings
+        })
+
+        self.catalog.create_table('test_db.blob_file_name_test', schema, False)
+        table = self.catalog.get_table('test_db.blob_file_name_test')
+
+        # Create multiple external blob files (2MB each, > target-file-size)
+        # This will trigger multiple blob file rollings
+        num_blobs, blob_size = 5, 2 * 1024 * 1024
+        random.seed(789)
+        descriptors = []
+        for i in range(num_blobs):
+            path = os.path.join(self.temp_dir, f'external_blob_{i}')
+            data = bytes(bytearray([random.randint(0, 255) for _ in 
range(blob_size)]))
+            with open(path, 'wb') as f:
+                f.write(data)
+            descriptors.append(BlobDescriptor(path, 0, len(data)))
+
+        # Write data
+        test_data = pa.Table.from_pydict({
+            'id': list(range(1, num_blobs + 1)),
+            'name': [f'item_{i}' for i in range(1, num_blobs + 1)],
+            'blob_data': [d.serialize() for d in descriptors]
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        # Extract blob files from commit messages
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+        # Should have multiple blob files due to rolling
+        self.assertGreater(len(blob_files), 1, "Should have multiple blob 
files due to rolling")
+
+        # Verify file name format: data-{uuid}-{count}.blob
+        file_name_pattern = re.compile(
+            r'^data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-'
+            r'[a-f0-9]{12}-(\d+)\.blob$'
+        )
+
+        first_file_name = blob_files[0].file_name
+        self.assertTrue(
+            file_name_pattern.match(first_file_name),
+            f"File name should match expected format: 
data-{{uuid}}-{{count}}.blob, got: {first_file_name}"
+        )
+
+        first_match = file_name_pattern.match(first_file_name)
+        first_counter = int(first_match.group(1))
+
+        # Extract UUID (everything between "data-" and last "-")
+        uuid_start = len("data-")
+        uuid_end = first_file_name.rfind('-', uuid_start)
+        shared_uuid = first_file_name[uuid_start:uuid_end]
+
+        # Verify all blob files use the same UUID and have sequential counters
+        for i, blob_file in enumerate(blob_files):
+            file_name = blob_file.file_name
+            match = file_name_pattern.match(file_name)
+
+            self.assertIsNotNone(
+                match,
+                f"File name should match expected format: 
data-{{uuid}}-{{count}}.blob, got: {file_name}"
+            )
+
+            counter = int(match.group(1))
+
+            # Extract UUID from this file
+            file_uuid = file_name[uuid_start:file_name.rfind('-', uuid_start)]
+
+            self.assertEqual(
+                file_uuid,
+                shared_uuid,
+                f"All blob files should use the same UUID. Expected: 
{shared_uuid}, got: {file_uuid} in {file_name}"
+            )
+
+            self.assertEqual(
+                counter,
+                first_counter + i,
+                f"File counter should be sequential. Expected: {first_counter 
+ i}, got: {counter} in {file_name}"
+            )
+
+        # Verify data integrity
+        result = 
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+        self.assertEqual(result.num_rows, num_blobs)
+        self.assertEqual(result.column('id').to_pylist(), list(range(1, 
num_blobs + 1)))
+
+    def test_blob_as_descriptor_sequence_number_increment(self):
+        import os
+        from pypaimon import Schema
+        from pypaimon.table.row.blob import BlobDescriptor
+
+        # Create schema with blob column (blob-as-descriptor=true)
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-as-descriptor': 'true'
+        })
+
+        self.catalog.create_table('test_db.blob_sequence_test', schema, False)
+        table = self.catalog.get_table('test_db.blob_sequence_test')
+
+        # Create multiple external blob files
+        num_blobs = 10
+        descriptors = []
+        for i in range(num_blobs):
+            path = os.path.join(self.temp_dir, f'external_blob_seq_{i}')
+            data = f"blob data {i}".encode('utf-8')
+            with open(path, 'wb') as f:
+                f.write(data)
+            descriptors.append(BlobDescriptor(path, 0, len(data)))
+
+        # Write data row by row (this triggers the one-by-one writing in 
blob-as-descriptor mode)
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+
+        # Write each row separately to ensure one-by-one writing
+        for i in range(num_blobs):
+            test_data = pa.Table.from_pydict({
+                'id': [i + 1],
+                'name': [f'item_{i}'],
+                'blob_data': [descriptors[i].serialize()]
+            }, schema=pa_schema)
+            writer.write_arrow(test_data)
+
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        # Extract blob files from commit messages
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+        # Verify that we have at least one blob file
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        # Verify sequence numbers for each blob file
+        for blob_file in blob_files:
+            min_seq = blob_file.min_sequence_number
+            max_seq = blob_file.max_sequence_number
+            row_count = blob_file.row_count
+
+            # Critical assertion: min_seq should NOT equal max_seq when there 
are multiple rows
+            if row_count > 1:
+                self.assertNotEqual(
+                    min_seq, max_seq,
+                    f"Sequence numbers should be different for files with 
multiple rows. "
+                    f"File: {blob_file.file_name}, row_count: {row_count}, "
+                    f"min_seq: {min_seq}, max_seq: {max_seq}. "
+                    f"This indicates sequence generator was not incremented 
for each row."
+                )
+                self.assertEqual(
+                    max_seq - min_seq + 1, row_count,
+                    f"Sequence number range should match row count. "
+                    f"File: {blob_file.file_name}, row_count: {row_count}, "
+                    f"min_seq: {min_seq}, max_seq: {max_seq}, "
+                    f"expected range: {row_count}, actual range: {max_seq - 
min_seq + 1}"
+                )
+            else:
+                # For single row files, min_seq == max_seq is acceptable
+                self.assertEqual(
+                    min_seq, max_seq,
+                    f"Single row file should have min_seq == max_seq. "
+                    f"File: {blob_file.file_name}, min_seq: {min_seq}, 
max_seq: {max_seq}"
+                )
+
+        # Verify data integrity
+        result = 
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+        self.assertEqual(result.num_rows, num_blobs)
+        self.assertEqual(result.column('id').to_pylist(), list(range(1, 
num_blobs + 1)))
+
+    def test_blob_non_descriptor_sequence_number_increment(self):
+        from pypaimon import Schema
+
+        # Create schema with blob column (blob-as-descriptor=false, normal 
mode)
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-as-descriptor': 'false'  # Normal mode, not descriptor mode
+        })
+
+        self.catalog.create_table('test_db.blob_sequence_non_desc_test', 
schema, False)
+        table = self.catalog.get_table('test_db.blob_sequence_non_desc_test')
+
+        # Create test data with multiple rows in a batch
+        num_rows = 10
+        test_data = pa.Table.from_pydict({
+            'id': list(range(1, num_rows + 1)),
+            'name': [f'item_{i}' for i in range(num_rows)],
+            'blob_data': [f'blob data {i}'.encode('utf-8') for i in 
range(num_rows)]
+        }, schema=pa_schema)
+
+        # Write data as a batch (this triggers batch writing in non-descriptor 
mode)
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        # Extract blob files from commit messages
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+        # Verify that we have at least one blob file
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        # Verify sequence numbers for each blob file
+        for blob_file in blob_files:
+            min_seq = blob_file.min_sequence_number
+            max_seq = blob_file.max_sequence_number
+            row_count = blob_file.row_count
+
+            # Critical assertion: min_seq should NOT equal max_seq when there 
are multiple rows
+            if row_count > 1:
+                self.assertNotEqual(
+                    min_seq, max_seq,
+                    f"Sequence numbers should be different for files with 
multiple rows. "
+                    f"File: {blob_file.file_name}, row_count: {row_count}, "
+                    f"min_seq: {min_seq}, max_seq: {max_seq}. "
+                    f"This indicates sequence generator was not incremented 
for each row in batch."
+                )
+                self.assertEqual(
+                    max_seq - min_seq + 1, row_count,
+                    f"Sequence number range should match row count. "
+                    f"File: {blob_file.file_name}, row_count: {row_count}, "
+                    f"min_seq: {min_seq}, max_seq: {max_seq}, "
+                    f"expected range: {row_count}, actual range: {max_seq - 
min_seq + 1}"
+                )
+            else:
+                # For single row files, min_seq == max_seq is acceptable
+                self.assertEqual(
+                    min_seq, max_seq,
+                    f"Single row file should have min_seq == max_seq. "
+                    f"File: {blob_file.file_name}, min_seq: {min_seq}, 
max_seq: {max_seq}"
+                )
+
+        print("✅ Non-descriptor mode sequence number increment test passed")
+
+    def test_blob_stats_schema_with_custom_column_name(self):
+        from pypaimon import Schema
+
+        # Create schema with blob column using a custom name (not 'blob_data')
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('my_custom_blob', pa.large_binary())  # Custom blob column name
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true'
+        })
+
+        self.catalog.create_table('test_db.blob_custom_name_test', schema, 
False)
+        table = self.catalog.get_table('test_db.blob_custom_name_test')
+
+        # Write data
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'my_custom_blob': [b'blob1', b'blob2', b'blob3']
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        # Extract blob files from commit messages
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+        # Verify we have at least one blob file
+        self.assertGreater(len(blob_files), 0, "Should have at least one blob 
file")
+
+        # Verify that the stats schema uses the actual blob column name, not 
hardcoded 'blob_data'
+        for blob_file in blob_files:
+            value_stats = blob_file.value_stats
+            if value_stats is not None and value_stats.min_values is not None:
+                # Get the field names from the stats
+                # The stats should use 'my_custom_blob', not 'blob_data'
+                min_values = value_stats.min_values
+                if hasattr(min_values, 'fields') and len(min_values.fields) > 
0:
+                    # Check if the field name matches the actual blob column 
name
+                    field_name = min_values.fields[0].name
+                    self.assertEqual(
+                        field_name, 'my_custom_blob',
+                        f"Blob stats field name should be 'my_custom_blob' 
(actual column name), "
+                        f"but got '{field_name}'. This indicates the field 
name was hardcoded "
+                        f"instead of using the blob_column parameter."
+                    )
+
+        # Verify data integrity
+        result = 
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+        self.assertEqual(result.num_rows, 3)
+        self.assertEqual(result.column('id').to_pylist(), [1, 2, 3])
+        self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob', 
'Charlie'])
+
+    def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self):
+        import random
+        import re
+        from pypaimon import Schema
+
+        # Create schema with blob column (blob-as-descriptor=false)
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('blob_data', pa.large_binary())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'blob-as-descriptor': 'false',  # Non-descriptor mode
+            'target-file-size': '1MB'  # Small target size to trigger multiple 
rollings
+        })
+
+        self.catalog.create_table('test_db.blob_file_name_test_non_desc', 
schema, False)
+        table = self.catalog.get_table('test_db.blob_file_name_test_non_desc')
+
+        num_blobs, blob_size = 5, 2 * 1024 * 1024
+        random.seed(123)
+        blob_data_list = []
+        for i in range(num_blobs):
+            blob_data = bytes(bytearray([random.randint(0, 255) for _ in 
range(blob_size)]))
+            blob_data_list.append(blob_data)
+
+        # Write data in batches to trigger multiple file rollings
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+
+        # Write data that will trigger rolling
+        test_data = pa.Table.from_pydict({
+            'id': list(range(1, num_blobs + 1)),
+            'name': [f'item_{i}' for i in range(1, num_blobs + 1)],
+            'blob_data': blob_data_list
+        }, schema=pa_schema)
+
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        write_builder.new_commit().commit(commit_messages)
+        writer.close()
+
+        # Extract blob files from commit messages
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+        # Should have at least one blob file (may have multiple if rolling 
occurred)
+        self.assertGreaterEqual(len(blob_files), 1, "Should have at least one 
blob file")
+
+        # Verify file name format: data-{uuid}-{count}.blob
+        file_name_pattern = re.compile(
+            r'^data-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-'
+            r'[a-f0-9]{12}-(\d+)\.blob$'
+        )
+
+        first_file_name = blob_files[0].file_name
+        self.assertTrue(
+            file_name_pattern.match(first_file_name),
+            f"File name should match expected format: 
data-{{uuid}}-{{count}}.blob, got: {first_file_name}"
+        )
+
+        # Extract UUID and counter from first file name
+        first_match = file_name_pattern.match(first_file_name)
+        first_counter = int(first_match.group(1))
+
+        # Extract UUID (everything between "data-" and last "-")
+        uuid_start = len("data-")
+        uuid_end = first_file_name.rfind('-', uuid_start)
+        shared_uuid = first_file_name[uuid_start:uuid_end]
+
+        # Verify all blob files use the same UUID and have sequential counters
+        for i, blob_file in enumerate(blob_files):
+            file_name = blob_file.file_name
+            match = file_name_pattern.match(file_name)
+
+            self.assertIsNotNone(
+                match,
+                f"File name should match expected format: 
data-{{uuid}}-{{count}}.blob, got: {file_name}"
+            )
+
+            counter = int(match.group(1))
+
+            # Extract UUID from this file
+            file_uuid = file_name[uuid_start:file_name.rfind('-', uuid_start)]
+
+            self.assertEqual(
+                file_uuid,
+                shared_uuid,
+                f"All blob files should use the same UUID. Expected: 
{shared_uuid}, got: {file_uuid} in {file_name}"
+            )
+
+            self.assertEqual(
+                counter,
+                first_counter + i,
+                f"File counter should be sequential. Expected: {first_counter 
+ i}, got: {counter} in {file_name}"
+            )
+
+        # Verify data integrity
+        result = 
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+        self.assertEqual(result.num_rows, num_blobs)
+        self.assertEqual(result.column('id').to_pylist(), list(range(1, 
num_blobs + 1)))
+
+    def test_blob_non_descriptor_target_file_size_rolling(self):
+        """Test that blob.target-file-size is respected in non-descriptor 
mode."""
+        from pypaimon import Schema
+
+        # Create schema with blob column (non-descriptor mode)
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('blob_data', pa.large_binary()),
+        ])
+
+        # Test with blob.target-file-size set to a small value
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '1MB'  # Set blob-specific target 
file size
+            }
+        )
+
+        self.catalog.create_table('test_db.blob_non_descriptor_rolling', 
schema, False)
+        table = self.catalog.get_table('test_db.blob_non_descriptor_rolling')
+
+        # Write multiple blobs that together exceed the target size
+        # Each blob is 0.6MB, so 3 blobs = 1.8MB > 1MB target
+        num_blobs = 3
+        blob_size = int(0.6 * 1024 * 1024)  # 0.6MB per blob
+
+        test_data = pa.Table.from_pydict({
+            'id': list(range(1, num_blobs + 1)),
+            'blob_data': [b'x' * blob_size for _ in range(num_blobs)]
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Extract blob files from commit messages
+        all_files = [f for msg in commit_messages for f in msg.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+
+        # The key test: verify that blob.target-file-size is used instead of 
target-file-size
+        # If target-file-size (default 256MB for append-only) was used, we'd 
have 1 file
+        # If blob.target-file-size (1MB) is used, we should have multiple files
+        total_data_size = num_blobs * blob_size
+
+        # Verify that the rolling logic used blob_target_file_size (1MB) not 
target_file_size (256MB)
+        # If target_file_size was used, all data would fit in one file
+        # If blob_target_file_size was used, data should be split
+        if total_data_size > 1024 * 1024:  # Total > 1MB
+            self.assertGreater(
+                len(blob_files), 1,
+                f"Should have multiple blob files when using 
blob.target-file-size (1MB). "
+                f"Total data size: {total_data_size / 1024 / 1024:.2f}MB, "
+                f"got {len(blob_files)} file(s). "
+                f"This indicates blob.target-file-size was ignored and 
target-file-size was used instead."
+            )
+
+        # Verify data integrity
+        result = table.new_read_builder().new_read().to_arrow(
+            table.new_read_builder().new_scan().plan().splits()
+        )
+        self.assertEqual(result.num_rows, num_blobs)
+        self.assertEqual(result.column('id').to_pylist(), list(range(1, 
num_blobs + 1)))
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/paimon-python/pypaimon/write/writer/blob_file_writer.py 
b/paimon-python/pypaimon/write/writer/blob_file_writer.py
new file mode 100644
index 0000000000..2b3ba1a1af
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/blob_file_writer.py
@@ -0,0 +1,111 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+import pyarrow as pa
+from pathlib import Path
+
+from pypaimon.write.blob_format_writer import BlobFormatWriter
+from pypaimon.table.row.generic_row import GenericRow, RowKind
+from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+
+
+class BlobFileWriter:
+    """
+    Single blob file writer
+    Writes rows one by one and tracks file size.
+    """
+
+    def __init__(self, file_io, file_path: Path, blob_as_descriptor: bool):
+        self.file_io = file_io
+        self.file_path = file_path
+        self.blob_as_descriptor = blob_as_descriptor
+        self.output_stream = file_io.new_output_stream(file_path)
+        self.writer = BlobFormatWriter(self.output_stream)
+        self.row_count = 0
+        self.closed = False
+
+    def write_row(self, row_data: pa.Table):
+        """Write a single row to the blob file."""
+        if row_data.num_rows != 1:
+            raise ValueError(f"Expected 1 row, got {row_data.num_rows}")
+
+        # Convert PyArrow row to GenericRow
+        records_dict = row_data.to_pydict()
+        field_name = row_data.schema[0].name
+        col_data = records_dict[field_name][0]
+
+        # Convert to Blob
+        if self.blob_as_descriptor:
+            # In blob-as-descriptor mode, we need to read external file data
+            # for rolling size calculation (based on external file size)
+            if isinstance(col_data, bytes):
+                blob_descriptor = BlobDescriptor.deserialize(col_data)
+            else:
+                # Handle PyArrow types
+                if hasattr(col_data, 'as_py'):
+                    col_data = col_data.as_py()
+                if isinstance(col_data, str):
+                    col_data = col_data.encode('utf-8')
+                blob_descriptor = BlobDescriptor.deserialize(col_data)
+            # Read external file data for rolling size calculation
+            uri_reader = 
self.file_io.uri_reader_factory.create(blob_descriptor.uri)
+            blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
+        elif isinstance(col_data, bytes):
+            blob_data = BlobData(col_data)
+        else:
+            if hasattr(col_data, 'as_py'):
+                col_data = col_data.as_py()
+            if isinstance(col_data, str):
+                col_data = col_data.encode('utf-8')
+            blob_data = BlobData(col_data)
+
+        # Create GenericRow
+        fields = [DataField(0, field_name, 
PyarrowFieldParser.to_paimon_type(row_data.schema[0].type, False))]
+        row = GenericRow([blob_data], fields, RowKind.INSERT)
+
+        # Write to blob format writer
+        self.writer.add_element(row)
+        self.row_count += 1
+
+    def reach_target_size(self, suggested_check: bool, target_size: int) -> 
bool:
+        return self.writer.reach_target_size(suggested_check, target_size)
+
+    def close(self) -> int:
+        if self.closed:
+            return self.file_io.get_file_size(self.file_path)
+
+        self.writer.close()
+        self.closed = True
+
+        # Get actual file size
+        file_size = self.file_io.get_file_size(self.file_path)
+        return file_size
+
+    def abort(self):
+        """Abort the writer and delete the file."""
+        if not self.closed:
+            try:
+                if hasattr(self.output_stream, 'close'):
+                    self.output_stream.close()
+            except Exception:
+                pass
+            self.closed = True
+
+        # Delete the file
+        self.file_io.delete_quietly(self.file_path)
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py 
b/paimon-python/pypaimon/write/writer/blob_writer.py
index ff153da843..09bce43a3f 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -17,13 +17,19 @@
 
################################################################################
 
 import logging
-from typing import Tuple
+import uuid
+import pyarrow as pa
+from pathlib import Path
+from typing import Optional, Tuple
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+from pypaimon.write.writer.blob_file_writer import BlobFileWriter
 
 logger = logging.getLogger(__name__)
 
+CHECK_ROLLING_RECORD_CNT = 1000
+
 
 class BlobWriter(AppendOnlyDataWriter):
 
@@ -33,11 +39,213 @@ class BlobWriter(AppendOnlyDataWriter):
         # Override file format to "blob"
         self.file_format = CoreOptions.FILE_FORMAT_BLOB
 
-        logger.info("Initialized BlobWriter with blob file format")
+        # Store blob column name for use in metadata creation
+        self.blob_column = blob_column
+
+        options = self.table.options
+        self.blob_target_file_size = 
CoreOptions.get_blob_target_file_size(options)
+
+        self.current_writer: Optional[BlobFileWriter] = None
+        self.current_file_path: Optional[Path] = None
+        self.record_count = 0
+
+        self.file_uuid = str(uuid.uuid4())
+        self.file_count = 0
+
+        logger.info(f"Initialized BlobWriter with blob file format, 
blob_target_file_size={self.blob_target_file_size}")
+
+    def _check_and_roll_if_needed(self):
+        if self.pending_data is None:
+            return
+
+        if self.blob_as_descriptor:
+            # blob-as-descriptor=true: Write row by row and check actual file 
size
+            for i in range(self.pending_data.num_rows):
+                row_data = self.pending_data.slice(i, 1)
+                self._write_row_to_file(row_data)
+                self.record_count += 1
+
+                if self.rolling_file(False):
+                    self.close_current_writer()
+
+            # All data has been written
+            self.pending_data = None
+        else:
+            # blob-as-descriptor=false: Use blob_target_file_size instead of 
target_file_size
+            current_size = self.pending_data.nbytes
+            if current_size > self.blob_target_file_size:
+                split_row = self._find_optimal_split_point(self.pending_data, 
self.blob_target_file_size)
+                if split_row > 0:
+                    data_to_write = self.pending_data.slice(0, split_row)
+                    remaining_data = self.pending_data.slice(split_row)
+
+                    self._write_data_to_file(data_to_write)
+                    self.pending_data = remaining_data
+                    self._check_and_roll_if_needed()
+
+    def _write_row_to_file(self, row_data: pa.Table):
+        """Write a single row to the current blob file. Opens a new file if 
needed."""
+        if row_data.num_rows == 0:
+            return
+
+        if self.current_writer is None:
+            self.open_current_writer()
+
+        self.current_writer.write_row(row_data)
+        # This ensures each row has a unique sequence number for data 
versioning and consistency
+        self.sequence_generator.next()
+
+    def open_current_writer(self):
+        file_name = 
f"data-{self.file_uuid}-{self.file_count}.{self.file_format}"
+        self.file_count += 1  # Increment counter for next file
+        file_path = self._generate_file_path(file_name)
+        self.current_file_path = file_path
+        self.current_writer = BlobFileWriter(self.file_io, file_path, 
self.blob_as_descriptor)
+
+    def rolling_file(self, force_check: bool = False) -> bool:
+        if self.current_writer is None:
+            return False
+
+        should_check = force_check or (self.record_count % 
CHECK_ROLLING_RECORD_CNT == 0)
+        return self.current_writer.reach_target_size(should_check, 
self.blob_target_file_size)
+
+    def close_current_writer(self):
+        """Close current writer and create metadata."""
+        if self.current_writer is None:
+            return
+
+        file_size = self.current_writer.close()
+        file_name = self.current_file_path.name
+        row_count = self.current_writer.row_count
+
+        self._add_file_metadata(file_name, self.current_file_path, row_count, 
file_size)
+
+        self.current_writer = None
+        self.current_file_path = None
+
+    def _write_data_to_file(self, data):
+        """
+        Override for blob format in normal mode (blob-as-descriptor=false).
+        Only difference from parent: use shared UUID + counter for file naming.
+        """
+        if data.num_rows == 0:
+            return
+
+        # This ensures each row gets a unique sequence number, matching the 
behavior expected
+        for _ in range(data.num_rows):
+            self.sequence_generator.next()
+
+        file_name = 
f"data-{self.file_uuid}-{self.file_count}.{self.file_format}"
+        self.file_count += 1
+        file_path = self._generate_file_path(file_name)
+
+        # Write blob file (parent class already supports blob format)
+        self.file_io.write_blob(file_path, data, self.blob_as_descriptor)
+
+        file_size = self.file_io.get_file_size(file_path)
+
+        # Reuse _add_file_metadata for consistency (blob table is append-only, 
no primary keys)
+        self._add_file_metadata(file_name, file_path, data, file_size)
+
+    def _add_file_metadata(self, file_name: str, file_path: Path, 
data_or_row_count, file_size: int):
+        """Add file metadata to committed_files."""
+        from datetime import datetime
+        from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+        from pypaimon.manifest.schema.simple_stats import SimpleStats
+        from pypaimon.table.row.generic_row import GenericRow
+        from pypaimon.schema.data_types import PyarrowFieldParser
+
+        # Handle both Table and row_count
+        if isinstance(data_or_row_count, pa.Table):
+            data = data_or_row_count
+            row_count = data.num_rows
+            data_fields = PyarrowFieldParser.to_paimon_schema(data.schema)
+            # Compute statistics across all batches, not just the first one
+            # This ensures correct min/max/null_counts when data has multiple 
batches
+            column_stats = {
+                field.name: self._get_column_stats(data, field.name)
+                for field in data_fields
+            }
+            min_value_stats = [column_stats[field.name]['min_values'] for 
field in data_fields]
+            max_value_stats = [column_stats[field.name]['max_values'] for 
field in data_fields]
+            value_null_counts = [column_stats[field.name]['null_counts'] for 
field in data_fields]
+        else:
+            # row_count only (from BlobFileWriter)
+            row_count = data_or_row_count
+            data_fields = 
[PyarrowFieldParser.to_paimon_schema(pa.schema([(self.blob_column, 
pa.large_binary())]))[0]]
+            min_value_stats = [None]
+            max_value_stats = [None]
+            value_null_counts = [0]
+
+        min_seq = self.sequence_generator.current - row_count
+        max_seq = self.sequence_generator.current - 1
+        self.sequence_generator.start = self.sequence_generator.current
+
+        self.committed_files.append(DataFileMeta(
+            file_name=file_name,
+            file_size=file_size,
+            row_count=row_count,
+            min_key=GenericRow([], []),
+            max_key=GenericRow([], []),
+            key_stats=SimpleStats(GenericRow([], []), GenericRow([], []), []),
+            value_stats=SimpleStats(
+                GenericRow(min_value_stats, data_fields),
+                GenericRow(max_value_stats, data_fields),
+                value_null_counts),
+            min_sequence_number=min_seq,
+            max_sequence_number=max_seq,
+            schema_id=self.table.table_schema.id,
+            level=0,
+            extra_files=[],
+            creation_time=datetime.now(),
+            delete_row_count=0,
+            file_source=0,  # FileSource.APPEND = 0
+            value_stats_cols=None,
+            external_path=None,
+            first_row_id=None,
+            write_cols=self.write_cols,
+            file_path=str(file_path),
+        ))
+
+    def prepare_commit(self):
+        """Prepare commit, ensuring all data is written."""
+        # Close current file if open (blob-as-descriptor=true mode)
+        if self.current_writer is not None:
+            self.close_current_writer()
+
+        # Call parent to handle pending_data (blob-as-descriptor=false mode)
+        return super().prepare_commit()
+
+    def close(self):
+        """Close current writer if open."""
+        # Close current file if open (blob-as-descriptor=true mode)
+        if self.current_writer is not None:
+            self.close_current_writer()
+
+        # Call parent to handle pending_data (blob-as-descriptor=false mode)
+        super().close()
+
+    def abort(self):
+        if self.current_writer is not None:
+            try:
+                self.current_writer.abort()
+            except Exception as e:
+                logger.warning(f"Error aborting blob writer: {e}", exc_info=e)
+            self.current_writer = None
+            self.current_file_path = None
+        super().abort()
 
     @staticmethod
-    def _get_column_stats(record_batch, column_name: str):
-        column_array = record_batch.column(column_name)
+    def _get_column_stats(data_or_batch, column_name: str):
+        """
+        Compute column statistics for a column in a Table or RecordBatch.
+        """
+        # Handle both Table and RecordBatch
+        if isinstance(data_or_batch, pa.Table):
+            column_array = data_or_batch.column(column_name)
+        else:
+            column_array = data_or_batch.column(column_name)
+
         # For blob data, don't generate min/max values
         return {
             "min_values": None,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 351ff32979..9cbb686441 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -47,7 +47,7 @@ class DataWriter(ABC):
         self.trimmed_primary_keys = self.table.trimmed_primary_keys
 
         options = self.table.options
-        self.target_file_size = 256 * 1024 * 1024
+        self.target_file_size = CoreOptions.get_target_file_size(options, 
self.table.is_primary_key_table)
         self.file_format = options.get(CoreOptions.FILE_FORMAT,
                                        CoreOptions.FILE_FORMAT_PARQUET
                                        if self.bucket != 
BucketMode.POSTPONE_BUCKET.value

Reply via email to