This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b542c5f13 [python] support blob type and blob write and read (#6390)
8b542c5f13 is described below

commit 8b542c5f13f76fdddbbf0298af27b2a96cd3954d
Author: jerry <[email protected]>
AuthorDate: Tue Oct 14 15:42:17 2025 +0800

    [python] support blob type and blob write and read (#6390)
---
 paimon-python/pypaimon/common/core_options.py      |   1 +
 .../pypaimon/common/delta_varint_compressor.py     | 125 +++
 paimon-python/pypaimon/common/file_io.py           |  55 ++
 .../pypaimon/read/reader/format_blob_reader.py     | 199 +++++
 paimon-python/pypaimon/read/split_read.py          |   9 +-
 paimon-python/pypaimon/schema/data_types.py        |   5 +
 paimon-python/pypaimon/table/row/blob.py           | 247 ++++++
 paimon-python/pypaimon/table/row/generic_row.py    |  15 +-
 paimon-python/pypaimon/tests/blob_test.py          | 983 +++++++++++++++++++++
 .../pypaimon/tests/delta_varint_compressor_test.py | 379 ++++++++
 paimon-python/pypaimon/write/blob_format_writer.py | 107 +++
 paimon-python/pypaimon/write/writer/data_writer.py |   2 +
 12 files changed, 2124 insertions(+), 3 deletions(-)

diff --git a/paimon-python/pypaimon/common/core_options.py 
b/paimon-python/pypaimon/common/core_options.py
index 1a060bd206..754bb17c05 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -38,6 +38,7 @@ class CoreOptions(str, Enum):
     FILE_FORMAT_ORC = "orc"
     FILE_FORMAT_AVRO = "avro"
     FILE_FORMAT_PARQUET = "parquet"
+    FILE_FORMAT_BLOB = "blob"
     FILE_COMPRESSION = "file.compression"
     FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
     FILE_FORMAT_PER_LEVEL = "file.format.per.level"
diff --git a/paimon-python/pypaimon/common/delta_varint_compressor.py 
b/paimon-python/pypaimon/common/delta_varint_compressor.py
new file mode 100644
index 0000000000..dc849886bf
--- /dev/null
+++ b/paimon-python/pypaimon/common/delta_varint_compressor.py
@@ -0,0 +1,125 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from typing import List
+
+
+class DeltaVarintCompressor:
+
+    @staticmethod
+    def compress(data: List[int]) -> bytes:
+        if not data:
+            return b''
+
+        # Estimate output size (conservative: 5 bytes per varint max)
+        out = io.BytesIO()
+        out.seek(0)
+
+        # Encode first value directly
+        DeltaVarintCompressor._encode_varint(data[0], out)
+
+        # Encode deltas without intermediate list creation
+        prev = data[0]
+        for i in range(1, len(data)):
+            current = data[i]
+            delta = current - prev
+            DeltaVarintCompressor._encode_varint(delta, out)
+            prev = current
+
+        # Return only the used portion of the buffer
+        position = out.tell()
+        result = out.getvalue()
+        out.close()
+        return result[:position]
+
+    @staticmethod
+    def decompress(compressed: bytes) -> List[int]:
+        if not compressed:
+            return []
+
+        # Fast path: decode directly into result without intermediate deltas 
list
+        in_stream = io.BytesIO(compressed)
+        result = []
+
+        try:
+            # Decode first value
+            first_value = DeltaVarintCompressor._decode_varint(in_stream)
+            result.append(first_value)
+
+            # Decode and reconstruct remaining values in one pass
+            current_value = first_value
+            while True:
+                try:
+                    delta = DeltaVarintCompressor._decode_varint(in_stream)
+                    current_value += delta
+                    result.append(current_value)
+                except RuntimeError:
+                    # End of stream reached
+                    break
+
+        except RuntimeError:
+            # Handle empty stream case
+            pass
+        finally:
+            in_stream.close()
+
+        return result
+
+    @staticmethod
+    def _encode_varint(value: int, out: io.BytesIO) -> None:
+        # ZigZag encoding: maps signed integers to unsigned integers
+        if value >= 0:
+            zigzag = value << 1
+        else:
+            zigzag = ((-value) << 1) - 1
+
+        # Varint encoding
+        while zigzag >= 0x80:
+            out.write(bytes([(zigzag & 0x7F) | 0x80]))
+            zigzag >>= 7
+        out.write(bytes([zigzag & 0x7F]))
+
+    @staticmethod
+    def _decode_varint(in_stream: io.BytesIO) -> int:
+        result = 0
+        shift = 0
+        while True:
+            byte_data = in_stream.read(1)
+            if not byte_data:
+                if shift == 0:
+                    # Natural end of stream
+                    raise RuntimeError("End of stream")
+                else:
+                    # Unexpected end in middle of varint
+                    raise RuntimeError("Unexpected end of input")
+
+            b = byte_data[0]
+            result |= (b & 0x7F) << shift
+            if (b & 0x80) == 0:
+                break
+
+            shift += 7
+            if shift > 63:
+                raise RuntimeError("Varint overflow")
+
+        # ZigZag decoding: maps unsigned integers back to signed integers
+        if result & 1:
+            return -((result + 1) >> 1)
+        else:
+            return result >> 1
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 6f5dfc6a2a..f6371d2d80 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -28,6 +28,11 @@ from packaging.version import parse
 from pyarrow._fs import FileSystem
 
 from pypaimon.common.config import OssOptions, S3Options
+from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
+from pypaimon.table.row.blob import BlobData
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.row_kind import RowKind
+from pypaimon.write.blob_format_writer import BlobFormatWriter
 
 
 class FileIO:
@@ -364,3 +369,53 @@ class FileIO:
 
         with self.new_output_stream(path) as output_stream:
             fastavro.writer(output_stream, avro_schema, records, **kwargs)
+
+    def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
+        try:
+            # Validate input constraints
+            if data.num_columns != 1:
+                raise RuntimeError(f"Blob format only supports a single 
column, got {data.num_columns} columns")
+            # Check for null values
+            column = data.column(0)
+            if column.null_count > 0:
+                raise RuntimeError("Blob format does not support null values")
+            # Convert PyArrow schema to Paimon DataFields
+            # For blob files, we expect exactly one blob column
+            field = data.schema[0]
+            if pyarrow.types.is_large_binary(field.type):
+                fields = [DataField(0, field.name, AtomicType("BLOB"))]
+            else:
+                # Convert other types as needed
+                paimon_type = PyarrowFieldParser.to_paimon_type(field.type, 
field.nullable)
+                fields = [DataField(0, field.name, paimon_type)]
+            # Convert PyArrow Table to records
+            records_dict = data.to_pydict()
+            num_rows = data.num_rows
+            field_name = fields[0].name
+            with self.new_output_stream(path) as output_stream:
+                writer = BlobFormatWriter(output_stream)
+                # Write each row
+                for i in range(num_rows):
+                    col_data = records_dict[field_name][i]
+                    # Convert to appropriate type based on field type
+                    if hasattr(fields[0].type, 'type') and fields[0].type.type 
== "BLOB":
+                        if isinstance(col_data, bytes):
+                            blob_data = BlobData(col_data)
+                        else:
+                            # Convert to bytes if needed
+                            if hasattr(col_data, 'as_py'):
+                                col_data = col_data.as_py()
+                            if isinstance(col_data, str):
+                                col_data = col_data.encode('utf-8')
+                            blob_data = BlobData(col_data)
+                        row_values = [blob_data]
+                    else:
+                        row_values = [col_data]
+                    # Create GenericRow and write
+                    row = GenericRow(row_values, fields, RowKind.INSERT)
+                    writer.add_element(row)
+                writer.close()
+
+        except Exception as e:
+            self.delete_quietly(path)
+            raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py 
b/paimon-python/pypaimon/read/reader/format_blob_reader.py
new file mode 100644
index 0000000000..709a0c27a3
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py
@@ -0,0 +1,199 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import struct
+from pathlib import Path
+from typing import List, Optional, Any, Iterator
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+from pyarrow import RecordBatch
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
+from pypaimon.table.row.generic_row import GenericRow
+
+
+class FormatBlobReader(RecordBatchReader):
+
+    def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
+                 full_fields: List[DataField], push_down_predicate: Any):
+        self._file_io = file_io
+        self._file_path = file_path
+        self._push_down_predicate = push_down_predicate
+
+        # Get file size
+        self._file_size = file_io.get_file_size(file_path)
+
+        # Initialize the low-level blob format reader
+        self.file_path = file_path
+        self.blob_lengths: List[int] = []
+        self.blob_offsets: List[int] = []
+        self.returned = False
+        self._read_index()
+
+        # Set up fields and schema
+        if len(read_fields) > 1:
+            raise RuntimeError("Blob reader only supports one field.")
+        self._fields = read_fields
+        full_fields_map = {field.name: field for field in full_fields}
+        projected_data_fields = [full_fields_map[name] for name in read_fields]
+        self._schema = 
PyarrowFieldParser.from_paimon_schema(projected_data_fields)
+
+        # Initialize iterator
+        self._blob_iterator = None
+        self._current_batch = None
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        if self._blob_iterator is None:
+            if self.returned:
+                return None
+            self.returned = True
+            batch_iterator = BlobRecordIterator(self.file_path, 
self.blob_lengths, self.blob_offsets, self._fields[0])
+            self._blob_iterator = iter(batch_iterator)
+
+        # Collect records for this batch
+        pydict_data = {name: [] for name in self._fields}
+        records_in_batch = 0
+
+        try:
+            while True:
+                # Get next blob record
+                blob_row = next(self._blob_iterator)
+                # Check if first read returns None, stop immediately
+                if blob_row is None:
+                    break
+
+                # Extract blob data from the row
+                blob = blob_row.values[0]  # Blob files have single blob field
+
+                # Convert blob to appropriate format for each requested field
+                for field_name in self._fields:
+                    # For blob files, all fields should contain blob data
+                    if isinstance(blob, Blob):
+                        blob_data = blob.to_data()
+                    else:
+                        blob_data = bytes(blob) if blob is not None else None
+                    pydict_data[field_name].append(blob_data)
+
+                records_in_batch += 1
+
+        except StopIteration:
+            # Stop immediately when StopIteration occurs
+            pass
+
+        if records_in_batch == 0:
+            return None
+
+        # Create RecordBatch
+        if self._push_down_predicate is None:
+            # Convert to Table first, then to RecordBatch
+            table = pa.Table.from_pydict(pydict_data, self._schema)
+            if table.num_rows > 0:
+                return table.to_batches()[0]
+            else:
+                return None
+        else:
+            # Apply predicate filtering
+            pa_batch = pa.Table.from_pydict(pydict_data, self._schema)
+            dataset = ds.InMemoryDataset(pa_batch)
+            scanner = dataset.scanner(filter=self._push_down_predicate)
+            combine_chunks = scanner.to_table().combine_chunks()
+            if combine_chunks.num_rows > 0:
+                return combine_chunks.to_batches()[0]
+            else:
+                return None
+
+    def close(self):
+        self._blob_iterator = None
+
+    def _read_index(self) -> None:
+        with self._file_io.new_input_stream(Path(self.file_path)) as f:
+            # Seek to header: last 5 bytes
+            f.seek(self._file_size - 5)
+            header = f.read(5)
+
+            if len(header) != 5:
+                raise IOError("Invalid blob file: cannot read header")
+
+            # Parse header
+            index_length = struct.unpack('<I', header[:4])[0]  # Little endian
+            version = header[4]
+
+            if version != 1:
+                raise IOError(f"Unsupported blob file version: {version}")
+
+            # Read index data
+            f.seek(self._file_size - 5 - index_length)
+            index_bytes = f.read(index_length)
+
+            if len(index_bytes) != index_length:
+                raise IOError("Invalid blob file: cannot read index")
+
+            # Decompress blob lengths and compute offsets
+            blob_lengths = DeltaVarintCompressor.decompress(index_bytes)
+            blob_offsets = []
+            offset = 0
+            for length in blob_lengths:
+                blob_offsets.append(offset)
+                offset += length
+            self.blob_lengths = blob_lengths
+            self.blob_offsets = blob_offsets
+
+
+class BlobRecordIterator:
+    MAGIC_NUMBER_SIZE = 4
+    METADATA_OVERHEAD = 16
+
+    def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: 
List[int], field_name: str):
+        self.file_path = file_path
+        self.field_name = field_name
+        self.blob_lengths = blob_lengths
+        self.blob_offsets = blob_offsets
+        self.current_position = 0
+
+    def __iter__(self) -> Iterator[GenericRow]:
+        return self
+
+    def __next__(self) -> GenericRow:
+        if self.current_position >= len(self.blob_lengths):
+            raise StopIteration
+
+        # Create blob reference for the current blob
+        # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 
bytes) = 12 bytes
+        blob_offset = self.blob_offsets[self.current_position] + 
self.MAGIC_NUMBER_SIZE  # Skip magic number
+        blob_length = self.blob_lengths[self.current_position] - 
self.METADATA_OVERHEAD
+
+        # Create BlobDescriptor for this blob
+        descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length)
+        blob = BlobRef(descriptor)
+
+        self.current_position += 1
+
+        # Return as GenericRow with single blob field
+        from pypaimon.schema.data_types import DataField, AtomicType
+        from pypaimon.table.row.row_kind import RowKind
+
+        fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
+        return GenericRow([blob], fields, RowKind.INSERT)
+
+    def returned_position(self) -> int:
+        """Get current position in the iterator."""
+        return self.current_position
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index d9e9939c11..7674db45f0 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -21,6 +21,7 @@ from abc import ABC, abstractmethod
 from functools import partial
 from typing import List, Optional, Tuple
 
