This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a214e7176e8b56f26d10be883bea635b1a0c10d4 Author: jerry <[email protected]> AuthorDate: Tue Oct 14 15:42:17 2025 +0800 [python] support blob type and blob write and read (#6390) --- paimon-python/pypaimon/common/core_options.py | 1 + .../pypaimon/common/delta_varint_compressor.py | 125 +++ paimon-python/pypaimon/common/file_io.py | 55 ++ .../pypaimon/read/reader/format_blob_reader.py | 199 +++++ paimon-python/pypaimon/read/split_read.py | 9 +- paimon-python/pypaimon/schema/data_types.py | 5 + paimon-python/pypaimon/table/row/blob.py | 247 ++++++ paimon-python/pypaimon/table/row/generic_row.py | 15 +- paimon-python/pypaimon/tests/blob_test.py | 983 +++++++++++++++++++++ .../pypaimon/tests/delta_varint_compressor_test.py | 379 ++++++++ paimon-python/pypaimon/write/blob_format_writer.py | 107 +++ paimon-python/pypaimon/write/writer/data_writer.py | 2 + 12 files changed, 2124 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index 1a060bd206..754bb17c05 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -38,6 +38,7 @@ class CoreOptions(str, Enum): FILE_FORMAT_ORC = "orc" FILE_FORMAT_AVRO = "avro" FILE_FORMAT_PARQUET = "parquet" + FILE_FORMAT_BLOB = "blob" FILE_COMPRESSION = "file.compression" FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level" FILE_FORMAT_PER_LEVEL = "file.format.per.level" diff --git a/paimon-python/pypaimon/common/delta_varint_compressor.py b/paimon-python/pypaimon/common/delta_varint_compressor.py new file mode 100644 index 0000000000..dc849886bf --- /dev/null +++ b/paimon-python/pypaimon/common/delta_varint_compressor.py @@ -0,0 +1,125 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from typing import List + + +class DeltaVarintCompressor: + + @staticmethod + def compress(data: List[int]) -> bytes: + if not data: + return b'' + + # Estimate output size (conservative: 5 bytes per varint max) + out = io.BytesIO() + out.seek(0) + + # Encode first value directly + DeltaVarintCompressor._encode_varint(data[0], out) + + # Encode deltas without intermediate list creation + prev = data[0] + for i in range(1, len(data)): + current = data[i] + delta = current - prev + DeltaVarintCompressor._encode_varint(delta, out) + prev = current + + # Return only the used portion of the buffer + position = out.tell() + result = out.getvalue() + out.close() + return result[:position] + + @staticmethod + def decompress(compressed: bytes) -> List[int]: + if not compressed: + return [] + + # Fast path: decode directly into result without intermediate deltas list + in_stream = io.BytesIO(compressed) + result = [] + + try: + # Decode first value + first_value = DeltaVarintCompressor._decode_varint(in_stream) + result.append(first_value) + + # Decode and reconstruct remaining values in one pass + current_value = first_value + while True: + try: + delta = DeltaVarintCompressor._decode_varint(in_stream) + current_value += delta + result.append(current_value) + except RuntimeError: + # End of stream reached + break + + except RuntimeError: + # Handle empty stream case + pass + finally: + in_stream.close() + + return result + + @staticmethod + def _encode_varint(value: int, out: io.BytesIO) -> None: + # ZigZag encoding: maps signed integers to unsigned integers + if value >= 0: + zigzag = value << 1 + else: + zigzag = ((-value) << 1) - 1 + + # Varint encoding + while zigzag >= 0x80: + out.write(bytes([(zigzag & 0x7F) | 0x80])) + zigzag >>= 7 + out.write(bytes([zigzag & 0x7F])) + + @staticmethod + def _decode_varint(in_stream: io.BytesIO) -> int: + result = 0 + shift = 0 + while True: + byte_data = in_stream.read(1) + if not byte_data: + if shift == 0: + # Natural end of stream + raise RuntimeError("End of stream") + else: + # Unexpected end in middle of varint + raise RuntimeError("Unexpected end of input") + + b = byte_data[0] + result |= (b & 0x7F) << shift + if (b & 0x80) == 0: + break + + shift += 7 + if shift > 63: + raise RuntimeError("Varint overflow") + + # ZigZag decoding: maps unsigned integers back to signed integers + if result & 1: + return -((result + 1) >> 1) + else: + return result >> 1 diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index 6f5dfc6a2a..f6371d2d80 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -28,6 +28,11 @@ from packaging.version import parse from pyarrow._fs import FileSystem from pypaimon.common.config import OssOptions, S3Options +from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser +from pypaimon.table.row.blob import BlobData +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.row_kind import RowKind +from pypaimon.write.blob_format_writer import BlobFormatWriter class FileIO: @@ -364,3 +369,53 @@ class FileIO: with self.new_output_stream(path) as output_stream: fastavro.writer(output_stream, avro_schema, records, **kwargs) + + def write_blob(self, path: Path, data: pyarrow.Table, **kwargs): + try: + # Validate input constraints + if data.num_columns != 1: + raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") + # Check for null values + column = data.column(0) + if column.null_count > 0: + raise RuntimeError("Blob format does not support null values") + # Convert PyArrow schema to Paimon DataFields + # For blob files, we expect exactly one blob column + field = data.schema[0] + if pyarrow.types.is_large_binary(field.type): + fields = [DataField(0, field.name, AtomicType("BLOB"))] + else: + # Convert other types as needed + paimon_type = PyarrowFieldParser.to_paimon_type(field.type, field.nullable) + fields = [DataField(0, field.name, paimon_type)] + # Convert PyArrow Table to records + records_dict = data.to_pydict() + num_rows = data.num_rows + field_name = fields[0].name + with self.new_output_stream(path) as output_stream: + writer = BlobFormatWriter(output_stream) + # Write each row + for i in range(num_rows): + col_data = records_dict[field_name][i] + # Convert to appropriate type based on field type + if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": + if isinstance(col_data, bytes): + blob_data = BlobData(col_data) + else: + # Convert to bytes if needed + if hasattr(col_data, 'as_py'): + col_data = col_data.as_py() + if isinstance(col_data, str): + col_data = col_data.encode('utf-8') + blob_data = BlobData(col_data) + row_values = [blob_data] + else: + row_values = [col_data] + # Create GenericRow and write + row = GenericRow(row_values, fields, RowKind.INSERT) + writer.add_element(row) + writer.close() + + except Exception as e: + self.delete_quietly(path) + raise RuntimeError(f"Failed to write blob file {path}: {e}") from e diff --git a/paimon-python/pypaimon/read/reader/format_blob_reader.py b/paimon-python/pypaimon/read/reader/format_blob_reader.py new file mode 100644 index 0000000000..709a0c27a3 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/format_blob_reader.py @@ -0,0 +1,199 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import struct +from pathlib import Path +from typing import List, Optional, Any, Iterator + +import pyarrow as pa +import pyarrow.dataset as ds +from pyarrow import RecordBatch + +from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor +from pypaimon.common.file_io import FileIO +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField, PyarrowFieldParser +from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef +from pypaimon.table.row.generic_row import GenericRow + + +class FormatBlobReader(RecordBatchReader): + + def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], + full_fields: List[DataField], push_down_predicate: Any): + self._file_io = file_io + self._file_path = file_path + self._push_down_predicate = push_down_predicate + + # Get file size + self._file_size = file_io.get_file_size(file_path) + + # Initialize the low-level blob format reader + self.file_path = file_path + self.blob_lengths: List[int] = [] + self.blob_offsets: List[int] = [] + self.returned = False + self._read_index() + + # Set up fields and schema + if len(read_fields) > 1: + raise RuntimeError("Blob reader only supports one field.") + self._fields = read_fields + full_fields_map = {field.name: field for field in full_fields} + projected_data_fields = [full_fields_map[name] for name in read_fields] + self._schema = PyarrowFieldParser.from_paimon_schema(projected_data_fields) + + # Initialize iterator + self._blob_iterator = None + self._current_batch = None + + def read_arrow_batch(self) -> Optional[RecordBatch]: + if self._blob_iterator is None: + if self.returned: + return None + self.returned = True + batch_iterator = BlobRecordIterator(self.file_path, self.blob_lengths, self.blob_offsets, self._fields[0]) + self._blob_iterator = iter(batch_iterator) + + # Collect records for this batch + pydict_data = {name: [] for name in self._fields} + records_in_batch = 0 + + try: + while True: + # Get next blob record + blob_row = next(self._blob_iterator) + # Check if first read returns None, stop immediately + if blob_row is None: + break + + # Extract blob data from the row + blob = blob_row.values[0] # Blob files have single blob field + + # Convert blob to appropriate format for each requested field + for field_name in self._fields: + # For blob files, all fields should contain blob data + if isinstance(blob, Blob): + blob_data = blob.to_data() + else: + blob_data = bytes(blob) if blob is not None else None + pydict_data[field_name].append(blob_data) + + records_in_batch += 1 + + except StopIteration: + # Stop immediately when StopIteration occurs + pass + + if records_in_batch == 0: + return None + + # Create RecordBatch + if self._push_down_predicate is None: + # Convert to Table first, then to RecordBatch + table = pa.Table.from_pydict(pydict_data, self._schema) + if table.num_rows > 0: + return table.to_batches()[0] + else: + return None + else: + # Apply predicate filtering + pa_batch = pa.Table.from_pydict(pydict_data, self._schema) + dataset = ds.InMemoryDataset(pa_batch) + scanner = dataset.scanner(filter=self._push_down_predicate) + combine_chunks = scanner.to_table().combine_chunks() + if combine_chunks.num_rows > 0: + return combine_chunks.to_batches()[0] + else: + return None + + def close(self): + self._blob_iterator = None + + def _read_index(self) -> None: + with self._file_io.new_input_stream(Path(self.file_path)) as f: + # Seek to header: last 5 bytes + f.seek(self._file_size - 5) + header = f.read(5) + + if len(header) != 5: + raise IOError("Invalid blob file: cannot read header") + + # Parse header + index_length = struct.unpack('<I', header[:4])[0] # Little endian + version = header[4] + + if version != 1: + raise IOError(f"Unsupported blob file version: {version}") + + # Read index data + f.seek(self._file_size - 5 - index_length) + index_bytes = f.read(index_length) + + if len(index_bytes) != index_length: + raise IOError("Invalid blob file: cannot read index") + + # Decompress blob lengths and compute offsets + blob_lengths = DeltaVarintCompressor.decompress(index_bytes) + blob_offsets = [] + offset = 0 + for length in blob_lengths: + blob_offsets.append(offset) + offset += length + self.blob_lengths = blob_lengths + self.blob_offsets = blob_offsets + + +class BlobRecordIterator: + MAGIC_NUMBER_SIZE = 4 + METADATA_OVERHEAD = 16 + + def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str): + self.file_path = file_path + self.field_name = field_name + self.blob_lengths = blob_lengths + self.blob_offsets = blob_offsets + self.current_position = 0 + + def __iter__(self) -> Iterator[GenericRow]: + return self + + def __next__(self) -> GenericRow: + if self.current_position >= len(self.blob_lengths): + raise StopIteration + + # Create blob reference for the current blob + # Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes + blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number + blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD + + # Create BlobDescriptor for this blob + descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length) + blob = BlobRef(descriptor) + + self.current_position += 1 + + # Return as GenericRow with single blob field + from pypaimon.schema.data_types import DataField, AtomicType + from pypaimon.table.row.row_kind import RowKind + + fields = [DataField(0, self.field_name, AtomicType("BLOB"))] + return GenericRow([blob], fields, RowKind.INSERT) + + def returned_position(self) -> int: + """Get current position in the iterator.""" + return self.current_position diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index d9e9939c11..7674db45f0 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -21,6 +21,7 @@ from abc import ABC, abstractmethod from functools import partial from typing import List, Optional, Tuple +from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.partition_info import PartitionInfo @@ -31,6 +32,7 @@ from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader from pypaimon.read.reader.filter_record_reader import FilterRecordReader from pypaimon.read.reader.format_avro_reader import FormatAvroReader +from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.reader.iface.record_reader import RecordReader @@ -73,10 +75,13 @@ class SplitRead(ABC): file_format = extension[1:] format_reader: RecordBatchReader - if file_format == "avro": + if file_format == CoreOptions.FILE_FORMAT_AVRO: format_reader = FormatAvroReader(self.table.file_io, file_path, self._get_final_read_data_fields(), self.read_fields, self.push_down_predicate) - elif file_format == "parquet" or file_format == "orc": + elif file_format == CoreOptions.FILE_FORMAT_BLOB: + format_reader = FormatBlobReader(self.table.file_io, file_path, self._get_final_read_data_fields(), + self.read_fields, self.push_down_predicate) + elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path, self._get_final_read_data_fields(), self.push_down_predicate) else: diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index 5255ac6b1e..d1ce2354a8 100644 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -247,6 +247,7 @@ class Keyword(Enum): BINARY = "BINARY" VARBINARY = "VARBINARY" BYTES = "BYTES" + BLOB = "BLOB" DECIMAL = "DECIMAL" NUMERIC = "NUMERIC" DEC = "DEC" @@ -407,6 +408,8 @@ class PyarrowFieldParser: return pyarrow.string() elif type_name == 'BYTES' or type_name.startswith('VARBINARY'): return pyarrow.binary() + elif type_name == 'BLOB': + return pyarrow.large_binary() elif type_name.startswith('BINARY'): if type_name == 'BINARY': return pyarrow.binary(1) @@ -506,6 +509,8 @@ class PyarrowFieldParser: type_name = f'BINARY({pa_type.byte_width})' elif types.is_binary(pa_type): type_name = 'BYTES' + elif types.is_large_binary(pa_type): + type_name = 'BLOB' elif types.is_decimal(pa_type): type_name = f'DECIMAL({pa_type.precision}, {pa_type.scale})' elif types.is_timestamp(pa_type) and pa_type.tz is None: diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py new file mode 100644 index 0000000000..b9d8cc0e38 --- /dev/null +++ b/paimon-python/pypaimon/table/row/blob.py @@ -0,0 +1,247 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import io +from abc import ABC, abstractmethod +from typing import Optional, Union + + +class BlobDescriptor: + CURRENT_VERSION = 1 + + def __init__(self, uri: str, offset: int, length: int, version: int = CURRENT_VERSION): + self._version = version + self._uri = uri + self._offset = offset + self._length = length + + @property + def uri(self) -> str: + return self._uri + + @property + def offset(self) -> int: + return self._offset + + @property + def length(self) -> int: + return self._length + + @property + def version(self) -> int: + return self._version + + def serialize(self) -> bytes: + import struct + + uri_bytes = self._uri.encode('utf-8') + uri_length = len(uri_bytes) + + # Pack using little endian format + data = struct.pack('<B', self._version) # version (1 byte) + data += struct.pack('<I', uri_length) # uri length (4 bytes) + data += uri_bytes # uri bytes + data += struct.pack('<q', self._offset) # offset (8 bytes, signed) + data += struct.pack('<q', self._length) # length (8 bytes, signed) + + return data + + @classmethod + def deserialize(cls, data: bytes) -> 'BlobDescriptor': + import struct + + if len(data) < 5: # minimum size: version(1) + uri_length(4) + raise ValueError("Invalid BlobDescriptor data: too short") + + offset = 0 + + # Read version + version = struct.unpack('<B', data[offset:offset + 1])[0] + offset += 1 + + # For now, we only support version 1, but allow flexibility for future versions + if version < 1: + raise ValueError(f"Unsupported BlobDescriptor version: {version}") + + # Read URI length + uri_length = struct.unpack('<I', data[offset:offset + 4])[0] + offset += 4 + + # Read URI bytes + if offset + uri_length > len(data): + raise ValueError("Invalid BlobDescriptor data: URI length exceeds data size") + + uri_bytes = data[offset:offset + uri_length] + uri = uri_bytes.decode('utf-8') + offset += uri_length + + # Read offset and length + if offset + 16 > len(data): + raise ValueError("Invalid BlobDescriptor data: missing offset/length") + + blob_offset = struct.unpack('<q', data[offset:offset + 8])[0] + offset += 8 + + blob_length = struct.unpack('<q', data[offset:offset + 8])[0] + + return cls(uri, blob_offset, blob_length, version) + + def __eq__(self, other) -> bool: + """Check equality with another BlobDescriptor.""" + if not isinstance(other, BlobDescriptor): + return False + return (self._version == other._version and + self._uri == other._uri and + self._offset == other._offset and + self._length == other._length) + + def __hash__(self) -> int: + """Calculate hash for the BlobDescriptor.""" + return hash((self._version, self._uri, self._offset, self._length)) + + def __str__(self) -> str: + """String representation of BlobDescriptor.""" + return (f"BlobDescriptor(version={self._version}, uri='{self._uri}', " + f"offset={self._offset}, length={self._length})") + + def __repr__(self) -> str: + """Detailed representation of BlobDescriptor.""" + return self.__str__() + + +class Blob(ABC): + + @abstractmethod + def to_data(self) -> bytes: + pass + + @abstractmethod + def to_descriptor(self) -> BlobDescriptor: + pass + + @abstractmethod + def new_input_stream(self) -> io.BytesIO: + pass + + @staticmethod + def from_data(data: bytes) -> 'Blob': + return BlobData(data) + + @staticmethod + def from_local(file: str) -> 'Blob': + return Blob.from_file(file) + + @staticmethod + def from_http(uri: str) -> 'Blob': + descriptor = BlobDescriptor(uri, 0, -1) + return BlobRef(descriptor) + + @staticmethod + def from_file(file: str, offset: int = 0, length: int = -1) -> 'Blob': + descriptor = BlobDescriptor(file, offset, length) + return BlobRef(descriptor) + + @staticmethod + def from_descriptor(descriptor: BlobDescriptor) -> 'Blob': + return BlobRef(descriptor) + + +class BlobData(Blob): + + def __init__(self, data: Optional[Union[bytes, bytearray]] = None): + if data is None: + self._data = b'' + elif isinstance(data, (bytes, bytearray)): + self._data = bytes(data) + else: + raise TypeError(f"BlobData expects bytes, bytearray, or None, got {type(data)}") + + @classmethod + def from_bytes(cls, data: bytes) -> 'BlobData': + return cls(data) + + @property + def data(self) -> bytes: + return self._data + + def to_data(self) -> bytes: + return self._data + + def to_descriptor(self) -> 'BlobDescriptor': + raise RuntimeError("Blob data can not convert to descriptor.") + + def new_input_stream(self) -> io.BytesIO: + return io.BytesIO(self._data) + + def __eq__(self, other) -> bool: + if other is None or not isinstance(other, BlobData): + return False + return self._data == other._data + + def __hash__(self) -> int: + return hash(self._data) + + +class BlobRef(Blob): + + def __init__(self, descriptor: BlobDescriptor): + self._descriptor = descriptor + + def to_data(self) -> bytes: + try: + with self.new_input_stream() as stream: + return stream.read() + except Exception as e: + raise IOError(f"Failed to read blob data: {e}") + + def to_descriptor(self) -> BlobDescriptor: + return self._descriptor + + def new_input_stream(self) -> io.BytesIO: + uri = self._descriptor.uri + offset = self._descriptor.offset + length = self._descriptor.length + + if uri.startswith('http://') or uri.startswith('https://'): + raise NotImplementedError("HTTP blob reading not implemented yet") + elif uri.startswith('file://') or '://' not in uri: + file_path = uri.replace('file://', '') if uri.startswith('file://') else uri + + try: + with open(file_path, 'rb') as f: + if offset > 0: + f.seek(offset) + + if length == -1: + data = f.read() + else: + data = f.read(length) + + return io.BytesIO(data) + except Exception as e: + raise IOError(f"Failed to read file {file_path}: {e}") + else: + raise ValueError(f"Unsupported URI scheme: {uri}") + + def __eq__(self, other) -> bool: + if not isinstance(other, BlobRef): + return False + return self._descriptor == other._descriptor + + def __hash__(self) -> int: + return hash(self._descriptor) diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index a7612168d9..5f8f9f6d9b 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -24,6 +24,7 @@ from typing import Any, List from pypaimon.schema.data_types import AtomicType, DataField, DataType from pypaimon.table.row.row_kind import RowKind +from pypaimon.table.row.blob import BlobData @dataclass @@ -111,6 +112,8 @@ class GenericRowDeserializer: return cls._parse_string(bytes_data, base_offset, field_offset) elif type_name.startswith('BINARY') or type_name.startswith('VARBINARY') or type_name == 'BYTES': return cls._parse_binary(bytes_data, base_offset, field_offset) + elif type_name == 'BLOB': + return cls._parse_blob(bytes_data, base_offset, field_offset) elif type_name.startswith('DECIMAL') or type_name.startswith('NUMERIC'): return cls._parse_decimal(bytes_data, base_offset, field_offset, data_type) elif type_name.startswith('TIMESTAMP'): @@ -193,6 +196,13 @@ class GenericRowDeserializer: length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56 return bytes_data[field_offset:field_offset + length] + @classmethod + def _parse_blob(cls, bytes_data: bytes, base_offset: int, field_offset: int) -> BlobData: + """Parse BLOB data from binary format and return a BlobData instance.""" + # BLOB uses the same binary format as regular binary data + binary_data = cls._parse_binary(bytes_data, base_offset, field_offset) + return BlobData.from_bytes(binary_data) + @classmethod def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: int, data_type: DataType) -> Decimal: unscaled_long = struct.unpack('<q', bytes_data[field_offset:field_offset + 8])[0] @@ -260,9 +270,12 @@ class GenericRowSerializer: raise ValueError(f"BinaryRow only support AtomicType yet, meet {field.type.__class__}") type_name = field.type.type.upper() - if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING', 'BINARY', 'VARBINARY', 'BYTES']): + if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING', + 'BINARY', 'VARBINARY', 'BYTES', 'BLOB']): if any(type_name.startswith(p) for p in ['CHAR', 'VARCHAR', 'STRING']): value_bytes = str(value).encode('utf-8') + elif type_name == 'BLOB': + value_bytes = value.to_data() else: value_bytes = bytes(value) diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py new file mode 100644 index 0000000000..b8ed65e5bb --- /dev/null +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -0,0 +1,983 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import shutil +import tempfile +import unittest +from pathlib import Path + +import pyarrow as pa + +from pypaimon import CatalogFactory +from pypaimon.common.file_io import FileIO +from pypaimon.read.reader.format_blob_reader import FormatBlobReader +from pypaimon.schema.data_types import AtomicType, DataField +from pypaimon.table.row.blob import Blob, BlobData, BlobRef, BlobDescriptor +from pypaimon.table.row.generic_row import GenericRowDeserializer, GenericRowSerializer, GenericRow +from pypaimon.table.row.row_kind import RowKind + + +class MockFileIO: + """Mock FileIO for testing.""" + + def __init__(self, file_io: FileIO): + self._file_io = file_io + + def get_file_size(self, path: str) -> int: + """Get file size.""" + return self._file_io.get_file_size(Path(path)) + + def new_input_stream(self, path: Path): + """Create new input stream for reading.""" + return self._file_io.new_input_stream(path) + + +class BlobTest(unittest.TestCase): + """Tests for Blob interface following org.apache.paimon.data.BlobTest.""" + + def setUp(self): + """Set up test environment with temporary file.""" + # Create a temporary directory and file + self.temp_dir = tempfile.mkdtemp() + self.file = os.path.join(self.temp_dir, "test.txt") + + # Write test data to the file + with open(self.file, 'wb') as f: + f.write(b"test data") + + def tearDown(self): + """Clean up temporary files.""" + try: + if os.path.exists(self.file): + os.remove(self.file) + os.rmdir(self.temp_dir) + except OSError: + pass # Ignore cleanup errors + + def test_from_data(self): + """Test Blob.from_data() method.""" + test_data = b"test data" + blob = Blob.from_data(test_data) + + # Verify it returns a BlobData instance + self.assertIsInstance(blob, BlobData) + + # Verify the data matches + self.assertEqual(blob.to_data(), test_data) + + def test_from_local(self): + """Test Blob.from_local() method.""" + blob = Blob.from_local(self.file) + + # Verify it returns a BlobRef instance + self.assertIsInstance(blob, BlobRef) + + # Verify the data matches + self.assertEqual(blob.to_data(), b"test data") + + def test_from_file_with_offset_and_length(self): + """Test Blob.from_file() method with offset and length.""" + blob = Blob.from_file(self.file, 0, 4) + + # Verify it returns a BlobRef instance + self.assertIsInstance(blob, BlobRef) + + # Verify the data matches (first 4 bytes: "test") + self.assertEqual(blob.to_data(), b"test") + + def test_from_file_full(self): + """Test Blob.from_file() method without offset and length.""" + blob = Blob.from_file(self.file) + + # Verify it returns a BlobRef instance + self.assertIsInstance(blob, BlobRef) + + # Verify the data matches + self.assertEqual(blob.to_data(), b"test data") + + def test_from_http(self): + """Test Blob.from_http() method.""" + uri = "http://example.com/file.txt" + blob = Blob.from_http(uri) + + # Verify it returns a BlobRef instance + self.assertIsInstance(blob, BlobRef) + + # Verify the descriptor has the correct URI + descriptor = blob.to_descriptor() + self.assertEqual(descriptor.uri, uri) + self.assertEqual(descriptor.offset, 0) + self.assertEqual(descriptor.length, -1) + + def test_blob_data_interface_compliance(self): + """Test that BlobData properly implements Blob interface.""" + test_data = b"interface test data" + blob_data = BlobData(test_data) + + # Test that it's a Blob + self.assertIsInstance(blob_data, Blob) + + # Test interface methods + self.assertEqual(blob_data.to_data(), test_data) + + # Test to_descriptor raises RuntimeError + with self.assertRaises(RuntimeError) as context: + blob_data.to_descriptor() + self.assertIn("Blob data can not convert to descriptor", str(context.exception)) + + # Test new_input_stream + stream = blob_data.new_input_stream() + self.assertEqual(stream.read(), test_data) + stream.close() + + def test_blob_ref_interface_compliance(self): + """Test that BlobRef properly implements Blob interface.""" + blob_ref = Blob.from_local(self.file) + + # Test that it's a Blob + self.assertIsInstance(blob_ref, Blob) + + # Test interface methods + self.assertEqual(blob_ref.to_data(), b"test data") + + # Test to_descriptor returns valid descriptor + descriptor = blob_ref.to_descriptor() + self.assertEqual(descriptor.uri, self.file) + self.assertEqual(descriptor.offset, 0) + self.assertEqual(descriptor.length, -1) + + # Test new_input_stream + stream = blob_ref.new_input_stream() + self.assertEqual(stream.read(), b"test data") + stream.close() + + def test_blob_equality_and_hashing(self): + """Test blob equality and hashing behavior.""" + # Test BlobData equality + data1 = BlobData(b"same data") + data2 = BlobData(b"same data") + data3 = BlobData(b"different data") + + self.assertEqual(data1, data2) + self.assertNotEqual(data1, data3) + self.assertEqual(hash(data1), hash(data2)) + + # Test BlobRef equality + ref1 = Blob.from_local(self.file) + ref2 = Blob.from_local(self.file) + + self.assertEqual(ref1, ref2) + self.assertEqual(hash(ref1), hash(ref2)) + + def test_blob_factory_methods_return_correct_types(self): + """Test that all factory methods return the expected types.""" + # from_data should return BlobData + blob_data = Blob.from_data(b"test") + self.assertIsInstance(blob_data, BlobData) + self.assertIsInstance(blob_data, Blob) + + # from_local should return BlobRef + blob_ref = Blob.from_local(self.file) + self.assertIsInstance(blob_ref, BlobRef) + self.assertIsInstance(blob_ref, Blob) + + # from_file should return BlobRef + blob_file = Blob.from_file(self.file) + self.assertIsInstance(blob_file, BlobRef) + self.assertIsInstance(blob_file, Blob) + + # from_http should return BlobRef + blob_http = Blob.from_http("http://example.com/test.bin") + self.assertIsInstance(blob_http, BlobRef) + self.assertIsInstance(blob_http, Blob) + + def test_blob_data_convenience_methods(self): + # Test from_bytes class method + blob2 = BlobData.from_bytes(b"from bytes") + self.assertEqual(blob2.to_data(), b"from bytes") + + def test_generic_row_deserializer_parse_blob(self): + """Test GenericRowDeserializer._parse_blob method.""" + # Create test data with BLOB field + test_blob_data = b"Test BLOB data for parsing" + blob_data = BlobData(test_blob_data) + + # Create fields with BLOB type + fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "blob_field", AtomicType("BLOB")), + ] + + # Create and serialize a row with blob data + original_row = GenericRow([42, blob_data], fields, RowKind.INSERT) + serialized_bytes = GenericRowSerializer.to_bytes(original_row) + + # Test the full deserialization process (which uses _parse_blob internally) + deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, fields) + + # Verify the deserialized blob + deserialized_blob = deserialized_row.values[1] + self.assertIsInstance(deserialized_blob, BlobData) + self.assertEqual(deserialized_blob.to_data(), test_blob_data) + + # Test with empty blob data + empty_blob = BlobData(b"") + empty_row = GenericRow([1, empty_blob], fields, RowKind.INSERT) + empty_serialized = GenericRowSerializer.to_bytes(empty_row) + empty_deserialized = GenericRowDeserializer.from_bytes(empty_serialized, fields) + + empty_deserialized_blob = empty_deserialized.values[1] + self.assertIsInstance(empty_deserialized_blob, BlobData) + self.assertEqual(empty_deserialized_blob.to_data(), b"") + + # Test with binary data containing null bytes + binary_blob_data = b"\x00\x01\x02\x03\xff\xfe\xfd" + binary_blob = BlobData(binary_blob_data) + binary_row = GenericRow([99, binary_blob], fields, RowKind.INSERT) + binary_serialized = GenericRowSerializer.to_bytes(binary_row) + binary_deserialized = GenericRowDeserializer.from_bytes(binary_serialized, fields) + + binary_deserialized_blob = binary_deserialized.values[1] + self.assertIsInstance(binary_deserialized_blob, BlobData) + self.assertEqual(binary_deserialized_blob.to_data(), binary_blob_data) + + def test_generic_row_deserializer_parse_blob_with_multiple_fields(self): + """Test _parse_blob with multiple BLOB fields in a row.""" + # Create test data with multiple BLOB fields + blob1_data = b"First BLOB data" + blob2_data = b"Second BLOB with different content" + blob3_data = b"" # Empty blob + + blob1 = BlobData(blob1_data) + blob2 = BlobData(blob2_data) + blob3 = BlobData(blob3_data) + + # Create fields with multiple BLOB types + fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "name", AtomicType("STRING")), + DataField(2, "blob1", AtomicType("BLOB")), + DataField(3, "blob2", AtomicType("BLOB")), + DataField(4, "blob3", AtomicType("BLOB")), + ] + + # Create and serialize a row with multiple blobs + original_row = GenericRow([123, "test_row", blob1, blob2, blob3], fields, RowKind.INSERT) + serialized_bytes = GenericRowSerializer.to_bytes(original_row) + + # Deserialize and verify all blobs + deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, fields) + + # Verify each blob field + self.assertEqual(deserialized_row.values[0], 123) + self.assertEqual(deserialized_row.values[1], "test_row") + + deserialized_blob1 = deserialized_row.values[2] + self.assertIsInstance(deserialized_blob1, BlobData) + self.assertEqual(deserialized_blob1.to_data(), blob1_data) + + deserialized_blob2 = deserialized_row.values[3] + self.assertIsInstance(deserialized_blob2, BlobData) + self.assertEqual(deserialized_blob2.to_data(), blob2_data) + + deserialized_blob3 = deserialized_row.values[4] + self.assertIsInstance(deserialized_blob3, BlobData) + self.assertEqual(deserialized_blob3.to_data(), blob3_data) + + def test_generic_row_deserializer_parse_blob_with_null_values(self): + """Test _parse_blob with null BLOB values.""" + # Create fields with BLOB type + fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "blob_field", AtomicType("BLOB")), + DataField(2, "name", AtomicType("STRING")), + ] + + # Create row with null blob (None value) + original_row = GenericRow([456, None, "test_with_null"], fields, RowKind.INSERT) + serialized_bytes = GenericRowSerializer.to_bytes(original_row) + + # Deserialize and verify null blob is handled correctly + deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, fields) + + self.assertEqual(deserialized_row.values[0], 456) + self.assertIsNone(deserialized_row.values[1]) # Null blob should remain None + self.assertEqual(deserialized_row.values[2], "test_with_null") + + def test_generic_row_deserializer_parse_blob_large_data(self): + """Test _parse_blob with large BLOB data.""" + # Create large blob data (1MB) + large_blob_data = b"X" * (1024 * 1024) # 1MB of 'X' characters + large_blob = BlobData(large_blob_data) + + fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "large_blob", AtomicType("BLOB")), + ] + + # Create and serialize row with large blob + original_row = GenericRow([789, large_blob], fields, RowKind.INSERT) + serialized_bytes = GenericRowSerializer.to_bytes(original_row) + + # Deserialize and verify large blob + deserialized_row = GenericRowDeserializer.from_bytes(serialized_bytes, fields) + + deserialized_large_blob = deserialized_row.values[1] + self.assertIsInstance(deserialized_large_blob, BlobData) + self.assertEqual(len(deserialized_large_blob.to_data()), 1024 * 1024) + self.assertEqual(deserialized_large_blob.to_data(), large_blob_data) + + def test_blob_descriptor_creation(self): + """Test BlobDescriptor creation and properties.""" + # Test basic creation + descriptor = BlobDescriptor("test://example.uri", 100, 200) + + self.assertEqual(descriptor.uri, "test://example.uri") + self.assertEqual(descriptor.offset, 100) + self.assertEqual(descriptor.length, 200) + self.assertEqual(descriptor.version, BlobDescriptor.CURRENT_VERSION) + + def test_blob_descriptor_creation_with_version(self): + """Test BlobDescriptor creation with explicit version.""" + descriptor = BlobDescriptor("test://example.uri", 50, 150, version=2) + + self.assertEqual(descriptor.uri, "test://example.uri") + self.assertEqual(descriptor.offset, 50) + self.assertEqual(descriptor.length, 150) + self.assertEqual(descriptor.version, 2) + + def test_blob_descriptor_serialization_deserialization(self): + """Test BlobDescriptor serialization and deserialization.""" + # Test with various URIs and parameters + test_cases = [ + ("file:///path/to/file.bin", 0, -1), + ("https://example.com/data.blob", 1024, 2048), + ("s3://bucket/key", 0, 1000000), + ("test://simple", 42, 84), + ] + + for uri, offset, length in test_cases: + with self.subTest(uri=uri, offset=offset, length=length): + # Create original descriptor + original = BlobDescriptor(uri, offset, length) + + # Serialize + serialized = original.serialize() + self.assertIsInstance(serialized, bytes) + self.assertGreater(len(serialized), 0) + + # Deserialize + deserialized = BlobDescriptor.deserialize(serialized) + + # Verify equality + self.assertEqual(deserialized, original) + self.assertEqual(deserialized.uri, uri) + self.assertEqual(deserialized.offset, offset) + self.assertEqual(deserialized.length, length) + self.assertEqual(deserialized.version, BlobDescriptor.CURRENT_VERSION) + + def test_blob_descriptor_serialization_with_unicode(self): + """Test BlobDescriptor serialization with Unicode characters.""" + # Test with Unicode characters in URI + unicode_uri = "file:///测试/文件.bin" + descriptor = BlobDescriptor(unicode_uri, 0, 100) + + # Serialize and deserialize + serialized = descriptor.serialize() + deserialized = BlobDescriptor.deserialize(serialized) + + # Verify Unicode is preserved + self.assertEqual(deserialized.uri, unicode_uri) + self.assertEqual(deserialized, descriptor) + + def test_blob_descriptor_deserialization_invalid_data(self): + """Test BlobDescriptor deserialization with invalid data.""" + # Test with too short data + with self.assertRaises(ValueError) as context: + BlobDescriptor.deserialize(b"sho") # Only 3 bytes, need at least 5 + self.assertIn("too short", str(context.exception)) + + # Test with invalid version (version 0) + # Create valid data but with wrong version + valid_descriptor = BlobDescriptor("test://uri", 0, 100) + valid_data = bytearray(valid_descriptor.serialize()) + valid_data[0] = 0 # Set invalid version (0) + + with self.assertRaises(ValueError) as context: + BlobDescriptor.deserialize(bytes(valid_data)) + self.assertIn("Unsupported BlobDescriptor version", str(context.exception)) + + # Test with incomplete data (missing URI bytes) + incomplete_data = b'\x01\x00\x00\x00\x10' # Version 1, URI length 16, but no URI bytes + with self.assertRaises(ValueError) as context: + BlobDescriptor.deserialize(incomplete_data) + self.assertIn("URI length exceeds data size", str(context.exception)) + + def test_blob_descriptor_equality_and_hashing(self): + """Test BlobDescriptor equality and hashing.""" + # Create identical descriptors + desc1 = BlobDescriptor("test://uri", 100, 200) + desc2 = BlobDescriptor("test://uri", 100, 200) + desc3 = BlobDescriptor("test://uri", 100, 201) # Different length + desc4 = BlobDescriptor("test://other", 100, 200) # Different URI + + # Test equality + self.assertEqual(desc1, desc2) + self.assertNotEqual(desc1, desc3) + self.assertNotEqual(desc1, desc4) + self.assertNotEqual(desc1, None) + self.assertNotEqual(desc1, "not a descriptor") + + # Test hashing + self.assertEqual(hash(desc1), hash(desc2)) + # Hash should be different for different descriptors (though not guaranteed) + self.assertNotEqual(hash(desc1), hash(desc3)) + self.assertNotEqual(hash(desc1), hash(desc4)) + + def test_blob_descriptor_string_representation(self): + """Test BlobDescriptor string representation.""" + descriptor = BlobDescriptor("test://example.uri", 42, 84) + + str_repr = str(descriptor) + self.assertIn("test://example.uri", str_repr) + self.assertIn("42", str_repr) + self.assertIn("84", str_repr) + self.assertIn("BlobDescriptor", str_repr) + + # __repr__ should be the same as __str__ + self.assertEqual(str_repr, repr(descriptor)) + + def test_blob_descriptor_version_handling(self): + """Test BlobDescriptor version handling.""" + # Test current version + descriptor = BlobDescriptor("test://uri", 0, 100) + self.assertEqual(descriptor.version, BlobDescriptor.CURRENT_VERSION) + + # Test explicit version + descriptor_v2 = BlobDescriptor("test://uri", 0, 100, version=2) + self.assertEqual(descriptor_v2.version, 2) + + # Serialize and deserialize should preserve version + serialized = descriptor_v2.serialize() + deserialized = BlobDescriptor.deserialize(serialized) + self.assertEqual(deserialized.version, 2) + + def test_blob_descriptor_edge_cases(self): + """Test BlobDescriptor with edge cases.""" + # Test with empty URI + empty_uri_desc = BlobDescriptor("", 0, 0) + serialized = empty_uri_desc.serialize() + deserialized = BlobDescriptor.deserialize(serialized) + self.assertEqual(deserialized.uri, "") + + # Test with very long URI + long_uri = "file://" + "a" * 1000 + "/file.bin" + long_uri_desc = BlobDescriptor(long_uri, 0, 1000000) + serialized = long_uri_desc.serialize() + deserialized = BlobDescriptor.deserialize(serialized) + self.assertEqual(deserialized.uri, long_uri) + + # Test with negative values + negative_desc = BlobDescriptor("test://uri", -1, -1) + serialized = negative_desc.serialize() + deserialized = BlobDescriptor.deserialize(serialized) + self.assertEqual(deserialized.offset, -1) + self.assertEqual(deserialized.length, -1) + + def test_blob_descriptor_with_blob_ref(self): + """Test BlobDescriptor integration with BlobRef.""" + # Create a descriptor + descriptor = BlobDescriptor(self.file, 0, -1) + + # Create BlobRef from descriptor + blob_ref = BlobRef(descriptor) + + # Verify descriptor is preserved + returned_descriptor = blob_ref.to_descriptor() + self.assertEqual(returned_descriptor, descriptor) + + # Verify data can be read through BlobRef + data = blob_ref.to_data() + self.assertEqual(data, b"test data") + + def test_blob_descriptor_serialization_format(self): + """Test BlobDescriptor serialization format details.""" + descriptor = BlobDescriptor("test", 12345, 67890) + serialized = descriptor.serialize() + + # Check that serialized data starts with version byte + self.assertEqual(serialized[0], BlobDescriptor.CURRENT_VERSION) + + # Check minimum length (version + uri_length + uri + offset + length) + # 1 + 4 + len("test") + 8 + 8 = 25 bytes + self.assertEqual(len(serialized), 25) + + # Verify round-trip consistency + deserialized = BlobDescriptor.deserialize(serialized) + re_serialized = deserialized.serialize() + self.assertEqual(serialized, re_serialized) + + +class BlobEndToEndTest(unittest.TestCase): + """End-to-end tests for blob functionality with schema definition, file writing, and reading.""" + + def setUp(self): + """Set up test environment.""" + self.temp_dir = tempfile.mkdtemp() + self.warehouse = os.path.join(self.temp_dir, 'warehouse') + # Create catalog for table operations + self.catalog = CatalogFactory.create({ + 'warehouse': self.warehouse + }) + self.catalog.create_database('test_db', False) + + def tearDown(self): + """Clean up test environment.""" + try: + shutil.rmtree(self.temp_dir) + except OSError: + pass + + def test_blob_end_to_end(self): + from pypaimon.schema.data_types import DataField, AtomicType + from pypaimon.table.row.blob import BlobData + from pypaimon.common.file_io import FileIO + from pathlib import Path + + # Set up file I/O + file_io = FileIO(self.temp_dir, {}) + + blob_field_name = "blob_field" + # ========== Step 1: Check Type Validation ========== + blob_fields = [DataField(0, blob_field_name, AtomicType("BLOB"))] + for blob_field in blob_fields: + self.assertIsInstance(blob_field.type, AtomicType) + self.assertEqual(blob_field.type.type, "BLOB") + + # ========== Step 2: Write Data ========== + test_data = {blob_field_name: BlobData(b'End-to-end test: PDF header %PDF-1.4\n...')} + blob_files = {} + blob_data = [test_data[blob_field_name].to_data()] + schema = pa.schema([pa.field(blob_field_name, pa.large_binary())]) + table = pa.table([blob_data], schema=schema) + blob_files[blob_field_name] = Path(self.temp_dir) / (blob_field_name + ".blob") + file_io.write_blob(blob_files[blob_field_name], table) + self.assertTrue(file_io.exists(blob_files[blob_field_name])) + + # ========== Step 3: Read Data and Check Data ========== + for field_name, file_path in blob_files.items(): + read_fields = blob_fields + reader = FormatBlobReader( + file_io=file_io, + file_path=str(file_path), + read_fields=[field_name], + full_fields=read_fields, + push_down_predicate=None + ) + + # Read data + batch = reader.read_arrow_batch() + self.assertIsNotNone(batch, f"{field_name} batch should not be None") + self.assertEqual(batch.num_rows, 1, f"{field_name} should have 1 row") + + # Verify data integrity + read_blob_data = batch.column(0)[0].as_py() + expected_blob_data = test_data[field_name].to_data() + self.assertEqual(read_blob_data, expected_blob_data, f"{field_name} data should match") + + reader.close() + + def test_blob_complex_types_throw_exception(self): + """Test that complex types containing BLOB elements throw exceptions during read/write operations.""" + from pypaimon.schema.data_types import DataField, AtomicType, ArrayType, MultisetType, MapType + from pypaimon.table.row.blob import BlobData + from pypaimon.common.file_io import FileIO + from pypaimon.table.row.generic_row import GenericRow, GenericRowSerializer + from pypaimon.table.row.row_kind import RowKind + from pathlib import Path + + # Set up file I/O + file_io = FileIO(self.temp_dir, {}) + + # ========== Test ArrayType(nullable=True, element_type=AtomicType("BLOB")) ========== + array_fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "blob_array", ArrayType(nullable=True, element_type=AtomicType("BLOB"))), + ] + + # Test serialization throws exception for ArrayType<BLOB> + array_blob_data = [ + BlobData(b"Array blob 1"), + BlobData(b"Array blob 2"), + BlobData(b"Array blob 3") + ] + + array_row = GenericRow([1, array_blob_data], array_fields, RowKind.INSERT) + + # GenericRowSerializer should throw exception for complex types + with self.assertRaises(ValueError) as context: + GenericRowSerializer.to_bytes(array_row) + self.assertIn("AtomicType", str(context.exception)) + + # Note: FileIO.write_blob validation for complex types is tested separately below + + # ========== Test MultisetType(nullable=True, element_type=AtomicType("BLOB")) ========== + multiset_fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "blob_multiset", MultisetType(nullable=True, element_type=AtomicType("BLOB"))), + ] + + # Test serialization throws exception for MultisetType<BLOB> + multiset_blob_data = [ + BlobData(b"Multiset blob 1"), + BlobData(b"Multiset blob 2"), + BlobData(b"Multiset blob 1"), # Duplicate allowed in multiset + ] + + multiset_row = GenericRow([2, multiset_blob_data], multiset_fields, RowKind.INSERT) + + # GenericRowSerializer should throw exception for complex types + with self.assertRaises(ValueError) as context: + GenericRowSerializer.to_bytes(multiset_row) + self.assertIn("AtomicType", str(context.exception)) + map_fields = [ + DataField(0, "id", AtomicType("INT")), + DataField(1, "blob_map", MapType( + nullable=True, key_type=AtomicType("STRING"), value_type=AtomicType("BLOB") + )), + ] + + # Test serialization throws exception for MapType<STRING, BLOB> + map_blob_data = { + "document": BlobData(b"Document content"), + "image": BlobData(b"Image data"), + "metadata": BlobData(b"Metadata content") + } + + map_row = GenericRow([3, map_blob_data], map_fields, RowKind.INSERT) + + # GenericRowSerializer should throw exception for complex types + with self.assertRaises(ValueError) as context: + GenericRowSerializer.to_bytes(map_row) + self.assertIn("AtomicType", str(context.exception)) + + # ========== Test FileIO.write_blob validation for complex types ========== + # Test that FileIO.write_blob properly validates and rejects complex types + + # Create a table with multiple columns (should fail - blob format requires single column) + multi_column_schema = pa.schema([ + pa.field("blob1", pa.large_binary()), + pa.field("blob2", pa.large_binary()) + ]) + multi_column_table = pa.table([ + [b"blob1_data"], + [b"blob2_data"] + ], schema=multi_column_schema) + + multi_column_file = Path(self.temp_dir) / "multi_column.blob" + + # Should throw RuntimeError for multiple columns + with self.assertRaises(RuntimeError) as context: + file_io.write_blob(multi_column_file, multi_column_table) + self.assertIn("single column", str(context.exception)) + + # Test that FileIO.write_blob rejects null values + null_schema = pa.schema([pa.field("blob_with_nulls", pa.large_binary())]) + null_table = pa.table([[b"data", None]], schema=null_schema) + + null_file = Path(self.temp_dir) / "null_data.blob" + + # Should throw RuntimeError for null values + with self.assertRaises(RuntimeError) as context: + file_io.write_blob(null_file, null_table) + self.assertIn("null values", str(context.exception)) + + # ========== Test FormatBlobReader with complex type schema ========== + # Create a valid blob file first + valid_blob_data = [b"Valid blob content"] + valid_schema = pa.schema([pa.field("valid_blob", pa.large_binary())]) + valid_table = pa.table([valid_blob_data], schema=valid_schema) + + valid_blob_file = Path(self.temp_dir) / "valid_blob.blob" + file_io.write_blob(valid_blob_file, valid_table) + + # Try to read with complex type field definition - this should fail + # because FormatBlobReader tries to create PyArrow schema with complex types + complex_read_fields = [ + DataField(0, "valid_blob", ArrayType(nullable=True, element_type=AtomicType("BLOB"))) + ] + + # FormatBlobReader creation should work, but reading should fail due to schema mismatch + reader = FormatBlobReader( + file_io=file_io, + file_path=str(valid_blob_file), + read_fields=["valid_blob"], + full_fields=complex_read_fields, + push_down_predicate=None + ) + + # Reading should fail because the schema expects complex type but data is atomic + with self.assertRaises(Exception) as context: + reader.read_arrow_batch() + # The error could be ArrowTypeError or other PyArrow-related errors + self.assertTrue( + "ArrowTypeError" in str(type(context.exception)) or + "TypeError" in str(type(context.exception)) or + "ValueError" in str(type(context.exception)) + ) + + reader.close() + + def test_blob_advanced_scenarios(self): + """Test advanced blob scenarios: corruption, truncation, zero-length, large blobs, compression, cross-format.""" + from pypaimon.schema.data_types import DataField, AtomicType + from pypaimon.common.file_io import FileIO + from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor + from pathlib import Path + + # Set up file I/O + file_io = FileIO(self.temp_dir, {}) + + # ========== Test 1: Corrupted file header test ========== + + # Create a valid blob file first + valid_blob_data = [b"Test blob content for corruption test"] + valid_schema = pa.schema([pa.field("test_blob", pa.large_binary())]) + valid_table = pa.table([valid_blob_data], schema=valid_schema) + + header_test_file = Path(self.temp_dir) / "header_test.blob" + file_io.write_blob(header_test_file, valid_table) + + # Read the file and corrupt the header (last 5 bytes: index_length + version) + with open(header_test_file, 'rb') as f: + original_data = f.read() + + # Corrupt the version byte (last byte) + corrupted_data = bytearray(original_data) + corrupted_data[-1] = 99 # Invalid version (should be 1) + + corrupted_header_file = Path(self.temp_dir) / "corrupted_header.blob" + with open(corrupted_header_file, 'wb') as f: + f.write(corrupted_data) + + # Try to read corrupted file - should detect invalid version + fields = [DataField(0, "test_blob", AtomicType("BLOB"))] + + # Reading should fail due to invalid version + with self.assertRaises(IOError) as context: + FormatBlobReader( + file_io=file_io, + file_path=str(corrupted_header_file), + read_fields=["test_blob"], + full_fields=fields, + push_down_predicate=None + ) + self.assertIn("Unsupported blob file version", str(context.exception)) + + # ========== Test 2: Truncated blob file (mid-blob) read ========== + + # Create a blob file with substantial content + large_content = b"Large blob content: " + b"X" * 1000 + b" End of content" + large_blob_data = [large_content] + large_schema = pa.schema([pa.field("large_blob", pa.large_binary())]) + large_table = pa.table([large_blob_data], schema=large_schema) + + full_blob_file = Path(self.temp_dir) / "full_blob.blob" + file_io.write_blob(full_blob_file, large_table) + + # Read the full file and truncate it in the middle + with open(full_blob_file, 'rb') as f: + full_data = f.read() + + # Truncate to about 50% of original size (mid-blob) + truncated_size = len(full_data) // 2 + truncated_data = full_data[:truncated_size] + + truncated_file = Path(self.temp_dir) / "truncated.blob" + with open(truncated_file, 'wb') as f: + f.write(truncated_data) + + # Try to read truncated file - should fail gracefully + with self.assertRaises((IOError, OSError)) as context: + FormatBlobReader( + file_io=file_io, + file_path=str(truncated_file), + read_fields=["large_blob"], + full_fields=fields, + push_down_predicate=None + ) + # Should detect truncation/incomplete data (either invalid header or invalid version) + self.assertTrue( + "cannot read header" in str(context.exception) or + "Unsupported blob file version" in str(context.exception) + ) + + # ========== Test 3: Zero-length blob handling ========== + + # Create blob with zero-length content + zero_blob_data = [b""] # Empty blob + zero_schema = pa.schema([pa.field("zero_blob", pa.large_binary())]) + zero_table = pa.table([zero_blob_data], schema=zero_schema) + + zero_blob_file = Path(self.temp_dir) / "zero_length.blob" + file_io.write_blob(zero_blob_file, zero_table) + + # Verify file was created + self.assertTrue(file_io.exists(zero_blob_file)) + file_size = file_io.get_file_size(zero_blob_file) + self.assertGreater(file_size, 0) # File should have headers even with empty blob + + # Read zero-length blob + zero_fields = [DataField(0, "zero_blob", AtomicType("BLOB"))] + zero_reader = FormatBlobReader( + file_io=file_io, + file_path=str(zero_blob_file), + read_fields=["zero_blob"], + full_fields=zero_fields, + push_down_predicate=None + ) + + zero_batch = zero_reader.read_arrow_batch() + self.assertIsNotNone(zero_batch) + self.assertEqual(zero_batch.num_rows, 1) + + # Verify empty blob content + read_zero_blob = zero_batch.column(0)[0].as_py() + self.assertEqual(read_zero_blob, b"") + self.assertEqual(len(read_zero_blob), 0) + zero_reader.close() + + # ========== Test 4: Large blob (multi-GB range) simulation ========== + # Simulate large blob without actually creating multi-GB data + # Test chunked writing and memory-safe reading patterns + + # Create moderately large blob (10MB) to test chunking behavior + chunk_size = 1024 * 1024 # 1MB chunks + large_blob_content = b"LARGE_BLOB_CHUNK:" + b"L" * (chunk_size - 17) # Fill to 1MB + + # Simulate multiple chunks + simulated_large_data = [large_blob_content * 10] # 10MB total + large_sim_schema = pa.schema([pa.field("large_sim_blob", pa.large_binary())]) + large_sim_table = pa.table([simulated_large_data], schema=large_sim_schema) + + large_sim_file = Path(self.temp_dir) / "large_simulation.blob" + file_io.write_blob(large_sim_file, large_sim_table) + + # Verify large file was written + large_sim_size = file_io.get_file_size(large_sim_file) + self.assertGreater(large_sim_size, 10 * 1024 * 1024) # Should be > 10MB + + # Read large blob in memory-safe manner + large_sim_fields = [DataField(0, "large_sim_blob", AtomicType("BLOB"))] + large_sim_reader = FormatBlobReader( + file_io=file_io, + file_path=str(large_sim_file), + read_fields=["large_sim_blob"], + full_fields=large_sim_fields, + push_down_predicate=None + ) + + large_sim_batch = large_sim_reader.read_arrow_batch() + self.assertIsNotNone(large_sim_batch) + self.assertEqual(large_sim_batch.num_rows, 1) + + # Verify large blob content (check prefix to avoid loading all into memory for comparison) + read_large_blob = large_sim_batch.column(0)[0].as_py() + self.assertTrue(read_large_blob.startswith(b"LARGE_BLOB_CHUNK:")) + self.assertEqual(len(read_large_blob), len(large_blob_content) * 10) + large_sim_reader.close() + + # ========== Test 5: Index compression/decompression validation ========== + # Test DeltaVarintCompressor roundtrip + test_indices = [0, 100, 250, 1000, 5000, 10000, 50000] + + # Compress indices + compressed_indices = DeltaVarintCompressor.compress(test_indices) + self.assertIsInstance(compressed_indices, bytes) + self.assertGreater(len(compressed_indices), 0) + + # Decompress indices + decompressed_indices = DeltaVarintCompressor.decompress(compressed_indices) + self.assertEqual(decompressed_indices, test_indices) + + # Test corruption detection in compressed indices + if len(compressed_indices) > 1: + # Corrupt the compressed data + corrupted_indices = bytearray(compressed_indices) + corrupted_indices[-1] = (corrupted_indices[-1] + 1) % 256 # Flip last byte + + # Decompression should fail or produce different results + try: + corrupted_result = DeltaVarintCompressor.decompress(bytes(corrupted_indices)) + # If decompression succeeds, result should be different + self.assertNotEqual(corrupted_result, test_indices) + except Exception: + pass + + # ========== Test 6: Cross-format guard (multi-field tables) ========== + # Test that blob format rejects multi-field tables + multi_field_schema = pa.schema([ + pa.field("blob_field", pa.large_binary()), + pa.field("string_field", pa.string()), + pa.field("int_field", pa.int64()) + ]) + + multi_field_table = pa.table([ + [b"blob_data_1", b"blob_data_2"], + ["string_1", "string_2"], + [100, 200] + ], schema=multi_field_schema) + + multi_field_file = Path(self.temp_dir) / "multi_field.blob" + + # Should reject multi-field table + with self.assertRaises(RuntimeError) as context: + file_io.write_blob(multi_field_file, multi_field_table) + self.assertIn("single column", str(context.exception)) + + # Test that blob format rejects non-binary field types + non_binary_schema = pa.schema([pa.field("string_field", pa.string())]) + non_binary_table = pa.table([["not_binary_data"]], schema=non_binary_schema) + + non_binary_file = Path(self.temp_dir) / "non_binary.blob" + + # Should reject non-binary field + with self.assertRaises(RuntimeError) as context: + file_io.write_blob(non_binary_file, non_binary_table) + # Should fail due to type conversion issues (non-binary field can't be converted to BLOB) + self.assertTrue( + "large_binary" in str(context.exception) or + "to_paimon_type" in str(context.exception) or + "missing" in str(context.exception) or + "Field must be Blob/BlobData instance" in str(context.exception) + ) + + # Test that blob format rejects tables with null values + null_schema = pa.schema([pa.field("blob_with_null", pa.large_binary())]) + null_table = pa.table([[b"data", None, b"more_data"]], schema=null_schema) + + null_file = Path(self.temp_dir) / "with_nulls.blob" + + # Should reject null values + with self.assertRaises(RuntimeError) as context: + file_io.write_blob(null_file, null_table) + self.assertIn("null values", str(context.exception)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/delta_varint_compressor_test.py b/paimon-python/pypaimon/tests/delta_varint_compressor_test.py new file mode 100644 index 0000000000..5a04becde8 --- /dev/null +++ b/paimon-python/pypaimon/tests/delta_varint_compressor_test.py @@ -0,0 +1,379 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import random +import sys +import unittest + +from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor + + +class DeltaVarintCompressorTest(unittest.TestCase): + """Tests for DeltaVarintCompressor following org.apache.paimon.utils.DeltaVarintCompressorTest.""" + + def test_normal_case_1(self): + """Test case for normal compression and decompression.""" + original = [80, 50, 90, 80, 70] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) # Verify data integrity + self.assertEqual(6, len(compressed)) # Optimized size for small deltas + + def test_normal_case_2(self): + """Test case for normal compression and decompression.""" + original = [100, 50, 150, 100, 200] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) # Verify data integrity + self.assertEqual(8, len(compressed)) # Optimized size for small deltas + + def test_random(self): + """Test with random data to ensure robustness.""" + # Run multiple iterations to test various random cases + for _ in range(100): # Reduced from 10000 for reasonable test time + original = [] + for i in range(100): + # Use a smaller range than Java's Long.MAX_VALUE for Python compatibility + original.append(random.randint(-sys.maxsize, sys.maxsize)) + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) # Verify data integrity + + def test_empty_array(self): + """Test case for empty array.""" + original = [] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + self.assertEqual(0, len(compressed)) + + def test_single_element(self): + """Test case for single-element array.""" + original = [42] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # Calculate expected size: Varint encoding for 42 (0x2A -> 1 byte) + self.assertEqual(1, len(compressed)) + + def test_extreme_values(self): + """Test case for extreme values (sys.maxsize and -sys.maxsize).""" + original = [-sys.maxsize, sys.maxsize] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # The compressed size will depend on the platform's sys.maxsize + # but should be reasonable for the delta encoding + self.assertGreater(len(compressed), 0) + + def test_negative_deltas(self): + """Test case for negative deltas with ZigZag optimization.""" + original = [100, -50, -150, -100] # Negative values + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # Verify ZigZag optimization: -1 → 1 (1 byte) + # Delta sequence: [100, -150, -100, 50] → ZigZag → + # Each encoded in 1-2 bytes + self.assertLessEqual(len(compressed), 8) # Optimized size + + def test_unsorted_data(self): + """Test case for unsorted data (worse compression ratio).""" + original = [1000, 5, 9999, 12345, 6789] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # Larger deltas → more bytes (e.g., 9994 → 3 bytes) + self.assertGreater(len(compressed), 5) # Worse than sorted case + + def test_corrupted_input(self): + """Test case for corrupted input (invalid Varint).""" + # Invalid Varint (all continuation flags) + corrupted = bytes([0x80, 0x80, 0x80]) + try: + result = DeltaVarintCompressor.decompress(corrupted) + # If it doesn't raise an exception, the result should be reasonable + self.assertIsInstance(result, list) + except (IndexError, ValueError, RuntimeError): + # It's acceptable to raise an exception for corrupted data + pass + + def test_zero_values(self): + """Test case for arrays with zero values.""" + original = [0, 0, 0, 0, 0] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # All deltas are 0, so should compress very well + self.assertLessEqual(len(compressed), 5) + + def test_ascending_sequence(self): + """Test case for ascending sequence (optimal for delta compression).""" + original = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # All deltas are 1, so should compress very well + self.assertLessEqual(len(compressed), 15) # Much smaller than original + + def test_descending_sequence(self): + """Test case for descending sequence.""" + original = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # All deltas are -1, should still compress well with ZigZag + self.assertLessEqual(len(compressed), 15) + + def test_large_positive_values(self): + """Test case for large positive values.""" + original = [1000000, 2000000, 3000000, 4000000] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # Large values but consistent deltas should still compress reasonably + self.assertGreater(len(compressed), 4) # Will be larger due to big numbers + + def test_mixed_positive_negative(self): + """Test case for mixed positive and negative values.""" + original = [100, -200, 300, -400, 500] + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # Mixed signs create larger deltas + self.assertGreater(len(compressed), 5) + + def test_compression_efficiency(self): + """Test that compression actually reduces size for suitable data.""" + # Create a sequence with small deltas + original = [] + base = 1000 + for i in range(100): + base += random.randint(-10, 10) # Small deltas + original.append(base) + + compressed = DeltaVarintCompressor.compress(original) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual(original, decompressed) + # For small deltas, compression should be effective + # Original would need 8 bytes per int (800 bytes), compressed should be much smaller + self.assertLess(len(compressed), len(original) * 4) # At least 50% compression + + def test_round_trip_consistency(self): + """Test that multiple compress/decompress cycles are consistent.""" + original = [1, 10, 100, 1000, 10000] + # First round trip + compressed1 = DeltaVarintCompressor.compress(original) + decompressed1 = DeltaVarintCompressor.decompress(compressed1) + # Second round trip + compressed2 = DeltaVarintCompressor.compress(decompressed1) + decompressed2 = DeltaVarintCompressor.decompress(compressed2) + # All should be identical + self.assertEqual(original, decompressed1) + self.assertEqual(original, decompressed2) + self.assertEqual(compressed1, compressed2) + + def test_boundary_values(self): + """Test boundary values for varint encoding.""" + # Test values around varint boundaries (127, 16383, etc.) + boundary_values = [ + 0, 1, 127, 128, 255, 256, + 16383, 16384, 32767, 32768, + -1, -127, -128, -255, -256, + -16383, -16384, -32767, -32768 + ] + compressed = DeltaVarintCompressor.compress(boundary_values) + decompressed = DeltaVarintCompressor.decompress(compressed) + self.assertEqual(boundary_values, decompressed) + + def test_java_compatibility_zigzag_encoding(self): + """Test ZigZag encoding compatibility with Java implementation.""" + # Test cases that verify ZigZag encoding matches Java's implementation + # ZigZag mapping: 0->0, -1->1, 1->2, -2->3, 2->4, -3->5, 3->6, etc. + zigzag_test_cases = [ + (0, 0), # 0 -> 0 + (-1, 1), # -1 -> 1 + (1, 2), # 1 -> 2 + (-2, 3), # -2 -> 3 + (2, 4), # 2 -> 4 + (-3, 5), # -3 -> 5 + (3, 6), # 3 -> 6 + (-64, 127), # -64 -> 127 + (64, 128), # 64 -> 128 + (-65, 129), # -65 -> 129 + ] + + for original_value, expected_zigzag in zigzag_test_cases: + # Test single value compression to verify ZigZag encoding + compressed = DeltaVarintCompressor.compress([original_value]) + decompressed = DeltaVarintCompressor.decompress(compressed) + + self.assertEqual([original_value], decompressed, + f"ZigZag encoding failed for value {original_value}") + + def test_java_compatibility_known_vectors(self): + """Test with known test vectors that should match Java implementation.""" + # Test vectors with expected compressed output (hexadecimal) + test_vectors = [ + # Simple cases + ([0], "00"), # 0 -> ZigZag(0) = 0 -> Varint(0) = 0x00 + ([1], "02"), # 1 -> ZigZag(1) = 2 -> Varint(2) = 0x02 + ([-1], "01"), # -1 -> ZigZag(-1) = 1 -> Varint(1) = 0x01 + ([2], "04"), # 2 -> ZigZag(2) = 4 -> Varint(4) = 0x04 + ([-2], "03"), # -2 -> ZigZag(-2) = 3 -> Varint(3) = 0x03 + + # Delta encoding cases + ([0, 1], "0002"), # [0, 1] -> [0, delta=1] -> [0x00, 0x02] + ([1, 2], "0202"), # [1, 2] -> [1, delta=1] -> [0x02, 0x02] + ([0, -1], "0001"), # [0, -1] -> [0, delta=-1] -> [0x00, 0x01] + ([1, 0], "0201"), # [1, 0] -> [1, delta=-1] -> [0x02, 0x01] + + # Larger values + ([127], "fe01"), # 127 -> ZigZag(127) = 254 -> Varint(254) = 0xfe01 + ([-127], "fd01"), # -127 -> ZigZag(-127) = 253 -> Varint(253) = 0xfd01 + ([128], "8002"), # 128 -> ZigZag(128) = 256 -> Varint(256) = 0x8002 + ([-128], "ff01"), # -128 -> ZigZag(-128) = 255 -> Varint(255) = 0xff01 + ] + + for original, expected_hex in test_vectors: + compressed = DeltaVarintCompressor.compress(original) + actual_hex = compressed.hex() + + self.assertEqual(expected_hex, actual_hex, + f"Binary compatibility failed for {original}. " + f"Expected: {expected_hex}, Got: {actual_hex}") + + # Also verify round-trip + decompressed = DeltaVarintCompressor.decompress(compressed) + self.assertEqual(original, decompressed, + f"Round-trip failed for {original}") + + def test_java_compatibility_large_numbers(self): + """Test compatibility with Java for large numbers (64-bit range).""" + # Test cases covering the full 64-bit signed integer range + large_number_cases = [ + 2147483647, # Integer.MAX_VALUE + -2147483648, # Integer.MIN_VALUE + 9223372036854775807, # Long.MAX_VALUE + -9223372036854775808 + 1, # Long.MIN_VALUE + 1 (avoid overflow in Python) + 4294967295, # 2^32 - 1 + -4294967296, # -2^32 + ] + + for value in large_number_cases: + # Test individual values + compressed = DeltaVarintCompressor.compress([value]) + decompressed = DeltaVarintCompressor.decompress(compressed) + self.assertEqual([value], decompressed, + f"Large number compatibility failed for {value}") + + # Test as a sequence to verify delta encoding with large numbers + compressed_seq = DeltaVarintCompressor.compress(large_number_cases) + decompressed_seq = DeltaVarintCompressor.decompress(compressed_seq) + self.assertEqual(large_number_cases, decompressed_seq, + "Large number sequence compatibility failed") + + def test_java_compatibility_varint_boundaries(self): + """Test Varint encoding boundaries that match Java implementation.""" + # Test values at Varint encoding boundaries + varint_boundary_cases = [ + # 1-byte Varint boundary + 63, # ZigZag(63) = 126, fits in 1 byte + 64, # ZigZag(64) = 128, needs 2 bytes + -64, # ZigZag(-64) = 127, fits in 1 byte + -65, # ZigZag(-65) = 129, needs 2 bytes + + # 2-byte Varint boundary + 8191, # ZigZag(8191) = 16382, fits in 2 bytes + 8192, # ZigZag(8192) = 16384, needs 3 bytes + -8192, # ZigZag(-8192) = 16383, fits in 2 bytes + -8193, # ZigZag(-8193) = 16385, needs 3 bytes + + # 3-byte Varint boundary + 1048575, # ZigZag(1048575) = 2097150, fits in 3 bytes + 1048576, # ZigZag(1048576) = 2097152, needs 4 bytes + ] + + for value in varint_boundary_cases: + compressed = DeltaVarintCompressor.compress([value]) + decompressed = DeltaVarintCompressor.decompress(compressed) + self.assertEqual([value], decompressed, + f"Varint boundary compatibility failed for {value}") + + def test_java_compatibility_delta_edge_cases(self): + """Test delta encoding edge cases for Java compatibility.""" + # Edge cases that test delta encoding behavior + delta_edge_cases = [ + # Maximum positive delta + [0, sys.maxsize], + # Maximum negative delta + [sys.maxsize, 0], + # Alternating large deltas + [0, 1000000, -1000000, 2000000, -2000000], + # Sequence with zero deltas + [42, 42, 42, 42], + # Mixed small and large deltas + [0, 1, 1000000, 1000001, 0], + ] + + for case in delta_edge_cases: + compressed = DeltaVarintCompressor.compress(case) + decompressed = DeltaVarintCompressor.decompress(compressed) + self.assertEqual(case, decompressed, + f"Delta edge case compatibility failed for {case}") + + def test_java_compatibility_error_conditions(self): + """Test error conditions that should match Java behavior.""" + # Test cases for error handling - our implementation gracefully handles + # truncated data by returning empty lists, which is acceptable behavior + + # Test with various truncated/invalid byte sequences + invalid_cases = [ + bytes([0x80]), # Single incomplete byte + bytes([0x80, 0x80]), # Incomplete 3-byte varint + bytes([0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x80]), # Long sequence + ] + + for invalid_data in invalid_cases: + # Our implementation handles invalid data gracefully by returning empty list + # This is acceptable behavior for robustness + result = DeltaVarintCompressor.decompress(invalid_data) + self.assertIsInstance(result, list, + f"Should return a list for invalid data: {invalid_data.hex()}") + # Empty result is acceptable for invalid/truncated data + + # Test that valid empty input returns empty list + empty_result = DeltaVarintCompressor.decompress(b'') + self.assertEqual([], empty_result, "Empty input should return empty list") + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/blob_format_writer.py b/paimon-python/pypaimon/write/blob_format_writer.py new file mode 100644 index 0000000000..07b29bbc4e --- /dev/null +++ b/paimon-python/pypaimon/write/blob_format_writer.py @@ -0,0 +1,107 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import struct +import zlib +from typing import BinaryIO, List + +from pypaimon.table.row.blob import Blob, BlobData +from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor + + +class BlobFormatWriter: + VERSION = 1 + MAGIC_NUMBER = 1481511375 + BUFFER_SIZE = 4096 + METADATA_SIZE = 12 # 8-byte length + 4-byte CRC + + def __init__(self, output_stream: BinaryIO): + self.output_stream = output_stream + self.lengths: List[int] = [] + self.position = 0 + + def add_element(self, row) -> None: + if not hasattr(row, 'values') or len(row.values) != 1: + raise ValueError("BlobFormatWriter only supports one field") + + blob_value = row.values[0] + if blob_value is None: + raise ValueError("BlobFormatWriter only supports non-null blob") + + if not isinstance(blob_value, Blob): + raise ValueError("Field must be Blob/BlobData instance") + + previous_pos = self.position + crc32 = 0 # Initialize CRC32 + + # Write magic number + magic_bytes = struct.pack('<I', self.MAGIC_NUMBER) # Little endian + crc32 = self._write_with_crc(magic_bytes, crc32) + + # Write blob data + if isinstance(blob_value, BlobData): + data = blob_value.to_data() + crc32 = self._write_with_crc(data, crc32) + else: + # Stream from BlobRef/Blob + stream = blob_value.new_input_stream() + try: + chunk = stream.read(self.BUFFER_SIZE) + while chunk: + crc32 = self._write_with_crc(chunk, crc32) + chunk = stream.read(self.BUFFER_SIZE) + finally: + stream.close() + + # Calculate total length including magic + data + metadata (length + CRC) + bin_length = self.position - previous_pos + self.METADATA_SIZE + self.lengths.append(bin_length) + + # Write length (8 bytes, little endian) + length_bytes = struct.pack('<Q', bin_length) + self.output_stream.write(length_bytes) + self.position += 8 + + # Write CRC32 (4 bytes, little endian) + crc_bytes = struct.pack('<I', crc32 & 0xffffffff) + self.output_stream.write(crc_bytes) + self.position += 4 + + def _write_with_crc(self, data: bytes, crc32: int) -> int: + crc32 = zlib.crc32(data, crc32) + self.output_stream.write(data) + self.position += len(data) + return crc32 + + def reach_target_size(self, suggested_check: bool, target_size: int) -> bool: + return self.position >= target_size + + def close(self) -> None: + index_bytes = DeltaVarintCompressor.compress(self.lengths) + self.output_stream.write(index_bytes) + + # Write header (index length + version) + header = struct.pack('<I', len(index_bytes)) # Index length (4 bytes, little endian) + header += struct.pack('<B', self.VERSION) # Version (1 byte) + self.output_stream.write(header) + + # Flush and close + if hasattr(self.output_stream, 'flush'): + self.output_stream.flush() + if hasattr(self.output_stream, 'close'): + self.output_stream.close() diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index e2f778b586..cc0fc944a1 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -111,6 +111,8 @@ class DataWriter(ABC): self.file_io.write_orc(file_path, data, compression=self.compression) elif self.file_format == CoreOptions.FILE_FORMAT_AVRO: self.file_io.write_avro(file_path, data) + elif self.file_format == CoreOptions.FILE_FORMAT_BLOB: + self.file_io.write_blob(file_path, data) else: raise ValueError(f"Unsupported file format: {self.file_format}")
