This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 10a71b1d0722412346a23623dcd6223a6a63b021 Author: umi <[email protected]> AuthorDate: Tue Oct 21 20:29:42 2025 +0800 [Python] SimpleStats supports BinaryRow (#6444) --- paimon-python/pypaimon/common/predicate.py | 49 ++- .../pypaimon/manifest/manifest_file_manager.py | 12 +- .../pypaimon/manifest/schema/simple_stats.py | 5 +- .../pypaimon/manifest/simple_stats_evolution.py | 124 ++++++++ .../pypaimon/manifest/simple_stats_evolutions.py | 76 +++++ .../pypaimon/read/scanner/full_starting_scanner.py | 43 ++- paimon-python/pypaimon/schema/schema_manager.py | 4 +- paimon-python/pypaimon/table/row/binary_row.py | 61 ++++ paimon-python/pypaimon/table/row/generic_row.py | 30 +- paimon-python/pypaimon/table/row/projected_row.py | 84 ++++++ paimon-python/pypaimon/tests/binary_row_test.py | 334 +++++++++++++++++++++ paimon-python/pypaimon/tests/predicates_test.py | 19 +- .../pypaimon/tests/py36/rest_ao_read_write_test.py | 26 +- paimon-python/pypaimon/tests/reader_base_test.py | 28 +- 14 files changed, 805 insertions(+), 90 deletions(-) diff --git a/paimon-python/pypaimon/common/predicate.py b/paimon-python/pypaimon/common/predicate.py index 6a760e473f..5e47fdd5df 100644 --- a/paimon-python/pypaimon/common/predicate.py +++ b/paimon-python/pypaimon/common/predicate.py @@ -27,6 +27,7 @@ from pyarrow import compute as pyarrow_compute from pyarrow import dataset as pyarrow_dataset from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.table.row.generic_row import GenericRow from pypaimon.table.row.internal_row import InternalRow @@ -67,32 +68,31 @@ class Predicate: raise ValueError(f"Unsupported predicate method: {self.method}") def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool: - return self.test_by_stats({ - "min_values": stat.min_values.to_dict(), - "max_values": stat.max_values.to_dict(), - "null_counts": { - stat.min_values.fields[i].name: stat.null_counts[i] for i in range(len(stat.min_values.fields)) - }, - "row_count": row_count, - }) - - def test_by_stats(self, stat: Dict) -> bool: + """Test predicate against BinaryRow stats with denseIndexMapping like Java implementation.""" if self.method == 'and': - return all(p.test_by_stats(stat) for p in self.literals) + return all(p.test_by_simple_stats(stat, row_count) for p in self.literals) if self.method == 'or': - t = any(p.test_by_stats(stat) for p in self.literals) - return t + return any(p.test_by_simple_stats(stat, row_count) for p in self.literals) - null_count = stat["null_counts"][self.field] - row_count = stat["row_count"] + # Get null count using the mapped index + null_count = stat.null_counts[self.index] if stat.null_counts and self.index < len( + stat.null_counts) else 0 if self.method == 'isNull': return null_count is not None and null_count > 0 if self.method == 'isNotNull': return null_count is None or row_count is None or null_count < row_count - min_value = stat["min_values"][self.field] - max_value = stat["max_values"][self.field] + if not isinstance(stat.min_values, GenericRow): + # Parse field values using BinaryRow's direct field access by name + min_value = stat.min_values.get_field(self.index) + max_value = stat.max_values.get_field(self.index) + else: + # TODO transform partition to BinaryRow + min_values = stat.min_values.to_dict() + max_values = stat.max_values.to_dict() + min_value = min_values[self.field] + max_value = max_values[self.field] if min_value is None or max_value is None or (null_count is not None and null_count == row_count): # invalid stats, skip validation @@ -164,7 +164,6 @@ class RegisterMeta(ABCMeta): class Tester(ABC, metaclass=RegisterMeta): - name = None @abstractmethod @@ -187,7 +186,6 @@ class Tester(ABC, metaclass=RegisterMeta): class Equal(Tester): - name = 'equal' def test_by_value(self, val, literals) -> bool: @@ -201,7 +199,6 @@ class Equal(Tester): class NotEqual(Tester): - name = "notEqual" def test_by_value(self, val, literals) -> bool: @@ -215,7 +212,6 @@ class NotEqual(Tester): class LessThan(Tester): - name = "lessThan" def test_by_value(self, val, literals) -> bool: @@ -229,7 +225,6 @@ class LessThan(Tester): class LessOrEqual(Tester): - name = "lessOrEqual" def test_by_value(self, val, literals) -> bool: @@ -243,7 +238,6 @@ class LessOrEqual(Tester): class GreaterThan(Tester): - name = "greaterThan" def test_by_value(self, val, literals) -> bool: @@ -257,7 +251,6 @@ class GreaterThan(Tester): class GreaterOrEqual(Tester): - name = "greaterOrEqual" def test_by_value(self, val, literals) -> bool: @@ -271,7 +264,6 @@ class GreaterOrEqual(Tester): class In(Tester): - name = "in" def test_by_value(self, val, literals) -> bool: @@ -285,7 +277,6 @@ class In(Tester): class NotIn(Tester): - name = "notIn" def test_by_value(self, val, literals) -> bool: @@ -299,7 +290,6 @@ class NotIn(Tester): class Between(Tester): - name = "between" def test_by_value(self, val, literals) -> bool: @@ -313,7 +303,6 @@ class Between(Tester): class StartsWith(Tester): - name = "startsWith" def test_by_value(self, val, literals) -> bool: @@ -329,7 +318,6 @@ class StartsWith(Tester): class EndsWith(Tester): - name = "endsWith" def test_by_value(self, val, literals) -> bool: @@ -343,7 +331,6 @@ class EndsWith(Tester): class Contains(Tester): - name = "contains" def test_by_value(self, val, literals) -> bool: @@ -357,7 +344,6 @@ class Contains(Tester): class IsNull(Tester): - name = "isNull" def test_by_value(self, val, literals) -> bool: @@ -371,7 +357,6 @@ class IsNull(Tester): class IsNotNull(Tester): - name = "isNotNull" def test_by_value(self, val, literals) -> bool: diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index bb9251df7e..c196845ff4 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -26,6 +26,7 @@ from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA, from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.table.row.generic_row import (GenericRowDeserializer, GenericRowSerializer) +from pypaimon.table.row.binary_row import BinaryRow class ManifestFileManager: @@ -54,12 +55,11 @@ class ManifestFileManager: file_dict = dict(record['_FILE']) key_dict = dict(file_dict['_KEY_STATS']) key_stats = SimpleStats( - min_values=GenericRowDeserializer.from_bytes(key_dict['_MIN_VALUES'], - self.trimmed_primary_key_fields), - max_values=GenericRowDeserializer.from_bytes(key_dict['_MAX_VALUES'], - self.trimmed_primary_key_fields), + min_values=BinaryRow(key_dict['_MIN_VALUES'], self.trimmed_primary_key_fields), + max_values=BinaryRow(key_dict['_MAX_VALUES'], self.trimmed_primary_key_fields), null_counts=key_dict['_NULL_COUNTS'], ) + value_dict = dict(file_dict['_VALUE_STATS']) if file_dict['_VALUE_STATS_COLS'] is None: if file_dict['_WRITE_COLS'] is None: @@ -72,8 +72,8 @@ class ManifestFileManager: else: fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] value_stats = SimpleStats( - min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields), - max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields), + min_values=BinaryRow(value_dict['_MIN_VALUES'], fields), + max_values=BinaryRow(value_dict['_MAX_VALUES'], fields), null_counts=value_dict['_NULL_COUNTS'], ) file_meta = DataFileMeta( diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py b/paimon-python/pypaimon/manifest/schema/simple_stats.py index 19816fdd0f..1130a812fa 100644 --- a/paimon-python/pypaimon/manifest/schema/simple_stats.py +++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py @@ -21,12 +21,13 @@ from typing import List, Optional from typing import ClassVar from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.internal_row import InternalRow @dataclass class SimpleStats: - min_values: GenericRow - max_values: GenericRow + min_values: InternalRow + max_values: InternalRow null_counts: Optional[List[int]] _empty_stats: ClassVar[object] = None diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolution.py b/paimon-python/pypaimon/manifest/simple_stats_evolution.py new file mode 100644 index 0000000000..56cea98a85 --- /dev/null +++ b/paimon-python/pypaimon/manifest/simple_stats_evolution.py @@ -0,0 +1,124 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import List, Optional, Dict, Any +import threading + +from pypaimon.schema.data_types import DataField +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.projected_row import ProjectedRow + + +class SimpleStatsEvolution: + """Converter for array of SimpleColStats.""" + + def __init__(self, data_fields: List[DataField], index_mapping: Optional[List[int]], + cast_field_getters: Optional[List[Any]]): + self.field_names = [field.name for field in data_fields] + self.index_mapping = index_mapping + self.cast_field_getters = cast_field_getters + self.index_mappings: Dict[tuple, List[int]] = {} + self._lock = threading.Lock() + + # Create empty values for optimization + self.empty_values = GenericRow([None] * len(self.field_names), data_fields) + self.empty_null_counts = [0] * len(self.field_names) + + def evolution(self, stats: SimpleStats, row_count: Optional[int], + stats_fields: Optional[List[str]]) -> 'SimpleStats': + min_values = stats.min_values + max_values = stats.max_values + null_counts = stats.null_counts + + if stats_fields is not None and not stats_fields: + # Optimize for empty dense fields + min_values = self.empty_values + max_values = self.empty_values + null_counts = self.empty_null_counts + elif stats_fields is not None: + # Apply dense field mapping + dense_index_mapping = self._get_dense_index_mapping(stats_fields) + min_values = self._project_row(min_values, dense_index_mapping) + max_values = self._project_row(max_values, dense_index_mapping) + null_counts = self._project_array(null_counts, dense_index_mapping) + + if self.index_mapping is not None: + # TODO support schema evolution + min_values = self._project_row(min_values, self.index_mapping) + max_values = self._project_row(max_values, self.index_mapping) + + if row_count is None: + raise RuntimeError("Schema Evolution for stats needs row count.") + + null_counts = self._evolve_null_counts(null_counts, self.index_mapping, row_count) + + return SimpleStats(min_values, max_values, null_counts) + + def _get_dense_index_mapping(self, dense_fields: List[str]) -> List[int]: + """ + Get dense index mapping similar to Java: + fieldNames.stream().mapToInt(denseFields::indexOf).toArray() + """ + dense_fields_tuple = tuple(dense_fields) + + if dense_fields_tuple not in self.index_mappings: + with self._lock: + # Double-check locking + if dense_fields_tuple not in self.index_mappings: + mapping = [] + for field_name in self.field_names: + try: + index = dense_fields.index(field_name) + mapping.append(index) + except ValueError: + mapping.append(-1) # Field not found + self.index_mappings[dense_fields_tuple] = mapping + + return self.index_mappings[dense_fields_tuple] + + def _project_row(self, row: Any, index_mapping: List[int]) -> Any: + """Project row based on index mapping using ProjectedRow.""" + projected_row = ProjectedRow.from_index_mapping(index_mapping) + return projected_row.replace_row(row) + + def _project_array(self, array: List[Any], index_mapping: List[int]) -> List[Any]: + """Project array based on index mapping.""" + if not array: + return [0] * len(index_mapping) + + projected = [] + for mapped_index in index_mapping: + if mapped_index >= 0 and mapped_index < len(array): + projected.append(array[mapped_index]) + else: + projected.append(0) # Default value for missing fields + + return projected + + def _evolve_null_counts(self, null_counts: List[Any], index_mapping: List[int], + not_found_value: int) -> List[Any]: + """Evolve null counts with schema evolution mapping.""" + evolved = [] + for mapped_index in index_mapping: + if mapped_index >= 0 and mapped_index < len(null_counts): + evolved.append(null_counts[mapped_index]) + else: + evolved.append(not_found_value) # Use row count for missing fields + + return evolved diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py new file mode 100644 index 0000000000..8331b7a5e5 --- /dev/null +++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py @@ -0,0 +1,76 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Callable, Dict, List, Optional + +from pypaimon.manifest.simple_stats_evolution import SimpleStatsEvolution +from pypaimon.schema.data_types import DataField + + +class SimpleStatsEvolutions: + """Converters to create col stats array serializer.""" + + def __init__(self, schema_fields: Callable[[int], List[DataField]], table_schema_id: int): + self.schema_fields = schema_fields + self.table_schema_id = table_schema_id + self.table_data_fields = schema_fields(table_schema_id) + self.table_fields = None + self.evolutions: Dict[int, SimpleStatsEvolution] = {} + + def get_or_create(self, data_schema_id: int) -> SimpleStatsEvolution: + """Get or create SimpleStatsEvolution for given schema id.""" + if data_schema_id in self.evolutions: + return self.evolutions[data_schema_id] + + if self.table_schema_id == data_schema_id: + evolution = SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None) + else: + # TODO support schema evolution + if self.table_fields is None: + self.table_fields = self.table_data_fields + + data_fields = self.schema_fields(data_schema_id) + index_cast_mapping = self._create_index_cast_mapping(self.table_fields, data_fields) + index_mapping = index_cast_mapping.get('index_mapping') + cast_mapping = index_cast_mapping.get('cast_mapping') + + evolution = SimpleStatsEvolution(data_fields, index_mapping, cast_mapping) + + self.evolutions[data_schema_id] = evolution + return evolution + + def _create_index_cast_mapping(self, table_fields: List[DataField], + data_fields: List[DataField]) -> Dict[str, Optional[List[int]]]: + """ + Create index and cast mapping between table fields and data fields. + This is a simplified implementation. + """ + # Create a mapping from field names to indices in data_fields + data_field_map = {field.name: i for i, field in enumerate(data_fields)} + + index_mapping = [] + for table_field in table_fields: + if table_field.name in data_field_map: + index_mapping.append(data_field_map[table_field.name]) + else: + index_mapping.append(-1) # Field not found in data schema + + return { + 'index_mapping': index_mapping, + 'cast_mapping': None # Simplified - no casting for now + } diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index f1ade2c4e8..47a2d86d15 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -32,6 +32,7 @@ from pypaimon.read.scanner.starting_scanner import StartingScanner from pypaimon.read.split import Split from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.bucket_mode import BucketMode +from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions class FullStartingScanner(StartingScanner): @@ -62,6 +63,19 @@ class FullStartingScanner(StartingScanner): self.table.options.get('bucket', -1)) == BucketMode.POSTPONE_BUCKET.value else False self.data_evolution = self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 'true' + self._schema_cache = {} + + def schema_fields_func(schema_id: int): + if schema_id not in self._schema_cache: + schema = self.table.schema_manager.read_schema(schema_id) + self._schema_cache[schema_id] = schema + return self._schema_cache[schema_id].fields if self._schema_cache[schema_id] else [] + + self.simple_stats_evolutions = SimpleStatsEvolutions( + schema_fields_func, + self.table.table_schema.id + ) + def scan(self) -> Plan: file_entries = self.plan_files() if not file_entries: @@ -215,22 +229,35 @@ class FullStartingScanner(StartingScanner): return False if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition): return False + + # Get SimpleStatsEvolution for this schema + evolution = self.simple_stats_evolutions.get_or_create(entry.file.schema_id) + + # Apply evolution to stats if self.table.is_primary_key_table: predicate = self.primary_key_predicate stats = entry.file.key_stats + stats_fields = None else: predicate = self.predicate stats = entry.file.value_stats + if entry.file.value_stats_cols is None and entry.file.write_cols is not None: + stats_fields = entry.file.write_cols + else: + stats_fields = entry.file.value_stats_cols if not predicate: return True - return predicate.test_by_stats({ - "min_values": stats.min_values.to_dict(), - "max_values": stats.max_values.to_dict(), - "null_counts": { - stats.min_values.fields[i].name: stats.null_counts[i] for i in range(len(stats.min_values.fields)) - }, - "row_count": entry.file.row_count - }) + evolved_stats = evolution.evolution( + stats, + entry.file.row_count, + stats_fields + ) + + # Test predicate against evolved stats + return predicate.test_by_simple_stats( + evolved_stats, + entry.file.row_count + ) def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: partitioned_files = defaultdict(list) diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index 31297cc2b3..b9c4cdbddc 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -39,7 +39,7 @@ class SchemaManager: return None max_version = max(versions) - return self._read_schema(max_version) + return self.read_schema(max_version) except Exception as e: raise RuntimeError(f"Failed to load schema from path: {self.schema_path}") from e @@ -64,7 +64,7 @@ class SchemaManager: def _to_schema_path(self, schema_id: int) -> Path: return self.schema_path / f"{self.schema_prefix}{schema_id}" - def _read_schema(self, schema_id: int) -> Optional['TableSchema']: + def read_schema(self, schema_id: int) -> Optional['TableSchema']: schema_path = self._to_schema_path(schema_id) if not self.file_io.exists(schema_path): return None diff --git a/paimon-python/pypaimon/table/row/binary_row.py b/paimon-python/pypaimon/table/row/binary_row.py new file mode 100644 index 0000000000..f908935d44 --- /dev/null +++ b/paimon-python/pypaimon/table/row/binary_row.py @@ -0,0 +1,61 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from typing import Any, List +from pypaimon.schema.data_types import DataField +from pypaimon.table.row.internal_row import InternalRow +from pypaimon.table.row.row_kind import RowKind + + +class BinaryRow(InternalRow): + """ + BinaryRow is a compact binary format for storing a row of data. + """ + + def __init__(self, data: bytes, fields: List[DataField]): + """ + Initialize BinaryRow with raw binary data and field definitions. + """ + self.data = data + self.arity = int.from_bytes(data[:4], 'big') + # Skip the arity prefix (4 bytes) if present + self.actual_data = data[4:] if len(data) >= 4 else data + self.fields = fields + self.row_kind = RowKind(self.actual_data[0]) + + def get_field(self, index: int) -> Any: + from pypaimon.table.row.generic_row import GenericRowDeserializer + """Get field value by index.""" + if index >= self.arity or index < 0: + raise IndexError(f"Field index {index} out of range [0, {self.arity})") + + if GenericRowDeserializer.is_null_at(self.actual_data, 0, index): + return None + + return GenericRowDeserializer.parse_field_value(self.actual_data, 0, + GenericRowDeserializer.calculate_bit_set_width_in_bytes( + self.arity), + index, self.fields[index].type) + + def get_row_kind(self) -> RowKind: + return self.row_kind + + def is_null_at(self, pos: int) -> bool: + return self.get_field(pos) is None + + def __len__(self): + return self.arity diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index b409d3f2eb..13e3742110 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -17,12 +17,14 @@ ################################################################################ import struct -from dataclasses import dataclass from datetime import date, datetime, time, timedelta from decimal import Decimal -from typing import Any, List +from typing import Any, List, Union + +from dataclasses import dataclass from pypaimon.schema.data_types import AtomicType, DataField, DataType +from pypaimon.table.row.binary_row import BinaryRow from pypaimon.table.row.internal_row import InternalRow, RowKind from pypaimon.table.row.blob import BlobData @@ -38,7 +40,7 @@ class GenericRow(InternalRow): def to_dict(self): return {self.fields[i].name: self.values[i] for i in range(len(self.fields))} - def get_field(self, pos: int): + def get_field(self, pos: int) -> Any: if pos >= len(self.values): raise IndexError(f"Position {pos} is out of bounds for row arity {len(self.values)}") return self.values[pos] @@ -74,28 +76,28 @@ class GenericRowDeserializer: actual_data = bytes_data[4:] fields = [] - null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity) + null_bits_size_in_bytes = cls.calculate_bit_set_width_in_bytes(arity) for i, data_field in enumerate(data_fields): value = None - if not cls._is_null_at(actual_data, 0, i): - value = cls._parse_field_value(actual_data, 0, null_bits_size_in_bytes, i, data_field.type) + if not cls.is_null_at(actual_data, 0, i): + value = cls.parse_field_value(actual_data, 0, null_bits_size_in_bytes, i, data_field.type) fields.append(value) return GenericRow(fields, data_fields, RowKind(actual_data[0])) @classmethod - def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int: + def calculate_bit_set_width_in_bytes(cls, arity: int) -> int: return ((arity + 63 + cls.HEADER_SIZE_IN_BITS) // 64) * 8 @classmethod - def _is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool: + def is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool: index = pos + cls.HEADER_SIZE_IN_BITS byte_index = offset + (index // 8) bit_index = index % 8 return (bytes_data[byte_index] & (1 << bit_index)) != 0 @classmethod - def _parse_field_value( + def parse_field_value( cls, bytes_data: bytes, base_offset: int, @@ -264,17 +266,19 @@ class GenericRowSerializer: MAX_FIX_PART_DATA_SIZE = 7 @classmethod - def to_bytes(cls, binary_row: GenericRow) -> bytes: - arity = len(binary_row.fields) + def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes: + if isinstance(row, BinaryRow): + return row.data + arity = len(row.fields) null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity) fixed_part_size = null_bits_size_in_bytes + arity * 8 fixed_part = bytearray(fixed_part_size) - fixed_part[0] = binary_row.row_kind.value + fixed_part[0] = row.row_kind.value variable_part_data = [] current_variable_offset = 0 - for i, (value, field) in enumerate(zip(binary_row.values, binary_row.fields)): + for i, (value, field) in enumerate(zip(row.values, row.fields)): field_fixed_offset = null_bits_size_in_bytes + i * 8 if value is None: diff --git a/paimon-python/pypaimon/table/row/projected_row.py b/paimon-python/pypaimon/table/row/projected_row.py new file mode 100644 index 0000000000..502338a605 --- /dev/null +++ b/paimon-python/pypaimon/table/row/projected_row.py @@ -0,0 +1,84 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from typing import Any, List +from pypaimon.table.row.internal_row import InternalRow +from pypaimon.table.row.row_kind import RowKind + + +class ProjectedRow(InternalRow): + """ + An implementation of InternalRow which provides a projected view of the underlying InternalRow. + Projection includes both reducing the accessible fields and reordering them. + Note: This class supports only top-level projections, not nested projections. + """ + + def __init__(self, index_mapping: List[int]): + """ + Initialize ProjectedRow with index mapping. + Args: + index_mapping: Array representing the mapping of fields. For example, + [0, 2, 1] specifies to include in the following order the 1st field, the 3rd field + and the 2nd field of the row. + """ + self.index_mapping = index_mapping + self.row = None + + def replace_row(self, row: InternalRow) -> 'ProjectedRow': + self.row = row + return self + + def get_field(self, pos: int) -> Any: + """Returns the value at the given position.""" + if self.index_mapping[pos] < 0: + # TODO move this logical to hive + return None + return self.row.get_field(self.index_mapping[pos]) + + def is_null_at(self, pos: int) -> bool: + """Returns true if the element is null at the given position.""" + if self.index_mapping[pos] < 0: + # TODO move this logical to hive + return True + return self.row.is_null_at(self.index_mapping[pos]) + + def get_row_kind(self) -> RowKind: + """Returns the kind of change that this row describes in a changelog.""" + return self.row.get_row_kind() + + def __len__(self) -> int: + """Returns the number of fields in this row.""" + return len(self.row) + + def __str__(self) -> str: + """String representation of the projected row.""" + return (f"{self.row.get_row_kind().name if self.row else 'None'}" + f"{{index_mapping={self.index_mapping}, row={self.row}}}") + + @staticmethod + def from_index_mapping(projection: List[int]) -> 'ProjectedRow': + """ + Create an empty ProjectedRow starting from a projection array. + Args: + projection: Array representing the mapping of fields. For example, + [0, 2, 1] specifies to include in the following order + the 1st field, the 3rd field and the 2nd field of the row. + Returns: + ProjectedRow instance + """ + return ProjectedRow(projection) diff --git a/paimon-python/pypaimon/tests/binary_row_test.py b/paimon-python/pypaimon/tests/binary_row_test.py new file mode 100644 index 0000000000..5bfabfb121 --- /dev/null +++ b/paimon-python/pypaimon/tests/binary_row_test.py @@ -0,0 +1,334 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import random +import tempfile +import unittest +from typing import List + +import pyarrow as pa + +from pypaimon import CatalogFactory, Schema +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner +from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer + + +def _random_format(): + return random.choice(['parquet', 'avro', 'orc']) + + +class BinaryRowTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('default', False) + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()), + ('f2', pa.int64()), + ]) + cls.catalog.create_table('default.test_append', Schema.from_pyarrow_schema( + pa_schema, partition_keys=['f0'], options={'file.format': _random_format()}), False) + cls.catalog.create_table('default.test_pk', Schema.from_pyarrow_schema( + pa_schema, partition_keys=['f2'], primary_keys=['f0'], + options={'bucket': '1', 'file.format': _random_format()}), False) + cls.data = pa.Table.from_pydict({ + 'f0': [1, 2, 3, 4, 5], + 'f1': ['abc', 'abbc', 'bc', 'd', None], + 'f2': [6, 7, 8, 9, 10], + }, schema=pa_schema) + + append_table = cls.catalog.get_table('default.test_append') + write_builder = append_table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_arrow(cls.data) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + pk_table = cls.catalog.get_table('default.test_pk') + write_builder = pk_table.new_batch_write_builder() + write = write_builder.new_write() + commit = write_builder.new_commit() + write.write_arrow(cls.data) + commit.commit(write.prepare_commit()) + write.close() + commit.close() + + def test_not_equal_append(self): + table = self.catalog.get_table('default.test_append') + self._overwrite_manifest_entry(table) + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.not_equal('f2', 6) # test stats filter when filtering ManifestEntry + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(1, 4) + self.assertEqual(expected, actual) + self.assertEqual(len(expected), len(splits)) + + def test_less_than_append(self): + table = self.catalog.get_table('default.test_append') + + self._overwrite_manifest_entry(table) + + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.less_than('f2', 8) + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(0, 2) + self.assertEqual(actual, expected) + self.assertEqual(len(expected), len(splits)) # test stats filter when filtering ManifestEntry + + def test_is_null_append(self): + table = self.catalog.get_table('default.test_append') + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.is_null('f1') # value_stats_cols=None + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(4, 1) + self.assertEqual(expected, actual) + self.assertEqual(len(expected), len(splits)) + + def test_is_not_null_append(self): + table = self.catalog.get_table('default.test_append') + starting_scanner = FullStartingScanner(table, None, None) + latest_snapshot = starting_scanner.snapshot_manager.get_latest_snapshot() + manifest_files = starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = starting_scanner.manifest_file_manager.read(manifest_files[0].file_name) + self._transform_manifest_entries(manifest_entries, []) + l = ['abc', 'abbc', 'bc', 'd', None] + for i, entry in enumerate(manifest_entries): + entry.file.value_stats_cols = ['f1'] + entry.file.value_stats = SimpleStats( + GenericRow([l[i]], [table.fields[1]]), + GenericRow([l[i]], [table.fields[1]]), + [1 if l[i] is None else 0], + ) + starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, manifest_entries) + + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.is_not_null('f1') + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(0, 4) + self.assertEqual(expected, actual) + self.assertEqual(len(expected), len(splits)) + + def test_is_in_append(self): + table = self.catalog.get_table('default.test_append') + self._overwrite_manifest_entry(table) + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.is_in('f2', [6, 8]) + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.take([0, 2]) + self.assertEqual(expected, actual) + self.assertEqual(len(expected), len(splits)) + + def test_equal_pk(self): + table = self.catalog.get_table('default.test_pk') + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.equal('f2', 6) + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(0, 1) + self.assertEqual(expected, actual) + self.assertEqual(len(splits), len(expected)) # test partition filter when filtering ManifestEntry + + def test_not_equal_pk(self): + table = self.catalog.get_table('default.test_pk') + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.not_equal('f2', 6) + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(1, 4) + self.assertEqual(actual, expected) + self.assertEqual(len(splits), len(expected)) # test partition filter when filtering ManifestEntry + + def test_less_than_pk(self): + table = self.catalog.get_table('default.test_pk') + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.less_than('f0', 3) + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(0, 2) + self.assertEqual(expected, actual) + self.assertEqual(len(expected), len(splits)) # test key stats filter when filtering ManifestEntry + + def test_is_null_pk(self): + table = self.catalog.get_table('default.test_pk') + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.is_null('f1') + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(4, 1) + self.assertEqual(actual, expected) + + def test_is_not_null_pk(self): + table = self.catalog.get_table('default.test_pk') + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.is_not_null('f1') + splits, actual = self._read_result(read_builder.with_filter(predicate)) + expected = self.data.slice(0, 4) + self.assertEqual(actual, expected) + + def test_is_in_pk(self): + table = self.catalog.get_table('default.test_pk') + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + predicate = predicate_builder.is_in('f0', [1, 5]) + splits, actual = self._read_result(read_builder.with_filter(predicate)) + # expected rows: indices [0, 3] + expected = self.data.take([0, 4]) + self.assertEqual(actual, expected) + self.assertEqual(len(splits), len(expected)) # test key stats filter when filtering ManifestEntry + + def test_append_multi_cols(self): + # Create a 10-column append table and write 10 rows + pa_schema = pa.schema([ + ('f0', pa.int64()), + ('f1', pa.string()), + ('f2', pa.int64()), + ('f3', pa.string()), + ('f4', pa.int64()), + ('f5', pa.string()), + ('f6', pa.int64()), + ('f7', pa.string()), + ('f8', pa.int64()), + ('f9', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + partition_keys=['f0'], + options={'file.format': _random_format()} + ) + self.catalog.create_table('default.test_append_10cols', schema, False) + table = self.catalog.get_table('default.test_append_10cols') + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + data = { + 'f0': list(range(1, 11)), # 0..9 + 'f1': ['a0', 'bb', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9'], # contains 'bb' at index 1 + 'f2': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100], + 'f3': ['x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9'], + 'f4': [0, 1, 0, 1, 0, 1, 0, 1, 0, 1], + 'f5': ['y0', 'y1', 'y2', 'y3', 'y4', 'y5', 'y6', 'y7', 'y8', 'y9'], + 'f6': [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000], + 'f7': ['z0', 'z1', 'z2', 'z3', 'z4', 'z5', 'z6', 'z7', 'z8', 'z9'], + 'f8': [5, 4, 3, 2, 1, 0, -1, -2, -3, -4], + 'f9': ['w0', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6', 'w7', 'w8', 'w9'], + } + pa_table = pa.Table.from_pydict(data, schema=pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + starting_scanner = FullStartingScanner(table, None, None) + latest_snapshot = starting_scanner.snapshot_manager.get_latest_snapshot() + manifest_files = starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = starting_scanner.manifest_file_manager.read(manifest_files[0].file_name) + self._transform_manifest_entries(manifest_entries, []) + for i, entry in enumerate(manifest_entries): + entry.file.value_stats_cols = ['f2', 'f6', 'f8'] + entry.file.value_stats = SimpleStats( + GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i], [table.fields[2], table.fields[6], table.fields[8]]), + GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i], [table.fields[2], table.fields[6], table.fields[8]]), + [0, 0, 0], + ) + starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, manifest_entries) + # Build multiple predicates and combine them + read_builder = table.new_read_builder() + predicate_builder = read_builder.new_predicate_builder() + p_in = predicate_builder.is_in('f6', [100, 600, 1000]) + p_contains = predicate_builder.less_or_equal('f8', -3) + p_not_null = predicate_builder.is_not_null('f2') + p_ge = predicate_builder.greater_or_equal('f2', 50) + p_or = predicate_builder.or_predicates([p_in, p_contains]) + combined = predicate_builder.and_predicates([p_or, p_not_null, p_ge]) + + splits, actual = self._read_result(read_builder.with_filter(combined)) + + # Expected rows after filter: indices 5 and 9 + expected_data = {'f0': [6, 9, 10], + 'f1': ['a5', 'a8', 'a9'], + 'f2': [60, 90, 100], + 'f3': ['x5', 'x8', 'x9'], + 'f4': [1, 0, 1], + 'f5': ['y5', 'y8', 'y9'], + 'f6': [600, 900, 1000], + 'f7': ['z5', 'z8', 'z9'], + 'f8': [0, -3, -4], + 'f9': ['w5', 'w8', 'w9'] + } + self.assertEqual(expected_data, actual.to_pydict()) + + starting_scanner = FullStartingScanner(table, None, None) + latest_snapshot = starting_scanner.snapshot_manager.get_latest_snapshot() + manifest_files = starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = starting_scanner.manifest_file_manager.read(manifest_files[0].file_name) + self._transform_manifest_entries(manifest_entries, []) + for i, entry in enumerate(manifest_entries): + entry.file.value_stats_cols = ['f2', 'f6', 'f8'] + entry.file.value_stats = SimpleStats( + GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2], table.fields[6], table.fields[8]]), + GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2], table.fields[6], table.fields[8]]), + [0, 0, 0], + ) + starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, manifest_entries) + splits, actual = self._read_result(read_builder.with_filter(combined)) + self.assertFalse(actual) + + def _read_result(self, read_builder): + scan = read_builder.new_scan() + read = read_builder.new_read() + splits = scan.plan().splits() + actual = read.to_arrow(splits) + return splits, actual + + def _transform_manifest_entries(self, manifest_entries: List[ManifestEntry], trimmed_pk_fields): + for entry in manifest_entries: + entry.file.key_stats.min_values = GenericRowDeserializer.from_bytes(entry.file.key_stats.min_values.data, + trimmed_pk_fields) + entry.file.key_stats.max_values = GenericRowDeserializer.from_bytes(entry.file.key_stats.max_values.data, + trimmed_pk_fields) + + def _overwrite_manifest_entry(self, table): + starting_scanner = FullStartingScanner(table, None, None) + latest_snapshot = starting_scanner.snapshot_manager.get_latest_snapshot() + manifest_files = starting_scanner.manifest_list_manager.read_all(latest_snapshot) + manifest_entries = starting_scanner.manifest_file_manager.read(manifest_files[0].file_name) + self._transform_manifest_entries(manifest_entries, []) + for i, entry in enumerate(manifest_entries): + entry.file.value_stats_cols = ['f2'] + entry.file.value_stats = SimpleStats( + GenericRow([6 + i], [table.fields[2]]), + GenericRow([6 + i], [table.fields[2]]), + [0], + ) + starting_scanner.manifest_file_manager.write(manifest_files[0].file_name, manifest_entries) diff --git a/paimon-python/pypaimon/tests/predicates_test.py b/paimon-python/pypaimon/tests/predicates_test.py index 6158d1d88b..561641589f 100644 --- a/paimon-python/pypaimon/tests/predicates_test.py +++ b/paimon-python/pypaimon/tests/predicates_test.py @@ -24,6 +24,7 @@ import pandas as pd import pyarrow as pa from pypaimon import CatalogFactory, Schema +from pypaimon.table.row.generic_row import GenericRowDeserializer def _check_filtered_result(read_builder, expected_df): @@ -454,20 +455,26 @@ class PredicateTest(unittest.TestCase): if split.partition.values == ["p1", 2]: count += 1 self.assertEqual(len(split.files), 1) - min_values = split.files[0].key_stats.min_values.to_dict() - max_values = split.files[0].key_stats.max_values.to_dict() + min_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data, + table.table_schema.get_primary_key_fields()).to_dict() + max_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data, + table.table_schema.get_primary_key_fields()).to_dict() self.assertTrue(min_values["key1"] == 1 and min_values["key2"] == "e" and max_values["key1"] == 4 and max_values["key2"] == "h") elif split.partition.values == ["p2", 2]: count += 1 - min_values = split.files[0].key_stats.min_values.to_dict() - max_values = split.files[0].key_stats.max_values.to_dict() + min_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data, + table.table_schema.get_primary_key_fields()).to_dict() + max_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data, + table.table_schema.get_primary_key_fields()).to_dict() self.assertTrue(min_values["key1"] == 5 and min_values["key2"] == "a" and max_values["key1"] == 8 and max_values["key2"] == "d") elif split.partition.values == ["p1", 1]: count += 1 - min_values = split.files[0].key_stats.min_values.to_dict() - max_values = split.files[0].key_stats.max_values.to_dict() + min_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data, + table.table_schema.get_primary_key_fields()).to_dict() + max_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data, + table.table_schema.get_primary_key_fields()).to_dict() self.assertTrue(min_values["key1"] == max_values["key1"] == 7 and max_values["key2"] == max_values["key2"] == "b") self.assertEqual(count, 3) diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index 9be66d9759..6e6d57f963 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -183,8 +183,10 @@ class RESTAOReadWritePy36Test(RESTBaseTest): manifest_files[0].file_name, lambda row: table_scan.starting_scanner._filter_manifest_entry(row), drop_stats=False) - min_value_stats = manifest_entries[0].file.value_stats.min_values.values - max_value_stats = manifest_entries[0].file.value_stats.max_values.values + min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, + table.fields).values + max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, + table.fields).values expected_min_values = [col[0].as_py() for col in expect_data] expected_max_values = [col[1].as_py() for col in expect_data] self.assertEqual(min_value_stats, expected_min_values) @@ -865,23 +867,27 @@ class RESTAOReadWritePy36Test(RESTBaseTest): # Verify value_stats structure based on the logic if value_stats_cols is None: # Should use all table fields - verify we have data for all fields - self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) + self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) elif not value_stats_cols: # Empty list # Should use empty fields - verify we have no field data - self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + self.assertEqual(read_entry.file.value_stats.min_values.arity, 0) + self.assertEqual(read_entry.file.value_stats.max_values.arity, 0) self.assertEqual(len(read_entry.file.value_stats.null_counts), 0) else: # Should use specified fields - verify we have data for specified fields only - self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) + self.assertEqual(read_entry.file.value_stats.max_values.arity, expected_fields_count) self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) # Verify the actual values match what we expect if expected_fields_count > 0: - self.assertEqual(read_entry.file.value_stats.min_values.values, min_values) - self.assertEqual(read_entry.file.value_stats.max_values.values, max_values) + self.assertEqual( + GenericRowDeserializer.from_bytes(read_entry.file.value_stats.min_values.data, test_fields).values, + min_values) + self.assertEqual( + GenericRowDeserializer.from_bytes(read_entry.file.value_stats.max_values.data, test_fields).values, + max_values) self.assertEqual(read_entry.file.value_stats.null_counts, null_counts) diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index a06e120e95..ebe98e4fc5 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -37,7 +37,7 @@ from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, MapType, PyarrowFieldParser) from pypaimon.schema.table_schema import TableSchema from pypaimon.snapshot.snapshot_manager import SnapshotManager -from pypaimon.table.row.generic_row import GenericRow +from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer from pypaimon.write.file_store_commit import FileStoreCommit @@ -226,8 +226,10 @@ class ReaderBasicTest(unittest.TestCase): manifest_files = table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot) manifest_entries = table_scan.starting_scanner.manifest_file_manager.read( manifest_files[0].file_name, lambda row: table_scan.starting_scanner._filter_manifest_entry(row), False) - min_value_stats = manifest_entries[0].file.value_stats.min_values.values - max_value_stats = manifest_entries[0].file.value_stats.max_values.values + min_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data, + table.fields).values + max_value_stats = GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data, + table.fields).values expected_min_values = [col[0].as_py() for col in expect_data] expected_max_values = [col[1].as_py() for col in expect_data] self.assertEqual(min_value_stats, expected_min_values) @@ -649,23 +651,27 @@ class ReaderBasicTest(unittest.TestCase): # Verify value_stats structure based on the logic if value_stats_cols is None: # Should use all table fields - verify we have data for all fields - self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) + self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) elif not value_stats_cols: # Empty list # Should use empty fields - verify we have no field data - self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0) + self.assertEqual(read_entry.file.value_stats.min_values.arity, 0) + self.assertEqual(read_entry.file.value_stats.max_values.arity, 0) self.assertEqual(len(read_entry.file.value_stats.null_counts), 0) else: # Should use specified fields - verify we have data for specified fields only - self.assertEqual(len(read_entry.file.value_stats.min_values.values), expected_fields_count) - self.assertEqual(len(read_entry.file.value_stats.max_values.values), expected_fields_count) + self.assertEqual(read_entry.file.value_stats.min_values.arity, expected_fields_count) + self.assertEqual(read_entry.file.value_stats.max_values.arity, expected_fields_count) self.assertEqual(len(read_entry.file.value_stats.null_counts), expected_fields_count) # Verify the actual values match what we expect if expected_fields_count > 0: - self.assertEqual(read_entry.file.value_stats.min_values.values, min_values) - self.assertEqual(read_entry.file.value_stats.max_values.values, max_values) + self.assertEqual( + GenericRowDeserializer.from_bytes(read_entry.file.value_stats.min_values.data, test_fields).values, + min_values) + self.assertEqual( + GenericRowDeserializer.from_bytes(read_entry.file.value_stats.max_values.data, test_fields).values, + max_values) self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
