This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 61179c7f11 [Python] SimpleStats supports BinaryRow (#6444)
61179c7f11 is described below
commit 61179c7f1111326f1767e20670510c4caba5040c
Author: umi <[email protected]>
AuthorDate: Tue Oct 21 20:29:42 2025 +0800
[Python] SimpleStats supports BinaryRow (#6444)
---
paimon-python/pypaimon/common/predicate.py | 49 ++-
.../pypaimon/manifest/manifest_file_manager.py | 12 +-
.../pypaimon/manifest/schema/simple_stats.py | 5 +-
.../pypaimon/manifest/simple_stats_evolution.py | 124 ++++++++
.../pypaimon/manifest/simple_stats_evolutions.py | 76 +++++
.../pypaimon/read/scanner/full_starting_scanner.py | 43 ++-
paimon-python/pypaimon/schema/schema_manager.py | 4 +-
paimon-python/pypaimon/table/row/binary_row.py | 61 ++++
paimon-python/pypaimon/table/row/generic_row.py | 30 +-
paimon-python/pypaimon/table/row/projected_row.py | 84 ++++++
paimon-python/pypaimon/tests/binary_row_test.py | 334 +++++++++++++++++++++
paimon-python/pypaimon/tests/predicates_test.py | 19 +-
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 26 +-
paimon-python/pypaimon/tests/reader_base_test.py | 28 +-
14 files changed, 805 insertions(+), 90 deletions(-)
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index 6a760e473f..5e47fdd5df 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -27,6 +27,7 @@ from pyarrow import compute as pyarrow_compute
from pyarrow import dataset as pyarrow_dataset
from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.internal_row import InternalRow
@@ -67,32 +68,31 @@ class Predicate:
raise ValueError(f"Unsupported predicate method: {self.method}")
def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
- return self.test_by_stats({
- "min_values": stat.min_values.to_dict(),
- "max_values": stat.max_values.to_dict(),
- "null_counts": {
- stat.min_values.fields[i].name: stat.null_counts[i] for i in
range(len(stat.min_values.fields))
- },
- "row_count": row_count,
- })
-
- def test_by_stats(self, stat: Dict) -> bool:
+ """Test predicate against BinaryRow stats with denseIndexMapping like
Java implementation."""
if self.method == 'and':
- return all(p.test_by_stats(stat) for p in self.literals)
+ return all(p.test_by_simple_stats(stat, row_count) for p in
self.literals)
if self.method == 'or':
- t = any(p.test_by_stats(stat) for p in self.literals)
- return t
+ return any(p.test_by_simple_stats(stat, row_count) for p in
self.literals)
- null_count = stat["null_counts"][self.field]
- row_count = stat["row_count"]
+ # Get null count using the mapped index
+ null_count = stat.null_counts[self.index] if stat.null_counts and
self.index < len(
+ stat.null_counts) else 0
if self.method == 'isNull':
return null_count is not None and null_count > 0
if self.method == 'isNotNull':
return null_count is None or row_count is None or null_count <
row_count
- min_value = stat["min_values"][self.field]
- max_value = stat["max_values"][self.field]
+ if not isinstance(stat.min_values, GenericRow):
+ # Parse field values using BinaryRow's direct field access by name
+ min_value = stat.min_values.get_field(self.index)
+ max_value = stat.max_values.get_field(self.index)
+ else:
+ # TODO transform partition to BinaryRow
+ min_values = stat.min_values.to_dict()
+ max_values = stat.max_values.to_dict()
+ min_value = min_values[self.field]
+ max_value = max_values[self.field]
if min_value is None or max_value is None or (null_count is not None
and null_count == row_count):
# invalid stats, skip validation
@@ -164,7 +164,6 @@ class RegisterMeta(ABCMeta):
class Tester(ABC, metaclass=RegisterMeta):
-
name = None
@abstractmethod
@@ -187,7 +186,6 @@ class Tester(ABC, metaclass=RegisterMeta):
class Equal(Tester):
-
name = 'equal'
def test_by_value(self, val, literals) -> bool:
@@ -201,7 +199,6 @@ class Equal(Tester):
class NotEqual(Tester):
-
name = "notEqual"
def test_by_value(self, val, literals) -> bool:
@@ -215,7 +212,6 @@ class NotEqual(Tester):
class LessThan(Tester):
-
name = "lessThan"
def test_by_value(self, val, literals) -> bool:
@@ -229,7 +225,6 @@ class LessThan(Tester):
class LessOrEqual(Tester):
-
name = "lessOrEqual"
def test_by_value(self, val, literals) -> bool:
@@ -243,7 +238,6 @@ class LessOrEqual(Tester):
class GreaterThan(Tester):
-
name = "greaterThan"
def test_by_value(self, val, literals) -> bool:
@@ -257,7 +251,6 @@ class GreaterThan(Tester):
class GreaterOrEqual(Tester):
-
name = "greaterOrEqual"
def test_by_value(self, val, literals) -> bool:
@@ -271,7 +264,6 @@ class GreaterOrEqual(Tester):
class In(Tester):
-
name = "in"
def test_by_value(self, val, literals) -> bool:
@@ -285,7 +277,6 @@ class In(Tester):
class NotIn(Tester):
-
name = "notIn"
def test_by_value(self, val, literals) -> bool:
@@ -299,7 +290,6 @@ class NotIn(Tester):
class Between(Tester):
-
name = "between"
def test_by_value(self, val, literals) -> bool:
@@ -313,7 +303,6 @@ class Between(Tester):
class StartsWith(Tester):
-
name = "startsWith"
def test_by_value(self, val, literals) -> bool:
@@ -329,7 +318,6 @@ class StartsWith(Tester):
class EndsWith(Tester):
-
name = "endsWith"
def test_by_value(self, val, literals) -> bool:
@@ -343,7 +331,6 @@ class EndsWith(Tester):
class Contains(Tester):
-
name = "contains"
def test_by_value(self, val, literals) -> bool:
@@ -357,7 +344,6 @@ class Contains(Tester):
class IsNull(Tester):
-
name = "isNull"
def test_by_value(self, val, literals) -> bool:
@@ -371,7 +357,6 @@ class IsNull(Tester):
class IsNotNull(Tester):
-
name = "isNotNull"
def test_by_value(self, val, literals) -> bool:
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index bb9251df7e..c196845ff4 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -26,6 +26,7 @@ from pypaimon.manifest.schema.manifest_entry import
(MANIFEST_ENTRY_SCHEMA,
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.table.row.generic_row import (GenericRowDeserializer,
GenericRowSerializer)
+from pypaimon.table.row.binary_row import BinaryRow
class ManifestFileManager:
@@ -54,12 +55,11 @@ class ManifestFileManager:
file_dict = dict(record['_FILE'])
key_dict = dict(file_dict['_KEY_STATS'])
key_stats = SimpleStats(
-
min_values=GenericRowDeserializer.from_bytes(key_dict['_MIN_VALUES'],
-
self.trimmed_primary_key_fields),
-
max_values=GenericRowDeserializer.from_bytes(key_dict['_MAX_VALUES'],
-
self.trimmed_primary_key_fields),
+ min_values=BinaryRow(key_dict['_MIN_VALUES'],
self.trimmed_primary_key_fields),
+ max_values=BinaryRow(key_dict['_MAX_VALUES'],
self.trimmed_primary_key_fields),
null_counts=key_dict['_NULL_COUNTS'],
)
+
value_dict = dict(file_dict['_VALUE_STATS'])
if file_dict['_VALUE_STATS_COLS'] is None:
if file_dict['_WRITE_COLS'] is None:
@@ -72,8 +72,8 @@ class ManifestFileManager:
else:
fields = [self.table.field_dict[col] for col in
file_dict['_VALUE_STATS_COLS']]
value_stats = SimpleStats(
-
min_values=GenericRowDeserializer.from_bytes(value_dict['_MIN_VALUES'], fields),
-
max_values=GenericRowDeserializer.from_bytes(value_dict['_MAX_VALUES'], fields),
+ min_values=BinaryRow(value_dict['_MIN_VALUES'], fields),
+ max_values=BinaryRow(value_dict['_MAX_VALUES'], fields),
null_counts=value_dict['_NULL_COUNTS'],
)
file_meta = DataFileMeta(
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index 19816fdd0f..1130a812fa 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -21,12 +21,13 @@ from typing import List, Optional
from typing import ClassVar
from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.internal_row import InternalRow
@dataclass
class SimpleStats:
- min_values: GenericRow
- max_values: GenericRow
+ min_values: InternalRow
+ max_values: InternalRow
null_counts: Optional[List[int]]
_empty_stats: ClassVar[object] = None
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolution.py
b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
new file mode 100644
index 0000000000..56cea98a85
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
@@ -0,0 +1,124 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import List, Optional, Dict, Any
+import threading
+
+from pypaimon.schema.data_types import DataField
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.projected_row import ProjectedRow
+
+
+class SimpleStatsEvolution:
+ """Converter for array of SimpleColStats."""
+
+ def __init__(self, data_fields: List[DataField], index_mapping:
Optional[List[int]],
+ cast_field_getters: Optional[List[Any]]):
+ self.field_names = [field.name for field in data_fields]
+ self.index_mapping = index_mapping
+ self.cast_field_getters = cast_field_getters
+ self.index_mappings: Dict[tuple, List[int]] = {}
+ self._lock = threading.Lock()
+
+ # Create empty values for optimization
+ self.empty_values = GenericRow([None] * len(self.field_names),
data_fields)
+ self.empty_null_counts = [0] * len(self.field_names)
+
+ def evolution(self, stats: SimpleStats, row_count: Optional[int],
+ stats_fields: Optional[List[str]]) -> 'SimpleStats':
+ min_values = stats.min_values
+ max_values = stats.max_values
+ null_counts = stats.null_counts
+
+ if stats_fields is not None and not stats_fields:
+ # Optimize for empty dense fields
+ min_values = self.empty_values
+ max_values = self.empty_values
+ null_counts = self.empty_null_counts
+ elif stats_fields is not None:
+ # Apply dense field mapping
+ dense_index_mapping = self._get_dense_index_mapping(stats_fields)
+ min_values = self._project_row(min_values, dense_index_mapping)
+ max_values = self._project_row(max_values, dense_index_mapping)
+ null_counts = self._project_array(null_counts, dense_index_mapping)
+
+ if self.index_mapping is not None:
+ # TODO support schema evolution
+ min_values = self._project_row(min_values, self.index_mapping)
+ max_values = self._project_row(max_values, self.index_mapping)
+
+ if row_count is None:
+ raise RuntimeError("Schema Evolution for stats needs row
count.")
+
+ null_counts = self._evolve_null_counts(null_counts,
self.index_mapping, row_count)
+
+ return SimpleStats(min_values, max_values, null_counts)
+
+ def _get_dense_index_mapping(self, dense_fields: List[str]) -> List[int]:
+ """
+ Get dense index mapping similar to Java:
+ fieldNames.stream().mapToInt(denseFields::indexOf).toArray()
+ """
+ dense_fields_tuple = tuple(dense_fields)
+
+ if dense_fields_tuple not in self.index_mappings:
+ with self._lock:
+ # Double-check locking
+ if dense_fields_tuple not in self.index_mappings:
+ mapping = []
+ for field_name in self.field_names:
+ try:
+ index = dense_fields.index(field_name)
+ mapping.append(index)
+ except ValueError:
+ mapping.append(-1) # Field not found
+ self.index_mappings[dense_fields_tuple] = mapping
+
+ return self.index_mappings[dense_fields_tuple]
+
+ def _project_row(self, row: Any, index_mapping: List[int]) -> Any:
+ """Project row based on index mapping using ProjectedRow."""
+ projected_row = ProjectedRow.from_index_mapping(index_mapping)
+ return projected_row.replace_row(row)
+
+ def _project_array(self, array: List[Any], index_mapping: List[int]) ->
List[Any]:
+ """Project array based on index mapping."""
+ if not array:
+ return [0] * len(index_mapping)
+
+ projected = []
+ for mapped_index in index_mapping:
+ if mapped_index >= 0 and mapped_index < len(array):
+ projected.append(array[mapped_index])
+ else:
+ projected.append(0) # Default value for missing fields
+
+ return projected
+
+ def _evolve_null_counts(self, null_counts: List[Any], index_mapping:
List[int],
+ not_found_value: int) -> List[Any]:
+ """Evolve null counts with schema evolution mapping."""
+ evolved = []
+ for mapped_index in index_mapping:
+ if mapped_index >= 0 and mapped_index < len(null_counts):
+ evolved.append(null_counts[mapped_index])
+ else:
+ evolved.append(not_found_value) # Use row count for missing
fields
+
+ return evolved
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
new file mode 100644
index 0000000000..8331b7a5e5
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
@@ -0,0 +1,76 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Callable, Dict, List, Optional
+
+from pypaimon.manifest.simple_stats_evolution import SimpleStatsEvolution
+from pypaimon.schema.data_types import DataField
+
+
+class SimpleStatsEvolutions:
+ """Converters to create col stats array serializer."""
+
+ def __init__(self, schema_fields: Callable[[int], List[DataField]],
table_schema_id: int):
+ self.schema_fields = schema_fields
+ self.table_schema_id = table_schema_id
+ self.table_data_fields = schema_fields(table_schema_id)
+ self.table_fields = None
+ self.evolutions: Dict[int, SimpleStatsEvolution] = {}
+
+ def get_or_create(self, data_schema_id: int) -> SimpleStatsEvolution:
+ """Get or create SimpleStatsEvolution for given schema id."""
+ if data_schema_id in self.evolutions:
+ return self.evolutions[data_schema_id]
+
+ if self.table_schema_id == data_schema_id:
+ evolution =
SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None)
+ else:
+ # TODO support schema evolution
+ if self.table_fields is None:
+ self.table_fields = self.table_data_fields
+
+ data_fields = self.schema_fields(data_schema_id)
+ index_cast_mapping =
self._create_index_cast_mapping(self.table_fields, data_fields)
+ index_mapping = index_cast_mapping.get('index_mapping')
+ cast_mapping = index_cast_mapping.get('cast_mapping')
+
+ evolution = SimpleStatsEvolution(data_fields, index_mapping,
cast_mapping)
+
+ self.evolutions[data_schema_id] = evolution
+ return evolution
+
+ def _create_index_cast_mapping(self, table_fields: List[DataField],
+ data_fields: List[DataField]) -> Dict[str,
Optional[List[int]]]:
+ """
+ Create index and cast mapping between table fields and data fields.
+ This is a simplified implementation.
+ """
+ # Create a mapping from field names to indices in data_fields
+ data_field_map = {field.name: i for i, field in enumerate(data_fields)}
+
+ index_mapping = []
+ for table_field in table_fields:
+ if table_field.name in data_field_map:
+ index_mapping.append(data_field_map[table_field.name])
+ else:
+ index_mapping.append(-1) # Field not found in data schema
+
+ return {
+ 'index_mapping': index_mapping,
+ 'cast_mapping': None # Simplified - no casting for now
+ }
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index f1ade2c4e8..47a2d86d15 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -32,6 +32,7 @@ from pypaimon.read.scanner.starting_scanner import
StartingScanner
from pypaimon.read.split import Split
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
+from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
class FullStartingScanner(StartingScanner):
@@ -62,6 +63,19 @@ class FullStartingScanner(StartingScanner):
self.table.options.get('bucket', -1)) ==
BucketMode.POSTPONE_BUCKET.value else False
self.data_evolution =
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() ==
'true'
+ self._schema_cache = {}
+
+ def schema_fields_func(schema_id: int):
+ if schema_id not in self._schema_cache:
+ schema = self.table.schema_manager.read_schema(schema_id)
+ self._schema_cache[schema_id] = schema
+ return self._schema_cache[schema_id].fields if
self._schema_cache[schema_id] else []
+
+ self.simple_stats_evolutions = SimpleStatsEvolutions(
+ schema_fields_func,
+ self.table.table_schema.id
+ )
+
def scan(self) -> Plan:
file_entries = self.plan_files()
if not file_entries:
@@ -215,22 +229,35 @@ class FullStartingScanner(StartingScanner):
return False
if self.partition_key_predicate and not
self.partition_key_predicate.test(entry.partition):
return False
+
+ # Get SimpleStatsEvolution for this schema
+ evolution =
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
+
+ # Apply evolution to stats
if self.table.is_primary_key_table:
predicate = self.primary_key_predicate
stats = entry.file.key_stats
+ stats_fields = None
else:
predicate = self.predicate
stats = entry.file.value_stats
+ if entry.file.value_stats_cols is None and entry.file.write_cols
is not None:
+ stats_fields = entry.file.write_cols
+ else:
+ stats_fields = entry.file.value_stats_cols
if not predicate:
return True
- return predicate.test_by_stats({
- "min_values": stats.min_values.to_dict(),
- "max_values": stats.max_values.to_dict(),
- "null_counts": {
- stats.min_values.fields[i].name: stats.null_counts[i] for i in
range(len(stats.min_values.fields))
- },
- "row_count": entry.file.row_count
- })
+ evolved_stats = evolution.evolution(
+ stats,
+ entry.file.row_count,
+ stats_fields
+ )
+
+ # Test predicate against evolved stats
+ return predicate.test_by_simple_stats(
+ evolved_stats,
+ entry.file.row_count
+ )
def _create_append_only_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
partitioned_files = defaultdict(list)
diff --git a/paimon-python/pypaimon/schema/schema_manager.py
b/paimon-python/pypaimon/schema/schema_manager.py
index 31297cc2b3..b9c4cdbddc 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -39,7 +39,7 @@ class SchemaManager:
return None
max_version = max(versions)
- return self._read_schema(max_version)
+ return self.read_schema(max_version)
except Exception as e:
raise RuntimeError(f"Failed to load schema from path:
{self.schema_path}") from e
@@ -64,7 +64,7 @@ class SchemaManager:
def _to_schema_path(self, schema_id: int) -> Path:
return self.schema_path / f"{self.schema_prefix}{schema_id}"
- def _read_schema(self, schema_id: int) -> Optional['TableSchema']:
+ def read_schema(self, schema_id: int) -> Optional['TableSchema']:
schema_path = self._to_schema_path(schema_id)
if not self.file_io.exists(schema_path):
return None
diff --git a/paimon-python/pypaimon/table/row/binary_row.py
b/paimon-python/pypaimon/table/row/binary_row.py
new file mode 100644
index 0000000000..f908935d44
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -0,0 +1,61 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from typing import Any, List
+from pypaimon.schema.data_types import DataField
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class BinaryRow(InternalRow):
+ """
+ BinaryRow is a compact binary format for storing a row of data.
+ """
+
+ def __init__(self, data: bytes, fields: List[DataField]):
+ """
+ Initialize BinaryRow with raw binary data and field definitions.
+ """
+ self.data = data
+ self.arity = int.from_bytes(data[:4], 'big')
+ # Skip the arity prefix (4 bytes) if present
+ self.actual_data = data[4:] if len(data) >= 4 else data
+ self.fields = fields
+ self.row_kind = RowKind(self.actual_data[0])
+
+ def get_field(self, index: int) -> Any:
+ from pypaimon.table.row.generic_row import GenericRowDeserializer
+ """Get field value by index."""
+ if index >= self.arity or index < 0:
+ raise IndexError(f"Field index {index} out of range [0,
{self.arity})")
+
+ if GenericRowDeserializer.is_null_at(self.actual_data, 0, index):
+ return None
+
+ return GenericRowDeserializer.parse_field_value(self.actual_data, 0,
+
GenericRowDeserializer.calculate_bit_set_width_in_bytes(
+ self.arity),
+ index,
self.fields[index].type)
+
+ def get_row_kind(self) -> RowKind:
+ return self.row_kind
+
+ def is_null_at(self, pos: int) -> bool:
+ return self.get_field(pos) is None
+
+ def __len__(self):
+ return self.arity
diff --git a/paimon-python/pypaimon/table/row/generic_row.py
b/paimon-python/pypaimon/table/row/generic_row.py
index b409d3f2eb..13e3742110 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -17,12 +17,14 @@
################################################################################
import struct
-from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from decimal import Decimal
-from typing import Any, List
+from typing import Any, List, Union
+
+from dataclasses import dataclass
from pypaimon.schema.data_types import AtomicType, DataField, DataType
+from pypaimon.table.row.binary_row import BinaryRow
from pypaimon.table.row.internal_row import InternalRow, RowKind
from pypaimon.table.row.blob import BlobData
@@ -38,7 +40,7 @@ class GenericRow(InternalRow):
def to_dict(self):
return {self.fields[i].name: self.values[i] for i in
range(len(self.fields))}
- def get_field(self, pos: int):
+ def get_field(self, pos: int) -> Any:
if pos >= len(self.values):
raise IndexError(f"Position {pos} is out of bounds for row arity
{len(self.values)}")
return self.values[pos]
@@ -74,28 +76,28 @@ class GenericRowDeserializer:
actual_data = bytes_data[4:]
fields = []
- null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
+ null_bits_size_in_bytes = cls.calculate_bit_set_width_in_bytes(arity)
for i, data_field in enumerate(data_fields):
value = None
- if not cls._is_null_at(actual_data, 0, i):
- value = cls._parse_field_value(actual_data, 0,
null_bits_size_in_bytes, i, data_field.type)
+ if not cls.is_null_at(actual_data, 0, i):
+ value = cls.parse_field_value(actual_data, 0,
null_bits_size_in_bytes, i, data_field.type)
fields.append(value)
return GenericRow(fields, data_fields, RowKind(actual_data[0]))
@classmethod
- def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
+ def calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
return ((arity + 63 + cls.HEADER_SIZE_IN_BITS) // 64) * 8
@classmethod
- def _is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool:
+ def is_null_at(cls, bytes_data: bytes, offset: int, pos: int) -> bool:
index = pos + cls.HEADER_SIZE_IN_BITS
byte_index = offset + (index // 8)
bit_index = index % 8
return (bytes_data[byte_index] & (1 << bit_index)) != 0
@classmethod
- def _parse_field_value(
+ def parse_field_value(
cls,
bytes_data: bytes,
base_offset: int,
@@ -264,17 +266,19 @@ class GenericRowSerializer:
MAX_FIX_PART_DATA_SIZE = 7
@classmethod
- def to_bytes(cls, binary_row: GenericRow) -> bytes:
- arity = len(binary_row.fields)
+ def to_bytes(cls, row: Union[GenericRow, BinaryRow]) -> bytes:
+ if isinstance(row, BinaryRow):
+ return row.data
+ arity = len(row.fields)
null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
fixed_part_size = null_bits_size_in_bytes + arity * 8
fixed_part = bytearray(fixed_part_size)
- fixed_part[0] = binary_row.row_kind.value
+ fixed_part[0] = row.row_kind.value
variable_part_data = []
current_variable_offset = 0
- for i, (value, field) in enumerate(zip(binary_row.values,
binary_row.fields)):
+ for i, (value, field) in enumerate(zip(row.values, row.fields)):
field_fixed_offset = null_bits_size_in_bytes + i * 8
if value is None:
diff --git a/paimon-python/pypaimon/table/row/projected_row.py
b/paimon-python/pypaimon/table/row/projected_row.py
new file mode 100644
index 0000000000..502338a605
--- /dev/null
+++ b/paimon-python/pypaimon/table/row/projected_row.py
@@ -0,0 +1,84 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Any, List
+from pypaimon.table.row.internal_row import InternalRow
+from pypaimon.table.row.row_kind import RowKind
+
+
+class ProjectedRow(InternalRow):
+ """
+ An implementation of InternalRow which provides a projected view of the
underlying InternalRow.
+ Projection includes both reducing the accessible fields and reordering
them.
+ Note: This class supports only top-level projections, not nested
projections.
+ """
+
+ def __init__(self, index_mapping: List[int]):
+ """
+ Initialize ProjectedRow with index mapping.
+ Args:
+ index_mapping: Array representing the mapping of fields. For
example,
+ [0, 2, 1] specifies to include in the following order the 1st
field, the 3rd field
+ and the 2nd field of the row.
+ """
+ self.index_mapping = index_mapping
+ self.row = None
+
+ def replace_row(self, row: InternalRow) -> 'ProjectedRow':
+ self.row = row
+ return self
+
+ def get_field(self, pos: int) -> Any:
+ """Returns the value at the given position."""
+ if self.index_mapping[pos] < 0:
+ # TODO move this logical to hive
+ return None
+ return self.row.get_field(self.index_mapping[pos])
+
+ def is_null_at(self, pos: int) -> bool:
+ """Returns true if the element is null at the given position."""
+ if self.index_mapping[pos] < 0:
+ # TODO move this logical to hive
+ return True
+ return self.row.is_null_at(self.index_mapping[pos])
+
+ def get_row_kind(self) -> RowKind:
+ """Returns the kind of change that this row describes in a
changelog."""
+ return self.row.get_row_kind()
+
+ def __len__(self) -> int:
+ """Returns the number of fields in this row."""
+ return len(self.row)
+
+ def __str__(self) -> str:
+ """String representation of the projected row."""
+ return (f"{self.row.get_row_kind().name if self.row else 'None'}"
+ f"{{index_mapping={self.index_mapping}, row={self.row}}}")
+
+ @staticmethod
+ def from_index_mapping(projection: List[int]) -> 'ProjectedRow':
+ """
+ Create an empty ProjectedRow starting from a projection array.
+ Args:
+ projection: Array representing the mapping of fields. For example,
+ [0, 2, 1] specifies to include in the following order
+ the 1st field, the 3rd field and the 2nd field of the
row.
+ Returns:
+ ProjectedRow instance
+ """
+ return ProjectedRow(projection)
diff --git a/paimon-python/pypaimon/tests/binary_row_test.py
b/paimon-python/pypaimon/tests/binary_row_test.py
new file mode 100644
index 0000000000..5bfabfb121
--- /dev/null
+++ b/paimon-python/pypaimon/tests/binary_row_test.py
@@ -0,0 +1,334 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import random
+import tempfile
+import unittest
+from typing import List
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
+from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
+
+
+def _random_format():
+ return random.choice(['parquet', 'avro', 'orc'])
+
+
+class BinaryRowTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string()),
+ ('f2', pa.int64()),
+ ])
+ cls.catalog.create_table('default.test_append',
Schema.from_pyarrow_schema(
+ pa_schema, partition_keys=['f0'], options={'file.format':
_random_format()}), False)
+ cls.catalog.create_table('default.test_pk', Schema.from_pyarrow_schema(
+ pa_schema, partition_keys=['f2'], primary_keys=['f0'],
+ options={'bucket': '1', 'file.format': _random_format()}), False)
+ cls.data = pa.Table.from_pydict({
+ 'f0': [1, 2, 3, 4, 5],
+ 'f1': ['abc', 'abbc', 'bc', 'd', None],
+ 'f2': [6, 7, 8, 9, 10],
+ }, schema=pa_schema)
+
+ append_table = cls.catalog.get_table('default.test_append')
+ write_builder = append_table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+ write.write_arrow(cls.data)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ pk_table = cls.catalog.get_table('default.test_pk')
+ write_builder = pk_table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+ write.write_arrow(cls.data)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ def test_not_equal_append(self):
+ table = self.catalog.get_table('default.test_append')
+ self._overwrite_manifest_entry(table)
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.not_equal('f2', 6) # test stats filter
when filtering ManifestEntry
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(1, 4)
+ self.assertEqual(expected, actual)
+ self.assertEqual(len(expected), len(splits))
+
+ def test_less_than_append(self):
+ table = self.catalog.get_table('default.test_append')
+
+ self._overwrite_manifest_entry(table)
+
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.less_than('f2', 8)
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(0, 2)
+ self.assertEqual(actual, expected)
+ self.assertEqual(len(expected), len(splits)) # test stats filter when
filtering ManifestEntry
+
+ def test_is_null_append(self):
+ table = self.catalog.get_table('default.test_append')
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.is_null('f1') # value_stats_cols=None
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(4, 1)
+ self.assertEqual(expected, actual)
+ self.assertEqual(len(expected), len(splits))
+
+ def test_is_not_null_append(self):
+ table = self.catalog.get_table('default.test_append')
+ starting_scanner = FullStartingScanner(table, None, None)
+ latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ self._transform_manifest_entries(manifest_entries, [])
+ l = ['abc', 'abbc', 'bc', 'd', None]
+ for i, entry in enumerate(manifest_entries):
+ entry.file.value_stats_cols = ['f1']
+ entry.file.value_stats = SimpleStats(
+ GenericRow([l[i]], [table.fields[1]]),
+ GenericRow([l[i]], [table.fields[1]]),
+ [1 if l[i] is None else 0],
+ )
+
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
+
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.is_not_null('f1')
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(0, 4)
+ self.assertEqual(expected, actual)
+ self.assertEqual(len(expected), len(splits))
+
+ def test_is_in_append(self):
+ table = self.catalog.get_table('default.test_append')
+ self._overwrite_manifest_entry(table)
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.is_in('f2', [6, 8])
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.take([0, 2])
+ self.assertEqual(expected, actual)
+ self.assertEqual(len(expected), len(splits))
+
+ def test_equal_pk(self):
+ table = self.catalog.get_table('default.test_pk')
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.equal('f2', 6)
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(0, 1)
+ self.assertEqual(expected, actual)
+ self.assertEqual(len(splits), len(expected)) # test partition filter
when filtering ManifestEntry
+
+ def test_not_equal_pk(self):
+ table = self.catalog.get_table('default.test_pk')
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.not_equal('f2', 6)
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(1, 4)
+ self.assertEqual(actual, expected)
+ self.assertEqual(len(splits), len(expected)) # test partition filter
when filtering ManifestEntry
+
+ def test_less_than_pk(self):
+ table = self.catalog.get_table('default.test_pk')
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.less_than('f0', 3)
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(0, 2)
+ self.assertEqual(expected, actual)
+ self.assertEqual(len(expected), len(splits)) # test key stats filter
when filtering ManifestEntry
+
+ def test_is_null_pk(self):
+ table = self.catalog.get_table('default.test_pk')
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.is_null('f1')
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(4, 1)
+ self.assertEqual(actual, expected)
+
+ def test_is_not_null_pk(self):
+ table = self.catalog.get_table('default.test_pk')
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.is_not_null('f1')
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ expected = self.data.slice(0, 4)
+ self.assertEqual(actual, expected)
+
+ def test_is_in_pk(self):
+ table = self.catalog.get_table('default.test_pk')
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.is_in('f0', [1, 5])
+ splits, actual = self._read_result(read_builder.with_filter(predicate))
+ # expected rows: indices [0, 3]
+ expected = self.data.take([0, 4])
+ self.assertEqual(actual, expected)
+ self.assertEqual(len(splits), len(expected)) # test key stats filter
when filtering ManifestEntry
+
+ def test_append_multi_cols(self):
+ # Create a 10-column append table and write 10 rows
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string()),
+ ('f2', pa.int64()),
+ ('f3', pa.string()),
+ ('f4', pa.int64()),
+ ('f5', pa.string()),
+ ('f6', pa.int64()),
+ ('f7', pa.string()),
+ ('f8', pa.int64()),
+ ('f9', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ partition_keys=['f0'],
+ options={'file.format': _random_format()}
+ )
+ self.catalog.create_table('default.test_append_10cols', schema, False)
+ table = self.catalog.get_table('default.test_append_10cols')
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ data = {
+ 'f0': list(range(1, 11)), # 0..9
+ 'f1': ['a0', 'bb', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8',
'a9'], # contains 'bb' at index 1
+ 'f2': [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
+ 'f3': ['x0', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6', 'x7', 'x8', 'x9'],
+ 'f4': [0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
+ 'f5': ['y0', 'y1', 'y2', 'y3', 'y4', 'y5', 'y6', 'y7', 'y8', 'y9'],
+ 'f6': [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
+ 'f7': ['z0', 'z1', 'z2', 'z3', 'z4', 'z5', 'z6', 'z7', 'z8', 'z9'],
+ 'f8': [5, 4, 3, 2, 1, 0, -1, -2, -3, -4],
+ 'f9': ['w0', 'w1', 'w2', 'w3', 'w4', 'w5', 'w6', 'w7', 'w8', 'w9'],
+ }
+ pa_table = pa.Table.from_pydict(data, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ starting_scanner = FullStartingScanner(table, None, None)
+ latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ self._transform_manifest_entries(manifest_entries, [])
+ for i, entry in enumerate(manifest_entries):
+ entry.file.value_stats_cols = ['f2', 'f6', 'f8']
+ entry.file.value_stats = SimpleStats(
+ GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i],
[table.fields[2], table.fields[6], table.fields[8]]),
+ GenericRow([10 * (i + 1), 100 * (i + 1), 5 - i],
[table.fields[2], table.fields[6], table.fields[8]]),
+ [0, 0, 0],
+ )
+
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
+ # Build multiple predicates and combine them
+ read_builder = table.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ p_in = predicate_builder.is_in('f6', [100, 600, 1000])
+ p_contains = predicate_builder.less_or_equal('f8', -3)
+ p_not_null = predicate_builder.is_not_null('f2')
+ p_ge = predicate_builder.greater_or_equal('f2', 50)
+ p_or = predicate_builder.or_predicates([p_in, p_contains])
+ combined = predicate_builder.and_predicates([p_or, p_not_null, p_ge])
+
+ splits, actual = self._read_result(read_builder.with_filter(combined))
+
+ # Expected rows after filter: indices 5 and 9
+ expected_data = {'f0': [6, 9, 10],
+ 'f1': ['a5', 'a8', 'a9'],
+ 'f2': [60, 90, 100],
+ 'f3': ['x5', 'x8', 'x9'],
+ 'f4': [1, 0, 1],
+ 'f5': ['y5', 'y8', 'y9'],
+ 'f6': [600, 900, 1000],
+ 'f7': ['z5', 'z8', 'z9'],
+ 'f8': [0, -3, -4],
+ 'f9': ['w5', 'w8', 'w9']
+ }
+ self.assertEqual(expected_data, actual.to_pydict())
+
+ starting_scanner = FullStartingScanner(table, None, None)
+ latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ self._transform_manifest_entries(manifest_entries, [])
+ for i, entry in enumerate(manifest_entries):
+ entry.file.value_stats_cols = ['f2', 'f6', 'f8']
+ entry.file.value_stats = SimpleStats(
+ GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2],
table.fields[6], table.fields[8]]),
+ GenericRow([0, 100 * (i + 1), 5 - i], [table.fields[2],
table.fields[6], table.fields[8]]),
+ [0, 0, 0],
+ )
+
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
+ splits, actual = self._read_result(read_builder.with_filter(combined))
+ self.assertFalse(actual)
+
+ def _read_result(self, read_builder):
+ scan = read_builder.new_scan()
+ read = read_builder.new_read()
+ splits = scan.plan().splits()
+ actual = read.to_arrow(splits)
+ return splits, actual
+
+ def _transform_manifest_entries(self, manifest_entries:
List[ManifestEntry], trimmed_pk_fields):
+ for entry in manifest_entries:
+ entry.file.key_stats.min_values =
GenericRowDeserializer.from_bytes(entry.file.key_stats.min_values.data,
+
trimmed_pk_fields)
+ entry.file.key_stats.max_values =
GenericRowDeserializer.from_bytes(entry.file.key_stats.max_values.data,
+
trimmed_pk_fields)
+
+ def _overwrite_manifest_entry(self, table):
+ starting_scanner = FullStartingScanner(table, None, None)
+ latest_snapshot =
starting_scanner.snapshot_manager.get_latest_snapshot()
+ manifest_files =
starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+ manifest_entries =
starting_scanner.manifest_file_manager.read(manifest_files[0].file_name)
+ self._transform_manifest_entries(manifest_entries, [])
+ for i, entry in enumerate(manifest_entries):
+ entry.file.value_stats_cols = ['f2']
+ entry.file.value_stats = SimpleStats(
+ GenericRow([6 + i], [table.fields[2]]),
+ GenericRow([6 + i], [table.fields[2]]),
+ [0],
+ )
+
starting_scanner.manifest_file_manager.write(manifest_files[0].file_name,
manifest_entries)
diff --git a/paimon-python/pypaimon/tests/predicates_test.py
b/paimon-python/pypaimon/tests/predicates_test.py
index 6158d1d88b..561641589f 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -24,6 +24,7 @@ import pandas as pd
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
+from pypaimon.table.row.generic_row import GenericRowDeserializer
def _check_filtered_result(read_builder, expected_df):
@@ -454,20 +455,26 @@ class PredicateTest(unittest.TestCase):
if split.partition.values == ["p1", 2]:
count += 1
self.assertEqual(len(split.files), 1)
- min_values = split.files[0].key_stats.min_values.to_dict()
- max_values = split.files[0].key_stats.max_values.to_dict()
+ min_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
+
table.table_schema.get_primary_key_fields()).to_dict()
+ max_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
+
table.table_schema.get_primary_key_fields()).to_dict()
self.assertTrue(min_values["key1"] == 1 and min_values["key2"]
== "e"
and max_values["key1"] == 4 and
max_values["key2"] == "h")
elif split.partition.values == ["p2", 2]:
count += 1
- min_values = split.files[0].key_stats.min_values.to_dict()
- max_values = split.files[0].key_stats.max_values.to_dict()
+ min_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
+
table.table_schema.get_primary_key_fields()).to_dict()
+ max_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
+
table.table_schema.get_primary_key_fields()).to_dict()
self.assertTrue(min_values["key1"] == 5 and min_values["key2"]
== "a"
and max_values["key1"] == 8 and
max_values["key2"] == "d")
elif split.partition.values == ["p1", 1]:
count += 1
- min_values = split.files[0].key_stats.min_values.to_dict()
- max_values = split.files[0].key_stats.max_values.to_dict()
+ min_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
+
table.table_schema.get_primary_key_fields()).to_dict()
+ max_values =
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
+
table.table_schema.get_primary_key_fields()).to_dict()
self.assertTrue(min_values["key1"] == max_values["key1"] == 7
and max_values["key2"] == max_values["key2"]
== "b")
self.assertEqual(count, 3)
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 9be66d9759..6e6d57f963 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -183,8 +183,10 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
manifest_files[0].file_name,
lambda row:
table_scan.starting_scanner._filter_manifest_entry(row),
drop_stats=False)
- min_value_stats =
manifest_entries[0].file.value_stats.min_values.values
- max_value_stats =
manifest_entries[0].file.value_stats.max_values.values
+ min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+
table.fields).values
+ max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+
table.fields).values
expected_min_values = [col[0].as_py() for col in expect_data]
expected_max_values = [col[1].as_py() for col in expect_data]
self.assertEqual(min_value_stats, expected_min_values)
@@ -865,23 +867,27 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
# Verify value_stats structure based on the logic
if value_stats_cols is None:
# Should use all table fields - verify we have data for all fields
-
self.assertEqual(len(read_entry.file.value_stats.min_values.values),
expected_fields_count)
-
self.assertEqual(len(read_entry.file.value_stats.max_values.values),
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity,
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity,
expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.null_counts),
expected_fields_count)
elif not value_stats_cols: # Empty list
# Should use empty fields - verify we have no field data
-
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0)
-
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity, 0)
+ self.assertEqual(read_entry.file.value_stats.max_values.arity, 0)
self.assertEqual(len(read_entry.file.value_stats.null_counts), 0)
else:
# Should use specified fields - verify we have data for specified
fields only
-
self.assertEqual(len(read_entry.file.value_stats.min_values.values),
expected_fields_count)
-
self.assertEqual(len(read_entry.file.value_stats.max_values.values),
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity,
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.max_values.arity,
expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.null_counts),
expected_fields_count)
# Verify the actual values match what we expect
if expected_fields_count > 0:
- self.assertEqual(read_entry.file.value_stats.min_values.values,
min_values)
- self.assertEqual(read_entry.file.value_stats.max_values.values,
max_values)
+ self.assertEqual(
+
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.min_values.data,
test_fields).values,
+ min_values)
+ self.assertEqual(
+
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.max_values.data,
test_fields).values,
+ max_values)
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index a06e120e95..ebe98e4fc5 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -37,7 +37,7 @@ from pypaimon.schema.data_types import (ArrayType,
AtomicType, DataField,
MapType, PyarrowFieldParser)
from pypaimon.schema.table_schema import TableSchema
from pypaimon.snapshot.snapshot_manager import SnapshotManager
-from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.row.generic_row import GenericRow, GenericRowDeserializer
from pypaimon.write.file_store_commit import FileStoreCommit
@@ -226,8 +226,10 @@ class ReaderBasicTest(unittest.TestCase):
manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
manifest_files[0].file_name, lambda row:
table_scan.starting_scanner._filter_manifest_entry(row), False)
- min_value_stats =
manifest_entries[0].file.value_stats.min_values.values
- max_value_stats =
manifest_entries[0].file.value_stats.max_values.values
+ min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+
table.fields).values
+ max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+
table.fields).values
expected_min_values = [col[0].as_py() for col in expect_data]
expected_max_values = [col[1].as_py() for col in expect_data]
self.assertEqual(min_value_stats, expected_min_values)
@@ -649,23 +651,27 @@ class ReaderBasicTest(unittest.TestCase):
# Verify value_stats structure based on the logic
if value_stats_cols is None:
# Should use all table fields - verify we have data for all fields
-
self.assertEqual(len(read_entry.file.value_stats.min_values.values),
expected_fields_count)
-
self.assertEqual(len(read_entry.file.value_stats.max_values.values),
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity,
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity,
expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.null_counts),
expected_fields_count)
elif not value_stats_cols: # Empty list
# Should use empty fields - verify we have no field data
-
self.assertEqual(len(read_entry.file.value_stats.min_values.values), 0)
-
self.assertEqual(len(read_entry.file.value_stats.max_values.values), 0)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity, 0)
+ self.assertEqual(read_entry.file.value_stats.max_values.arity, 0)
self.assertEqual(len(read_entry.file.value_stats.null_counts), 0)
else:
# Should use specified fields - verify we have data for specified
fields only
-
self.assertEqual(len(read_entry.file.value_stats.min_values.values),
expected_fields_count)
-
self.assertEqual(len(read_entry.file.value_stats.max_values.values),
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.min_values.arity,
expected_fields_count)
+ self.assertEqual(read_entry.file.value_stats.max_values.arity,
expected_fields_count)
self.assertEqual(len(read_entry.file.value_stats.null_counts),
expected_fields_count)
# Verify the actual values match what we expect
if expected_fields_count > 0:
- self.assertEqual(read_entry.file.value_stats.min_values.values,
min_values)
- self.assertEqual(read_entry.file.value_stats.max_values.values,
max_values)
+ self.assertEqual(
+
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.min_values.data,
test_fields).values,
+ min_values)
+ self.assertEqual(
+
GenericRowDeserializer.from_bytes(read_entry.file.value_stats.max_values.data,
test_fields).values,
+ max_values)
self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)