This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cfbfbeb35c [python] Support blob read && write (#6420)
cfbfbeb35c is described below
commit cfbfbeb35c5198e5566695a8969ef6ce31921409
Author: YeJunHao <[email protected]>
AuthorDate: Fri Oct 17 13:33:45 2025 +0800
[python] Support blob read && write (#6420)
---
paimon-python/pypaimon/common/core_options.py | 2 +
paimon-python/pypaimon/common/file_io.py | 1 -
.../pypaimon/read/reader/concat_batch_reader.py | 68 ++
.../pypaimon/read/scanner/full_starting_scanner.py | 117 +-
paimon-python/pypaimon/read/split_read.py | 42 +-
paimon-python/pypaimon/read/table_read.py | 3 +-
paimon-python/pypaimon/schema/data_types.py | 1 +
paimon-python/pypaimon/schema/schema.py | 36 +-
paimon-python/pypaimon/tests/blob_table_test.py | 1177 ++++++++++++++++++++
paimon-python/pypaimon/write/file_store_commit.py | 3 +-
paimon-python/pypaimon/write/file_store_write.py | 22 +-
.../writer/blob_writer.py} | 61 +-
.../pypaimon/write/writer/data_blob_writer.py | 321 ++++++
paimon-python/pypaimon/write/writer/data_writer.py | 49 +-
14 files changed, 1844 insertions(+), 59 deletions(-)
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index 8ab26fe062..da1ad0674e 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -49,3 +49,5 @@ class CoreOptions(str, Enum):
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
# Commit options
COMMIT_USER_PREFIX = "commit.user-prefix"
+ ROW_TRACKING_ENABLED = "row-tracking.enabled"
+ DATA_EVOLUTION_ENABLED = "data-evolution.enabled"
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index eb32ebb755..f881ba77bc 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -25,7 +25,6 @@ from urllib.parse import splitport, urlparse
import pyarrow
from packaging.version import parse
from pyarrow._fs import FileSystem
-
from pypaimon.common.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index a5a596e1ea..de4f10c15b 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -19,6 +19,7 @@
import collections
from typing import Callable, List, Optional
+import pyarrow as pa
from pyarrow import RecordBatch
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
@@ -76,3 +77,70 @@ class ShardBatchReader(ConcatBatchReader):
return batch.slice(0, self.split_end_row - cur_begin)
else:
return batch
+
+
+class MergeAllBatchReader(RecordBatchReader):
+ """
+ A reader that accepts multiple reader suppliers and concatenates all their
arrow batches
+ into one big batch. This is useful when you want to merge all data from
multiple sources
+ into a single batch for processing.
+ """
+
+ def __init__(self, reader_suppliers: List[Callable]):
+ self.reader_suppliers = reader_suppliers
+ self.merged_batch: Optional[RecordBatch] = None
+ self.batch_created = False
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ if self.batch_created:
+ return None
+
+ all_batches = []
+
+ # Read all batches from all reader suppliers
+ for supplier in self.reader_suppliers:
+ reader = supplier()
+ try:
+ while True:
+ batch = reader.read_arrow_batch()
+ if batch is None:
+ break
+ all_batches.append(batch)
+ finally:
+ reader.close()
+
+ # Concatenate all batches into one big batch
+ if all_batches:
+ # For PyArrow < 17.0.0, use Table.concat_tables approach
+ # Convert batches to tables and concatenate
+ tables = [pa.Table.from_batches([batch]) for batch in all_batches]
+ if len(tables) == 1:
+ # Single table, just get the first batch
+ self.merged_batch = tables[0].to_batches()[0]
+ else:
+ # Multiple tables, concatenate them
+ concatenated_table = pa.concat_tables(tables)
+ # Convert back to a single batch by taking all batches and
combining
+ all_concatenated_batches = concatenated_table.to_batches()
+ if len(all_concatenated_batches) == 1:
+ self.merged_batch = all_concatenated_batches[0]
+ else:
+ # If still multiple batches, we need to manually combine
them
+ # This shouldn't happen with concat_tables, but just in
case
+ combined_arrays = []
+ for i in range(len(all_concatenated_batches[0].columns)):
+ column_arrays = [batch.column(i) for batch in
all_concatenated_batches]
+ combined_arrays.append(pa.concat_arrays(column_arrays))
+ self.merged_batch = pa.RecordBatch.from_arrays(
+ combined_arrays,
+ names=all_concatenated_batches[0].schema.names
+ )
+ else:
+ self.merged_batch = None
+
+ self.batch_created = True
+ return self.merged_batch
+
+ def close(self) -> None:
+ self.merged_batch = None
+ self.batch_created = False
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 4275b3f3e2..73915a0412 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -18,6 +18,7 @@ limitations under the License.
from collections import defaultdict
from typing import Callable, List, Optional
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
@@ -65,6 +66,7 @@ class FullStartingScanner(StartingScanner):
self.only_read_real_buckets = True if int(
self.table.options.get('bucket', -1)) ==
BucketMode.POSTPONE_BUCKET.value else False
+ self.data_evolution =
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() ==
'true'
def scan(self) -> Plan:
file_entries = self.plan_files()
@@ -72,6 +74,8 @@ class FullStartingScanner(StartingScanner):
return Plan([])
if self.table.is_primary_key_table:
splits = self._create_primary_key_splits(file_entries)
+ elif self.data_evolution:
+ splits = self._create_data_evolution_splits(file_entries)
else:
splits = self._create_append_only_splits(file_entries)
@@ -104,7 +108,7 @@ class FullStartingScanner(StartingScanner):
file_entries = self._filter_by_predicate(file_entries)
return file_entries
- def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'TableScan':
+ def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'FullStartingScanner':
if idx_of_this_subtask >= number_of_para_subtasks:
raise Exception("idx_of_this_subtask must be less than
number_of_para_subtasks")
self.idx_of_this_subtask = idx_of_this_subtask
@@ -357,3 +361,114 @@ class FullStartingScanner(StartingScanner):
packed.append(bin_items)
return packed
+
+ def _create_data_evolution_splits(self, file_entries: List[ManifestEntry])
-> List['Split']:
+ partitioned_files = defaultdict(list)
+ for entry in file_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
+
+ if self.idx_of_this_subtask is not None:
+ partitioned_files, plan_start_row, plan_end_row =
self._append_only_filter_by_shard(partitioned_files)
+
+ def weight_func(file_list: List[DataFileMeta]) -> int:
+ return max(sum(f.file_size for f in file_list),
self.open_file_cost)
+
+ splits = []
+ for key, file_entries in partitioned_files.items():
+ if not file_entries:
+ continue
+
+ data_files: List[DataFileMeta] = [e.file for e in file_entries]
+
+ # Split files by firstRowId for data evolution
+ split_by_row_id = self._split_by_row_id(data_files)
+
+ # Pack the split groups for optimal split sizes
+ packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(split_by_row_id, weight_func,
+
self.target_split_size)
+
+ # Flatten the packed files and build splits
+ flatten_packed_files: List[List[DataFileMeta]] = [
+ [file for sub_pack in pack for file in sub_pack]
+ for pack in packed_files
+ ]
+
+ splits += self._build_split_from_pack(flatten_packed_files,
file_entries, False)
+
+ if self.idx_of_this_subtask is not None:
+ self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
+ return splits
+
+ def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
+ split_by_row_id = []
+
+ def sort_key(file: DataFileMeta) -> tuple:
+ first_row_id = file.first_row_id if file.first_row_id is not None
else float('-inf')
+ is_blob = 1 if self._is_blob_file(file.file_name) else 0
+ # For files with same firstRowId, sort by maxSequenceNumber in
descending order
+ # (larger sequence number means more recent data)
+ max_seq = file.max_sequence_number
+ return (first_row_id, is_blob, -max_seq)
+
+ sorted_files = sorted(files, key=sort_key)
+
+ # Filter blob files to only include those within the row ID range of
non-blob files
+ sorted_files = self._filter_blob(sorted_files)
+
+ # Split files by firstRowId
+ last_row_id = -1
+ check_row_id_start = 0
+ current_split = []
+
+ for file in sorted_files:
+ first_row_id = file.first_row_id
+ if first_row_id is None:
+ # Files without firstRowId are treated as individual splits
+ split_by_row_id.append([file])
+ continue
+
+ if not self._is_blob_file(file.file_name) and first_row_id !=
last_row_id:
+ if current_split:
+ split_by_row_id.append(current_split)
+
+ # Validate that files don't overlap
+ if first_row_id < check_row_id_start:
+ file_names = [f.file_name for f in sorted_files]
+ raise ValueError(
+ f"There are overlapping files in the split:
{file_names}, "
+ f"the wrong file is: {file.file_name}"
+ )
+
+ current_split = []
+ last_row_id = first_row_id
+ check_row_id_start = first_row_id + file.row_count
+
+ current_split.append(file)
+
+ if current_split:
+ split_by_row_id.append(current_split)
+
+ return split_by_row_id
+
+ @staticmethod
+ def _is_blob_file(file_name: str) -> bool:
+ return file_name.endswith('.blob')
+
+ @staticmethod
+ def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]:
+ result = []
+ row_id_start = -1
+ row_id_end = -1
+
+ for file in files:
+ if not FullStartingScanner._is_blob_file(file.file_name):
+ if file.first_row_id is not None:
+ row_id_start = file.first_row_id
+ row_id_end = file.first_row_id + file.row_count
+ result.append(file)
+ else:
+ if file.first_row_id is not None and row_id_start != -1:
+ if row_id_start <= file.first_row_id < row_id_end:
+ result.append(file)
+
+ return result
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 372679ed63..64a28ac63c 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -26,7 +26,7 @@ from pypaimon.common.predicate import Predicate
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
-from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader,
ShardBatchReader
+from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader,
ShardBatchReader, MergeAllBatchReader
from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
from pypaimon.read.reader.data_evolution_merge_reader import
DataEvolutionMergeReader
@@ -73,21 +73,21 @@ class SplitRead(ABC):
def create_reader(self) -> RecordReader:
"""Create a record reader for the given split."""
- def file_reader_supplier(self, file_path: str, for_merge_read: bool):
+ def file_reader_supplier(self, file_path: str, for_merge_read: bool,
read_fields: List[str]):
_, extension = os.path.splitext(file_path)
file_format = extension[1:]
format_reader: RecordBatchReader
if file_format == CoreOptions.FILE_FORMAT_AVRO:
- format_reader = FormatAvroReader(self.table.file_io, file_path,
self._get_final_read_data_fields(),
+ format_reader = FormatAvroReader(self.table.file_io, file_path,
read_fields,
self.read_fields,
self.push_down_predicate)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
blob_as_descriptor =
self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
- format_reader = FormatBlobReader(self.table.file_io, file_path,
self._get_final_read_data_fields(),
+ format_reader = FormatBlobReader(self.table.file_io, file_path,
read_fields,
self.read_fields,
self.push_down_predicate, blob_as_descriptor)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io,
file_format, file_path,
-
self._get_final_read_data_fields(), self.push_down_predicate)
+ read_fields,
self.push_down_predicate)
else:
raise ValueError(f"Unexpected file format: {file_format}")
@@ -253,7 +253,12 @@ class RawFileSplitRead(SplitRead):
def create_reader(self) -> RecordReader:
data_readers = []
for file_path in self.split.file_paths:
- supplier = partial(self.file_reader_supplier, file_path=file_path,
for_merge_read=False)
+ supplier = partial(
+ self.file_reader_supplier,
+ file_path=file_path,
+ for_merge_read=False,
+ read_fields=self._get_final_read_data_fields(),
+ )
data_readers.append(supplier)
if not data_readers:
@@ -274,7 +279,12 @@ class RawFileSplitRead(SplitRead):
class MergeFileSplitRead(SplitRead):
def kv_reader_supplier(self, file_path):
- reader_supplier = partial(self.file_reader_supplier,
file_path=file_path, for_merge_read=True)
+ reader_supplier = partial(
+ self.file_reader_supplier,
+ file_path=file_path,
+ for_merge_read=True,
+ read_fields=self._get_final_read_data_fields()
+ )
return KeyValueWrapReader(reader_supplier(),
len(self.trimmed_primary_key), self.value_arity)
def section_reader_supplier(self, section: List[SortedRun]):
@@ -317,7 +327,7 @@ class DataEvolutionSplitRead(SplitRead):
if len(need_merge_files) == 1 or not self.read_fields:
# No need to merge fields, just create a single file reader
suppliers.append(
- lambda f=need_merge_files[0]: self._create_file_reader(f)
+ lambda f=need_merge_files[0]: self._create_file_reader(f,
self._get_final_read_data_fields())
)
else:
suppliers.append(
@@ -424,26 +434,30 @@ class DataEvolutionSplitRead(SplitRead):
self.read_fields = read_fields # create reader based on
read_fields
# Create reader for this bunch
if len(bunch.files()) == 1:
- file_record_readers[i] =
self._create_file_reader(bunch.files()[0])
+ file_record_readers[i] = self._create_file_reader(
+ bunch.files()[0], [field.name for field in read_fields]
+ )
else:
# Create concatenated reader for multiple files
suppliers = [
- lambda f=file: self._create_file_reader(f) for file in
bunch.files()
+ lambda f=file: self._create_file_reader(
+ f, [field.name for field in read_fields]
+ ) for file in bunch.files()
]
- file_record_readers[i] = ConcatRecordReader(suppliers)
+ file_record_readers[i] = MergeAllBatchReader(suppliers)
self.read_fields = table_fields
# Validate that all required fields are found
for i, field in enumerate(all_read_fields):
if row_offsets[i] == -1:
- if not field.type.is_nullable():
+ if not field.type.nullable:
raise ValueError(f"Field {field} is not null but can't
find any file contains it.")
return DataEvolutionMergeReader(row_offsets, field_offsets,
file_record_readers)
- def _create_file_reader(self, file: DataFileMeta) -> RecordReader:
+ def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) ->
RecordReader:
"""Create a file reader for a single file."""
- return self.file_reader_supplier(file_path=file.file_path,
for_merge_read=False)
+ return self.file_reader_supplier(file_path=file.file_path,
for_merge_read=False, read_fields=read_fields)
def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) ->
List[FieldBunch]:
"""Split files into field bunches."""
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index cf32d04d72..4c2c615d89 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -20,6 +20,7 @@ from typing import Any, Iterator, List, Optional
import pandas
import pyarrow
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.read.push_down_utils import extract_predicate_to_list
@@ -132,7 +133,7 @@ class TableRead:
read_type=self.read_type,
split=split
)
- elif self.table.options.get('data-evolution.enabled', 'false').lower()
== 'true':
+ elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED,
'false').lower() == 'true':
return DataEvolutionSplitRead(
table=self.table,
predicate=self.predicate,
diff --git a/paimon-python/pypaimon/schema/data_types.py
b/paimon-python/pypaimon/schema/data_types.py
index d1ce2354a8..c65771d7f2 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -548,6 +548,7 @@ class PyarrowFieldParser:
@staticmethod
def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]:
+ # Convert PyArrow schema to Paimon fields
fields = []
for i, pa_field in enumerate(pa_schema):
pa_field: pyarrow.Field
diff --git a/paimon-python/pypaimon/schema/schema.py
b/paimon-python/pypaimon/schema/schema.py
index 965fe2255b..0ad53f99d3 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -20,6 +20,7 @@ from typing import Dict, List, Optional
import pyarrow as pa
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.json_util import json_field
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
@@ -51,4 +52,37 @@ class Schema:
def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys:
Optional[List[str]] = None,
primary_keys: Optional[List[str]] = None, options:
Optional[Dict] = None,
comment: Optional[str] = None):
- return Schema(PyarrowFieldParser.to_paimon_schema(pa_schema),
partition_keys, primary_keys, options, comment)
+ # Convert PyArrow schema to Paimon fields
+ fields = PyarrowFieldParser.to_paimon_schema(pa_schema)
+
+ # Check if Blob type exists in the schema
+ has_blob_type = any(
+ 'blob' in str(field.type).lower()
+ for field in fields
+ )
+
+ # If Blob type exists, validate required options
+ if has_blob_type:
+ if options is None:
+ options = {}
+
+ required_options = {
+ CoreOptions.ROW_TRACKING_ENABLED: 'true',
+ CoreOptions.DATA_EVOLUTION_ENABLED: 'true'
+ }
+
+ missing_options = []
+ for key, expected_value in required_options.items():
+ if key not in options or options[key] != expected_value:
+ missing_options.append(f"{key}='{expected_value}'")
+
+ if missing_options:
+ raise ValueError(
+ f"Schema contains Blob type but is missing required
options: {', '.join(missing_options)}. "
+ f"Please add these options to the schema."
+ )
+
+ if primary_keys is not None:
+ raise ValueError("Blob type is not supported with primary
key.")
+
+ return Schema(fields, partition_keys, primary_keys, options, comment)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
new file mode 100644
index 0000000000..3c8273bde4
--- /dev/null
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -0,0 +1,1177 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon.write.commit_message import CommitMessage
+
+
+class DataBlobWriterTest(unittest.TestCase):
+ """Tests for DataBlobWriter functionality with paimon table operations."""
+
+ @classmethod
+ def setUpClass(cls):
+ """Set up test environment."""
+ cls.temp_dir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.temp_dir, 'warehouse')
+ # Create catalog for table operations
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('test_db', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ """Clean up test environment."""
+ try:
+ shutil.rmtree(cls.temp_dir)
+ except OSError:
+ pass
+
+ def test_data_blob_writer_basic_functionality(self):
+ """Test basic DataBlobWriter functionality with paimon table."""
+ from pypaimon import Schema
+
+ # Create schema with normal and blob columns
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('blob_data', pa.large_binary()), # This will be detected as blob
+ ])
+
+ # Create Paimon schema
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+
+ # Create table
+ self.catalog.create_table('test_db.blob_writer_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_writer_test')
+
+ # Test data
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie'],
+ 'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3']
+ }, schema=pa_schema)
+
+ # Test DataBlobWriter initialization using proper table API
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Write test data using BatchTableWrite API
+ blob_writer.write_arrow(test_data)
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+ self.assertGreater(len(commit_messages), 0)
+
+ # Verify commit message structure
+ for commit_msg in commit_messages:
+ self.assertIsInstance(commit_msg.new_files, list)
+ self.assertGreater(len(commit_msg.new_files), 0)
+
+ # Verify file metadata structure
+ for file_meta in commit_msg.new_files:
+ self.assertIsNotNone(file_meta.file_name)
+ self.assertGreater(file_meta.file_size, 0)
+ self.assertGreater(file_meta.row_count, 0)
+
+ blob_writer.close()
+
+ def test_data_blob_writer_schema_detection(self):
+ """Test that DataBlobWriter correctly detects blob columns from
schema."""
+ from pypaimon import Schema
+
+ # Test schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_field', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_detection_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_detection_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Test that DataBlobWriter was created internally
+ # We can verify this by checking the internal data writers
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'blob_field': [b'blob1', b'blob2', b'blob3']
+ }, schema=pa_schema)
+
+ # Write data to trigger writer creation
+ blob_writer.write_arrow(test_data)
+
+ # Verify that a DataBlobWriter was created internally
+ data_writers = blob_writer.file_store_write.data_writers
+ self.assertGreater(len(data_writers), 0)
+
+ # Check that the writer is a DataBlobWriter
+ for writer in data_writers.values():
+ from pypaimon.write.writer.data_blob_writer import DataBlobWriter
+ self.assertIsInstance(writer, DataBlobWriter)
+
+ blob_writer.close()
+
+ def test_data_blob_writer_no_blob_column(self):
+ """Test that DataBlobWriter raises error when no blob column is
found."""
+ from pypaimon import Schema
+
+ # Test schema without blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.no_blob_test', schema, False)
+ table = self.catalog.get_table('test_db.no_blob_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ # Test that a regular writer (not DataBlobWriter) was created
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'name': ['Alice', 'Bob', 'Charlie']
+ }, schema=pa_schema)
+
+ # Write data to trigger writer creation
+ writer.write_arrow(test_data)
+
+ # Verify that a regular writer was created (not DataBlobWriter)
+ data_writers = writer.file_store_write.data_writers
+ self.assertGreater(len(data_writers), 0)
+
+ # Check that the writer is NOT a DataBlobWriter
+ for writer_instance in data_writers.values():
+ from pypaimon.write.writer.data_blob_writer import DataBlobWriter
+ self.assertNotIsInstance(writer_instance, DataBlobWriter)
+
+ writer.close()
+
+ def test_data_blob_writer_multiple_blob_columns(self):
+ """Test that DataBlobWriter raises error when multiple blob columns
are found."""
+ from pypaimon import Schema
+
+ # Test schema with multiple blob columns
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob1', pa.large_binary()),
+ ('blob2', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.multiple_blob_test', schema, False)
+ table = self.catalog.get_table('test_db.multiple_blob_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ # Test data with multiple blob columns
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'blob1': [b'blob1_1', b'blob1_2', b'blob1_3'],
+ 'blob2': [b'blob2_1', b'blob2_2', b'blob2_3']
+ }, schema=pa_schema)
+
+ # This should raise an error when DataBlobWriter is created internally
+ with self.assertRaises(ValueError) as context:
+ writer.write_arrow(test_data)
+ self.assertIn("Limit exactly one blob field in one paimon table yet",
str(context.exception))
+
+ def test_data_blob_writer_write_operations(self):
+ """Test DataBlobWriter write operations with real data."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('document', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.write_test', schema, False)
+ table = self.catalog.get_table('test_db.write_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Test data
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'name': ['Alice', 'Bob'],
+ 'document': [b'document_content_1', b'document_content_2']
+ }, schema=pa_schema)
+
+ # Test writing data
+ for batch in test_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+
+ blob_writer.close()
+
+ def test_data_blob_writer_write_large_blob(self):
+ """Test DataBlobWriter with very large blob data (50MB per item) in 10
batches."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('description', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.large_blob_test', schema, False)
+ table = self.catalog.get_table('test_db.large_blob_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Create 50MB blob data per item
+ # Using a pattern to make the data more realistic and compressible
+ target_size = 50 * 1024 * 1024 # 50MB in bytes
+ blob_pattern = b'LARGE_BLOB_DATA_PATTERN_' + b'X' * 1024 # ~1KB
pattern
+ pattern_size = len(blob_pattern)
+ repetitions = target_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ # Verify the blob size is approximately 50MB
+ blob_size_mb = len(large_blob_data) / (1024 * 1024)
+ self.assertGreater(blob_size_mb, 49) # Should be at least 49MB
+ self.assertLess(blob_size_mb, 51) # Should be less than 51MB
+
+ total_rows = 0
+
+ # Write 10 batches, each with 5 rows (50 rows total)
+ # Total data volume: 50 rows * 50MB = 2.5GB of blob data
+ for batch_num in range(10):
+ batch_data = pa.Table.from_pydict({
+ 'id': [batch_num * 5 + i for i in range(5)],
+ 'description': [f'Large blob batch {batch_num}, row {i}' for i
in range(5)],
+ 'large_blob': [large_blob_data] * 5 # 5 rows per batch, each
with 50MB blob
+ }, schema=pa_schema)
+
+ # Write each batch
+ for batch in batch_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ # Log progress for large data processing
+ print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows}
rows")
+
+ # Record count is tracked internally by DataBlobWriter
+
+ # Test prepare commit
+ commit_messages: CommitMessage = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+ # Verify we have commit messages
+ self.assertEqual(len(commit_messages), 1)
+ commit_message = commit_messages[0]
+ normal_file_meta = commit_message.new_files[0]
+ blob_file_metas = commit_message.new_files[1:]
+ # Validate row count consistency
+ parquet_row_count = normal_file_meta.row_count
+ blob_row_count_sum = sum(meta.row_count for meta in blob_file_metas)
+ self.assertEqual(parquet_row_count, blob_row_count_sum,
+ f"Parquet row count ({parquet_row_count}) should
equal "
+ f"sum of blob row counts ({blob_row_count_sum})")
+
+ # Verify commit message structure and file metadata
+ total_file_size = 0
+ total_row_count = parquet_row_count
+ for commit_msg in commit_messages:
+ self.assertIsInstance(commit_msg.new_files, list)
+ self.assertGreater(len(commit_msg.new_files), 0)
+
+ # Verify file metadata structure
+ for file_meta in commit_msg.new_files:
+ self.assertIsNotNone(file_meta.file_name)
+ self.assertGreater(file_meta.file_size, 0)
+ self.assertGreater(file_meta.row_count, 0)
+ total_file_size += file_meta.file_size
+
+ # Verify total data written (50 rows of normal data + 50 rows of blob
data = 100 total)
+ self.assertEqual(total_row_count, 50)
+
+ # Verify total file size is substantial (should be much larger than
2.5GB due to overhead)
+ total_size_mb = total_file_size / (1024 * 1024)
+ self.assertGreater(total_size_mb, 2000) # Should be at least 2GB due
to overhead
+
+ total_files = sum(len(commit_msg.new_files) for commit_msg in
commit_messages)
+ print(f"Total data written: {total_size_mb:.2f}MB across {total_files}
files")
+ print(f"Total rows processed: {total_row_count}")
+
+ blob_writer.close()
+
+ def test_data_blob_writer_abort_functionality(self):
+ """Test DataBlobWriter abort functionality."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.abort_test', schema, False)
+ table = self.catalog.get_table('test_db.abort_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Test data
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'blob_data': [b'blob_1', b'blob_2']
+ }, schema=pa_schema)
+
+ # Write some data
+ for batch in test_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+
+ # Test abort - BatchTableWrite doesn't have abort method
+ # The abort functionality is handled internally by DataBlobWriter
+
+ blob_writer.close()
+
+ def test_data_blob_writer_multiple_batches(self):
+ """Test DataBlobWriter with multiple batches and verify results."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('document', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.multiple_batches_test', schema,
False)
+ table = self.catalog.get_table('test_db.multiple_batches_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Test data - multiple batches
+ batch1_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'name': ['Alice', 'Bob'],
+ 'document': [b'document_1_content', b'document_2_content']
+ }, schema=pa_schema)
+
+ batch2_data = pa.Table.from_pydict({
+ 'id': [3, 4, 5],
+ 'name': ['Charlie', 'David', 'Eve'],
+ 'document': [b'document_3_content', b'document_4_content',
b'document_5_content']
+ }, schema=pa_schema)
+
+ batch3_data = pa.Table.from_pydict({
+ 'id': [6],
+ 'name': ['Frank'],
+ 'document': [b'document_6_content']
+ }, schema=pa_schema)
+
+ # Write multiple batches
+ total_rows = 0
+ for batch in batch1_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ for batch in batch2_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ for batch in batch3_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ # Record count is tracked internally by DataBlobWriter
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+
+ # Verify we have committed files
+ self.assertGreater(len(commit_messages), 0)
+
+ blob_writer.close()
+
+ def test_data_blob_writer_large_batches(self):
+ """Test DataBlobWriter with large batches to test rolling behavior."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('description', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.large_batches_test', schema, False)
+ table = self.catalog.get_table('test_db.large_batches_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Create large batches with substantial blob data
+ large_blob_data = b'L' * 10000 # 10KB blob data
+
+ # Batch 1: 100 rows
+ batch1_data = pa.Table.from_pydict({
+ 'id': list(range(1, 101)),
+ 'description': [f'Description for row {i}' for i in range(1, 101)],
+ 'large_blob': [large_blob_data] * 100
+ }, schema=pa_schema)
+
+ # Batch 2: 50 rows
+ batch2_data = pa.Table.from_pydict({
+ 'id': list(range(101, 151)),
+ 'description': [f'Description for row {i}' for i in range(101,
151)],
+ 'large_blob': [large_blob_data] * 50
+ }, schema=pa_schema)
+
+ # Write large batches
+ total_rows = 0
+ for batch in batch1_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ for batch in batch2_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ # Record count is tracked internally by DataBlobWriter
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+
+ # Verify we have committed files
+ self.assertGreater(len(commit_messages), 0)
+
+ blob_writer.close()
+
+ def test_data_blob_writer_mixed_data_types(self):
+ """Test DataBlobWriter with mixed data types in blob column."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('type', pa.string()),
+ ('data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.mixed_data_test', schema, False)
+ table = self.catalog.get_table('test_db.mixed_data_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Test data with different types of blob content
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'type': ['text', 'json', 'binary', 'image', 'pdf'],
+ 'data': [
+ b'This is text content',
+ b'{"key": "value", "number": 42}',
+ b'\x00\x01\x02\x03\xff\xfe\xfd',
+ b'PNG_IMAGE_DATA_PLACEHOLDER',
+ b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER'
+ ]
+ }, schema=pa_schema)
+
+ # Write mixed data
+ total_rows = 0
+ for batch in test_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ # Record count is tracked internally by DataBlobWriter
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+
+ # Verify we have committed files
+ self.assertGreater(len(commit_messages), 0)
+
+ # Verify commit message structure
+ for commit_msg in commit_messages:
+ self.assertIsInstance(commit_msg.new_files, list)
+ self.assertGreater(len(commit_msg.new_files), 0)
+
+ # Verify file metadata structure
+ for file_meta in commit_msg.new_files:
+ self.assertIsNotNone(file_meta.file_name)
+ self.assertGreater(file_meta.file_size, 0)
+ self.assertGreater(file_meta.row_count, 0)
+
+ # Should have both normal and blob files
+ file_names = [f.file_name for f in commit_msg.new_files]
+ parquet_files = [f for f in file_names if f.endswith('.parquet')]
+ blob_files = [f for f in file_names if f.endswith('.blob')]
+
+ self.assertGreater(len(parquet_files), 0, "Should have at least one
parquet file")
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ # Create commit and commit the data
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ blob_writer.close()
+
+ # Read data back using table API
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data was read back correctly
+ self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+ self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+ # Convert result to pandas for easier comparison
+ result_df = result.to_pandas()
+
+ # Verify each row matches the original data
+ for i in range(5):
+ original_id = test_data.column('id')[i].as_py()
+ original_type = test_data.column('type')[i].as_py()
+ original_data = test_data.column('data')[i].as_py()
+
+ result_id = result_df.iloc[i]['id']
+ result_type = result_df.iloc[i]['type']
+ result_data = result_df.iloc[i]['data']
+
+ self.assertEqual(result_id, original_id, f"Row {i+1}: ID should
match")
+ self.assertEqual(result_type, original_type, f"Row {i+1}: Type
should match")
+ self.assertEqual(result_data, original_data, f"Row {i+1}: Blob
data should match")
+
+ def test_data_blob_writer_empty_batches(self):
+ """Test DataBlobWriter with empty batches."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.empty_batches_test', schema, False)
+ table = self.catalog.get_table('test_db.empty_batches_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Test data with some empty batches
+ batch1_data = pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'data': [b'data1', b'data2']
+ }, schema=pa_schema)
+
+ # Empty batch
+ empty_batch = pa.Table.from_pydict({
+ 'id': [],
+ 'data': []
+ }, schema=pa_schema)
+
+ batch2_data = pa.Table.from_pydict({
+ 'id': [3],
+ 'data': [b'data3']
+ }, schema=pa_schema)
+
+ # Write batches including empty ones
+ total_rows = 0
+ for batch in batch1_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ for batch in empty_batch.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ for batch in batch2_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+ total_rows += batch.num_rows
+
+ # Verify record count (empty batch should not affect count)
+ # Record count is tracked internally by DataBlobWriter
+ # Record count is tracked internally by DataBlobWriter
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+
+ blob_writer.close()
+
+ def test_data_blob_writer_rolling_behavior(self):
+ """Test DataBlobWriter rolling behavior with multiple commits."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('content', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.rolling_test', schema, False)
+ table = self.catalog.get_table('test_db.rolling_test')
+
+ # Use proper table API to create writer
+ write_builder = table.new_batch_write_builder()
+ blob_writer = write_builder.new_write()
+
+ # Create data that should trigger rolling
+ large_content = 'X' * 1000 # Large string content
+ large_blob = b'B' * 5000 # Large blob data
+
+ # Write multiple batches to test rolling
+ for i in range(10): # 10 batches
+ batch_data = pa.Table.from_pydict({
+ 'id': [i * 10 + j for j in range(10)],
+ 'content': [f'{large_content}_{i}_{j}' for j in range(10)],
+ 'blob_data': [large_blob] * 10
+ }, schema=pa_schema)
+
+ for batch in batch_data.to_batches():
+ blob_writer.write_arrow_batch(batch)
+
+ # Verify total record count
+ # Record count is tracked internally by DataBlobWriter
+
+ # Test prepare commit
+ commit_messages = blob_writer.prepare_commit()
+ self.assertIsInstance(commit_messages, list)
+
+ # Verify we have committed files
+ self.assertGreater(len(commit_messages), 0)
+
+ # Verify file metadata structure
+ for commit_msg in commit_messages:
+ for file_meta in commit_msg.new_files:
+ self.assertIsNotNone(file_meta.file_name)
+ self.assertGreater(file_meta.file_size, 0)
+ self.assertGreater(file_meta.row_count, 0)
+
+ blob_writer.close()
+
+ def test_blob_write_read_end_to_end(self):
+ """Test complete end-to-end blob functionality: write blob data and
read it back to verify correctness."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('description', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_write_read_e2e', schema, False)
+ table = self.catalog.get_table('test_db.blob_write_read_e2e')
+
+ # Test data with various blob sizes and types
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
+ 'description': ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'],
+ 'blob_data': [
+ b'small_blob_1',
+ b'medium_blob_data_2_with_more_content',
+ b'large_blob_data_3_with_even_more_content_and_details',
+
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', #
noqa: E501
+
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
# noqa: E501
+ ]
+ }, schema=pa_schema)
+
+ # Write data using table API
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+
+ # Commit the data
+ commit_messages = writer.prepare_commit()
+ self.assertGreater(len(commit_messages), 0)
+
+ # Verify commit message structure
+ for commit_msg in commit_messages:
+ self.assertIsInstance(commit_msg.new_files, list)
+ self.assertGreater(len(commit_msg.new_files), 0)
+
+ # Should have both normal and blob files
+ file_names = [f.file_name for f in commit_msg.new_files]
+ parquet_files = [f for f in file_names if f.endswith('.parquet')]
+ blob_files = [f for f in file_names if f.endswith('.blob')]
+
+ self.assertGreater(len(parquet_files), 0, "Should have at least one
parquet file")
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ # Create commit and commit the data
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back using table API
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data was read back correctly
+ self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+ self.assertEqual(result.num_columns, 4, "Should have 4 columns")
+
+ # Verify normal columns
+ self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID
column should match")
+ self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob',
'Charlie', 'David', 'Eve'], "Name column should match") # noqa: E501
+ self.assertEqual(result.column('description').to_pylist(), ['User 1',
'User 2', 'User 3', 'User 4', 'User 5'], "Description column should match") #
noqa: E501
+
+ # Verify blob data correctness
+ blob_data = result.column('blob_data').to_pylist()
+ expected_blobs = [
+ b'small_blob_1',
+ b'medium_blob_data_2_with_more_content',
+ b'large_blob_data_3_with_even_more_content_and_details',
+
b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', #
noqa: E501
+
b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects'
# noqa: E501
+ ]
+
+ self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
+ self.assertEqual(blob_data, expected_blobs, "Blob data should match
exactly")
+
+ # Verify individual blob sizes
+ for i, (actual_blob, expected_blob) in enumerate(zip(blob_data,
expected_blobs)):
+ self.assertEqual(len(actual_blob), len(expected_blob), f"Blob
{i+1} size should match")
+ self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content
should match exactly")
+
+ print(f"✅ End-to-end blob write/read test passed: wrote and read back
{len(blob_data)} blob records correctly") # noqa: E501
+
+ def test_blob_write_read_large_data_end_to_end(self):
+ """Test end-to-end blob functionality with large blob data (1MB per
blob)."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('metadata', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_large_write_read_e2e', schema,
False)
+ table = self.catalog.get_table('test_db.blob_large_write_read_e2e')
+
+ # Create large blob data (1MB per blob)
+ large_blob_size = 1024 * 1024 # 1MB
+ blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
+ pattern_size = len(blob_pattern)
+ repetitions = large_blob_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ # Test data with large blobs
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3],
+ 'metadata': ['Large blob 1', 'Large blob 2', 'Large blob 3'],
+ 'large_blob': [large_blob_data, large_blob_data, large_blob_data]
+ }, schema=pa_schema)
+
+ # Write data
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+
+ # Commit the data
+ commit_messages = writer.prepare_commit()
+ self.assertGreater(len(commit_messages), 0)
+
+ # Verify commit message structure
+ for commit_msg in commit_messages:
+ self.assertIsInstance(commit_msg.new_files, list)
+ self.assertGreater(len(commit_msg.new_files), 0)
+
+ # Should have both normal and blob files
+ file_names = [f.file_name for f in commit_msg.new_files]
+ parquet_files = [f for f in file_names if f.endswith('.parquet')]
+ blob_files = [f for f in file_names if f.endswith('.blob')]
+
+ self.assertGreater(len(parquet_files), 0, "Should have at least one
parquet file")
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data
+ self.assertEqual(result.num_rows, 3, "Should have 3 rows")
+ self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+ # Verify normal columns
+ self.assertEqual(result.column('id').to_pylist(), [1, 2, 3], "ID
column should match")
+ self.assertEqual(result.column('metadata').to_pylist(), ['Large blob
1', 'Large blob 2', 'Large blob 3'], "Metadata column should match") # noqa:
E501
+
+ # Verify blob data integrity
+ blob_data = result.column('large_blob').to_pylist()
+ self.assertEqual(len(blob_data), 3, "Should have 3 blob records")
+
+ for i, blob in enumerate(blob_data):
+ self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1}
should be {large_blob_size} bytes")
+ self.assertEqual(blob, large_blob_data, f"Blob {i+1} content
should match exactly")
+ print(f"✅ Verified large blob {i+1}: {len(blob)} bytes")
+
+ print(f"✅ Large blob end-to-end test passed: wrote and read back
{len(blob_data)} large blob records correctly") # noqa: E501
+
+ def test_blob_write_read_mixed_sizes_end_to_end(self):
+ """Test end-to-end blob functionality with mixed blob sizes."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('size_category', pa.string()),
+ ('blob_data', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_mixed_sizes_write_read_e2e',
schema, False)
+ table =
self.catalog.get_table('test_db.blob_mixed_sizes_write_read_e2e')
+
+ # Create blobs of different sizes
+ tiny_blob = b'tiny'
+ small_blob = b'small_blob_data' * 10 # ~140 bytes
+ medium_blob = b'medium_blob_data' * 100 # ~1.4KB
+ large_blob = b'large_blob_data' * 1000 # ~14KB
+ huge_blob = b'huge_blob_data' * 10000 # ~140KB
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1, 2, 3, 4, 5],
+ 'size_category': ['tiny', 'small', 'medium', 'large', 'huge'],
+ 'blob_data': [tiny_blob, small_blob, medium_blob, large_blob,
huge_blob]
+ }, schema=pa_schema)
+
+ # Write data
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+
+ # Commit
+ commit_messages = writer.prepare_commit()
+ self.assertGreater(len(commit_messages), 0)
+
+ # Verify commit message structure
+ for commit_msg in commit_messages:
+ self.assertIsInstance(commit_msg.new_files, list)
+ self.assertGreater(len(commit_msg.new_files), 0)
+
+ # Should have both normal and blob files
+ file_names = [f.file_name for f in commit_msg.new_files]
+ parquet_files = [f for f in file_names if f.endswith('.parquet')]
+ blob_files = [f for f in file_names if f.endswith('.blob')]
+
+ self.assertGreater(len(parquet_files), 0, "Should have at least one
parquet file")
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ # Read back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify
+ self.assertEqual(result.num_rows, 5, "Should have 5 rows")
+ self.assertEqual(result.num_columns, 3, "Should have 3 columns")
+
+ # Verify normal columns
+ self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID
column should match")
+ self.assertEqual(result.column('size_category').to_pylist(), ['tiny',
'small', 'medium', 'large', 'huge'], "Size category column should match") #
noqa: E501
+
+ # Verify blob data
+ blob_data = result.column('blob_data').to_pylist()
+ expected_blobs = [tiny_blob, small_blob, medium_blob, large_blob,
huge_blob]
+
+ self.assertEqual(len(blob_data), 5, "Should have 5 blob records")
+ self.assertEqual(blob_data, expected_blobs, "Blob data should match
exactly")
+
+ # Verify sizes
+ sizes = [len(blob) for blob in blob_data]
+ expected_sizes = [len(blob) for blob in expected_blobs]
+ self.assertEqual(sizes, expected_sizes, "Blob sizes should match")
+
+ # Verify individual blob content
+ for i, (actual_blob, expected_blob) in enumerate(zip(blob_data,
expected_blobs)):
+ self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content
should match exactly")
+
+ print(f"✅ Mixed sizes end-to-end test passed: wrote and read back
blobs ranging from {min(sizes)} to {max(sizes)} bytes") # noqa: E501
+
+ def test_blob_write_read_large_data_end_to_end_with_rolling(self):
+ """Test end-to-end blob functionality with large blob data (50MB per
blob) and rolling behavior (40 blobs)."""
+ from pypaimon import Schema
+
+ # Create schema with blob column
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('batch_id', pa.int32()),
+ ('metadata', pa.string()),
+ ('large_blob', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true'
+ }
+ )
+ self.catalog.create_table('test_db.blob_large_rolling_e2e', schema,
False)
+ table = self.catalog.get_table('test_db.blob_large_rolling_e2e')
+
+ # Create large blob data (50MB per blob)
+ large_blob_size = 50 * 1024 * 1024 # 50MB
+ blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
+ pattern_size = len(blob_pattern)
+ repetitions = large_blob_size // pattern_size
+ large_blob_data = blob_pattern * repetitions
+
+ # Verify the blob size is exactly 50MB
+ actual_size = len(large_blob_data)
+ print(f"Created blob data: {actual_size:,} bytes ({actual_size /
(1024*1024):.2f} MB)")
+
+ # Write 40 batches of data (each with 1 blob of 50MB)
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ # Write all 40 batches first
+ for batch_id in range(40):
+ # Create test data for this batch
+ test_data = pa.Table.from_pydict({
+ 'id': [batch_id + 1],
+ 'batch_id': [batch_id],
+ 'metadata': [f'Large blob batch {batch_id + 1}'],
+ 'large_blob': [large_blob_data]
+ }, schema=pa_schema)
+
+ # Write data
+ writer.write_arrow(test_data)
+
+ # Print progress every 10 batches
+ if (batch_id + 1) % 10 == 0:
+ print(f"✅ Written batch {batch_id + 1}/40:
{len(large_blob_data):,} bytes")
+
+ print("✅ Successfully wrote all 40 batches of 50MB blobs")
+
+ # Commit all data at once
+ commit_messages = writer.prepare_commit()
+ self.assertGreater(len(commit_messages), 0)
+
+ # Verify commit message structure
+ for commit_msg in commit_messages:
+ self.assertIsInstance(commit_msg.new_files, list)
+ self.assertGreater(len(commit_msg.new_files), 0)
+
+ # Should have both normal and blob files
+ file_names = [f.file_name for f in commit_msg.new_files]
+ parquet_files = [f for f in file_names if f.endswith('.parquet')]
+ blob_files = [f for f in file_names if f.endswith('.blob')]
+
+ self.assertGreater(len(parquet_files), 0, "Should have at least one
parquet file")
+ self.assertGreater(len(blob_files), 0, "Should have at least one blob
file")
+
+ # Commit the data
+ commit = write_builder.new_commit()
+ commit.commit(commit_messages)
+ writer.close()
+
+ print(f"✅ Successfully committed {len(commit_messages)} commit
messages with 40 batches of 50MB blobs")
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify the data
+ self.assertEqual(result.num_rows, 40, "Should have 40 rows")
+ self.assertEqual(result.num_columns, 4, "Should have 4 columns")
+
+ # Verify normal columns
+ expected_ids = list(range(1, 41))
+ expected_batch_ids = list(range(40))
+ expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)]
+
+ self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID
column should match")
+ self.assertEqual(result.column('batch_id').to_pylist(),
expected_batch_ids, "Batch ID column should match") # noqa: E501
+ self.assertEqual(result.column('metadata').to_pylist(),
expected_metadata, "Metadata column should match") # noqa: E501
+
+ # Verify blob data integrity
+ blob_data = result.column('large_blob').to_pylist()
+ self.assertEqual(len(blob_data), 40, "Should have 40 blob records")
+
+ # Verify each blob
+ for i, blob in enumerate(blob_data):
+ self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1}
should be {large_blob_size:,} bytes")
+ self.assertEqual(blob, large_blob_data, f"Blob {i+1} content
should match exactly")
+
+ # Print progress every 10 blobs
+ if (i + 1) % 10 == 0:
+ print(f"✅ Verified blob {i+1}/40: {len(blob):,} bytes")
+
+ # Verify total data size
+ total_blob_size = sum(len(blob) for blob in blob_data)
+ expected_total_size = 40 * len(large_blob_data)
+ self.assertEqual(total_blob_size, expected_total_size,
+ f"Total blob size should be {expected_total_size:,}
bytes")
+
+ print("✅ Large blob rolling end-to-end test passed:")
+ print(" - Wrote and read back 40 blobs of 50MB each")
+ print(f" - Total data size: {total_blob_size:,} bytes
({total_blob_size / (1024*1024*1024):.2f} GB)") # noqa: E501
+ print(" - All blob content verified as correct")
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 4e5b4d723e..97804bbf6b 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -21,6 +21,7 @@ import uuid
from pathlib import Path
from typing import List
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
@@ -134,7 +135,7 @@ class FileStoreCommit:
new_snapshot_id = self._generate_snapshot_id()
# Check if row tracking is enabled
- row_tracking_enabled = self.table.options.get('row-tracking.enabled',
'false').lower() == 'true'
+ row_tracking_enabled =
self.table.options.get(CoreOptions.ROW_TRACKING_ENABLED, 'false').lower() ==
'true'
# Apply row tracking logic if enabled
next_row_id = None
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index 841fef3a65..35b4a7d980 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -21,6 +21,7 @@ import pyarrow as pa
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+from pypaimon.write.writer.data_blob_writer import DataBlobWriter
from pypaimon.write.writer.data_writer import DataWriter
from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
@@ -44,7 +45,15 @@ class FileStoreWrite:
writer.write(data)
def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter:
- if self.table.is_primary_key_table:
+ # Check if table has blob columns
+ if self._has_blob_columns():
+ return DataBlobWriter(
+ table=self.table,
+ partition=partition,
+ bucket=bucket,
+ max_seq_number=self.max_seq_numbers.get((partition, bucket),
1),
+ )
+ elif self.table.is_primary_key_table:
return KeyValueDataWriter(
table=self.table,
partition=partition,
@@ -60,6 +69,17 @@ class FileStoreWrite:
write_cols=self.write_cols
)
+ def _has_blob_columns(self) -> bool:
+ """Check if the table schema contains blob columns."""
+ for field in self.table.table_schema.fields:
+ # Check if field type is blob
+ if hasattr(field.type, 'type') and field.type.type == 'BLOB':
+ return True
+ # Alternative: check for specific blob type class
+ elif hasattr(field.type, '__class__') and 'blob' in
field.type.__class__.__name__.lower():
+ return True
+ return False
+
def prepare_commit(self) -> List[CommitMessage]:
commit_messages = []
for (partition, bucket), writer in self.data_writers.items():
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
similarity index 50%
copy from paimon-python/pypaimon/common/core_options.py
copy to paimon-python/pypaimon/write/writer/blob_writer.py
index 8ab26fe062..ff153da843 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -16,36 +16,31 @@
# limitations under the License.
################################################################################
-from enum import Enum
-
-
-class CoreOptions(str, Enum):
- """Core options for paimon."""
-
- def __str__(self):
- return self.value
-
- # Basic options
- AUTO_CREATE = "auto-create"
- PATH = "path"
- TYPE = "type"
- BRANCH = "branch"
- BUCKET = "bucket"
- BUCKET_KEY = "bucket-key"
- WAREHOUSE = "warehouse"
- # File format options
- FILE_FORMAT = "file.format"
- FILE_FORMAT_ORC = "orc"
- FILE_FORMAT_AVRO = "avro"
- FILE_FORMAT_PARQUET = "parquet"
- FILE_FORMAT_BLOB = "blob"
- FILE_COMPRESSION = "file.compression"
- FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
- FILE_FORMAT_PER_LEVEL = "file.format.per.level"
- FILE_BLOCK_SIZE = "file.block-size"
- FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
- # Scan options
- SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
- INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"
- # Commit options
- COMMIT_USER_PREFIX = "commit.user-prefix"
+import logging
+from typing import Tuple
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
+
+logger = logging.getLogger(__name__)
+
+
+class BlobWriter(AppendOnlyDataWriter):
+
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int, blob_column: str):
+ super().__init__(table, partition, bucket, max_seq_number,
[blob_column])
+
+ # Override file format to "blob"
+ self.file_format = CoreOptions.FILE_FORMAT_BLOB
+
+ logger.info("Initialized BlobWriter with blob file format")
+
+ @staticmethod
+ def _get_column_stats(record_batch, column_name: str):
+ column_array = record_batch.column(column_name)
+ # For blob data, don't generate min/max values
+ return {
+ "min_values": None,
+ "max_values": None,
+ "null_counts": column_array.null_count,
+ }
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
new file mode 100644
index 0000000000..e34b9a5701
--- /dev/null
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -0,0 +1,321 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import uuid
+from datetime import datetime
+from pathlib import Path
+from typing import List, Optional, Tuple
+
+import pyarrow as pa
+
+from pypaimon.common.core_options import CoreOptions
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.write.writer.data_writer import DataWriter
+
+logger = logging.getLogger(__name__)
+
+
+class DataBlobWriter(DataWriter):
+ """
+ A rolling file writer that handles both normal data and blob data. This
writer creates separate
+ files for normal columns and blob columns, managing their lifecycle
independently.
+
+ For example, given a table schema with normal columns (id INT, name
STRING) and a blob column
+ (data BLOB), this writer will create separate files for (id, name) and
(data).
+
+ Key features:
+ - Blob data can roll independently when normal data doesn't need rolling
+ - When normal data rolls, blob data MUST also be closed (Java behavior)
+ - Blob data uses more aggressive rolling (smaller target size) to prevent
memory issues
+ - One normal data file may correspond to multiple blob data files
+ - Blob data is written immediately to disk to prevent memory corruption
+ - Blob file metadata is stored as separate DataFileMeta objects after
normal file metadata
+
+ Rolling behavior:
+ - Normal data rolls: Both normal and blob writers are closed together,
blob metadata added after normal metadata
+ - Blob data rolls independently: Only blob writer is closed, blob metadata
is cached until normal data rolls
+
+ Metadata organization:
+ - Normal file metadata is added first to committed_files
+ - Blob file metadata is added after normal file metadata in committed_files
+ - When blob rolls independently, metadata is cached until normal data rolls
+ - Result: [normal_meta, blob_meta1, blob_meta2, blob_meta3, ...]
+
+ Example file organization:
+ committed_files = [
+ normal_file1_meta, # f1.parquet metadata
+ blob_file1_meta, # b1.blob metadata
+ blob_file2_meta, # b2.blob metadata
+ blob_file3_meta, # b3.blob metadata
+ normal_file2_meta, # f1-2.parquet metadata
+ blob_file4_meta, # b4.blob metadata
+ blob_file5_meta, # b5.blob metadata
+ ]
+
+ This matches the Java RollingBlobFileWriter behavior exactly.
+ """
+
+ # Constant for checking rolling condition periodically
+ CHECK_ROLLING_RECORD_CNT = 1000
+
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int):
+ super().__init__(table, partition, bucket, max_seq_number)
+
+ # Determine blob column from table schema
+ self.blob_column_name = self._get_blob_columns_from_schema()
+
+ # Split schema into normal and blob columns
+ all_column_names = [field.name for field in
self.table.table_schema.fields]
+ self.normal_column_names = [col for col in all_column_names if col !=
self.blob_column_name]
+ self.write_cols = self.normal_column_names
+
+ # State management for blob writer
+ self.record_count = 0
+ self.closed = False
+
+ # Track pending data for normal data only
+ self.pending_normal_data: Optional[pa.Table] = None
+
+ # Initialize blob writer with blob column name
+ from pypaimon.write.writer.blob_writer import BlobWriter
+ self.blob_writer = BlobWriter(
+ table=self.table,
+ partition=self.partition,
+ bucket=self.bucket,
+ max_seq_number=max_seq_number,
+ blob_column=self.blob_column_name
+ )
+
+ logger.info(f"Initialized DataBlobWriter with blob column:
{self.blob_column_name}")
+
+ def _get_blob_columns_from_schema(self) -> str:
+ blob_columns = []
+ for field in self.table.table_schema.fields:
+ type_str = str(field.type).lower()
+ if 'blob' in type_str:
+ blob_columns.append(field.name)
+
+ # Validate blob column count (matching Java constraint)
+ if len(blob_columns) == 0:
+ raise ValueError("No blob field found in table schema.")
+ elif len(blob_columns) > 1:
+ raise ValueError("Limit exactly one blob field in one paimon table
yet.")
+
+ return blob_columns[0] # Return single blob column name
+
+ def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
+ normal_data, _ = self._split_data(data)
+ return normal_data
+
+ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) ->
pa.Table:
+ return self._merge_normal_data(existing_data, new_data)
+
+ def write(self, data: pa.RecordBatch):
+ try:
+ # Split data into normal and blob parts
+ normal_data, blob_data = self._split_data(data)
+
+ # Process and accumulate normal data
+ processed_normal = self._process_normal_data(normal_data)
+ if self.pending_normal_data is None:
+ self.pending_normal_data = processed_normal
+ else:
+ self.pending_normal_data =
self._merge_normal_data(self.pending_normal_data, processed_normal)
+
+ # Write blob data directly to blob writer (handles its own rolling)
+ if blob_data is not None and blob_data.num_rows > 0:
+ # Write blob data directly to blob writer
+ self.blob_writer.write(blob_data)
+
+ self.record_count += data.num_rows
+
+ # Check if normal data rolling is needed
+ if self._should_roll_normal():
+ # When normal data rolls, close both writers and fetch blob
metadata
+ self._close_current_writers()
+
+ except Exception as e:
+ logger.error("Exception occurs when writing data. Cleaning up.",
exc_info=e)
+ self.abort()
+ raise e
+
+ def prepare_commit(self) -> List[DataFileMeta]:
+ # Close any remaining data
+ self._close_current_writers()
+
+ return self.committed_files.copy()
+
+ def close(self):
+ if self.closed:
+ return
+
+ try:
+ if self.pending_normal_data is not None and
self.pending_normal_data.num_rows > 0:
+ self._close_current_writers()
+ except Exception as e:
+ logger.error("Exception occurs when closing writer. Cleaning up.",
exc_info=e)
+ self.abort()
+ finally:
+ self.closed = True
+ self.pending_normal_data = None
+
+ def abort(self):
+ """Abort all writers and clean up resources."""
+ self.blob_writer.abort()
+ self.pending_normal_data = None
+ self.committed_files.clear()
+
+ def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch,
pa.RecordBatch]:
+ """Split data into normal and blob parts based on column names."""
+ # Use the pre-computed column names
+ normal_columns = self.normal_column_names
+ blob_columns = [self.blob_column_name] # Single blob column
+
+ # Create projected batches
+ normal_data = data.select(normal_columns) if normal_columns else None
+ blob_data = data.select(blob_columns) if blob_columns else None
+
+ return normal_data, blob_data
+
+ def _process_normal_data(self, data: pa.RecordBatch) -> pa.Table:
+ """Process normal data (similar to base DataWriter)."""
+ if data is None or data.num_rows == 0:
+ return pa.Table.from_batches([])
+ return pa.Table.from_batches([data])
+
+ def _merge_normal_data(self, existing_data: pa.Table, new_data: pa.Table)
-> pa.Table:
+ return pa.concat_tables([existing_data, new_data])
+
+ def _should_roll_normal(self) -> bool:
+ if self.pending_normal_data is None:
+ return False
+
+ # Check rolling condition periodically (every CHECK_ROLLING_RECORD_CNT
records)
+ if self.record_count % self.CHECK_ROLLING_RECORD_CNT != 0:
+ return False
+
+ # Check if normal data exceeds target size
+ current_size = self.pending_normal_data.nbytes
+ return current_size > self.target_file_size
+
+ def _close_current_writers(self):
+ """Close both normal and blob writers and add blob metadata after
normal metadata (Java behavior)."""
+ if self.pending_normal_data is None or
self.pending_normal_data.num_rows == 0:
+ return
+
+ # Close normal writer and get metadata
+ normal_meta = self._write_normal_data_to_file(self.pending_normal_data)
+
+ # Fetch blob metadata from blob writer
+ blob_metas = self.blob_writer.prepare_commit()
+
+ # Validate consistency between normal and blob files (Java behavior)
+ self._validate_consistency(normal_meta, blob_metas)
+
+ # Add normal file metadata first
+ self.committed_files.append(normal_meta)
+
+ # Add blob file metadata after normal metadata
+ self.committed_files.extend(blob_metas)
+
+ # Reset pending data
+ self.pending_normal_data = None
+
+ logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
+ f"added {len(blob_metas)} blob file metadata after normal
metadata")
+
+ def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta:
+ if data.num_rows == 0:
+ return None
+
+ file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
+ file_path = self._generate_file_path(file_name)
+
+ # Write file based on format
+ if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
+ self.file_io.write_parquet(file_path, data,
compression=self.compression)
+ elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
+ self.file_io.write_orc(file_path, data,
compression=self.compression)
+ elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
+ self.file_io.write_avro(file_path, data)
+ else:
+ raise ValueError(f"Unsupported file format: {self.file_format}")
+
+ # Generate metadata
+ return self._create_data_file_meta(file_name, file_path, data)
+
+ def _create_data_file_meta(self, file_name: str, file_path: Path, data:
pa.Table) -> DataFileMeta:
+ # Column stats (only for normal columns)
+ column_stats = {
+ field.name: self._get_column_stats(data, field.name)
+ for field in self.table.table_schema.fields
+ if field.name != self.blob_column_name
+ }
+
+ # Get normal fields only
+ normal_fields = [field for field in self.table.table_schema.fields
+ if field.name != self.blob_column_name]
+
+ min_value_stats = [column_stats[field.name]['min_values'] for field in
normal_fields]
+ max_value_stats = [column_stats[field.name]['max_values'] for field in
normal_fields]
+ value_null_counts = [column_stats[field.name]['null_counts'] for field
in normal_fields]
+
+ self.sequence_generator.start = self.sequence_generator.current
+
+ return DataFileMeta(
+ file_name=file_name,
+ file_size=self.file_io.get_file_size(file_path),
+ row_count=data.num_rows,
+ min_key=GenericRow([], []),
+ max_key=GenericRow([], []),
+ key_stats=SimpleStats(
+ GenericRow([], []),
+ GenericRow([], []),
+ []),
+ value_stats=SimpleStats(
+ GenericRow(min_value_stats, normal_fields),
+ GenericRow(max_value_stats, normal_fields),
+ value_null_counts),
+ min_sequence_number=-1,
+ max_sequence_number=-1,
+ schema_id=self.table.table_schema.id,
+ level=0,
+ extra_files=[],
+ creation_time=datetime.now(),
+ delete_row_count=0,
+ file_source="APPEND",
+ value_stats_cols=self.normal_column_names,
+ file_path=str(file_path),
+ write_cols=self.write_cols)
+
+ def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas:
List[DataFileMeta]):
+ if normal_meta is None:
+ return
+
+ normal_row_count = normal_meta.row_count
+ blob_row_count = sum(meta.row_count for meta in blob_metas)
+
+ if normal_row_count != blob_row_count:
+ raise RuntimeError(
+ f"This is a bug: The row count of main file and blob files
does not match. "
+ f"Main file: {normal_meta.file_name} (row count:
{normal_row_count}), "
+ f"blob files: {[meta.file_name for meta in blob_metas]} (total
row count: {blob_row_count})"
+ )
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 0a063fb2f4..502d196ae6 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -61,14 +61,21 @@ class DataWriter(ABC):
self.blob_as_descriptor =
options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
def write(self, data: pa.RecordBatch):
- processed_data = self._process_data(data)
+ try:
+ processed_data = self._process_data(data)
- if self.pending_data is None:
- self.pending_data = processed_data
- else:
- self.pending_data = self._merge_data(self.pending_data,
processed_data)
+ if self.pending_data is None:
+ self.pending_data = processed_data
+ else:
+ self.pending_data = self._merge_data(self.pending_data,
processed_data)
- self._check_and_roll_if_needed()
+ self._check_and_roll_if_needed()
+ except Exception as e:
+ import logging
+ logger = logging.getLogger(__name__)
+ logger.warning("Exception occurs when writing data. Cleaning up.",
exc_info=e)
+ self.abort()
+ raise e
def prepare_commit(self) -> List[DataFileMeta]:
if self.pending_data is not None and self.pending_data.num_rows > 0:
@@ -78,6 +85,36 @@ class DataWriter(ABC):
return self.committed_files.copy()
def close(self):
+ try:
+ if self.pending_data is not None and self.pending_data.num_rows >
0:
+ self._write_data_to_file(self.pending_data)
+ except Exception as e:
+ import logging
+ logger = logging.getLogger(__name__)
+ logger.warning("Exception occurs when closing writer. Cleaning
up.", exc_info=e)
+ self.abort()
+ raise e
+ finally:
+ self.pending_data = None
+ # Note: Don't clear committed_files in close() - they should be
returned by prepare_commit()
+
+ def abort(self):
+ """
+ Abort all writers and clean up resources. This method should be called
when an error occurs
+ during writing. It deletes any files that were written and cleans up
resources.
+ """
+ # Delete any files that were written
+ for file_meta in self.committed_files:
+ try:
+ if file_meta.file_path:
+ self.file_io.delete_quietly(file_meta.file_path)
+ except Exception as e:
+ # Log but don't raise - we want to clean up as much as possible
+ import logging
+ logger = logging.getLogger(__name__)
+ logger.warning(f"Failed to delete file {file_meta.file_path}
during abort: {e}")
+
+ # Clean up resources
self.pending_data = None
self.committed_files.clear()