+from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
@@ -31,6 +32,7 @@ from pypaimon.read.reader.drop_delete_reader import 
DropDeleteRecordReader
 from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
 from pypaimon.read.reader.filter_record_reader import FilterRecordReader
 from pypaimon.read.reader.format_avro_reader import FormatAvroReader
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.reader.iface.record_reader import RecordReader
@@ -73,10 +75,13 @@ class SplitRead(ABC):
         file_format = extension[1:]
 
         format_reader: RecordBatchReader
-        if file_format == "avro":
+        if file_format == CoreOptions.FILE_FORMAT_AVRO:
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
self._get_final_read_data_fields(),
                                              self.read_fields, 
self.push_down_predicate)
-        elif file_format == "parquet" or file_format == "orc":
+        elif file_format == CoreOptions.FILE_FORMAT_BLOB:
+            format_reader = FormatBlobReader(self.table.file_io, file_path, 
self._get_final_read_data_fields(),
+                                             self.read_fields, 
self.push_down_predicate)
+        elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             format_reader = FormatPyArrowReader(self.table.file_io, 
file_format, file_path,
                                                 
self._get_final_read_data_fields(), self.push_down_predicate)
         else:
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index 5255ac6b1e..d1ce2354a8 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -247,6 +247,7 @@ class Keyword(Enum):
     BINARY = "BINARY"
     VARBINARY = "VARBINARY"
     BYTES = "BYTES"
+    BLOB = "BLOB"
     DECIMAL = "DECIMAL"
     NUMERIC = "NUMERIC"
     DEC = "DEC"
@@ -407,6 +408,8 @@ class PyarrowFieldParser:
                 return pyarrow.string()
             elif type_name == 'BYTES' or type_name.startswith('VARBINARY'):
                 return pyarrow.binary()
+            elif type_name == 'BLOB':
+                return pyarrow.large_binary()
             elif type_name.startswith('BINARY'):
                 if type_name == 'BINARY':
                     return pyarrow.binary(1)
@@ -506,6 +509,8 @@ class PyarrowFieldParser:
             type_name = f'BINARY({pa_type.byte_width})'
         elif types.is_binary(pa_type):
             type_name = 'BYTES'
+        elif types.is_large_binary(pa_type):
+            type_name = 'BLOB'
         elif types.is_decimal(pa_type):
             type_name = f'DECIMAL({pa_type.precision}, {pa_type.scale})'
         elif types.is_timestamp(pa_type) and pa_type.tz is None:
diff --git a/paimon-python/pypaimon/table/row/blob.py 
b/paimon-python/pypaimon/table/row/blob.py
new file mode 100644
index 0000000000..b9d8cc0e38
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -0,0 +1,247 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import io
+from abc import ABC, abstractmethod
+from typing import Optional, Union
+
+
+class BlobDescriptor:
+    CURRENT_VERSION = 1
+
+    def __init__(self, uri: str, offset: int, length: int, version: int = 
CURRENT_VERSION):
+        self._version = version
+        self._uri = uri
+        self._offset = offset
+        self._length = length
+
+    @property
+    def uri(self) -> str:
+        return self._uri
+
+    @property
+    def offset(self) -> int:
+        return self._offset
+
+    @property
+    def length(self) -> int:
+        return self._length
+
+    @property
+    def version(self) -> int:
+        return self._version
+
+    def serialize(self) -> bytes:
+        import struct
+
+        uri_bytes = self._uri.encode('utf-8')
+        uri_length = len(uri_bytes)
+
+        # Pack using little endian format
+        data = struct.pack('<B', self._version)  # version (1 byte)
+        data += struct.pack('<I', uri_length)  # uri length (4 bytes)
+        data += uri_bytes  # uri bytes
+        data += struct.pack('<q', self._offset)  # offset (8 bytes, signed)
+        data += struct.pack('<q', self._length)  # length (8 bytes, signed)
+
+        return data
+
+    @classmethod
+    def deserialize(cls, data: bytes) -> 'BlobDescriptor':
+        import struct
+
+        if len(data) < 5:  # minimum size: version(1) + uri_length(4)
+            raise ValueError("Invalid BlobDescriptor data: too short")
+
+        offset = 0
+
+        # Read version
+        version = struct.unpack('<B', data[offset:offset + 1])[0]
+        offset += 1
+
+        # For now, we only support version 1, but allow flexibility for future 
versions
+        if version < 1:
+            raise ValueError(f"Unsupported BlobDescriptor version: {version}")
+
+        # Read URI length
+        uri_length = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+
+        # Read URI bytes
+        if offset + uri_length > len(data):
+            raise ValueError("Invalid BlobDescriptor data: URI length exceeds 
data size")
+
+        uri_bytes = data[offset:offset + uri_length]
+        uri = uri_bytes.decode('utf-8')
+        offset += uri_length
+
+        # Read offset and length
+        if offset + 16 > len(data):
+            raise ValueError("Invalid BlobDescriptor data: missing 
offset/length")
+
+        blob_offset = struct.unpack('<q', data[offset:offset + 8])[0]
+        offset += 8
+
+        blob_length = struct.unpack('<q', data[offset:offset + 8])[0]
+
+        return cls(uri, blob_offset, blob_length, version)
+
+    def __eq__(self, other) -> bool:
+        """Check equality with another BlobDescriptor."""
+        if not isinstance(other, BlobDescriptor):
+            return False
+        return (self._version == other._version and
+                self._uri == other._uri and
+                self._offset == other._offset and
+                self._length == other._length)
+
+    def __hash__(self) -> int:
+        """Calculate hash for the BlobDescriptor."""
+        return hash((self._version, self._uri, self._offset, self._length))
+
+    def __str__(self) -> str:
+        """String representation of BlobDescriptor."""
+        return (f"BlobDescriptor(version={self._version}, uri='{self._uri}', "
+                f"offset={self._offset}, length={self._length})")
+
+    def __repr__(self) -> str:
+        """Detailed representation of BlobDescriptor."""
+        return self.__str__()
+
+
+class Blob(ABC):
+
+    @abstractmethod
+    def to_data(self) -> bytes:
+        pass
+
+    @abstractmethod
+    def to_descriptor(self) -> BlobDescriptor:
+        pass
+
+    @abstractmethod
+    def new_input_stream(self) -> io.BytesIO:
+        pass
+
+    @staticmethod
+    def from_data(data: bytes) -> 'Blob':
+        return BlobData(data)
+
+    @staticmethod
+    def from_local(file: str) -> 'Blob':
+        return Blob.from_file(file)
+
+    @staticmethod
+    def from_http(uri: str) -> 'Blob':
+        descriptor = BlobDescriptor(uri, 0, -1)
+        return BlobRef(descriptor)
+
+    @staticmethod
+    def from_file(file: str, offset: int = 0, length: int = -1) -> 'Blob':
+        descriptor = BlobDescriptor(file, offset, length)
+        return BlobRef(descriptor)
+
+    @staticmethod
+    def from_descriptor(descriptor: BlobDescriptor) -> 'Blob':
+        return BlobRef(descriptor)
+
+
+class BlobData(Blob):
+
+    def __init__(self, data: Optional[Union[bytes, bytearray]] = None):
+        if data is None:
+            self._data = b''
+        elif isinstance(data, (bytes, bytearray)):
+            self._data = bytes(data)
+        else:
+            raise TypeError(f"BlobData expects bytes, bytearray, or None, got 
{type(data)}")
+
+    @classmethod
+    def from_bytes(cls, data: bytes) -> 'BlobData':
+        return cls(data)
+
+    @property
+    def data(self) -> bytes:
+        return self._data
+
+    def to_data(self) -> bytes:
+        return self._data
+
+    def to_descriptor(self) -> 'BlobDescriptor':
+        raise RuntimeError("Blob data can not convert to descriptor.")
+
+    def new_input_stream(self) -> io.BytesIO:
+        return io.BytesIO(self._data)
+
+    def __eq__(self, other) -> bool:
+        if other is None or not isinstance(other, BlobData):
+            return False
+        return self._data == other._data
+
+    def __hash__(self) -> int:
+        return hash(self._data)
+
+
+class BlobRef(Blob):
+
+    def __init__(self, descriptor: BlobDescriptor):
+        self._descriptor = descriptor
+
+    def to_data(self) -> bytes:
+        try:
+            with self.new_input_stream() as stream:
+                return stream.read()
+        except Exception as e:
+            raise IOError(f"Failed to read blob data: {e}")
+
+    def to_descriptor(self) -> BlobDescriptor:
+        return self._descriptor
+
+    def new_input_stream(self) -> io.BytesIO:
+        uri = self._descriptor.uri
+        offset = self._descriptor.offset
+        length = self._descriptor.length
+
+        if uri.startswith('http://') or uri.startswith('https://'):
+            raise NotImplementedError("HTTP blob reading not implemented yet")
+        elif uri.startswith('file://') or '://' not in uri:
+            file_path = uri.replace('file://', '') if 
uri.startswith('file://') else uri
+
+            try:
+                with open(file_path, 'rb') as f:
+                    if offset > 0:
+                        f.seek(offset)
+
+                    if length == -1:
+                        data = f.read()
+                    else:
+                        data = f.read(length)
+
+                    return io.BytesIO(data)
+            except Exception as e:
+                raise IOError(f"Failed to read file {file_path}: {e}")
+        else:
+            raise ValueError(f"Unsupported URI scheme: {uri}")
+
+    def __eq__(self, other) -> bool:
+        if not isinstance(other, BlobRef):
+            return False
+        return self._descriptor == other._descriptor
+
+    def __hash__(self) -> int:
+        return hash(self._descriptor)
diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index a7612168d9..5f8f9f6d9b 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -24,6 +24,7 @@ from typing import Any, List
 
 from pypaimon.schema.data_types import AtomicType, DataField, DataType
 from pypaimon.table.row.row_kind import RowKind
+from pypaimon.table.row.blob import BlobData
 
 
 @dataclass
@@ -111,6 +112,8 @@ class GenericRowDeserializer:
             return cls._parse_string(bytes_data, base_offset, field_offset)
         elif type_name.startswith('BINARY') or 
type_name.startswith('VARBINARY') or type_name == 'BYTES':
             return cls._parse_binary(bytes_data, base_offset, field_offset)
+        elif type_name == 'BLOB':
+            return cls._parse_blob(bytes_data, base_offset, field_offset)
         elif type_name.startswith('DECIMAL') or 
type_name.startswith('NUMERIC'):
             return cls._parse_decimal(bytes_data, base_offset, field_offset, 
data_type)
         elif type_name.startswith('TIMESTAMP'):
@@ -193,6 +196,13 @@ class GenericRowDeserializer:
             length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
             return bytes_data[field_offset:field_offset + length]
 
+    @classmethod
+    def _parse_blob(cls, bytes_data: bytes, base_offset: int, field_offset: 
int) -> BlobData:
+        """Parse BLOB data from binary format and return a BlobData 
instance."""
+        # BLOB uses the same binary format as regular binary data
+        binary_data = cls._parse_binary(bytes_data, base_offset, field_offset)
+        return BlobData.from_bytes(binary_data)
+
     @classmethod
     def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: 
int, data_type: DataType) -> Decimal:
         unscaled_long = struct.unpack('<q', 
bytes_data[field_offset:field_offset + 8])[0]
@@ -260,9 +270,12 @@ class GenericRowSerializer:
                 raise ValueError(f"BinaryRow only support AtomicType yet, meet 
{field.type.__class__}")
 
             type_name = field.type.type.upper()
-            if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 
'STRING', 'BINARY', 'VARBINARY', 'BYTES']):
+            if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 
'STRING',
+                                                     'BINARY', 'VARBINARY', 
'BYTES', 'BLOB']):
                 if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 
