This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8b542c5f13 [python] support blob type and blob write and read (#6390)
8b542c5f13 is described below
commit 8b542c5f13f76fdddbbf0298af27b2a96cd3954d
Author: jerry <[email protected]>
AuthorDate: Tue Oct 14 15:42:17 2025 +0800
[python] support blob type and blob write and read (#6390)
---
paimon-python/pypaimon/common/core_options.py | 1 +
.../pypaimon/common/delta_varint_compressor.py | 125 +++
paimon-python/pypaimon/common/file_io.py | 55 ++
.../pypaimon/read/reader/format_blob_reader.py | 199 +++++
paimon-python/pypaimon/read/split_read.py | 9 +-
paimon-python/pypaimon/schema/data_types.py | 5 +
paimon-python/pypaimon/table/row/blob.py | 247 ++++++
paimon-python/pypaimon/table/row/generic_row.py | 15 +-
paimon-python/pypaimon/tests/blob_test.py | 983 +++++++++++++++++++++
.../pypaimon/tests/delta_varint_compressor_test.py | 379 ++++++++
paimon-python/pypaimon/write/blob_format_writer.py | 107 +++
paimon-python/pypaimon/write/writer/data_writer.py | 2 +
12 files changed, 2124 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index 1a060bd206..754bb17c05 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -38,6 +38,7 @@ class CoreOptions(str, Enum):
FILE_FORMAT_ORC = "orc"
FILE_FORMAT_AVRO = "avro"
FILE_FORMAT_PARQUET = "parquet"
+ FILE_FORMAT_BLOB = "blob"
FILE_COMPRESSION = "file.compression"
FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
diff --git a/paimon-python/pypaimon/common/delta_varint_compressor.py
b/paimon-python/pypaimon/common/delta_varint_compressor.py
new file mode 100644
index 0000000000..dc849886bf
--- /dev/null
+++ b/paimon-python/pypaimon/common/delta_varint_compressor.py
@@ -0,0 +1,125 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from typing import List
+
+
+class DeltaVarintCompressor:
+
+ @staticmethod
+ def compress(data: List[int]) -> bytes:
+ if not data:
+ return b''
+
+ # Estimate output size (conservative: 5 bytes per varint max)
+ out = io.BytesIO()
+ out.seek(0)
+
+ # Encode first value directly
+ DeltaVarintCompressor._encode_varint(data[0], out)
+
+ # Encode deltas without intermediate list creation
+ prev = data[0]
+ for i in range(1, len(data)):
+ current = data[i]
+ delta = current - prev
+ DeltaVarintCompressor._encode_varint(delta, out)
+ prev = current
+
+ # Return only the used portion of the buffer
+ position = out.tell()
+ result = out.getvalue()
+ out.close()
+ return result[:position]
+
+ @staticmethod
+ def decompress(compressed: bytes) -> List[int]:
+ if not compressed:
+ return []
+
+ # Fast path: decode directly into result without intermediate deltas
list
+ in_stream = io.BytesIO(compressed)
+ result = []
+
+ try:
+ # Decode first value
+ first_value = DeltaVarintCompressor._decode_varint(in_stream)
+ result.append(first_value)
+
+ # Decode and reconstruct remaining values in one pass
+ current_value = first_value
+ while True:
+ try:
+ delta = DeltaVarintCompressor._decode_varint(in_stream)
+ current_value += delta
+ result.append(current_value)
+ except RuntimeError:
+ # End of stream reached
+ break
+
+ except RuntimeError:
+ # Handle empty stream case
+ pass
+ finally:
+ in_stream.close()
+
+ return result
+
+ @staticmethod
+ def _encode_varint(value: int, out: io.BytesIO) -> None:
+ # ZigZag encoding: maps signed integers to unsigned integers
+ if value >= 0:
+ zigzag = value << 1
+ else:
+ zigzag = ((-value) << 1) - 1
+
+ # Varint encoding
+ while zigzag >= 0x80:
+ out.write(bytes([(zigzag & 0x7F) | 0x80]))
+ zigzag >>= 7
+ out.write(bytes([zigzag & 0x7F]))
+
+ @staticmethod
+ def _decode_varint(in_stream: io.BytesIO) -> int:
+ result = 0
+ shift = 0
+ while True:
+ byte_data = in_stream.read(1)
+ if not byte_data:
+ if shift == 0:
+ # Natural end of stream
+ raise RuntimeError("End of stream")
+ else:
+ # Unexpected end in middle of varint
+ raise RuntimeError("Unexpected end of input")
+
+ b = byte_data[0]
+ result |= (b & 0x7F) << shift
+ if (b & 0x80) == 0:
+ break
+
+ shift += 7
+ if shift > 63:
+ raise RuntimeError("Varint overflow")
+
+ # ZigZag decoding: maps unsigned integers back to signed integers
+ if result & 1:
+ return -((result + 1) >> 1)
+ else:
+ return result >> 1
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 6f5dfc6a2a..f6371d2d80 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -28,6 +28,11 @@ from packaging.version import parse
from pyarrow._fs import FileSystem
from pypaimon.common.config import OssOptions, S3Options
+from pypaimon.schema.data_types import DataField, AtomicType,
PyarrowFieldParser
+from pypaimon.table.row.blob import BlobData
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.row_kind import RowKind
+from pypaimon.write.blob_format_writer import BlobFormatWriter
class FileIO:
@@ -364,3 +369,53 @@ class FileIO:
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema, records, **kwargs)
+
+ def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
+ try:
+ # Validate input constraints
+ if data.num_columns != 1:
+ raise RuntimeError(f"Blob format only supports a single
column, got {data.num_columns} columns")
+ # Check for null values
+ column = data.column(0)
+ if column.null_count > 0:
+ raise RuntimeError("Blob format does not support null values")
+ # Convert PyArrow schema to Paimon DataFields
+ # For blob files, we expect exactly one blob column
+ field = data.schema[0]
+ if pyarrow.types.is_large_binary(field.type):
+ fields = [DataField(0, field.name, AtomicType("BLOB"))]
+ else:
+ # Convert other types as needed
+ paimon_type = PyarrowFieldParser.to_paimon_type(field.type,
field.nullable)
+ fields = [DataField(0, field.name, paimon_type)]
+ # Convert PyArrow Table to records
+ records_dict = data.to_pydict()
+ num_rows = data.num_rows
+ field_name = fields[0].name
+ with self.new_output_stream(path) as output_stream:
+ writer = BlobFormatWriter(output_stream)
+ # Write each row
+ for i in range(num_rows):
+ col_data = records_dict[field_name][i]
+ # Convert to appropriate type based on field type
+ if hasattr(fields[0].type, 'type') and fields[0].type.type
== "BLOB":
+ if isinstance(col_data, bytes):
+ blob_data = BlobData(col_data)
+ else:
+ # Convert to bytes if needed
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ blob_data = BlobData(col_data)
+ row_values = [blob_data]
+ else:
+ row_values = [col_data]
+ # Create GenericRow and write
+ row = GenericRow(row_values, fields, RowKind.INSERT)
+ writer.add_element(row)
+ writer.close()
+
+ except Exception as e:
+ self.delete_quietly(path)
+ raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
new file mode 100644
index 0000000000..709a0c27a3
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -0,0 +1,199 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import struct
+from pathlib import Path
+from typing import List, Optional, Any, Iterator
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
+from pypaimon.table.row.generic_row import GenericRow
+
+
+class FormatBlobReader(RecordBatchReader):
+
+ def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+ full_fields: List[DataField], push_down_predicate: Any):
+ self._file_io = file_io
+ self._file_path = file_path
+ self._push_down_predicate = push_down_predicate
+
+ # Get file size
+ self._file_size = file_io.get_file_size(file_path)
+
+ # Initialize the low-level blob format reader
+ self.file_path = file_path
+ self.blob_lengths: List[int] = []
+ self.blob_offsets: List[int] = []
+ self.returned = False
+ self._read_index()
+
+ # Set up fields and schema
+ if len(read_fields) > 1:
+ raise RuntimeError("Blob reader only supports one field.")
+ self._fields = read_fields
+ full_fields_map = {field.name: field for field in full_fields}
+ projected_data_fields = [full_fields_map[name] for name in read_fields]
+ self._schema =
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
+
+ # Initialize iterator
+ self._blob_iterator = None
+ self._current_batch = None
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ if self._blob_iterator is None:
+ if self.returned:
+ return None
+ self.returned = True
+ batch_iterator = BlobRecordIterator(self.file_path,
self.blob_lengths, self.blob_offsets, self._fields[0])
+ self._blob_iterator = iter(batch_iterator)
+
+ # Collect records for this batch
+ pydict_data = {name: [] for name in self._fields}
+ records_in_batch = 0
+
+ try:
+ while True:
+ # Get next blob record
+ blob_row = next(self._blob_iterator)
+ # Check if first read returns None, stop immediately
+ if blob_row is None:
+ break
+
+ # Extract blob data from the row
+ blob = blob_row.values[0] # Blob files have single blob field
+
+ # Convert blob to appropriate format for each requested field
+ for field_name in self._fields:
+ # For blob files, all fields should contain blob data
+ if isinstance(blob, Blob):
+ blob_data = blob.to_data()
+ else:
+ blob_data = bytes(blob) if blob is not None else None
+ pydict_data[field_name].append(blob_data)
+
+ records_in_batch += 1
+
+ except StopIteration:
+ # Stop immediately when StopIteration occurs
+ pass
+
+ if records_in_batch == 0:
+ return None
+
+ # Create RecordBatch
+ if self._push_down_predicate is None:
+ # Convert to Table first, then to RecordBatch
+ table = pa.Table.from_pydict(pydict_data, self._schema)
+ if table.num_rows > 0:
+ return table.to_batches()[0]
+ else:
+ return None
+ else:
+ # Apply predicate filtering
+ pa_batch = pa.Table.from_pydict(pydict_data, self._schema)
+ dataset = ds.InMemoryDataset(pa_batch)
+ scanner = dataset.scanner(filter=self._push_down_predicate)
+ combine_chunks = scanner.to_table().combine_chunks()
+ if combine_chunks.num_rows > 0:
+ return combine_chunks.to_batches()[0]
+ else:
+ return None
+
+ def close(self):
+ self._blob_iterator = None
+
+ def _read_index(self) -> None:
+ with self._file_io.new_input_stream(Path(self.file_path)) as f:
+ # Seek to header: last 5 bytes
+ f.seek(self._file_size - 5)
+ header = f.read(5)
+
+ if len(header) != 5:
+ raise IOError("Invalid blob file: cannot read header")
+
+ # Parse header
+ index_length = struct.unpack('<I', header[:4])[0] # Little endian
+ version = header[4]
+
+ if version != 1:
+ raise IOError(f"Unsupported blob file version: {version}")
+
+ # Read index data
+ f.seek(self._file_size - 5 - index_length)
+ index_bytes = f.read(index_length)
+
+ if len(index_bytes) != index_length:
+ raise IOError("Invalid blob file: cannot read index")
+
+ # Decompress blob lengths and compute offsets
+ blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
+ blob_offsets = []
+ offset = 0
+ for length in blob_lengths:
+ blob_offsets.append(offset)
+ offset += length
+ self.blob_lengths = blob_lengths
+ self.blob_offsets = blob_offsets
+
+
+class BlobRecordIterator:
+ MAGIC_NUMBER_SIZE = 4
+ METADATA_OVERHEAD = 16
+
+ def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets:
List[int], field_name: str):
+ self.file_path = file_path
+ self.field_name = field_name
+ self.blob_lengths = blob_lengths
+ self.blob_offsets = blob_offsets
+ self.current_position = 0
+
+ def __iter__(self) -> Iterator[GenericRow]:
+ return self
+
+ def __next__(self) -> GenericRow:
+ if self.current_position >= len(self.blob_lengths):
+ raise StopIteration
+
+ # Create blob reference for the current blob
+ # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4
bytes) = 12 bytes
+ blob_offset = self.blob_offsets[self.current_position] +
self.MAGIC_NUMBER_SIZE # Skip magic number
+ blob_length = self.blob_lengths[self.current_position] -
self.METADATA_OVERHEAD
+
+ # Create BlobDescriptor for this blob
+ descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length)
+ blob = BlobRef(descriptor)
+
+ self.current_position += 1
+
+ # Return as GenericRow with single blob field
+ from pypaimon.schema.data_types import DataField, AtomicType
+ from pypaimon.table.row.row_kind import RowKind
+
+ fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
+ return GenericRow([blob], fields, RowKind.INSERT)
+
+ def returned_position(self) -> int:
+ """Get current position in the iterator."""
+ return self.current_position
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index d9e9939c11..7674db45f0 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -21,6 +21,7 @@ from abc import ABC, abstractmethod
from functools import partial
from typing import List, Optional, Tuple
+from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
@@ -31,6 +32,7 @@ from pypaimon.read.reader.drop_delete_reader import
DropDeleteRecordReader
from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
from pypaimon.read.reader.filter_record_reader import FilterRecordReader
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.reader.iface.record_reader import RecordReader
@@ -73,10 +75,13 @@ class SplitRead(ABC):
file_format = extension[1:]
format_reader: RecordBatchReader
- if file_format == "avro":
+ if file_format == CoreOptions.FILE_FORMAT_AVRO:
format_reader = FormatAvroReader(self.table.file_io, file_path,
self._get_final_read_data_fields(),
self.read_fields,
self.push_down_predicate)
- elif file_format == "parquet" or file_format == "orc":
+ elif file_format == CoreOptions.FILE_FORMAT_BLOB:
+ format_reader = FormatBlobReader(self.table.file_io, file_path,
self._get_final_read_data_fields(),
+ self.read_fields,
self.push_down_predicate)
+ elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io,
file_format, file_path,
self._get_final_read_data_fields(), self.push_down_predicate)
else:
diff --git a/paimon-python/pypaimon/schema/data_types.py
b/paimon-python/pypaimon/schema/data_types.py
index 5255ac6b1e..d1ce2354a8 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -247,6 +247,7 @@ class Keyword(Enum):
BINARY = "BINARY"
VARBINARY = "VARBINARY"
BYTES = "BYTES"
+ BLOB = "BLOB"
DECIMAL = "DECIMAL"
NUMERIC = "NUMERIC"
DEC = "DEC"
@@ -407,6 +408,8 @@ class PyarrowFieldParser:
return pyarrow.string()
elif type_name == 'BYTES' or type_name.startswith('VARBINARY'):
return pyarrow.binary()
+ elif type_name == 'BLOB':
+ return pyarrow.large_binary()
elif type_name.startswith('BINARY'):
if type_name == 'BINARY':
return pyarrow.binary(1)
@@ -506,6 +509,8 @@ class PyarrowFieldParser:
type_name = f'BINARY({pa_type.byte_width})'
elif types.is_binary(pa_type):
type_name = 'BYTES'
+ elif types.is_large_binary(pa_type):
+ type_name = 'BLOB'
elif types.is_decimal(pa_type):
type_name = f'DECIMAL({pa_type.precision}, {pa_type.scale})'
elif types.is_timestamp(pa_type) and pa_type.tz is None:
diff --git a/paimon-python/pypaimon/table/row/blob.py
b/paimon-python/pypaimon/table/row/blob.py
new file mode 100644
index 0000000000..b9d8cc0e38
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -0,0 +1,247 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from typing import Optional, Union
+
+
+class BlobDescriptor:
+ CURRENT_VERSION = 1
+
+ def __init__(self, uri: str, offset: int, length: int, version: int =
CURRENT_VERSION):
+ self._version = version
+ self._uri = uri
+ self._offset = offset
+ self._length = length
+
+ @property
+ def uri(self) -> str:
+ return self._uri
+
+ @property
+ def offset(self) -> int:
+ return self._offset
+
+ @property
+ def length(self) -> int:
+ return self._length
+
+ @property
+ def version(self) -> int:
+ return self._version
+
+ def serialize(self) -> bytes:
+ import struct
+
+ uri_bytes = self._uri.encode('utf-8')
+ uri_length = len(uri_bytes)
+
+ # Pack using little endian format
+ data = struct.pack('<B', self._version) # version (1 byte)
+ data += struct.pack('<I', uri_length) # uri length (4 bytes)
+ data += uri_bytes # uri bytes
+ data += struct.pack('<q', self._offset) # offset (8 bytes, signed)
+ data += struct.pack('<q', self._length) # length (8 bytes, signed)
+
+ return data
+
+ @classmethod
+ def deserialize(cls, data: bytes) -> 'BlobDescriptor':
+ import struct
+
+ if len(data) < 5: # minimum size: version(1) + uri_length(4)
+ raise ValueError("Invalid BlobDescriptor data: too short")
+
+ offset = 0
+
+ # Read version
+ version = struct.unpack('<B', data[offset:offset + 1])[0]
+ offset += 1
+
+ # For now, we only support version 1, but allow flexibility for future
versions
+ if version < 1:
+ raise ValueError(f"Unsupported BlobDescriptor version: {version}")
+
+ # Read URI length
+ uri_length = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+
+ # Read URI bytes
+ if offset + uri_length > len(data):
+ raise ValueError("Invalid BlobDescriptor data: URI length exceeds
data size")
+
+ uri_bytes = data[offset:offset + uri_length]
+ uri = uri_bytes.decode('utf-8')
+ offset += uri_length
+
+ # Read offset and length
+ if offset + 16 > len(data):
+ raise ValueError("Invalid BlobDescriptor data: missing
offset/length")
+
+ blob_offset = struct.unpack('<q', data[offset:offset + 8])[0]
+ offset += 8
+
+ blob_length = struct.unpack('<q', data[offset:offset + 8])[0]
+
+ return cls(uri, blob_offset, blob_length, version)
+
+ def __eq__(self, other) -> bool:
+ """Check equality with another BlobDescriptor."""
+ if not isinstance(other, BlobDescriptor):
+ return False
+ return (self._version == other._version and
+ self._uri == other._uri and
+ self._offset == other._offset and
+ self._length == other._length)
+
+ def __hash__(self) -> int:
+ """Calculate hash for the BlobDescriptor."""
+ return hash((self._version, self._uri, self._offset, self._length))
+
+ def __str__(self) -> str:
+ """String representation of BlobDescriptor."""
+ return (f"BlobDescriptor(version={self._version}, uri='{self._uri}', "
+ f"offset={self._offset}, length={self._length})")
+
+ def __repr__(self) -> str:
+ """Detailed representation of BlobDescriptor."""
+ return self.__str__()
+
+
+class Blob(ABC):
+
+ @abstractmethod
+ def to_data(self) -> bytes:
+ pass
+
+ @abstractmethod
+ def to_descriptor(self) -> BlobDescriptor:
+ pass
+
+ @abstractmethod
+ def new_input_stream(self) -> io.BytesIO:
+ pass
+
+ @staticmethod
+ def from_data(data: bytes) -> 'Blob':
+ return BlobData(data)
+
+ @staticmethod
+ def from_local(file: str) -> 'Blob':
+ return Blob.from_file(file)
+
+ @staticmethod
+ def from_http(uri: str) -> 'Blob':
+ descriptor = BlobDescriptor(uri, 0, -1)
+ return BlobRef(descriptor)
+
+ @staticmethod
+ def from_file(file: str, offset: int = 0, length: int = -1) -> 'Blob':
+ descriptor = BlobDescriptor(file, offset, length)
+ return BlobRef(descriptor)
+
+ @staticmethod
+ def from_descriptor(descriptor: BlobDescriptor) -> 'Blob':
+ return BlobRef(descriptor)
+
+
+class BlobData(Blob):
+
+ def __init__(self, data: Optional[Union[bytes, bytearray]] = None):
+ if data is None:
+ self._data = b''
+ elif isinstance(data, (bytes, bytearray)):
+ self._data = bytes(data)
+ else:
+ raise TypeError(f"BlobData expects bytes, bytearray, or None, got
{type(data)}")
+
+ @classmethod
+ def from_bytes(cls, data: bytes) -> 'BlobData':
+ return cls(data)
+
+ @property
+ def data(self) -> bytes:
+ return self._data
+
+ def to_data(self) -> bytes:
+ return self._data
+
+ def to_descriptor(self) -> 'BlobDescriptor':
+ raise RuntimeError("Blob data can not convert to descriptor.")
+
+ def new_input_stream(self) -> io.BytesIO:
+ return io.BytesIO(self._data)
+
+ def __eq__(self, other) -> bool:
+ if other is None or not isinstance(other, BlobData):
+ return False
+ return self._data == other._data
+
+ def __hash__(self) -> int:
+ return hash(self._data)
+
+
+class BlobRef(Blob):
+
+ def __init__(self, descriptor: BlobDescriptor):
+ self._descriptor = descriptor
+
+ def to_data(self) -> bytes:
+ try:
+ with self.new_input_stream() as stream:
+ return stream.read()
+ except Exception as e:
+ raise IOError(f"Failed to read blob data: {e}")
+
+ def to_descriptor(self) -> BlobDescriptor:
+ return self._descriptor
+
+ def new_input_stream(self) -> io.BytesIO:
+ uri = self._descriptor.uri
+ offset = self._descriptor.offset
+ length = self._descriptor.length
+
+ if uri.startswith('http://') or uri.startswith('https://'):
+ raise NotImplementedError("HTTP blob reading not implemented yet")
+ elif uri.startswith('file://') or '://' not in uri:
+ file_path = uri.replace('file://', '') if
uri.startswith('file://') else uri
+
+ try:
+ with open(file_path, 'rb') as f:
+ if offset > 0:
+ f.seek(offset)
+
+ if length == -1:
+ data = f.read()
+ else:
+ data = f.read(length)
+
+ return io.BytesIO(data)
+ except Exception as e:
+ raise IOError(f"Failed to read file {file_path}: {e}")
+ else:
+ raise ValueError(f"Unsupported URI scheme: {uri}")
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, BlobRef):
+ return False
+ return self._descriptor == other._descriptor
+
+ def __hash__(self) -> int:
+ return hash(self._descriptor)
diff --git a/paimon-python/pypaimon/table/row/generic_row.py
b/paimon-python/pypaimon/table/row/generic_row.py
index a7612168d9..5f8f9f6d9b 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -24,6 +24,7 @@ from typing import Any, List
from pypaimon.schema.data_types import AtomicType, DataField, DataType
from pypaimon.table.row.row_kind import RowKind
+from pypaimon.table.row.blob import BlobData
@dataclass
@@ -111,6 +112,8 @@ class GenericRowDeserializer:
return cls._parse_string(bytes_data, base_offset, field_offset)
elif type_name.startswith('BINARY') or
type_name.startswith('VARBINARY') or type_name == 'BYTES':
return cls._parse_binary(bytes_data, base_offset, field_offset)
+ elif type_name == 'BLOB':
+ return cls._parse_blob(bytes_data, base_offset, field_offset)
elif type_name.startswith('DECIMAL') or
type_name.startswith('NUMERIC'):
return cls._parse_decimal(bytes_data, base_offset, field_offset,
data_type)
elif type_name.startswith('TIMESTAMP'):
@@ -193,6 +196,13 @@ class GenericRowDeserializer:
length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
return bytes_data[field_offset:field_offset + length]
+ @classmethod
+ def _parse_blob(cls, bytes_data: bytes, base_offset: int, field_offset:
int) -> BlobData:
+ """Parse BLOB data from binary format and return a BlobData
instance."""
+ # BLOB uses the same binary format as regular binary data
+ binary_data = cls._parse_binary(bytes_data, base_offset, field_offset)
+ return BlobData.from_bytes(binary_data)
+
@classmethod
def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset:
int, data_type: DataType) -> Decimal:
unscaled_long = struct.unpack('<q',
bytes_data[field_offset:field_offset + 8])[0]
@@ -260,9 +270,12 @@ class GenericRowSerializer:
raise ValueError(f"BinaryRow only support AtomicType yet, meet
{field.type.__class__}")
type_name = field.type.type.upper()
- if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR',
'STRING', 'BINARY', 'VARBINARY', 'BYTES']):
+ if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR',
'STRING',
+ 'BINARY', 'VARBINARY',
'BYTES', 'BLOB']):
if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR',
'STRING']):
value_bytes = str(value).encode('utf-8')
+ elif type_name == 'BLOB':
+ value_bytes = value.to_data()
else:
value_bytes = bytes(value)
diff --git a/paimon-python/pypaimon/tests/blob_test.py
b/paimon-python/pypaimon/tests/blob_test.py
new file mode 100644
index 0000000000..b8ed65e5bb
--- /dev/null
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -0,0 +1,983 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import shutil
+import tempfile
+import unittest
+from pathlib import Path
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor
+from pypaimon.table.row.generic_row import GenericRowDeserializer,
GenericRowSerializer, GenericRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class MockFileIO:
+ """Mock FileIO for testing."""
+
+ def __init__(self, file_io: FileIO):
+ self._file_io = file_io
+
+ def get_file_size(self, path: str) -> int:
+ """Get file size."""
+ return self._file_io.get_file_size(Path(path))
+
+ def new_input_stream(self, path: Path):
+ """Create new input stream for reading."""
+ return self._file_io.new_input_stream(path)
+
+
+class BlobTest(unittest.TestCase):
+ """Tests for Blob interface following org.apache.paimon.data.BlobTest."""
+
+ def setUp(self):
+ """Set up test environment with temporary file."""
+ # Create a temporary directory and file
+ self.temp_dir = tempfile.mkdtemp()
+ self.file = os.path.join(self.temp_dir, "test.txt")
+
+ # Write test data to the file
+ with open(self.file, 'wb') as f:
+ f.write(b"test data")
+
+ def tearDown(self):
+ """Clean up temporary files."""
+ try:
+ if os.path.exists(self.file):
+ os.remove(self.file)
+ os.rmdir(self.temp_dir)
+ except OSError:
+ pass # Ignore cleanup errors
+
+ def test_from_data(self):
+ """Test Blob.from_data() method."""
+ test_data = b"test data"
+ blob = Blob.from_data(test_data)
+
+ # Verify it returns a BlobData instance
+ self.assertIsInstance(blob, BlobData)
+
+ # Verify the data matches
+ self.assertEqual(blob.to_data(), test_data)
+
+ def test_from_local(self):
+ """Test Blob.from_local() method."""
+ blob = Blob.from_local(self.file)
+
+ # Verify it returns a BlobRef instance
+ self.assertIsInstance(blob, BlobRef)
+
+ # Verify the data matches
+ self.assertEqual(blob.to_data(), b"test data")
+
+ def test_from_file_with_offset_and_length(self):
+ """Test Blob.from_file() method with offset and length."""
+ blob = Blob.from_file(self.file, 0, 4)
+
+ # Verify it returns a BlobRef instance
+ self.assertIsInstance(blob, BlobRef)
+
+ # Verify the data matches (first 4 bytes: "test")
+ self.assertEqual(blob.to_data(), b"test")
+
+ def test_from_file_full(self):
+ """Test Blob.from_file() method without offset and length."""
+ blob = Blob.from_file(self.file)
+
+ # Verify it returns a BlobRef instance
+ self.assertIsInstance(blob, BlobRef)
+
+ # Verify the data matches
+ self.assertEqual(blob.to_data(), b"test data")
+
+ def test_from_http(self):
+ """Test Blob.from_http() method."""
+ uri = "http://example.com/file.txt"
+ blob = Blob.from_http(uri)
+
+ # Verify it returns a BlobRef instance
+ self.assertIsInstance(blob, BlobRef)
+
+ # Verify the descriptor has the correct URI
+ descriptor = blob.to_descriptor()
+ self.assertEqual(descriptor.uri, uri)
+ self.assertEqual(descriptor.offset, 0)
+ self.assertEqual(descriptor.length, -1)
+
+ def test_blob_data_interface_compliance(self):
+ """Test that BlobData properly implements Blob interface."""
+ test_data = b"interface test data"
+ blob_data = BlobData(test_data)
+
+ # Test that it's a Blob
+ self.assertIsInstance(blob_data, Blob)
+
+ # Test interface methods
+ self.assertEqual(blob_data.to_data(), test_data)
+
+ # Test to_descriptor raises RuntimeError
+ with self.assertRaises(RuntimeError) as context:
+ blob_data.to_descriptor()
+ self.assertIn("Blob data can not convert to descriptor",
str(context.exception))
+
+ # Test new_input_stream
+ stream = blob_data.new_input_stream()
+ self.assertEqual(stream.read(), test_data)
+ stream.close()
+
+ def test_blob_ref_interface_compliance(self):
+ """Test that BlobRef properly implements Blob interface."""
+ blob_ref = Blob.from_local(self.file)
+
+ # Test that it's a Blob
+ self.assertIsInstance(blob_ref, Blob)
+
+ # Test interface methods
+ self.assertEqual(blob_ref.to_data(), b"test data")
+
+ # Test to_descriptor returns valid descriptor
+ descriptor = blob_ref.to_descriptor()
+ self.assertEqual(descriptor.uri, self.file)
+ self.assertEqual(descriptor.offset, 0)
+ self.assertEqual(descriptor.length, -1)
+
+ # Test new_input_stream
+ stream = blob_ref.new_input_stream()
+ self.assertEqual(stream.read(), b"test data")
+ stream.close()
+
+ def test_blob_equality_and_hashing(self):
+ """Test blob equality and hashing behavior."""
+ # Test BlobData equality
+ data1 = BlobData(b"same data")
+ data2 = BlobData(b"same data")
+ data3 = BlobData(b"different data")
+
+ self.assertEqual(data1, data2)
+ self.assertNotEqual(data1, data3)
+ self.assertEqual(hash(data1), hash(data2))
+
+ # Test BlobRef equality
+ ref1 = Blob.from_local(self.file)
+ ref2 = Blob.from_local(self.file)
+
+ self.assertEqual(ref1, ref2)
+ self.assertEqual(hash(ref1), hash(ref2))
+
+ def test_blob_factory_methods_return_correct_types(self):
+ """Test that all factory methods return the expected types."""
+ # from_data should return BlobData
+ blob_data = Blob.from_data(b"test")
+ self.assertIsInstance(blob_data, BlobData)
+ self.assertIsInstance(blob_data, Blob)
+
+ # from_local should return BlobRef
+ blob_ref = Blob.from_local(self.file)
+ self.assertIsInstance(blob_ref, BlobRef)
+ self.assertIsInstance(blob_ref, Blob)
+
+ # from_file should return BlobRef
+ blob_file = Blob.from_file(self.file)
+ self.assertIsInstance(blob_file, BlobRef)
+ self.assertIsInstance(blob_file, Blob)
+
+ # from_http should return BlobRef
+ blob_http = Blob.from_http("http://example.com/test.bin")
+ self.assertIsInstance(blob_http, BlobRef)
+ self.assertIsInstance(blob_http, Blob)
+
+ def test_blob_data_convenience_methods(self):
+ # Test from_bytes class method
+ blob2 = BlobData.from_bytes(b"from bytes")
+ self.assertEqual(blob2.to_data(), b"from bytes")
+
+ def test_generic_row_deserializer_parse_blob(self):
+ """Test GenericRowDeserializer._parse_blob method."""
+ # Create test data with BLOB field
+ test_blob_data = b"Test BLOB data for parsing"
+ blob_data = BlobData(test_blob_data)
+
+ # Create fields with BLOB type
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "blob_field", AtomicType("BLOB")),
+ ]
+
+ # Create and serialize a row with blob data
+ original_row = GenericRow([42, blob_data], fields, RowKind.INSERT)
+ serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+ # Test the full deserialization process (which uses _parse_blob
internally)
+ deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes,
fields)
+
+ # Verify the deserialized blob
+ deserialized_blob = deserialized_row.values[1]
+ self.assertIsInstance(deserialized_blob, BlobData)
+ self.assertEqual(deserialized_blob.to_data(), test_blob_data)
+
+ # Test with empty blob data
+ empty_blob = BlobData(b"")
+ empty_row = GenericRow([1, empty_blob], fields, RowKind.INSERT)
+ empty_serialized = GenericRowSerializer.to_bytes(empty_row)
+ empty_deserialized =
GenericRowDeserializer.from_bytes(empty_serialized, fields)
+
+ empty_deserialized_blob = empty_deserialized.values[1]
+ self.assertIsInstance(empty_deserialized_blob, BlobData)
+ self.assertEqual(empty_deserialized_blob.to_data(), b"")
+
+ # Test with binary data containing null bytes
+ binary_blob_data = b"\x00\x01\x02\x03\xff\xfe\xfd"
+ binary_blob = BlobData(binary_blob_data)
+ binary_row = GenericRow([99, binary_blob], fields, RowKind.INSERT)
+ binary_serialized = GenericRowSerializer.to_bytes(binary_row)
+ binary_deserialized =
GenericRowDeserializer.from_bytes(binary_serialized, fields)
+
+ binary_deserialized_blob = binary_deserialized.values[1]
+ self.assertIsInstance(binary_deserialized_blob, BlobData)
+ self.assertEqual(binary_deserialized_blob.to_data(), binary_blob_data)
+
+ def test_generic_row_deserializer_parse_blob_with_multiple_fields(self):
+ """Test _parse_blob with multiple BLOB fields in a row."""
+ # Create test data with multiple BLOB fields
+ blob1_data = b"First BLOB data"
+ blob2_data = b"Second BLOB with different content"
+ blob3_data = b"" # Empty blob
+
+ blob1 = BlobData(blob1_data)
+ blob2 = BlobData(blob2_data)
+ blob3 = BlobData(blob3_data)
+
+ # Create fields with multiple BLOB types
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "name", AtomicType("STRING")),
+ DataField(2, "blob1", AtomicType("BLOB")),
+ DataField(3, "blob2", AtomicType("BLOB")),
+ DataField(4, "blob3", AtomicType("BLOB")),
+ ]
+
+ # Create and serialize a row with multiple blobs
+ original_row = GenericRow([123, "test_row", blob1, blob2, blob3],
fields, RowKind.INSERT)
+ serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+ # Deserialize and verify all blobs
+ deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes,
fields)
+
+ # Verify each blob field
+ self.assertEqual(deserialized_row.values[0], 123)
+ self.assertEqual(deserialized_row.values[1], "test_row")
+
+ deserialized_blob1 = deserialized_row.values[2]
+ self.assertIsInstance(deserialized_blob1, BlobData)
+ self.assertEqual(deserialized_blob1.to_data(), blob1_data)
+
+ deserialized_blob2 = deserialized_row.values[3]
+ self.assertIsInstance(deserialized_blob2, BlobData)
+ self.assertEqual(deserialized_blob2.to_data(), blob2_data)
+
+ deserialized_blob3 = deserialized_row.values[4]
+ self.assertIsInstance(deserialized_blob3, BlobData)
+ self.assertEqual(deserialized_blob3.to_data(), blob3_data)
+
+ def test_generic_row_deserializer_parse_blob_with_null_values(self):
+ """Test _parse_blob with null BLOB values."""
+ # Create fields with BLOB type
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "blob_field", AtomicType("BLOB")),
+ DataField(2, "name", AtomicType("STRING")),
+ ]
+
+ # Create row with null blob (None value)
+ original_row = GenericRow([456, None, "test_with_null"], fields,
RowKind.INSERT)
+ serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+ # Deserialize and verify null blob is handled correctly
+ deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes,
fields)
+
+ self.assertEqual(deserialized_row.values[0], 456)
+ self.assertIsNone(deserialized_row.values[1]) # Null blob should
remain None
+ self.assertEqual(deserialized_row.values[2], "test_with_null")
+
+ def test_generic_row_deserializer_parse_blob_large_data(self):
+ """Test _parse_blob with large BLOB data."""
+ # Create large blob data (1MB)
+ large_blob_data = b"X" * (1024 * 1024) # 1MB of 'X' characters
+ large_blob = BlobData(large_blob_data)
+
+ fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "large_blob", AtomicType("BLOB")),
+ ]
+
+ # Create and serialize row with large blob
+ original_row = GenericRow([789, large_blob], fields, RowKind.INSERT)
+ serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+ # Deserialize and verify large blob
+ deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes,
fields)
+
+ deserialized_large_blob = deserialized_row.values[1]
+ self.assertIsInstance(deserialized_large_blob, BlobData)
+ self.assertEqual(len(deserialized_large_blob.to_data()), 1024 * 1024)
+ self.assertEqual(deserialized_large_blob.to_data(), large_blob_data)
+
+ def test_blob_descriptor_creation(self):
+ """Test BlobDescriptor creation and properties."""
+ # Test basic creation
+ descriptor = BlobDescriptor("test://example.uri", 100, 200)
+
+ self.assertEqual(descriptor.uri, "test://example.uri")
+ self.assertEqual(descriptor.offset, 100)
+ self.assertEqual(descriptor.length, 200)
+ self.assertEqual(descriptor.version, BlobDescriptor.CURRENT_VERSION)
+
+ def test_blob_descriptor_creation_with_version(self):
+ """Test BlobDescriptor creation with explicit version."""
+ descriptor = BlobDescriptor("test://example.uri", 50, 150, version=2)
+
+ self.assertEqual(descriptor.uri, "test://example.uri")
+ self.assertEqual(descriptor.offset, 50)
+ self.assertEqual(descriptor.length, 150)
+ self.assertEqual(descriptor.version, 2)
+
+ def test_blob_descriptor_serialization_deserialization(self):
+ """Test BlobDescriptor serialization and deserialization."""
+ # Test with various URIs and parameters
+ test_cases = [
+ ("file:///path/to/file.bin", 0, -1),
+ ("https://example.com/data.blob", 1024, 2048),
+ ("s3://bucket/key", 0, 1000000),
+ ("test://simple", 42, 84),
+ ]
+
+ for uri, offset, length in test_cases:
+ with self.subTest(uri=uri, offset=offset, length=length):
+ # Create original descriptor
+ original = BlobDescriptor(uri, offset, length)
+
+ # Serialize
+ serialized = original.serialize()
+ self.assertIsInstance(serialized, bytes)
+ self.assertGreater(len(serialized), 0)
+
+ # Deserialize
+ deserialized = BlobDescriptor.deserialize(serialized)
+
+ # Verify equality
+ self.assertEqual(deserialized, original)
+ self.assertEqual(deserialized.uri, uri)
+ self.assertEqual(deserialized.offset, offset)
+ self.assertEqual(deserialized.length, length)
+ self.assertEqual(deserialized.version,
BlobDescriptor.CURRENT_VERSION)
+
+ def test_blob_descriptor_serialization_with_unicode(self):
+ """Test BlobDescriptor serialization with Unicode characters."""
+ # Test with Unicode characters in URI
+ unicode_uri = "file:///测试/文件.bin"
+ descriptor = BlobDescriptor(unicode_uri, 0, 100)
+
+ # Serialize and deserialize
+ serialized = descriptor.serialize()
+ deserialized = BlobDescriptor.deserialize(serialized)
+
+ # Verify Unicode is preserved
+ self.assertEqual(deserialized.uri, unicode_uri)
+ self.assertEqual(deserialized, descriptor)
+
+ def test_blob_descriptor_deserialization_invalid_data(self):
+ """Test BlobDescriptor deserialization with invalid data."""
+ # Test with too short data
+ with self.assertRaises(ValueError) as context:
+ BlobDescriptor.deserialize(b"sho") # Only 3 bytes, need at least 5
+ self.assertIn("too short", str(context.exception))
+
+ # Test with invalid version (version 0)
+ # Create valid data but with wrong version
+ valid_descriptor = BlobDescriptor("test://uri", 0, 100)
+ valid_data = bytearray(valid_descriptor.serialize())
+ valid_data[0] = 0 # Set invalid version (0)
+
+ with self.assertRaises(ValueError) as context:
+ BlobDescriptor.deserialize(bytes(valid_data))
+ self.assertIn("Unsupported BlobDescriptor version",
str(context.exception))
+
+ # Test with incomplete data (missing URI bytes)
+ incomplete_data = b'\x01\x00\x00\x00\x10' # Version 1, URI length 16,
but no URI bytes
+ with self.assertRaises(ValueError) as context:
+ BlobDescriptor.deserialize(incomplete_data)
+ self.assertIn("URI length exceeds data size", str(context.exception))
+
+ def test_blob_descriptor_equality_and_hashing(self):
+ """Test BlobDescriptor equality and hashing."""
+ # Create identical descriptors
+ desc1 = BlobDescriptor("test://uri", 100, 200)
+ desc2 = BlobDescriptor("test://uri", 100, 200)
+ desc3 = BlobDescriptor("test://uri", 100, 201) # Different length
+ desc4 = BlobDescriptor("test://other", 100, 200) # Different URI
+
+ # Test equality
+ self.assertEqual(desc1, desc2)
+ self.assertNotEqual(desc1, desc3)
+ self.assertNotEqual(desc1, desc4)
+ self.assertNotEqual(desc1, None)
+ self.assertNotEqual(desc1, "not a descriptor")
+
+ # Test hashing
+ self.assertEqual(hash(desc1), hash(desc2))
+ # Hash should be different for different descriptors (though not
guaranteed)
+ self.assertNotEqual(hash(desc1), hash(desc3))
+ self.assertNotEqual(hash(desc1), hash(desc4))
+
+ def test_blob_descriptor_string_representation(self):
+ """Test BlobDescriptor string representation."""
+ descriptor = BlobDescriptor("test://example.uri", 42, 84)
+
+ str_repr = str(descriptor)
+ self.assertIn("test://example.uri", str_repr)
+ self.assertIn("42", str_repr)
+ self.assertIn("84", str_repr)
+ self.assertIn("BlobDescriptor", str_repr)
+
+ # __repr__ should be the same as __str__
+ self.assertEqual(str_repr, repr(descriptor))
+
+ def test_blob_descriptor_version_handling(self):
+ """Test BlobDescriptor version handling."""
+ # Test current version
+ descriptor = BlobDescriptor("test://uri", 0, 100)
+ self.assertEqual(descriptor.version, BlobDescriptor.CURRENT_VERSION)
+
+ # Test explicit version
+ descriptor_v2 = BlobDescriptor("test://uri", 0, 100, version=2)
+ self.assertEqual(descriptor_v2.version, 2)
+
+ # Serialize and deserialize should preserve version
+ serialized = descriptor_v2.serialize()
+ deserialized = BlobDescriptor.deserialize(serialized)
+ self.assertEqual(deserialized.version, 2)
+
+ def test_blob_descriptor_edge_cases(self):
+ """Test BlobDescriptor with edge cases."""
+ # Test with empty URI
+ empty_uri_desc = BlobDescriptor("", 0, 0)
+ serialized = empty_uri_desc.serialize()
+ deserialized = BlobDescriptor.deserialize(serialized)
+ self.assertEqual(deserialized.uri, "")
+
+ # Test with very long URI
+ long_uri = "file://" + "a" * 1000 + "/file.bin"
+ long_uri_desc = BlobDescriptor(long_uri, 0, 1000000)
+ serialized = long_uri_desc.serialize()
+ deserialized = BlobDescriptor.deserialize(serialized)
+ self.assertEqual(deserialized.uri, long_uri)
+
+ # Test with negative values
+ negative_desc = BlobDescriptor("test://uri", -1, -1)
+ serialized = negative_desc.serialize()
+ deserialized = BlobDescriptor.deserialize(serialized)
+ self.assertEqual(deserialized.offset, -1)
+ self.assertEqual(deserialized.length, -1)
+
+ def test_blob_descriptor_with_blob_ref(self):
+ """Test BlobDescriptor integration with BlobRef."""
+ # Create a descriptor
+ descriptor = BlobDescriptor(self.file, 0, -1)
+
+ # Create BlobRef from descriptor
+ blob_ref = BlobRef(descriptor)
+
+ # Verify descriptor is preserved
+ returned_descriptor = blob_ref.to_descriptor()
+ self.assertEqual(returned_descriptor, descriptor)
+
+ # Verify data can be read through BlobRef
+ data = blob_ref.to_data()
+ self.assertEqual(data, b"test data")
+
+ def test_blob_descriptor_serialization_format(self):
+ """Test BlobDescriptor serialization format details."""
+ descriptor = BlobDescriptor("test", 12345, 67890)
+ serialized = descriptor.serialize()
+
+ # Check that serialized data starts with version byte
+ self.assertEqual(serialized[0], BlobDescriptor.CURRENT_VERSION)
+
+ # Check minimum length (version + uri_length + uri + offset + length)
+ # 1 + 4 + len("test") + 8 + 8 = 25 bytes
+ self.assertEqual(len(serialized), 25)
+
+ # Verify round-trip consistency
+ deserialized = BlobDescriptor.deserialize(serialized)
+ re_serialized = deserialized.serialize()
+ self.assertEqual(serialized, re_serialized)
+
+
+class BlobEndToEndTest(unittest.TestCase):
+ """End-to-end tests for blob functionality with schema definition, file
writing, and reading."""
+
+ def setUp(self):
+ """Set up test environment."""
+ self.temp_dir = tempfile.mkdtemp()
+ self.warehouse = os.path.join(self.temp_dir, 'warehouse')
+ # Create catalog for table operations
+ self.catalog = CatalogFactory.create({
+ 'warehouse': self.warehouse
+ })
+ self.catalog.create_database('test_db', False)
+
+ def tearDown(self):
+ """Clean up test environment."""
+ try:
+ shutil.rmtree(self.temp_dir)
+ except OSError:
+ pass
+
+ def test_blob_end_to_end(self):
+ from pypaimon.schema.data_types import DataField, AtomicType
+ from pypaimon.table.row.blob import BlobData
+ from pypaimon.common.file_io import FileIO
+ from pathlib import Path
+
+ # Set up file I/O
+ file_io = FileIO(self.temp_dir, {})
+
+ blob_field_name = "blob_field"
+ # ========== Step 1: Check Type Validation ==========
+ blob_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+ for blob_field in blob_fields:
+ self.assertIsInstance(blob_field.type, AtomicType)
+ self.assertEqual(blob_field.type.type, "BLOB")
+
+ # ========== Step 2: Write Data ==========
+ test_data = {blob_field_name: BlobData(b'End-to-end test: PDF header
%PDF-1.4\n...')}
+ blob_files = {}
+ blob_data = [test_data[blob_field_name].to_data()]
+ schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+ table = pa.table([blob_data], schema=schema)
+ blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name +
".blob")
+ file_io.write_blob(blob_files[blob_field_name], table)
+ self.assertTrue(file_io.exists(blob_files[blob_field_name]))
+
+ # ========== Step 3: Read Data and Check Data ==========
+ for field_name, file_path in blob_files.items():
+ read_fields = blob_fields
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(file_path),
+ read_fields=[field_name],
+ full_fields=read_fields,
+ push_down_predicate=None
+ )
+
+ # Read data
+ batch = reader.read_arrow_batch()
+ self.assertIsNotNone(batch, f"{field_name} batch should not be
None")
+ self.assertEqual(batch.num_rows, 1, f"{field_name} should have 1
row")
+
+ # Verify data integrity
+ read_blob_data = batch.column(0)[0].as_py()
+ expected_blob_data = test_data[field_name].to_data()
+ self.assertEqual(read_blob_data, expected_blob_data,
f"{field_name} data should match")
+
+ reader.close()
+
+ def test_blob_complex_types_throw_exception(self):
+ """Test that complex types containing BLOB elements throw exceptions
during read/write operations."""
+ from pypaimon.schema.data_types import DataField, AtomicType,
ArrayType, MultisetType, MapType
+ from pypaimon.table.row.blob import BlobData
+ from pypaimon.common.file_io import FileIO
+ from pypaimon.table.row.generic_row import GenericRow,
GenericRowSerializer
+ from pypaimon.table.row.row_kind import RowKind
+ from pathlib import Path
+
+ # Set up file I/O
+ file_io = FileIO(self.temp_dir, {})
+
+ # ========== Test ArrayType(nullable=True,
element_type=AtomicType("BLOB")) ==========
+ array_fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "blob_array", ArrayType(nullable=True,
element_type=AtomicType("BLOB"))),
+ ]
+
+ # Test serialization throws exception for ArrayType<BLOB>
+ array_blob_data = [
+ BlobData(b"Array blob 1"),
+ BlobData(b"Array blob 2"),
+ BlobData(b"Array blob 3")
+ ]
+
+ array_row = GenericRow([1, array_blob_data], array_fields,
RowKind.INSERT)
+
+ # GenericRowSerializer should throw exception for complex types
+ with self.assertRaises(ValueError) as context:
+ GenericRowSerializer.to_bytes(array_row)
+ self.assertIn("AtomicType", str(context.exception))
+
+ # Note: FileIO.write_blob validation for complex types is tested
separately below
+
+ # ========== Test MultisetType(nullable=True,
element_type=AtomicType("BLOB")) ==========
+ multiset_fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "blob_multiset", MultisetType(nullable=True,
element_type=AtomicType("BLOB"))),
+ ]
+
+ # Test serialization throws exception for MultisetType<BLOB>
+ multiset_blob_data = [
+ BlobData(b"Multiset blob 1"),
+ BlobData(b"Multiset blob 2"),
+ BlobData(b"Multiset blob 1"), # Duplicate allowed in multiset
+ ]
+
+ multiset_row = GenericRow([2, multiset_blob_data], multiset_fields,
RowKind.INSERT)
+
+ # GenericRowSerializer should throw exception for complex types
+ with self.assertRaises(ValueError) as context:
+ GenericRowSerializer.to_bytes(multiset_row)
+ self.assertIn("AtomicType", str(context.exception))
+ map_fields = [
+ DataField(0, "id", AtomicType("INT")),
+ DataField(1, "blob_map", MapType(
+ nullable=True, key_type=AtomicType("STRING"),
value_type=AtomicType("BLOB")
+ )),
+ ]
+
+ # Test serialization throws exception for MapType<STRING, BLOB>
+ map_blob_data = {
+ "document": BlobData(b"Document content"),
+ "image": BlobData(b"Image data"),
+ "metadata": BlobData(b"Metadata content")
+ }
+
+ map_row = GenericRow([3, map_blob_data], map_fields, RowKind.INSERT)
+
+ # GenericRowSerializer should throw exception for complex types
+ with self.assertRaises(ValueError) as context:
+ GenericRowSerializer.to_bytes(map_row)
+ self.assertIn("AtomicType", str(context.exception))
+
+ # ========== Test FileIO.write_blob validation for complex types
==========
+ # Test that FileIO.write_blob properly validates and rejects complex
types
+
+ # Create a table with multiple columns (should fail - blob format
requires single column)
+ multi_column_schema = pa.schema([
+ pa.field("blob1", pa.large_binary()),
+ pa.field("blob2", pa.large_binary())
+ ])
+ multi_column_table = pa.table([
+ [b"blob1_data"],
+ [b"blob2_data"]
+ ], schema=multi_column_schema)
+
+ multi_column_file = Path(self.temp_dir) / "multi_column.blob"
+
+ # Should throw RuntimeError for multiple columns
+ with self.assertRaises(RuntimeError) as context:
+ file_io.write_blob(multi_column_file, multi_column_table)
+ self.assertIn("single column", str(context.exception))
+
+ # Test that FileIO.write_blob rejects null values
+ null_schema = pa.schema([pa.field("blob_with_nulls",
pa.large_binary())])
+ null_table = pa.table([[b"data", None]], schema=null_schema)
+
+ null_file = Path(self.temp_dir) / "null_data.blob"
+
+ # Should throw RuntimeError for null values
+ with self.assertRaises(RuntimeError) as context:
+ file_io.write_blob(null_file, null_table)
+ self.assertIn("null values", str(context.exception))
+
+ # ========== Test FormatBlobReader with complex type schema ==========
+ # Create a valid blob file first
+ valid_blob_data = [b"Valid blob content"]
+ valid_schema = pa.schema([pa.field("valid_blob", pa.large_binary())])
+ valid_table = pa.table([valid_blob_data], schema=valid_schema)
+
+ valid_blob_file = Path(self.temp_dir) / "valid_blob.blob"
+ file_io.write_blob(valid_blob_file, valid_table)
+
+ # Try to read with complex type field definition - this should fail
+ # because FormatBlobReader tries to create PyArrow schema with complex
types
+ complex_read_fields = [
+ DataField(0, "valid_blob", ArrayType(nullable=True,
element_type=AtomicType("BLOB")))
+ ]
+
+ # FormatBlobReader creation should work, but reading should fail due
to schema mismatch
+ reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(valid_blob_file),
+ read_fields=["valid_blob"],
+ full_fields=complex_read_fields,
+ push_down_predicate=None
+ )
+
+ # Reading should fail because the schema expects complex type but data
is atomic
+ with self.assertRaises(Exception) as context:
+ reader.read_arrow_batch()
+ # The error could be ArrowTypeError or other PyArrow-related errors
+ self.assertTrue(
+ "ArrowTypeError" in str(type(context.exception)) or
+ "TypeError" in str(type(context.exception)) or
+ "ValueError" in str(type(context.exception))
+ )
+
+ reader.close()
+
+ def test_blob_advanced_scenarios(self):
+ """Test advanced blob scenarios: corruption, truncation, zero-length,
large blobs, compression, cross-format."""
+ from pypaimon.schema.data_types import DataField, AtomicType
+ from pypaimon.common.file_io import FileIO
+ from pypaimon.common.delta_varint_compressor import
DeltaVarintCompressor
+ from pathlib import Path
+
+ # Set up file I/O
+ file_io = FileIO(self.temp_dir, {})
+
+ # ========== Test 1: Corrupted file header test ==========
+
+ # Create a valid blob file first
+ valid_blob_data = [b"Test blob content for corruption test"]
+ valid_schema = pa.schema([pa.field("test_blob", pa.large_binary())])
+ valid_table = pa.table([valid_blob_data], schema=valid_schema)
+
+ header_test_file = Path(self.temp_dir) / "header_test.blob"
+ file_io.write_blob(header_test_file, valid_table)
+
+ # Read the file and corrupt the header (last 5 bytes: index_length +
version)
+ with open(header_test_file, 'rb') as f:
+ original_data = f.read()
+
+ # Corrupt the version byte (last byte)
+ corrupted_data = bytearray(original_data)
+ corrupted_data[-1] = 99 # Invalid version (should be 1)
+
+ corrupted_header_file = Path(self.temp_dir) / "corrupted_header.blob"
+ with open(corrupted_header_file, 'wb') as f:
+ f.write(corrupted_data)
+
+ # Try to read corrupted file - should detect invalid version
+ fields = [DataField(0, "test_blob", AtomicType("BLOB"))]
+
+ # Reading should fail due to invalid version
+ with self.assertRaises(IOError) as context:
+ FormatBlobReader(
+ file_io=file_io,
+ file_path=str(corrupted_header_file),
+ read_fields=["test_blob"],
+ full_fields=fields,
+ push_down_predicate=None
+ )
+ self.assertIn("Unsupported blob file version", str(context.exception))
+
+ # ========== Test 2: Truncated blob file (mid-blob) read ==========
+
+ # Create a blob file with substantial content
+ large_content = b"Large blob content: " + b"X" * 1000 + b" End of
content"
+ large_blob_data = [large_content]
+ large_schema = pa.schema([pa.field("large_blob", pa.large_binary())])
+ large_table = pa.table([large_blob_data], schema=large_schema)
+
+ full_blob_file = Path(self.temp_dir) / "full_blob.blob"
+ file_io.write_blob(full_blob_file, large_table)
+
+ # Read the full file and truncate it in the middle
+ with open(full_blob_file, 'rb') as f:
+ full_data = f.read()
+
+ # Truncate to about 50% of original size (mid-blob)
+ truncated_size = len(full_data) // 2
+ truncated_data = full_data[:truncated_size]
+
+ truncated_file = Path(self.temp_dir) / "truncated.blob"
+ with open(truncated_file, 'wb') as f:
+ f.write(truncated_data)
+
+ # Try to read truncated file - should fail gracefully
+ with self.assertRaises((IOError, OSError)) as context:
+ FormatBlobReader(
+ file_io=file_io,
+ file_path=str(truncated_file),
+ read_fields=["large_blob"],
+ full_fields=fields,
+ push_down_predicate=None
+ )
+ # Should detect truncation/incomplete data (either invalid header or
invalid version)
+ self.assertTrue(
+ "cannot read header" in str(context.exception) or
+ "Unsupported blob file version" in str(context.exception)
+ )
+
+ # ========== Test 3: Zero-length blob handling ==========
+
+ # Create blob with zero-length content
+ zero_blob_data = [b""] # Empty blob
+ zero_schema = pa.schema([pa.field("zero_blob", pa.large_binary())])
+ zero_table = pa.table([zero_blob_data], schema=zero_schema)
+
+ zero_blob_file = Path(self.temp_dir) / "zero_length.blob"
+ file_io.write_blob(zero_blob_file, zero_table)
+
+ # Verify file was created
+ self.assertTrue(file_io.exists(zero_blob_file))
+ file_size = file_io.get_file_size(zero_blob_file)
+ self.assertGreater(file_size, 0) # File should have headers even with
empty blob
+
+ # Read zero-length blob
+ zero_fields = [DataField(0, "zero_blob", AtomicType("BLOB"))]
+ zero_reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(zero_blob_file),
+ read_fields=["zero_blob"],
+ full_fields=zero_fields,
+ push_down_predicate=None
+ )
+
+ zero_batch = zero_reader.read_arrow_batch()
+ self.assertIsNotNone(zero_batch)
+ self.assertEqual(zero_batch.num_rows, 1)
+
+ # Verify empty blob content
+ read_zero_blob = zero_batch.column(0)[0].as_py()
+ self.assertEqual(read_zero_blob, b"")
+ self.assertEqual(len(read_zero_blob), 0)
+ zero_reader.close()
+
+ # ========== Test 4: Large blob (multi-GB range) simulation ==========
+ # Simulate large blob without actually creating multi-GB data
+ # Test chunked writing and memory-safe reading patterns
+
+ # Create moderately large blob (10MB) to test chunking behavior
+ chunk_size = 1024 * 1024 # 1MB chunks
+ large_blob_content = b"LARGE_BLOB_CHUNK:" + b"L" * (chunk_size - 17)
# Fill to 1MB
+
+ # Simulate multiple chunks
+ simulated_large_data = [large_blob_content * 10] # 10MB total
+ large_sim_schema = pa.schema([pa.field("large_sim_blob",
pa.large_binary())])
+ large_sim_table = pa.table([simulated_large_data],
schema=large_sim_schema)
+
+ large_sim_file = Path(self.temp_dir) / "large_simulation.blob"
+ file_io.write_blob(large_sim_file, large_sim_table)
+
+ # Verify large file was written
+ large_sim_size = file_io.get_file_size(large_sim_file)
+ self.assertGreater(large_sim_size, 10 * 1024 * 1024) # Should be >
10MB
+
+ # Read large blob in memory-safe manner
+ large_sim_fields = [DataField(0, "large_sim_blob", AtomicType("BLOB"))]
+ large_sim_reader = FormatBlobReader(
+ file_io=file_io,
+ file_path=str(large_sim_file),
+ read_fields=["large_sim_blob"],
+ full_fields=large_sim_fields,
+ push_down_predicate=None
+ )
+
+ large_sim_batch = large_sim_reader.read_arrow_batch()
+ self.assertIsNotNone(large_sim_batch)
+ self.assertEqual(large_sim_batch.num_rows, 1)
+
+ # Verify large blob content (check prefix to avoid loading all into
memory for comparison)
+ read_large_blob = large_sim_batch.column(0)[0].as_py()
+ self.assertTrue(read_large_blob.startswith(b"LARGE_BLOB_CHUNK:"))
+ self.assertEqual(len(read_large_blob), len(large_blob_content) * 10)
+ large_sim_reader.close()
+
+ # ========== Test 5: Index compression/decompression validation
==========
+ # Test DeltaVarintCompressor roundtrip
+ test_indices = [0, 100, 250, 1000, 5000, 10000, 50000]
+
+ # Compress indices
+ compressed_indices = DeltaVarintCompressor.compress(test_indices)
+ self.assertIsInstance(compressed_indices, bytes)
+ self.assertGreater(len(compressed_indices), 0)
+
+ # Decompress indices
+ decompressed_indices =
DeltaVarintCompressor.decompress(compressed_indices)
+ self.assertEqual(decompressed_indices, test_indices)
+
+ # Test corruption detection in compressed indices
+ if len(compressed_indices) > 1:
+ # Corrupt the compressed data
+ corrupted_indices = bytearray(compressed_indices)
+ corrupted_indices[-1] = (corrupted_indices[-1] + 1) % 256 # Flip
last byte
+
+ # Decompression should fail or produce different results
+ try:
+ corrupted_result =
DeltaVarintCompressor.decompress(bytes(corrupted_indices))
+ # If decompression succeeds, result should be different
+ self.assertNotEqual(corrupted_result, test_indices)
+ except Exception:
+ pass
+
+ # ========== Test 6: Cross-format guard (multi-field tables) ==========
+ # Test that blob format rejects multi-field tables
+ multi_field_schema = pa.schema([
+ pa.field("blob_field", pa.large_binary()),
+ pa.field("string_field", pa.string()),
+ pa.field("int_field", pa.int64())
+ ])
+
+ multi_field_table = pa.table([
+ [b"blob_data_1", b"blob_data_2"],
+ ["string_1", "string_2"],
+ [100, 200]
+ ], schema=multi_field_schema)
+
+ multi_field_file = Path(self.temp_dir) / "multi_field.blob"
+
+ # Should reject multi-field table
+ with self.assertRaises(RuntimeError) as context:
+ file_io.write_blob(multi_field_file, multi_field_table)
+ self.assertIn("single column", str(context.exception))
+
+ # Test that blob format rejects non-binary field types
+ non_binary_schema = pa.schema([pa.field("string_field", pa.string())])
+ non_binary_table = pa.table([["not_binary_data"]],
schema=non_binary_schema)
+
+ non_binary_file = Path(self.temp_dir) / "non_binary.blob"
+
+ # Should reject non-binary field
+ with self.assertRaises(RuntimeError) as context:
+ file_io.write_blob(non_binary_file, non_binary_table)
+ # Should fail due to type conversion issues (non-binary field can't be
converted to BLOB)
+ self.assertTrue(
+ "large_binary" in str(context.exception) or
+ "to_paimon_type" in str(context.exception) or
+ "missing" in str(context.exception) or
+ "Field must be Blob/BlobData instance" in str(context.exception)
+ )
+
+ # Test that blob format rejects tables with null values
+ null_schema = pa.schema([pa.field("blob_with_null",
pa.large_binary())])
+ null_table = pa.table([[b"data", None, b"more_data"]],
schema=null_schema)
+
+ null_file = Path(self.temp_dir) / "with_nulls.blob"
+
+ # Should reject null values
+ with self.assertRaises(RuntimeError) as context:
+ file_io.write_blob(null_file, null_table)
+ self.assertIn("null values", str(context.exception))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/delta_varint_compressor_test.py
b/paimon-python/pypaimon/tests/delta_varint_compressor_test.py
new file mode 100644
index 0000000000..5a04becde8
--- /dev/null
+++ b/paimon-python/pypaimon/tests/delta_varint_compressor_test.py
@@ -0,0 +1,379 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import random
+import sys
+import unittest
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+
+
+class DeltaVarintCompressorTest(unittest.TestCase):
+ """Tests for DeltaVarintCompressor following
org.apache.paimon.utils.DeltaVarintCompressorTest."""
+
+ def test_normal_case_1(self):
+ """Test case for normal compression and decompression."""
+ original = [80, 50, 90, 80, 70]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed) # Verify data integrity
+ self.assertEqual(6, len(compressed)) # Optimized size for small deltas
+
+ def test_normal_case_2(self):
+ """Test case for normal compression and decompression."""
+ original = [100, 50, 150, 100, 200]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed) # Verify data integrity
+ self.assertEqual(8, len(compressed)) # Optimized size for small deltas
+
+ def test_random(self):
+ """Test with random data to ensure robustness."""
+ # Run multiple iterations to test various random cases
+ for _ in range(100): # Reduced from 10000 for reasonable test time
+ original = []
+ for i in range(100):
+ # Use a smaller range than Java's Long.MAX_VALUE for Python
compatibility
+ original.append(random.randint(-sys.maxsize, sys.maxsize))
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed) # Verify data integrity
+
+ def test_empty_array(self):
+ """Test case for empty array."""
+ original = []
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ self.assertEqual(0, len(compressed))
+
+ def test_single_element(self):
+ """Test case for single-element array."""
+ original = [42]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # Calculate expected size: Varint encoding for 42 (0x2A -> 1 byte)
+ self.assertEqual(1, len(compressed))
+
+ def test_extreme_values(self):
+ """Test case for extreme values (sys.maxsize and -sys.maxsize)."""
+ original = [-sys.maxsize, sys.maxsize]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # The compressed size will depend on the platform's sys.maxsize
+ # but should be reasonable for the delta encoding
+ self.assertGreater(len(compressed), 0)
+
+ def test_negative_deltas(self):
+ """Test case for negative deltas with ZigZag optimization."""
+ original = [100, -50, -150, -100] # Negative values
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # Verify ZigZag optimization: -1 → 1 (1 byte)
+ # Delta sequence: [100, -150, -100, 50] → ZigZag →
+ # Each encoded in 1-2 bytes
+ self.assertLessEqual(len(compressed), 8) # Optimized size
+
+ def test_unsorted_data(self):
+ """Test case for unsorted data (worse compression ratio)."""
+ original = [1000, 5, 9999, 12345, 6789]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # Larger deltas → more bytes (e.g., 9994 → 3 bytes)
+ self.assertGreater(len(compressed), 5) # Worse than sorted case
+
+ def test_corrupted_input(self):
+ """Test case for corrupted input (invalid Varint)."""
+ # Invalid Varint (all continuation flags)
+ corrupted = bytes([0x80, 0x80, 0x80])
+ try:
+ result = DeltaVarintCompressor.decompress(corrupted)
+ # If it doesn't raise an exception, the result should be reasonable
+ self.assertIsInstance(result, list)
+ except (IndexError, ValueError, RuntimeError):
+ # It's acceptable to raise an exception for corrupted data
+ pass
+
+ def test_zero_values(self):
+ """Test case for arrays with zero values."""
+ original = [0, 0, 0, 0, 0]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # All deltas are 0, so should compress very well
+ self.assertLessEqual(len(compressed), 5)
+
+ def test_ascending_sequence(self):
+ """Test case for ascending sequence (optimal for delta compression)."""
+ original = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # All deltas are 1, so should compress very well
+ self.assertLessEqual(len(compressed), 15) # Much smaller than original
+
+ def test_descending_sequence(self):
+ """Test case for descending sequence."""
+ original = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # All deltas are -1, should still compress well with ZigZag
+ self.assertLessEqual(len(compressed), 15)
+
+ def test_large_positive_values(self):
+ """Test case for large positive values."""
+ original = [1000000, 2000000, 3000000, 4000000]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # Large values but consistent deltas should still compress reasonably
+ self.assertGreater(len(compressed), 4) # Will be larger due to big
numbers
+
+ def test_mixed_positive_negative(self):
+ """Test case for mixed positive and negative values."""
+ original = [100, -200, 300, -400, 500]
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # Mixed signs create larger deltas
+ self.assertGreater(len(compressed), 5)
+
+ def test_compression_efficiency(self):
+ """Test that compression actually reduces size for suitable data."""
+ # Create a sequence with small deltas
+ original = []
+ base = 1000
+ for i in range(100):
+ base += random.randint(-10, 10) # Small deltas
+ original.append(base)
+
+ compressed = DeltaVarintCompressor.compress(original)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual(original, decompressed)
+ # For small deltas, compression should be effective
+ # Original would need 8 bytes per int (800 bytes), compressed should
be much smaller
+ self.assertLess(len(compressed), len(original) * 4) # At least 50%
compression
+
+ def test_round_trip_consistency(self):
+ """Test that multiple compress/decompress cycles are consistent."""
+ original = [1, 10, 100, 1000, 10000]
+ # First round trip
+ compressed1 = DeltaVarintCompressor.compress(original)
+ decompressed1 = DeltaVarintCompressor.decompress(compressed1)
+ # Second round trip
+ compressed2 = DeltaVarintCompressor.compress(decompressed1)
+ decompressed2 = DeltaVarintCompressor.decompress(compressed2)
+ # All should be identical
+ self.assertEqual(original, decompressed1)
+ self.assertEqual(original, decompressed2)
+ self.assertEqual(compressed1, compressed2)
+
+ def test_boundary_values(self):
+ """Test boundary values for varint encoding."""
+ # Test values around varint boundaries (127, 16383, etc.)
+ boundary_values = [
+ 0, 1, 127, 128, 255, 256,
+ 16383, 16384, 32767, 32768,
+ -1, -127, -128, -255, -256,
+ -16383, -16384, -32767, -32768
+ ]
+ compressed = DeltaVarintCompressor.compress(boundary_values)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+ self.assertEqual(boundary_values, decompressed)
+
+ def test_java_compatibility_zigzag_encoding(self):
+ """Test ZigZag encoding compatibility with Java implementation."""
+ # Test cases that verify ZigZag encoding matches Java's implementation
+ # ZigZag mapping: 0->0, -1->1, 1->2, -2->3, 2->4, -3->5, 3->6, etc.
+ zigzag_test_cases = [
+ (0, 0), # 0 -> 0
+ (-1, 1), # -1 -> 1
+ (1, 2), # 1 -> 2
+ (-2, 3), # -2 -> 3
+ (2, 4), # 2 -> 4
+ (-3, 5), # -3 -> 5
+ (3, 6), # 3 -> 6
+ (-64, 127), # -64 -> 127
+ (64, 128), # 64 -> 128
+ (-65, 129), # -65 -> 129
+ ]
+
+ for original_value, expected_zigzag in zigzag_test_cases:
+ # Test single value compression to verify ZigZag encoding
+ compressed = DeltaVarintCompressor.compress([original_value])
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+
+ self.assertEqual([original_value], decompressed,
+ f"ZigZag encoding failed for value
{original_value}")
+
+ def test_java_compatibility_known_vectors(self):
+ """Test with known test vectors that should match Java
implementation."""
+ # Test vectors with expected compressed output (hexadecimal)
+ test_vectors = [
+ # Simple cases
+ ([0], "00"), # 0 -> ZigZag(0) = 0 -> Varint(0)
= 0x00
+ ([1], "02"), # 1 -> ZigZag(1) = 2 -> Varint(2)
= 0x02
+ ([-1], "01"), # -1 -> ZigZag(-1) = 1 ->
Varint(1) = 0x01
+ ([2], "04"), # 2 -> ZigZag(2) = 4 -> Varint(4)
= 0x04
+ ([-2], "03"), # -2 -> ZigZag(-2) = 3 ->
Varint(3) = 0x03
+
+ # Delta encoding cases
+ ([0, 1], "0002"), # [0, 1] -> [0, delta=1] -> [0x00,
0x02]
+ ([1, 2], "0202"), # [1, 2] -> [1, delta=1] -> [0x02,
0x02]
+ ([0, -1], "0001"), # [0, -1] -> [0, delta=-1] ->
[0x00, 0x01]
+ ([1, 0], "0201"), # [1, 0] -> [1, delta=-1] ->
[0x02, 0x01]
+
+ # Larger values
+ ([127], "fe01"), # 127 -> ZigZag(127) = 254 ->
Varint(254) = 0xfe01
+ ([-127], "fd01"), # -127 -> ZigZag(-127) = 253 ->
Varint(253) = 0xfd01
+ ([128], "8002"), # 128 -> ZigZag(128) = 256 ->
Varint(256) = 0x8002
+ ([-128], "ff01"), # -128 -> ZigZag(-128) = 255 ->
Varint(255) = 0xff01
+ ]
+
+ for original, expected_hex in test_vectors:
+ compressed = DeltaVarintCompressor.compress(original)
+ actual_hex = compressed.hex()
+
+ self.assertEqual(expected_hex, actual_hex,
+ f"Binary compatibility failed for {original}. "
+ f"Expected: {expected_hex}, Got: {actual_hex}")
+
+ # Also verify round-trip
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+ self.assertEqual(original, decompressed,
+ f"Round-trip failed for {original}")
+
+ def test_java_compatibility_large_numbers(self):
+ """Test compatibility with Java for large numbers (64-bit range)."""
+ # Test cases covering the full 64-bit signed integer range
+ large_number_cases = [
+ 2147483647, # Integer.MAX_VALUE
+ -2147483648, # Integer.MIN_VALUE
+ 9223372036854775807, # Long.MAX_VALUE
+ -9223372036854775808 + 1, # Long.MIN_VALUE + 1 (avoid overflow in
Python)
+ 4294967295, # 2^32 - 1
+ -4294967296, # -2^32
+ ]
+
+ for value in large_number_cases:
+ # Test individual values
+ compressed = DeltaVarintCompressor.compress([value])
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+ self.assertEqual([value], decompressed,
+ f"Large number compatibility failed for {value}")
+
+ # Test as a sequence to verify delta encoding with large numbers
+ compressed_seq = DeltaVarintCompressor.compress(large_number_cases)
+ decompressed_seq = DeltaVarintCompressor.decompress(compressed_seq)
+ self.assertEqual(large_number_cases, decompressed_seq,
+ "Large number sequence compatibility failed")
+
+ def test_java_compatibility_varint_boundaries(self):
+ """Test Varint encoding boundaries that match Java implementation."""
+ # Test values at Varint encoding boundaries
+ varint_boundary_cases = [
+ # 1-byte Varint boundary
+ 63, # ZigZag(63) = 126, fits in 1 byte
+ 64, # ZigZag(64) = 128, needs 2 bytes
+ -64, # ZigZag(-64) = 127, fits in 1 byte
+ -65, # ZigZag(-65) = 129, needs 2 bytes
+
+ # 2-byte Varint boundary
+ 8191, # ZigZag(8191) = 16382, fits in 2 bytes
+ 8192, # ZigZag(8192) = 16384, needs 3 bytes
+ -8192, # ZigZag(-8192) = 16383, fits in 2 bytes
+ -8193, # ZigZag(-8193) = 16385, needs 3 bytes
+
+ # 3-byte Varint boundary
+ 1048575, # ZigZag(1048575) = 2097150, fits in 3 bytes
+ 1048576, # ZigZag(1048576) = 2097152, needs 4 bytes
+ ]
+
+ for value in varint_boundary_cases:
+ compressed = DeltaVarintCompressor.compress([value])
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+ self.assertEqual([value], decompressed,
+ f"Varint boundary compatibility failed for
{value}")
+
+ def test_java_compatibility_delta_edge_cases(self):
+ """Test delta encoding edge cases for Java compatibility."""
+ # Edge cases that test delta encoding behavior
+ delta_edge_cases = [
+ # Maximum positive delta
+ [0, sys.maxsize],
+ # Maximum negative delta
+ [sys.maxsize, 0],
+ # Alternating large deltas
+ [0, 1000000, -1000000, 2000000, -2000000],
+ # Sequence with zero deltas
+ [42, 42, 42, 42],
+ # Mixed small and large deltas
+ [0, 1, 1000000, 1000001, 0],
+ ]
+
+ for case in delta_edge_cases:
+ compressed = DeltaVarintCompressor.compress(case)
+ decompressed = DeltaVarintCompressor.decompress(compressed)
+ self.assertEqual(case, decompressed,
+ f"Delta edge case compatibility failed for
{case}")
+
+ def test_java_compatibility_error_conditions(self):
+ """Test error conditions that should match Java behavior."""
+ # Test cases for error handling - our implementation gracefully handles
+ # truncated data by returning empty lists, which is acceptable behavior
+
+ # Test with various truncated/invalid byte sequences
+ invalid_cases = [
+ bytes([0x80]), # Single incomplete byte
+ bytes([0x80, 0x80]), # Incomplete 3-byte varint
+ bytes([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0x80]), # Long sequence
+ ]
+
+ for invalid_data in invalid_cases:
+ # Our implementation handles invalid data gracefully by returning
empty list
+ # This is acceptable behavior for robustness
+ result = DeltaVarintCompressor.decompress(invalid_data)
+ self.assertIsInstance(result, list,
+ f"Should return a list for invalid data:
{invalid_data.hex()}")
+ # Empty result is acceptable for invalid/truncated data
+
+ # Test that valid empty input returns empty list
+ empty_result = DeltaVarintCompressor.decompress(b'')
+ self.assertEqual([], empty_result, "Empty input should return empty
list")
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/blob_format_writer.py
b/paimon-python/pypaimon/write/blob_format_writer.py
new file mode 100644
index 0000000000..07b29bbc4e
--- /dev/null
+++ b/paimon-python/pypaimon/write/blob_format_writer.py
@@ -0,0 +1,107 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import struct
+import zlib
+from typing import BinaryIO, List
+
+from pypaimon.table.row.blob import Blob, BlobData
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+
+
+class BlobFormatWriter:
+ VERSION = 1
+ MAGIC_NUMBER = 1481511375
+ BUFFER_SIZE = 4096
+ METADATA_SIZE = 12 # 8-byte length + 4-byte CRC
+
+ def __init__(self, output_stream: BinaryIO):
+ self.output_stream = output_stream
+ self.lengths: List[int] = []
+ self.position = 0
+
+ def add_element(self, row) -> None:
+ if not hasattr(row, 'values') or len(row.values) != 1:
+ raise ValueError("BlobFormatWriter only supports one field")
+
+ blob_value = row.values[0]
+ if blob_value is None:
+ raise ValueError("BlobFormatWriter only supports non-null blob")
+
+ if not isinstance(blob_value, Blob):
+ raise ValueError("Field must be Blob/BlobData instance")
+
+ previous_pos = self.position
+ crc32 = 0 # Initialize CRC32
+
+ # Write magic number
+ magic_bytes = struct.pack('<I', self.MAGIC_NUMBER) # Little endian
+ crc32 = self._write_with_crc(magic_bytes, crc32)
+
+ # Write blob data
+ if isinstance(blob_value, BlobData):
+ data = blob_value.to_data()
+ crc32 = self._write_with_crc(data, crc32)
+ else:
+ # Stream from BlobRef/Blob
+ stream = blob_value.new_input_stream()
+ try:
+ chunk = stream.read(self.BUFFER_SIZE)
+ while chunk:
+ crc32 = self._write_with_crc(chunk, crc32)
+ chunk = stream.read(self.BUFFER_SIZE)
+ finally:
+ stream.close()
+
+ # Calculate total length including magic + data + metadata (length +
CRC)
+ bin_length = self.position - previous_pos + self.METADATA_SIZE
+ self.lengths.append(bin_length)
+
+ # Write length (8 bytes, little endian)
+ length_bytes = struct.pack('<Q', bin_length)
+ self.output_stream.write(length_bytes)
+ self.position += 8
+
+ # Write CRC32 (4 bytes, little endian)
+ crc_bytes = struct.pack('<I', crc32 & 0xffffffff)
+ self.output_stream.write(crc_bytes)
+ self.position += 4
+
+ def _write_with_crc(self, data: bytes, crc32: int) -> int:
+ crc32 = zlib.crc32(data, crc32)
+ self.output_stream.write(data)
+ self.position += len(data)
+ return crc32
+
+ def reach_target_size(self, suggested_check: bool, target_size: int) ->
bool:
+ return self.position >= target_size
+
+ def close(self) -> None:
+ index_bytes = DeltaVarintCompressor.compress(self.lengths)
+ self.output_stream.write(index_bytes)
+
+ # Write header (index length + version)
+ header = struct.pack('<I', len(index_bytes)) # Index length (4 bytes,
little endian)
+ header += struct.pack('<B', self.VERSION) # Version (1 byte)
+ self.output_stream.write(header)
+
+ # Flush and close
+ if hasattr(self.output_stream, 'flush'):
+ self.output_stream.flush()
+ if hasattr(self.output_stream, 'close'):
+ self.output_stream.close()
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index e2f778b586..cc0fc944a1 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -111,6 +111,8 @@ class DataWriter(ABC):
self.file_io.write_orc(file_path, data,
compression=self.compression)
elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
self.file_io.write_avro(file_path, data)
+ elif self.file_format == CoreOptions.FILE_FORMAT_BLOB:
+ self.file_io.write_blob(file_path, data)
else:
raise ValueError(f"Unsupported file format: {self.file_format}")