'STRING']):
                     value_bytes = str(value).encode('utf-8')
+                elif type_name == 'BLOB':
+                    value_bytes = value.to_data()
                 else:
                     value_bytes = bytes(value)
 
diff --git a/paimon-python/pypaimon/tests/blob_test.py 
b/paimon-python/pypaimon/tests/blob_test.py
new file mode 100644
index 0000000000..b8ed65e5bb
--- /dev/null
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -0,0 +1,983 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import shutil
+import tempfile
+import unittest
+from pathlib import Path
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory
+from pypaimon.common.file_io import FileIO
+from pypaimon.read.reader.format_blob_reader import FormatBlobReader
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor
+from pypaimon.table.row.generic_row import GenericRowDeserializer, 
GenericRowSerializer, GenericRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class MockFileIO:
+    """Mock FileIO for testing."""
+
+    def __init__(self, file_io: FileIO):
+        self._file_io = file_io
+
+    def get_file_size(self, path: str) -> int:
+        """Get file size."""
+        return self._file_io.get_file_size(Path(path))
+
+    def new_input_stream(self, path: Path):
+        """Create new input stream for reading."""
+        return self._file_io.new_input_stream(path)
+
+
+class BlobTest(unittest.TestCase):
+    """Tests for Blob interface following org.apache.paimon.data.BlobTest."""
+
+    def setUp(self):
+        """Set up test environment with temporary file."""
+        # Create a temporary directory and file
+        self.temp_dir = tempfile.mkdtemp()
+        self.file = os.path.join(self.temp_dir, "test.txt")
+
+        # Write test data to the file
+        with open(self.file, 'wb') as f:
+            f.write(b"test data")
+
+    def tearDown(self):
+        """Clean up temporary files."""
+        try:
+            if os.path.exists(self.file):
+                os.remove(self.file)
+            os.rmdir(self.temp_dir)
+        except OSError:
+            pass  # Ignore cleanup errors
+
+    def test_from_data(self):
+        """Test Blob.from_data() method."""
+        test_data = b"test data"
+        blob = Blob.from_data(test_data)
+
+        # Verify it returns a BlobData instance
+        self.assertIsInstance(blob, BlobData)
+
+        # Verify the data matches
+        self.assertEqual(blob.to_data(), test_data)
+
+    def test_from_local(self):
+        """Test Blob.from_local() method."""
+        blob = Blob.from_local(self.file)
+
+        # Verify it returns a BlobRef instance
+        self.assertIsInstance(blob, BlobRef)
+
+        # Verify the data matches
+        self.assertEqual(blob.to_data(), b"test data")
+
+    def test_from_file_with_offset_and_length(self):
+        """Test Blob.from_file() method with offset and length."""
+        blob = Blob.from_file(self.file, 0, 4)
+
+        # Verify it returns a BlobRef instance
+        self.assertIsInstance(blob, BlobRef)
+
+        # Verify the data matches (first 4 bytes: "test")
+        self.assertEqual(blob.to_data(), b"test")
+
+    def test_from_file_full(self):
+        """Test Blob.from_file() method without offset and length."""
+        blob = Blob.from_file(self.file)
+
+        # Verify it returns a BlobRef instance
+        self.assertIsInstance(blob, BlobRef)
+
+        # Verify the data matches
+        self.assertEqual(blob.to_data(), b"test data")
+
+    def test_from_http(self):
+        """Test Blob.from_http() method."""
+        uri = "http://example.com/file.txt";
+        blob = Blob.from_http(uri)
+
+        # Verify it returns a BlobRef instance
+        self.assertIsInstance(blob, BlobRef)
+
+        # Verify the descriptor has the correct URI
+        descriptor = blob.to_descriptor()
+        self.assertEqual(descriptor.uri, uri)
+        self.assertEqual(descriptor.offset, 0)
+        self.assertEqual(descriptor.length, -1)
+
+    def test_blob_data_interface_compliance(self):
+        """Test that BlobData properly implements Blob interface."""
+        test_data = b"interface test data"
+        blob_data = BlobData(test_data)
+
+        # Test that it's a Blob
+        self.assertIsInstance(blob_data, Blob)
+
+        # Test interface methods
+        self.assertEqual(blob_data.to_data(), test_data)
+
+        # Test to_descriptor raises RuntimeError
+        with self.assertRaises(RuntimeError) as context:
+            blob_data.to_descriptor()
+        self.assertIn("Blob data can not convert to descriptor", 
str(context.exception))
+
+        # Test new_input_stream
+        stream = blob_data.new_input_stream()
+        self.assertEqual(stream.read(), test_data)
+        stream.close()
+
+    def test_blob_ref_interface_compliance(self):
+        """Test that BlobRef properly implements Blob interface."""
+        blob_ref = Blob.from_local(self.file)
+
+        # Test that it's a Blob
+        self.assertIsInstance(blob_ref, Blob)
+
+        # Test interface methods
+        self.assertEqual(blob_ref.to_data(), b"test data")
+
+        # Test to_descriptor returns valid descriptor
+        descriptor = blob_ref.to_descriptor()
+        self.assertEqual(descriptor.uri, self.file)
+        self.assertEqual(descriptor.offset, 0)
+        self.assertEqual(descriptor.length, -1)
+
+        # Test new_input_stream
+        stream = blob_ref.new_input_stream()
+        self.assertEqual(stream.read(), b"test data")
+        stream.close()
+
+    def test_blob_equality_and_hashing(self):
+        """Test blob equality and hashing behavior."""
+        # Test BlobData equality
+        data1 = BlobData(b"same data")
+        data2 = BlobData(b"same data")
+        data3 = BlobData(b"different data")
+
+        self.assertEqual(data1, data2)
+        self.assertNotEqual(data1, data3)
+        self.assertEqual(hash(data1), hash(data2))
+
+        # Test BlobRef equality
+        ref1 = Blob.from_local(self.file)
+        ref2 = Blob.from_local(self.file)
+
+        self.assertEqual(ref1, ref2)
+        self.assertEqual(hash(ref1), hash(ref2))
+
+    def test_blob_factory_methods_return_correct_types(self):
+        """Test that all factory methods return the expected types."""
+        # from_data should return BlobData
+        blob_data = Blob.from_data(b"test")
+        self.assertIsInstance(blob_data, BlobData)
+        self.assertIsInstance(blob_data, Blob)
+
+        # from_local should return BlobRef
+        blob_ref = Blob.from_local(self.file)
+        self.assertIsInstance(blob_ref, BlobRef)
+        self.assertIsInstance(blob_ref, Blob)
+
+        # from_file should return BlobRef
+        blob_file = Blob.from_file(self.file)
+        self.assertIsInstance(blob_file, BlobRef)
+        self.assertIsInstance(blob_file, Blob)
+
+        # from_http should return BlobRef
+        blob_http = Blob.from_http("http://example.com/test.bin";)
+        self.assertIsInstance(blob_http, BlobRef)
+        self.assertIsInstance(blob_http, Blob)
+
+    def test_blob_data_convenience_methods(self):
+        # Test from_bytes class method
+        blob2 = BlobData.from_bytes(b"from bytes")
+        self.assertEqual(blob2.to_data(), b"from bytes")
+
+    def test_generic_row_deserializer_parse_blob(self):
+        """Test GenericRowDeserializer._parse_blob method."""
+        # Create test data with BLOB field
+        test_blob_data = b"Test BLOB data for parsing"
+        blob_data = BlobData(test_blob_data)
+
+        # Create fields with BLOB type
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "blob_field", AtomicType("BLOB")),
+        ]
+
+        # Create and serialize a row with blob data
+        original_row = GenericRow([42, blob_data], fields, RowKind.INSERT)
+        serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+        # Test the full deserialization process (which uses _parse_blob 
internally)
+        deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, 
fields)
+
+        # Verify the deserialized blob
+        deserialized_blob = deserialized_row.values[1]
+        self.assertIsInstance(deserialized_blob, BlobData)
+        self.assertEqual(deserialized_blob.to_data(), test_blob_data)
+
+        # Test with empty blob data
+        empty_blob = BlobData(b"")
+        empty_row = GenericRow([1, empty_blob], fields, RowKind.INSERT)
+        empty_serialized = GenericRowSerializer.to_bytes(empty_row)
+        empty_deserialized = 
GenericRowDeserializer.from_bytes(empty_serialized, fields)
+
+        empty_deserialized_blob = empty_deserialized.values[1]
+        self.assertIsInstance(empty_deserialized_blob, BlobData)
+        self.assertEqual(empty_deserialized_blob.to_data(), b"")
+
+        # Test with binary data containing null bytes
+        binary_blob_data = b"\x00\x01\x02\x03\xff\xfe\xfd"
+        binary_blob = BlobData(binary_blob_data)
+        binary_row = GenericRow([99, binary_blob], fields, RowKind.INSERT)
+        binary_serialized = GenericRowSerializer.to_bytes(binary_row)
+        binary_deserialized = 
GenericRowDeserializer.from_bytes(binary_serialized, fields)
+
+        binary_deserialized_blob = binary_deserialized.values[1]
+        self.assertIsInstance(binary_deserialized_blob, BlobData)
+        self.assertEqual(binary_deserialized_blob.to_data(), binary_blob_data)
+
+    def test_generic_row_deserializer_parse_blob_with_multiple_fields(self):
+        """Test _parse_blob with multiple BLOB fields in a row."""
+        # Create test data with multiple BLOB fields
+        blob1_data = b"First BLOB data"
+        blob2_data = b"Second BLOB with different content"
+        blob3_data = b""  # Empty blob
+
+        blob1 = BlobData(blob1_data)
+        blob2 = BlobData(blob2_data)
+        blob3 = BlobData(blob3_data)
+
+        # Create fields with multiple BLOB types
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "name", AtomicType("STRING")),
+            DataField(2, "blob1", AtomicType("BLOB")),
+            DataField(3, "blob2", AtomicType("BLOB")),
+            DataField(4, "blob3", AtomicType("BLOB")),
+        ]
+
+        # Create and serialize a row with multiple blobs
+        original_row = GenericRow([123, "test_row", blob1, blob2, blob3], 
fields, RowKind.INSERT)
+        serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+        # Deserialize and verify all blobs
+        deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, 
fields)
+
+        # Verify each blob field
+        self.assertEqual(deserialized_row.values[0], 123)
+        self.assertEqual(deserialized_row.values[1], "test_row")
+
+        deserialized_blob1 = deserialized_row.values[2]
+        self.assertIsInstance(deserialized_blob1, BlobData)
+        self.assertEqual(deserialized_blob1.to_data(), blob1_data)
+
+        deserialized_blob2 = deserialized_row.values[3]
+        self.assertIsInstance(deserialized_blob2, BlobData)
+        self.assertEqual(deserialized_blob2.to_data(), blob2_data)
+
+        deserialized_blob3 = deserialized_row.values[4]
+        self.assertIsInstance(deserialized_blob3, BlobData)
+        self.assertEqual(deserialized_blob3.to_data(), blob3_data)
+
+    def test_generic_row_deserializer_parse_blob_with_null_values(self):
+        """Test _parse_blob with null BLOB values."""
+        # Create fields with BLOB type
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "blob_field", AtomicType("BLOB")),
+            DataField(2, "name", AtomicType("STRING")),
+        ]
+
+        # Create row with null blob (None value)
+        original_row = GenericRow([456, None, "test_with_null"], fields, 
RowKind.INSERT)
+        serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+        # Deserialize and verify null blob is handled correctly
+        deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, 
fields)
+
+        self.assertEqual(deserialized_row.values[0], 456)
+        self.assertIsNone(deserialized_row.values[1])  # Null blob should 
remain None
+        self.assertEqual(deserialized_row.values[2], "test_with_null")
+
+    def test_generic_row_deserializer_parse_blob_large_data(self):
+        """Test _parse_blob with large BLOB data."""
+        # Create large blob data (1MB)
+        large_blob_data = b"X" * (1024 * 1024)  # 1MB of 'X' characters
+        large_blob = BlobData(large_blob_data)
+
+        fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "large_blob", AtomicType("BLOB")),
+        ]
+
+        # Create and serialize row with large blob
+        original_row = GenericRow([789, large_blob], fields, RowKind.INSERT)
+        serialized_bytes = GenericRowSerializer.to_bytes(original_row)
+
+        # Deserialize and verify large blob
+        deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, 
fields)
+
+        deserialized_large_blob = deserialized_row.values[1]
+        self.assertIsInstance(deserialized_large_blob, BlobData)
+        self.assertEqual(len(deserialized_large_blob.to_data()), 1024 * 1024)
+        self.assertEqual(deserialized_large_blob.to_data(), large_blob_data)
+
+    def test_blob_descriptor_creation(self):
+        """Test BlobDescriptor creation and properties."""
+        # Test basic creation
+        descriptor = BlobDescriptor("test://example.uri", 100, 200)
+
+        self.assertEqual(descriptor.uri, "test://example.uri")
+        self.assertEqual(descriptor.offset, 100)
+        self.assertEqual(descriptor.length, 200)
+        self.assertEqual(descriptor.version, BlobDescriptor.CURRENT_VERSION)
+
+    def test_blob_descriptor_creation_with_version(self):
+        """Test BlobDescriptor creation with explicit version."""
+        descriptor = BlobDescriptor("test://example.uri", 50, 150, version=2)
+
+        self.assertEqual(descriptor.uri, "test://example.uri")
+        self.assertEqual(descriptor.offset, 50)
+        self.assertEqual(descriptor.length, 150)
+        self.assertEqual(descriptor.version, 2)
+
+    def test_blob_descriptor_serialization_deserialization(self):
+        """Test BlobDescriptor serialization and deserialization."""
+        # Test with various URIs and parameters
+        test_cases = [
+            ("file:///path/to/file.bin", 0, -1),
+            ("https://example.com/data.blob";, 1024, 2048),
+            ("s3://bucket/key", 0, 1000000),
+            ("test://simple", 42, 84),
+        ]
+
+        for uri, offset, length in test_cases:
+            with self.subTest(uri=uri, offset=offset, length=length):
+                # Create original descriptor
+                original = BlobDescriptor(uri, offset, length)
+
+                # Serialize
+                serialized = original.serialize()
+                self.assertIsInstance(serialized, bytes)
+                self.assertGreater(len(serialized), 0)
+
+                # Deserialize
+                deserialized = BlobDescriptor.deserialize(serialized)
+
+                # Verify equality
+                self.assertEqual(deserialized, original)
+                self.assertEqual(deserialized.uri, uri)
+                self.assertEqual(deserialized.offset, offset)
+                self.assertEqual(deserialized.length, length)
+                self.assertEqual(deserialized.version, 
BlobDescriptor.CURRENT_VERSION)
+
+    def test_blob_descriptor_serialization_with_unicode(self):
+        """Test BlobDescriptor serialization with Unicode characters."""
+        # Test with Unicode characters in URI
+        unicode_uri = "file:///测试/文件.bin"
+        descriptor = BlobDescriptor(unicode_uri, 0, 100)
+
+        # Serialize and deserialize
+        serialized = descriptor.serialize()
+        deserialized = BlobDescriptor.deserialize(serialized)
+
+        # Verify Unicode is preserved
+        self.assertEqual(deserialized.uri, unicode_uri)
+        self.assertEqual(deserialized, descriptor)
+
+    def test_blob_descriptor_deserialization_invalid_data(self):
+        """Test BlobDescriptor deserialization with invalid data."""
+        # Test with too short data
+        with self.assertRaises(ValueError) as context:
+            BlobDescriptor.deserialize(b"sho")  # Only 3 bytes, need at least 5
+        self.assertIn("too short", str(context.exception))
+
+        # Test with invalid version (version 0)
+        # Create valid data but with wrong version
+        valid_descriptor = BlobDescriptor("test://uri", 0, 100)
+        valid_data = bytearray(valid_descriptor.serialize())
+        valid_data[0] = 0  # Set invalid version (0)
+
+        with self.assertRaises(ValueError) as context:
+            BlobDescriptor.deserialize(bytes(valid_data))
+        self.assertIn("Unsupported BlobDescriptor version", 
str(context.exception))
+
+        # Test with incomplete data (missing URI bytes)
+        incomplete_data = b'\x01\x00\x00\x00\x10'  # Version 1, URI length 16, 
but no URI bytes
+        with self.assertRaises(ValueError) as context:
+            BlobDescriptor.deserialize(incomplete_data)
+        self.assertIn("URI length exceeds data size", str(context.exception))
+
+    def test_blob_descriptor_equality_and_hashing(self):
+        """Test BlobDescriptor equality and hashing."""
+        # Create identical descriptors
+        desc1 = BlobDescriptor("test://uri", 100, 200)
+        desc2 = BlobDescriptor("test://uri", 100, 200)
+        desc3 = BlobDescriptor("test://uri", 100, 201)  # Different length
+        desc4 = BlobDescriptor("test://other", 100, 200)  # Different URI
+
+        # Test equality
+        self.assertEqual(desc1, desc2)
+        self.assertNotEqual(desc1, desc3)
+        self.assertNotEqual(desc1, desc4)
+        self.assertNotEqual(desc1, None)
+        self.assertNotEqual(desc1, "not a descriptor")
+
+        # Test hashing
+        self.assertEqual(hash(desc1), hash(desc2))
+        # Hash should be different for different descriptors (though not 
guaranteed)
+        self.assertNotEqual(hash(desc1), hash(desc3))
+        self.assertNotEqual(hash(desc1), hash(desc4))
+
+    def test_blob_descriptor_string_representation(self):
+        """Test BlobDescriptor string representation."""
+        descriptor = BlobDescriptor("test://example.uri", 42, 84)
+
+        str_repr = str(descriptor)
+        self.assertIn("test://example.uri", str_repr)
+        self.assertIn("42", str_repr)
+        self.assertIn("84", str_repr)
+        self.assertIn("BlobDescriptor", str_repr)
+
+        # __repr__ should be the same as __str__
+        self.assertEqual(str_repr, repr(descriptor))
+
+    def test_blob_descriptor_version_handling(self):
+        """Test BlobDescriptor version handling."""
+        # Test current version
+        descriptor = BlobDescriptor("test://uri", 0, 100)
+        self.assertEqual(descriptor.version, BlobDescriptor.CURRENT_VERSION)
+
+        # Test explicit version
+        descriptor_v2 = BlobDescriptor("test://uri", 0, 100, version=2)
+        self.assertEqual(descriptor_v2.version, 2)
+
+        # Serialize and deserialize should preserve version
+        serialized = descriptor_v2.serialize()
+        deserialized = BlobDescriptor.deserialize(serialized)
+        self.assertEqual(deserialized.version, 2)
+
+    def test_blob_descriptor_edge_cases(self):
+        """Test BlobDescriptor with edge cases."""
+        # Test with empty URI
+        empty_uri_desc = BlobDescriptor("", 0, 0)
+        serialized = empty_uri_desc.serialize()
+        deserialized = BlobDescriptor.deserialize(serialized)
+        self.assertEqual(deserialized.uri, "")
+
+        # Test with very long URI
+        long_uri = "file://" + "a" * 1000 + "/file.bin"
+        long_uri_desc = BlobDescriptor(long_uri, 0, 1000000)
+        serialized = long_uri_desc.serialize()
+        deserialized = BlobDescriptor.deserialize(serialized)
+        self.assertEqual(deserialized.uri, long_uri)
+
+        # Test with negative values
+        negative_desc = BlobDescriptor("test://uri", -1, -1)
+        serialized = negative_desc.serialize()
+        deserialized = BlobDescriptor.deserialize(serialized)
+        self.assertEqual(deserialized.offset, -1)
+        self.assertEqual(deserialized.length, -1)
+
+    def test_blob_descriptor_with_blob_ref(self):
+        """Test BlobDescriptor integration with BlobRef."""
+        # Create a descriptor
+        descriptor = BlobDescriptor(self.file, 0, -1)
+
+        # Create BlobRef from descriptor
+        blob_ref = BlobRef(descriptor)
+
+        # Verify descriptor is preserved
+        returned_descriptor = blob_ref.to_descriptor()
+        self.assertEqual(returned_descriptor, descriptor)
+
+        # Verify data can be read through BlobRef
+        data = blob_ref.to_data()
+        self.assertEqual(data, b"test data")
+
+    def test_blob_descriptor_serialization_format(self):
+        """Test BlobDescriptor serialization format details."""
+        descriptor = BlobDescriptor("test", 12345, 67890)
+        serialized = descriptor.serialize()
+
+        # Check that serialized data starts with version byte
+        self.assertEqual(serialized[0], BlobDescriptor.CURRENT_VERSION)
+
+        # Check minimum length (version + uri_length + uri + offset + length)
+        # 1 + 4 + len("test") + 8 + 8 = 25 bytes
+        self.assertEqual(len(serialized), 25)
+
+        # Verify round-trip consistency
+        deserialized = BlobDescriptor.deserialize(serialized)
+        re_serialized = deserialized.serialize()
+        self.assertEqual(serialized, re_serialized)
+
+
+class BlobEndToEndTest(unittest.TestCase):
+    """End-to-end tests for blob functionality with schema definition, file 
writing, and reading."""
+
+    def setUp(self):
+        """Set up test environment."""
+        self.temp_dir = tempfile.mkdtemp()
+        self.warehouse = os.path.join(self.temp_dir, 'warehouse')
+        # Create catalog for table operations
+        self.catalog = CatalogFactory.create({
+            'warehouse': self.warehouse
+        })
+        self.catalog.create_database('test_db', False)
+
+    def tearDown(self):
+        """Clean up test environment."""
+        try:
+            shutil.rmtree(self.temp_dir)
+        except OSError:
+            pass
+
+    def test_blob_end_to_end(self):
+        from pypaimon.schema.data_types import DataField, AtomicType
+        from pypaimon.table.row.blob import BlobData
+        from pypaimon.common.file_io import FileIO
+        from pathlib import Path
+
+        # Set up file I/O
+        file_io = FileIO(self.temp_dir, {})
+
+        blob_field_name = "blob_field"
+        # ========== Step 1: Check Type Validation ==========
+        blob_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))]
+        for blob_field in blob_fields:
+            self.assertIsInstance(blob_field.type, AtomicType)
+            self.assertEqual(blob_field.type.type, "BLOB")
+
+        # ========== Step 2: Write Data ==========
+        test_data = {blob_field_name: BlobData(b'End-to-end test: PDF header 
%PDF-1.4\n...')}
+        blob_files = {}
+        blob_data = [test_data[blob_field_name].to_data()]
+        schema = pa.schema([pa.field(blob_field_name, pa.large_binary())])
+        table = pa.table([blob_data], schema=schema)
+        blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name + 
".blob")
+        file_io.write_blob(blob_files[blob_field_name], table)
+        self.assertTrue(file_io.exists(blob_files[blob_field_name]))
+
+        # ========== Step 3: Read Data and Check Data ==========
+        for field_name, file_path in blob_files.items():
+            read_fields = blob_fields
+            reader = FormatBlobReader(
+                file_io=file_io,
+                file_path=str(file_path),
+                read_fields=[field_name],
+                full_fields=read_fields,
+                push_down_predicate=None
+            )
+
+            # Read data
+            batch = reader.read_arrow_batch()
+            self.assertIsNotNone(batch, f"{field_name} batch should not be 
None")
+            self.assertEqual(batch.num_rows, 1, f"{field_name} should have 1 
row")
+
+            # Verify data integrity
+            read_blob_data = batch.column(0)[0].as_py()
+            expected_blob_data = test_data[field_name].to_data()
+            self.assertEqual(read_blob_data, expected_blob_data, 
f"{field_name} data should match")
+
+            reader.close()
+
+    def test_blob_complex_types_throw_exception(self):
+        """Test that complex types containing BLOB elements throw exceptions 
during read/write operations."""
+        from pypaimon.schema.data_types import DataField, AtomicType, 
ArrayType, MultisetType, MapType
+        from pypaimon.table.row.blob import BlobData
+        from pypaimon.common.file_io import FileIO
+        from pypaimon.table.row.generic_row import GenericRow, 
GenericRowSerializer
+        from pypaimon.table.row.row_kind import RowKind
+        from pathlib import Path
+
+        # Set up file I/O
+        file_io = FileIO(self.temp_dir, {})
+
+        # ========== Test ArrayType(nullable=True, 
element_type=AtomicType("BLOB")) ==========
+        array_fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "blob_array", ArrayType(nullable=True, 
element_type=AtomicType("BLOB"))),
+        ]
+
+        # Test serialization throws exception for ArrayType<BLOB>
+        array_blob_data = [
+            BlobData(b"Array blob 1"),
+            BlobData(b"Array blob 2"),
+            BlobData(b"Array blob 3")
+        ]
+
+        array_row = GenericRow([1, array_blob_data], array_fields, 
RowKind.INSERT)
+
+        # GenericRowSerializer should throw exception for complex types
+        with self.assertRaises(ValueError) as context:
+            GenericRowSerializer.to_bytes(array_row)
+        self.assertIn("AtomicType", str(context.exception))
+
+        # Note: FileIO.write_blob validation for complex types is tested 
separately below
+
+        # ========== Test MultisetType(nullable=True, 
element_type=AtomicType("BLOB")) ==========
+        multiset_fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "blob_multiset", MultisetType(nullable=True, 
element_type=AtomicType("BLOB"))),
+        ]
+
+        # Test serialization throws exception for MultisetType<BLOB>
+        multiset_blob_data = [
+            BlobData(b"Multiset blob 1"),
+            BlobData(b"Multiset blob 2"),
+            BlobData(b"Multiset blob 1"),  # Duplicate allowed in multiset
+        ]
+
+        multiset_row = GenericRow([2, multiset_blob_data], multiset_fields, 
RowKind.INSERT)
+
+        # GenericRowSerializer should throw exception for complex types
+        with self.assertRaises(ValueError) as context:
+            GenericRowSerializer.to_bytes(multiset_row)
+        self.assertIn("AtomicType", str(context.exception))
+        map_fields = [
+            DataField(0, "id", AtomicType("INT")),
+            DataField(1, "blob_map", MapType(
+                nullable=True, key_type=AtomicType("STRING"), 
value_type=AtomicType("BLOB")
+            )),
+        ]
+
+        # Test serialization throws exception for MapType<STRING, BLOB>
+        map_blob_data = {
+            "document": BlobData(b"Document content"),
+            "image": BlobData(b"Image data"),
+            "metadata": BlobData(b"Metadata content")
+        }
+
+        map_row = GenericRow([3, map_blob_data], map_fields, RowKind.INSERT)
+
+        # GenericRowSerializer should throw exception for complex types
+        with self.assertRaises(ValueError) as context:
+            GenericRowSerializer.to_bytes(map_row)
+        self.assertIn("AtomicType", str(context.exception))
+
+        # ========== Test FileIO.write_blob validation for complex types 
==========
+        # Test that FileIO.write_blob properly validates and rejects complex 
types
+
+        # Create a table with multiple columns (should fail - blob format 
requires single column)
+        multi_column_schema = pa.schema([
+            pa.field("blob1", pa.large_binary()),
+            pa.field("blob2", pa.large_binary())
+        ])
+        multi_column_table = pa.table([
+            [b"blob1_data"],
+            [b"blob2_data"]
+        ], schema=multi_column_schema)
+
+        multi_column_file = Path(self.temp_dir) / "multi_column.blob"
+
+        # Should throw RuntimeError for multiple columns
+        with self.assertRaises(RuntimeError) as context:
+            file_io.write_blob(multi_column_file, multi_column_table)
+        self.assertIn("single column", str(context.exception))
+
+        # Test that FileIO.write_blob rejects null values
+        null_schema = pa.schema([pa.field("blob_with_nulls", 
pa.large_binary())])
+        null_table = pa.table([[b"data", None]], schema=null_schema)
+
+        null_file = Path(self.temp_dir) / "null_data.blob"
+
+        # Should throw RuntimeError for null values
+        with self.assertRaises(RuntimeError) as context:
+            file_io.write_blob(null_file, null_table)
+        self.assertIn("null values", str(context.exception))
+
+        # ========== Test FormatBlobReader with complex type schema ==========
+        # Create a valid blob file first
+        valid_blob_data = [b"Valid blob content"]
+        valid_schema = pa.schema([pa.field("valid_blob", pa.large_binary())])
+        valid_table = pa.table([valid_blob_data], schema=valid_schema)
+
+        valid_blob_file = Path(self.temp_dir) / "valid_blob.blob"
+        file_io.write_blob(valid_blob_file, valid_table)
+
+        # Try to read with complex type field definition - this should fail
+        # because FormatBlobReader tries to create PyArrow schema with complex 
types
+        complex_read_fields = [
+            DataField(0, "valid_blob", ArrayType(nullable=True, 
element_type=AtomicType("BLOB")))
+        ]
+
+        # FormatBlobReader creation should work, but reading should fail due 
to schema mismatch
+        reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(valid_blob_file),
+            read_fields=["valid_blob"],
+            full_fields=complex_read_fields,
+            push_down_predicate=None
+        )
+
+        # Reading should fail because the schema expects complex type but data 
is atomic
+        with self.assertRaises(Exception) as context:
+            reader.read_arrow_batch()
+        # The error could be ArrowTypeError or other PyArrow-related errors
+        self.assertTrue(
+            "ArrowTypeError" in str(type(context.exception)) or
+            "TypeError" in str(type(context.exception)) or
+            "ValueError" in str(type(context.exception))
+        )
+
+        reader.close()
+
+    def test_blob_advanced_scenarios(self):
+        """Test advanced blob scenarios: corruption, truncation, zero-length, 
large blobs, compression, cross-format."""
+        from pypaimon.schema.data_types import DataField, AtomicType
+        from pypaimon.common.file_io import FileIO
+        from pypaimon.common.delta_varint_compressor import 
DeltaVarintCompressor
+        from pathlib import Path
+
+        # Set up file I/O
+        file_io = FileIO(self.temp_dir, {})
+
+        # ========== Test 1: Corrupted file header test ==========
+
+        # Create a valid blob file first
+        valid_blob_data = [b"Test blob content for corruption test"]
+        valid_schema = pa.schema([pa.field("test_blob", pa.large_binary())])
+        valid_table = pa.table([valid_blob_data], schema=valid_schema)
+
+        header_test_file = Path(self.temp_dir) / "header_test.blob"
+        file_io.write_blob(header_test_file, valid_table)
+
+        # Read the file and corrupt the header (last 5 bytes: index_length + 
version)
+        with open(header_test_file, 'rb') as f:
+            original_data = f.read()
+
+        # Corrupt the version byte (last byte)
+        corrupted_data = bytearray(original_data)
+        corrupted_data[-1] = 99  # Invalid version (should be 1)
+
+        corrupted_header_file = Path(self.temp_dir) / "corrupted_header.blob"
+        with open(corrupted_header_file, 'wb') as f:
+            f.write(corrupted_data)
+
+        # Try to read corrupted file - should detect invalid version
+        fields = [DataField(0, "test_blob", AtomicType("BLOB"))]
+
+        # Reading should fail due to invalid version
+        with self.assertRaises(IOError) as context:
+            FormatBlobReader(
+                file_io=file_io,
+                file_path=str(corrupted_header_file),
+                read_fields=["test_blob"],
+                full_fields=fields,
+                push_down_predicate=None
+            )
+        self.assertIn("Unsupported blob file version", str(context.exception))
+
+        # ========== Test 2: Truncated blob file (mid-blob) read ==========
+
+        # Create a blob file with substantial content
+        large_content = b"Large blob content: " + b"X" * 1000 + b" End of 
content"
+        large_blob_data = [large_content]
+        large_schema = pa.schema([pa.field("large_blob", pa.large_binary())])
+        large_table = pa.table([large_blob_data], schema=large_schema)
+
+        full_blob_file = Path(self.temp_dir) / "full_blob.blob"
+        file_io.write_blob(full_blob_file, large_table)
+
+        # Read the full file and truncate it in the middle
+        with open(full_blob_file, 'rb') as f:
+            full_data = f.read()
+
+        # Truncate to about 50% of original size (mid-blob)
+        truncated_size = len(full_data) // 2
+        truncated_data = full_data[:truncated_size]
+
+        truncated_file = Path(self.temp_dir) / "truncated.blob"
+        with open(truncated_file, 'wb') as f:
+            f.write(truncated_data)
+
+        # Try to read truncated file - should fail gracefully
+        with self.assertRaises((IOError, OSError)) as context:
+            FormatBlobReader(
+                file_io=file_io,
+                file_path=str(truncated_file),
+                read_fields=["large_blob"],
+                full_fields=fields,
+                push_down_predicate=None
+            )
+        # Should detect truncation/incomplete data (either invalid header or 
invalid version)
+        self.assertTrue(
+            "cannot read header" in str(context.exception) or
+            "Unsupported blob file version" in str(context.exception)
+        )
+
+        # ========== Test 3: Zero-length blob handling ==========
+
+        # Create blob with zero-length content
+        zero_blob_data = [b""]  # Empty blob
+        zero_schema = pa.schema([pa.field("zero_blob", pa.large_binary())])
+        zero_table = pa.table([zero_blob_data], schema=zero_schema)
+
+        zero_blob_file = Path(self.temp_dir) / "zero_length.blob"
+        file_io.write_blob(zero_blob_file, zero_table)
+
+        # Verify file was created
+        self.assertTrue(file_io.exists(zero_blob_file))
+        file_size = file_io.get_file_size(zero_blob_file)
+        self.assertGreater(file_size, 0)  # File should have headers even with 
empty blob
+
+        # Read zero-length blob
+        zero_fields = [DataField(0, "zero_blob", AtomicType("BLOB"))]
+        zero_reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(zero_blob_file),
+            read_fields=["zero_blob"],
+            full_fields=zero_fields,
+            push_down_predicate=None
+        )
+
+        zero_batch = zero_reader.read_arrow_batch()
+        self.assertIsNotNone(zero_batch)
+        self.assertEqual(zero_batch.num_rows, 1)
+
+        # Verify empty blob content
+        read_zero_blob = zero_batch.column(0)[0].as_py()
+        self.assertEqual(read_zero_blob, b"")
+        self.assertEqual(len(read_zero_blob), 0)
+        zero_reader.close()
+
+        # ========== Test 4: Large blob (multi-GB range) simulation ==========
+        # Simulate large blob without actually creating multi-GB data
+        # Test chunked writing and memory-safe reading patterns
+
+        # Create moderately large blob (10MB) to test chunking behavior
+        chunk_size = 1024 * 1024  # 1MB chunks
+        large_blob_content = b"LARGE_BLOB_CHUNK:" + b"L" * (chunk_size - 17)  
# Fill to 1MB
+
+        # Simulate multiple chunks
+        simulated_large_data = [large_blob_content * 10]  # 10MB total
+        large_sim_schema = pa.schema([pa.field("large_sim_blob", 
pa.large_binary())])
+        large_sim_table = pa.table([simulated_large_data], 
schema=large_sim_schema)
+
+        large_sim_file = Path(self.temp_dir) / "large_simulation.blob"
+        file_io.write_blob(large_sim_file, large_sim_table)
+
+        # Verify large file was written
+        large_sim_size = file_io.get_file_size(large_sim_file)
+        self.assertGreater(large_sim_size, 10 * 1024 * 1024)  # Should be > 
10MB
+
+        # Read large blob in memory-safe manner
+        large_sim_fields = [DataField(0, "large_sim_blob", AtomicType("BLOB"))]
+        large_sim_reader = FormatBlobReader(
+            file_io=file_io,
+            file_path=str(large_sim_file),
+            read_fields=["large_sim_blob"],
+            full_fields=large_sim_fields,
+            push_down_predicate=None
+        )
+
+        large_sim_batch = large_sim_reader.read_arrow_batch()
+        self.assertIsNotNone(large_sim_batch)
+        self.assertEqual(large_sim_batch.num_rows, 1)
+
+        # Verify large blob content (check prefix to avoid loading all into 
memory for comparison)
+        read_large_blob = large_sim_batch.column(0)[0].as_py()
+        self.assertTrue(read_large_blob.startswith(b"LARGE_BLOB_CHUNK:"))
+        self.assertEqual(len(read_large_blob), len(large_blob_content) * 10)
+        large_sim_reader.close()
+
+        # ========== Test 5: Index compression/decompression validation 
==========
+        # Test DeltaVarintCompressor roundtrip
+        test_indices = [0, 100, 250, 1000, 5000, 10000, 50000]
+
+        # Compress indices
+        compressed_indices = DeltaVarintCompressor.compress(test_indices)
+        self.assertIsInstance(compressed_indices, bytes)
+        self.assertGreater(len(compressed_indices), 0)
+
+        # Decompress indices
+        decompressed_indices = 
DeltaVarintCompressor.decompress(compressed_indices)
+        self.assertEqual(decompressed_indices, test_indices)
+
+        # Test corruption detection in compressed indices
+        if len(compressed_indices) > 1:
+            # Corrupt the compressed data
+            corrupted_indices = bytearray(compressed_indices)
+            corrupted_indices[-1] = (corrupted_indices[-1] + 1) % 256  # Flip 
last byte
+
+            # Decompression should fail or produce different results
+            try:
+                corrupted_result = 
DeltaVarintCompressor.decompress(bytes(corrupted_indices))
+                # If decompression succeeds, result should be different
+                self.assertNotEqual(corrupted_result, test_indices)
+            except Exception:
+                pass
+
+        # ========== Test 6: Cross-format guard (multi-field tables) ==========
+        # Test that blob format rejects multi-field tables
+        multi_field_schema = pa.schema([
+            pa.field("blob_field", pa.large_binary()),
+            pa.field("string_field", pa.string()),
+            pa.field("int_field", pa.int64())
+        ])
+
+        multi_field_table = pa.table([
+            [b"blob_data_1", b"blob_data_2"],
+            ["string_1", "string_2"],
+            [100, 200]
+        ], schema=multi_field_schema)
+
+        multi_field_file = Path(self.temp_dir) / "multi_field.blob"
+
+        # Should reject multi-field table
+        with self.assertRaises(RuntimeError) as context:
+            file_io.write_blob(multi_field_file, multi_field_table)
+        self.assertIn("single column", str(context.exception))
+
+        # Test that blob format rejects non-binary field types
+        non_binary_schema = pa.schema([pa.field("string_field", pa.string())])
+        non_binary_table = pa.table([["not_binary_data"]], 
schema=non_binary_schema)
+
+        non_binary_file = Path(self.temp_dir) / "non_binary.blob"
+
+        # Should reject non-binary field
+        with self.assertRaises(RuntimeError) as context:
+            file_io.write_blob(non_binary_file, non_binary_table)
+        # Should fail due to type conversion issues (non-binary field can't be 
converted to BLOB)
+        self.assertTrue(
+            "large_binary" in str(context.exception) or
+            "to_paimon_type" in str(context.exception) or
+            "missing" in str(context.exception) or
+            "Field must be Blob/BlobData instance" in str(context.exception)
+        )
+
+        # Test that blob format rejects tables with null values
+        null_schema = pa.schema([pa.field("blob_with_null", 
pa.large_binary())])
+        null_table = pa.table([[b"data", None, b"more_data"]], 
schema=null_schema)
+
+        null_file = Path(self.temp_dir) / "with_nulls.blob"
+
+        # Should reject null values
+        with self.assertRaises(RuntimeError) as context:
+            file_io.write_blob(null_file, null_table)
+        self.assertIn("null values", str(context.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/delta_varint_compressor_test.py 
b/paimon-python/pypaimon/tests/delta_varint_compressor_test.py
new file mode 100644
index 0000000000..5a04becde8
--- /dev/null
+++ b/paimon-python/pypaimon/tests/delta_varint_compressor_test.py
@@ -0,0 +1,379 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import random
+import sys
+import unittest
+
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+
+
+class DeltaVarintCompressorTest(unittest.TestCase):
+    """Tests for DeltaVarintCompressor following 
org.apache.paimon.utils.DeltaVarintCompressorTest."""
+
+    def test_normal_case_1(self):
+        """Test case for normal compression and decompression."""
+        original = [80, 50, 90, 80, 70]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)  # Verify data integrity
+        self.assertEqual(6, len(compressed))  # Optimized size for small deltas
+
+    def test_normal_case_2(self):
+        """Test case for normal compression and decompression."""
+        original = [100, 50, 150, 100, 200]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)  # Verify data integrity
+        self.assertEqual(8, len(compressed))  # Optimized size for small deltas
+
+    def test_random(self):
+        """Test with random data to ensure robustness."""
+        # Run multiple iterations to test various random cases
+        for _ in range(100):  # Reduced from 10000 for reasonable test time
+            original = []
+            for i in range(100):
+                # Use a smaller range than Java's Long.MAX_VALUE for Python 
compatibility
+                original.append(random.randint(-sys.maxsize, sys.maxsize))
+            compressed = DeltaVarintCompressor.compress(original)
+            decompressed = DeltaVarintCompressor.decompress(compressed)
+
+            self.assertEqual(original, decompressed)  # Verify data integrity
+
+    def test_empty_array(self):
+        """Test case for empty array."""
+        original = []
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        self.assertEqual(0, len(compressed))
+
+    def test_single_element(self):
+        """Test case for single-element array."""
+        original = [42]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # Calculate expected size: Varint encoding for 42 (0x2A -> 1 byte)
+        self.assertEqual(1, len(compressed))
+
+    def test_extreme_values(self):
+        """Test case for extreme values (sys.maxsize and -sys.maxsize)."""
+        original = [-sys.maxsize, sys.maxsize]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # The compressed size will depend on the platform's sys.maxsize
+        # but should be reasonable for the delta encoding
+        self.assertGreater(len(compressed), 0)
+
+    def test_negative_deltas(self):
+        """Test case for negative deltas with ZigZag optimization."""
+        original = [100, -50, -150, -100]  # Negative values
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # Verify ZigZag optimization: -1 → 1 (1 byte)
+        # Delta sequence: [100, -150, -100, 50] → ZigZag →
+        # Each encoded in 1-2 bytes
+        self.assertLessEqual(len(compressed), 8)  # Optimized size
+
+    def test_unsorted_data(self):
+        """Test case for unsorted data (worse compression ratio)."""
+        original = [1000, 5, 9999, 12345, 6789]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # Larger deltas → more bytes (e.g., 9994 → 3 bytes)
+        self.assertGreater(len(compressed), 5)  # Worse than sorted case
+
+    def test_corrupted_input(self):
+        """Test case for corrupted input (invalid Varint)."""
+        # Invalid Varint (all continuation flags)
+        corrupted = bytes([0x80, 0x80, 0x80])
+        try:
+            result = DeltaVarintCompressor.decompress(corrupted)
+            # If it doesn't raise an exception, the result should be reasonable
+            self.assertIsInstance(result, list)
+        except (IndexError, ValueError, RuntimeError):
+            # It's acceptable to raise an exception for corrupted data
+            pass
+
+    def test_zero_values(self):
+        """Test case for arrays with zero values."""
+        original = [0, 0, 0, 0, 0]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # All deltas are 0, so should compress very well
+        self.assertLessEqual(len(compressed), 5)
+
+    def test_ascending_sequence(self):
+        """Test case for ascending sequence (optimal for delta compression)."""
+        original = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # All deltas are 1, so should compress very well
+        self.assertLessEqual(len(compressed), 15)  # Much smaller than original
+
+    def test_descending_sequence(self):
+        """Test case for descending sequence."""
+        original = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # All deltas are -1, should still compress well with ZigZag
+        self.assertLessEqual(len(compressed), 15)
+
+    def test_large_positive_values(self):
+        """Test case for large positive values."""
+        original = [1000000, 2000000, 3000000, 4000000]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # Large values but consistent deltas should still compress reasonably
+        self.assertGreater(len(compressed), 4)  # Will be larger due to big 
numbers
+
+    def test_mixed_positive_negative(self):
+        """Test case for mixed positive and negative values."""
+        original = [100, -200, 300, -400, 500]
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # Mixed signs create larger deltas
+        self.assertGreater(len(compressed), 5)
+
+    def test_compression_efficiency(self):
+        """Test that compression actually reduces size for suitable data."""
+        # Create a sequence with small deltas
+        original = []
+        base = 1000
+        for i in range(100):
+            base += random.randint(-10, 10)  # Small deltas
+            original.append(base)
+
+        compressed = DeltaVarintCompressor.compress(original)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+
+        self.assertEqual(original, decompressed)
+        # For small deltas, compression should be effective
+        # Original would need 8 bytes per int (800 bytes), compressed should 
be much smaller
+        self.assertLess(len(compressed), len(original) * 4)  # At least 50% 
compression
+
+    def test_round_trip_consistency(self):
+        """Test that multiple compress/decompress cycles are consistent."""
+        original = [1, 10, 100, 1000, 10000]
+        # First round trip
+        compressed1 = DeltaVarintCompressor.compress(original)
+        decompressed1 = DeltaVarintCompressor.decompress(compressed1)
+        # Second round trip
+        compressed2 = DeltaVarintCompressor.compress(decompressed1)
+        decompressed2 = DeltaVarintCompressor.decompress(compressed2)
+        # All should be identical
+        self.assertEqual(original, decompressed1)
+        self.assertEqual(original, decompressed2)
+        self.assertEqual(compressed1, compressed2)
+
+    def test_boundary_values(self):
+        """Test boundary values for varint encoding."""
+        # Test values around varint boundaries (127, 16383, etc.)
+        boundary_values = [
+            0, 1, 127, 128, 255, 256,
+            16383, 16384, 32767, 32768,
+            -1, -127, -128, -255, -256,
+            -16383, -16384, -32767, -32768
+        ]
+        compressed = DeltaVarintCompressor.compress(boundary_values)
+        decompressed = DeltaVarintCompressor.decompress(compressed)
+        self.assertEqual(boundary_values, decompressed)
+
+    def test_java_compatibility_zigzag_encoding(self):
+        """Test ZigZag encoding compatibility with Java implementation."""
+        # Test cases that verify ZigZag encoding matches Java's implementation
+        # ZigZag mapping: 0->0, -1->1, 1->2, -2->3, 2->4, -3->5, 3->6, etc.
+        zigzag_test_cases = [
+            (0, 0),      # 0 -> 0
+            (-1, 1),     # -1 -> 1
+            (1, 2),      # 1 -> 2
+            (-2, 3),     # -2 -> 3
+            (2, 4),      # 2 -> 4
+            (-3, 5),     # -3 -> 5
+            (3, 6),      # 3 -> 6
+            (-64, 127),  # -64 -> 127
+            (64, 128),   # 64 -> 128
+            (-65, 129),  # -65 -> 129
+        ]
+
+        for original_value, expected_zigzag in zigzag_test_cases:
+            # Test single value compression to verify ZigZag encoding
+            compressed = DeltaVarintCompressor.compress([original_value])
+            decompressed = DeltaVarintCompressor.decompress(compressed)
+
+            self.assertEqual([original_value], decompressed,
+                             f"ZigZag encoding failed for value 
{original_value}")
+
+    def test_java_compatibility_known_vectors(self):
+        """Test with known test vectors that should match Java 
implementation."""
+        # Test vectors with expected compressed output (hexadecimal)
+        test_vectors = [
+            # Simple cases
+            ([0], "00"),                    # 0 -> ZigZag(0) = 0 -> Varint(0) 
= 0x00
+            ([1], "02"),                    # 1 -> ZigZag(1) = 2 -> Varint(2) 
= 0x02
+            ([-1], "01"),                   # -1 -> ZigZag(-1) = 1 -> 
Varint(1) = 0x01
+            ([2], "04"),                    # 2 -> ZigZag(2) = 4 -> Varint(4) 
= 0x04
+            ([-2], "03"),                   # -2 -> ZigZag(-2) = 3 -> 
Varint(3) = 0x03
+
+            # Delta encoding cases
+            ([0, 1], "0002"),               # [0, 1] -> [0, delta=1] -> [0x00, 
0x02]
+            ([1, 2], "0202"),               # [1, 2] -> [1, delta=1] -> [0x02, 
0x02]
+            ([0, -1], "0001"),              # [0, -1] -> [0, delta=-1] -> 
[0x00, 0x01]
+            ([1, 0], "0201"),               # [1, 0] -> [1, delta=-1] -> 
[0x02, 0x01]
+
+            # Larger values
+            ([127], "fe01"),                # 127 -> ZigZag(127) = 254 -> 
Varint(254) = 0xfe01
+            ([-127], "fd01"),               # -127 -> ZigZag(-127) = 253 -> 
Varint(253) = 0xfd01
+            ([128], "8002"),                # 128 -> ZigZag(128) = 256 -> 
Varint(256) = 0x8002
+            ([-128], "ff01"),               # -128 -> ZigZag(-128) = 255 -> 
Varint(255) = 0xff01
+        ]
+
+        for original, expected_hex in test_vectors:
+            compressed = DeltaVarintCompressor.compress(original)
+            actual_hex = compressed.hex()
+
+            self.assertEqual(expected_hex, actual_hex,
+                             f"Binary compatibility failed for {original}. "
+                             f"Expected: {expected_hex}, Got: {actual_hex}")
+
+            # Also verify round-trip
+            decompressed = DeltaVarintCompressor.decompress(compressed)
+            self.assertEqual(original, decompressed,
+                             f"Round-trip failed for {original}")
+
+    def test_java_compatibility_large_numbers(self):
+        """Test compatibility with Java for large numbers (64-bit range)."""
+        # Test cases covering the full 64-bit signed integer range
+        large_number_cases = [
+            2147483647,          # Integer.MAX_VALUE
+            -2147483648,         # Integer.MIN_VALUE
+            9223372036854775807,  # Long.MAX_VALUE
+            -9223372036854775808 + 1,  # Long.MIN_VALUE + 1 (avoid overflow in 
Python)
+            4294967295,          # 2^32 - 1
+            -4294967296,         # -2^32
+        ]
+
+        for value in large_number_cases:
+            # Test individual values
+            compressed = DeltaVarintCompressor.compress([value])
+            decompressed = DeltaVarintCompressor.decompress(compressed)
+            self.assertEqual([value], decompressed,
+                             f"Large number compatibility failed for {value}")
+
+        # Test as a sequence to verify delta encoding with large numbers
+        compressed_seq = DeltaVarintCompressor.compress(large_number_cases)
+        decompressed_seq = DeltaVarintCompressor.decompress(compressed_seq)
+        self.assertEqual(large_number_cases, decompressed_seq,
+                         "Large number sequence compatibility failed")
+
+    def test_java_compatibility_varint_boundaries(self):
+        """Test Varint encoding boundaries that match Java implementation."""
+        # Test values at Varint encoding boundaries
+        varint_boundary_cases = [
+            # 1-byte Varint boundary
+            63,    # ZigZag(63) = 126, fits in 1 byte
+            64,    # ZigZag(64) = 128, needs 2 bytes
+            -64,   # ZigZag(-64) = 127, fits in 1 byte
+            -65,   # ZigZag(-65) = 129, needs 2 bytes
+
+            # 2-byte Varint boundary
+            8191,   # ZigZag(8191) = 16382, fits in 2 bytes
+            8192,   # ZigZag(8192) = 16384, needs 3 bytes
+            -8192,  # ZigZag(-8192) = 16383, fits in 2 bytes
+            -8193,  # ZigZag(-8193) = 16385, needs 3 bytes
+
+            # 3-byte Varint boundary
+            1048575,  # ZigZag(1048575) = 2097150, fits in 3 bytes
+            1048576,  # ZigZag(1048576) = 2097152, needs 4 bytes
+        ]
+
+        for value in varint_boundary_cases:
+            compressed = DeltaVarintCompressor.compress([value])
+            decompressed = DeltaVarintCompressor.decompress(compressed)
+            self.assertEqual([value], decompressed,
+                             f"Varint boundary compatibility failed for 
{value}")
+
+    def test_java_compatibility_delta_edge_cases(self):
+        """Test delta encoding edge cases for Java compatibility."""
+        # Edge cases that test delta encoding behavior
+        delta_edge_cases = [
+            # Maximum positive delta
+            [0, sys.maxsize],
+            # Maximum negative delta
+            [sys.maxsize, 0],
+            # Alternating large deltas
+            [0, 1000000, -1000000, 2000000, -2000000],
+            # Sequence with zero deltas
+            [42, 42, 42, 42],
+            # Mixed small and large deltas
+            [0, 1, 1000000, 1000001, 0],
+        ]
+
+        for case in delta_edge_cases:
+            compressed = DeltaVarintCompressor.compress(case)
+            decompressed = DeltaVarintCompressor.decompress(compressed)
+            self.assertEqual(case, decompressed,
+                             f"Delta edge case compatibility failed for 
{case}")
+
+    def test_java_compatibility_error_conditions(self):
+        """Test error conditions that should match Java behavior."""
+        # Test cases for error handling - our implementation gracefully handles
+        # truncated data by returning empty lists, which is acceptable behavior
+
+        # Test with various truncated/invalid byte sequences
+        invalid_cases = [
+            bytes([0x80]),           # Single incomplete byte
+            bytes([0x80, 0x80]),     # Incomplete 3-byte varint
+            bytes([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 
0x80]),  # Long sequence
+        ]
+
+        for invalid_data in invalid_cases:
+            # Our implementation handles invalid data gracefully by returning 
empty list
+            # This is acceptable behavior for robustness
+            result = DeltaVarintCompressor.decompress(invalid_data)
+            self.assertIsInstance(result, list,
+                                  f"Should return a list for invalid data: 
{invalid_data.hex()}")
+            # Empty result is acceptable for invalid/truncated data
+
+        # Test that valid empty input returns empty list
+        empty_result = DeltaVarintCompressor.decompress(b'')
+        self.assertEqual([], empty_result, "Empty input should return empty 
list")
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/blob_format_writer.py 
b/paimon-python/pypaimon/write/blob_format_writer.py
new file mode 100644
index 0000000000..07b29bbc4e
--- /dev/null
+++ b/paimon-python/pypaimon/write/blob_format_writer.py
@@ -0,0 +1,107 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import struct
+import zlib
+from typing import BinaryIO, List
+
+from pypaimon.table.row.blob import Blob, BlobData
+from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
+
+
+class BlobFormatWriter:
+    VERSION = 1
+    MAGIC_NUMBER = 1481511375
+    BUFFER_SIZE = 4096
+    METADATA_SIZE = 12  # 8-byte length + 4-byte CRC
+
+    def __init__(self, output_stream: BinaryIO):
+        self.output_stream = output_stream
+        self.lengths: List[int] = []
+        self.position = 0
+
+    def add_element(self, row) -> None:
+        if not hasattr(row, 'values') or len(row.values) != 1:
+            raise ValueError("BlobFormatWriter only supports one field")
+
+        blob_value = row.values[0]
+        if blob_value is None:
+            raise ValueError("BlobFormatWriter only supports non-null blob")
+
+        if not isinstance(blob_value, Blob):
+            raise ValueError("Field must be Blob/BlobData instance")
+
+        previous_pos = self.position
+        crc32 = 0  # Initialize CRC32
+
+        # Write magic number
+        magic_bytes = struct.pack('<I', self.MAGIC_NUMBER)  # Little endian
+        crc32 = self._write_with_crc(magic_bytes, crc32)
+
+        # Write blob data
+        if isinstance(blob_value, BlobData):
+            data = blob_value.to_data()
+            crc32 = self._write_with_crc(data, crc32)
+        else:
+            # Stream from BlobRef/Blob
+            stream = blob_value.new_input_stream()
+            try:
+                chunk = stream.read(self.BUFFER_SIZE)
+                while chunk:
+                    crc32 = self._write_with_crc(chunk, crc32)
+                    chunk = stream.read(self.BUFFER_SIZE)
+            finally:
+                stream.close()
+
+        # Calculate total length including magic + data + metadata (length + 
CRC)
+        bin_length = self.position - previous_pos + self.METADATA_SIZE
+        self.lengths.append(bin_length)
+
+        # Write length (8 bytes, little endian)
+        length_bytes = struct.pack('<Q', bin_length)
+        self.output_stream.write(length_bytes)
+        self.position += 8
+
+        # Write CRC32 (4 bytes, little endian)
+        crc_bytes = struct.pack('<I', crc32 & 0xffffffff)
+        self.output_stream.write(crc_bytes)
+        self.position += 4
+
+    def _write_with_crc(self, data: bytes, crc32: int) -> int:
+        crc32 = zlib.crc32(data, crc32)
+        self.output_stream.write(data)
+        self.position += len(data)
+        return crc32
+
+    def reach_target_size(self, suggested_check: bool, target_size: int) -> 
bool:
+        return self.position >= target_size
+
+    def close(self) -> None:
+        index_bytes = DeltaVarintCompressor.compress(self.lengths)
+        self.output_stream.write(index_bytes)
+
+        # Write header (index length + version)
+        header = struct.pack('<I', len(index_bytes))  # Index length (4 bytes, 
little endian)
+        header += struct.pack('<B', self.VERSION)  # Version (1 byte)
+        self.output_stream.write(header)
+
+        # Flush and close
+        if hasattr(self.output_stream, 'flush'):
+            self.output_stream.flush()
+        if hasattr(self.output_stream, 'close'):
+            self.output_stream.close()
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index e2f778b586..cc0fc944a1 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -111,6 +111,8 @@ class DataWriter(ABC):
             self.file_io.write_orc(file_path, data, 
compression=self.compression)
         elif self.file_format == CoreOptions.FILE_FORMAT_AVRO:
             self.file_io.write_avro(file_path, data)
+        elif self.file_format == CoreOptions.FILE_FORMAT_BLOB:
+            self.file_io.write_blob(file_path, data)
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 

Reply via email to