This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a7a0b830d [Python] Refactor BinaryRow to reuse keys and key fields 
(#6445)
2a7a0b830d is described below

commit 2a7a0b830d6aaf6d35729bda18343b9879c8844d
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 21 21:36:24 2025 +0800

    [Python] Refactor BinaryRow to reuse keys and key fields (#6445)
---
 paimon-python/pypaimon/common/predicate.py         | 17 +++----------
 .../pypaimon/manifest/manifest_file_manager.py     | 16 ++++++-------
 .../pypaimon/manifest/manifest_list_manager.py     | 12 +++++-----
 .../pypaimon/manifest/schema/simple_stats.py       |  5 ++--
 .../pypaimon/manifest/simple_stats_evolution.py    |  1 -
 .../pypaimon/manifest/simple_stats_evolutions.py   |  1 -
 .../pypaimon/read/scanner/full_starting_scanner.py |  2 +-
 paimon-python/pypaimon/read/split_read.py          |  2 +-
 paimon-python/pypaimon/schema/table_schema.py      | 28 ----------------------
 paimon-python/pypaimon/table/file_store_table.py   |  5 ++++
 paimon-python/pypaimon/table/row/binary_row.py     |  3 ---
 paimon-python/pypaimon/table/row/generic_row.py    |  3 ---
 paimon-python/pypaimon/table/row/internal_row.py   |  6 -----
 paimon-python/pypaimon/table/row/offset_row.py     |  3 ---
 paimon-python/pypaimon/table/row/projected_row.py  |  7 ------
 paimon-python/pypaimon/tests/predicates_test.py    | 12 +++++-----
 paimon-python/pypaimon/write/file_store_commit.py  | 10 ++++----
 paimon-python/pypaimon/write/writer/data_writer.py | 16 ++++++-------
 .../pypaimon/write/writer/key_value_data_writer.py |  8 +++----
 19 files changed, 50 insertions(+), 107 deletions(-)

diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index 5e47fdd5df..9ae2cdfce3 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -27,7 +27,6 @@ from pyarrow import compute as pyarrow_compute
 from pyarrow import dataset as pyarrow_dataset
 
 from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.table.row.internal_row import InternalRow
 
 
@@ -74,25 +73,15 @@ class Predicate:
         if self.method == 'or':
             return any(p.test_by_simple_stats(stat, row_count) for p in 
self.literals)
 
-        # Get null count using the mapped index
-        null_count = stat.null_counts[self.index] if stat.null_counts and 
self.index < len(
-            stat.null_counts) else 0
+        null_count = stat.null_counts[self.index]
 
         if self.method == 'isNull':
             return null_count is not None and null_count > 0
         if self.method == 'isNotNull':
             return null_count is None or row_count is None or null_count < 
row_count
 
-        if not isinstance(stat.min_values, GenericRow):
-            # Parse field values using BinaryRow's direct field access by name
-            min_value = stat.min_values.get_field(self.index)
-            max_value = stat.max_values.get_field(self.index)
-        else:
-            # TODO transform partition to BinaryRow
-            min_values = stat.min_values.to_dict()
-            max_values = stat.max_values.to_dict()
-            min_value = min_values[self.field]
-            max_value = max_values[self.field]
+        min_value = stat.min_values.get_field(self.index)
+        max_value = stat.max_values.get_field(self.index)
 
         if min_value is None or max_value is None or (null_count is not None 
and null_count == row_count):
             # invalid stats, skip validation
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index c196845ff4..9fc92a4113 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -38,9 +38,9 @@ class ManifestFileManager:
         self.table: FileStoreTable = table
         self.manifest_path = table.table_path / "manifest"
         self.file_io = table.file_io
-        self.partition_key_fields = 
self.table.table_schema.get_partition_key_fields()
-        self.primary_key_fields = 
self.table.table_schema.get_primary_key_fields()
-        self.trimmed_primary_key_fields = 
self.table.table_schema.get_trimmed_primary_key_fields()
+        self.partition_keys_fields = self.table.partition_keys_fields
+        self.primary_keys_fields = self.table.primary_keys_fields
+        self.trimmed_primary_keys_fields = 
self.table.trimmed_primary_keys_fields
 
     def read(self, manifest_file_name: str, manifest_entry_filter=None, 
drop_stats=True) -> List[ManifestEntry]:
         manifest_file_path = self.manifest_path / manifest_file_name
@@ -55,8 +55,8 @@ class ManifestFileManager:
             file_dict = dict(record['_FILE'])
             key_dict = dict(file_dict['_KEY_STATS'])
             key_stats = SimpleStats(
-                min_values=BinaryRow(key_dict['_MIN_VALUES'], 
self.trimmed_primary_key_fields),
-                max_values=BinaryRow(key_dict['_MAX_VALUES'], 
self.trimmed_primary_key_fields),
+                min_values=BinaryRow(key_dict['_MIN_VALUES'], 
self.trimmed_primary_keys_fields),
+                max_values=BinaryRow(key_dict['_MAX_VALUES'], 
self.trimmed_primary_keys_fields),
                 null_counts=key_dict['_NULL_COUNTS'],
             )
 
@@ -80,8 +80,8 @@ class ManifestFileManager:
                 file_name=file_dict['_FILE_NAME'],
                 file_size=file_dict['_FILE_SIZE'],
                 row_count=file_dict['_ROW_COUNT'],
-                
min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'], 
self.trimmed_primary_key_fields),
-                
max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'], 
self.trimmed_primary_key_fields),
+                
min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'], 
self.trimmed_primary_keys_fields),
+                
max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'], 
self.trimmed_primary_keys_fields),
                 key_stats=key_stats,
                 value_stats=value_stats,
                 min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'],
@@ -100,7 +100,7 @@ class ManifestFileManager:
             )
             entry = ManifestEntry(
                 kind=record['_KIND'],
-                
partition=GenericRowDeserializer.from_bytes(record['_PARTITION'], 
self.partition_key_fields),
+                
partition=GenericRowDeserializer.from_bytes(record['_PARTITION'], 
self.partition_keys_fields),
                 bucket=record['_BUCKET'],
                 total_buckets=record['_TOTAL_BUCKETS'],
                 file=file_meta
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index 0fc58652f6..367f802de5 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -25,8 +25,8 @@ from pypaimon.manifest.schema.manifest_file_meta import (
     MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
 from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.snapshot.snapshot import Snapshot
-from pypaimon.table.row.generic_row import (GenericRowDeserializer,
-                                            GenericRowSerializer)
+from pypaimon.table.row.binary_row import BinaryRow
+from pypaimon.table.row.generic_row import GenericRowSerializer
 
 
 class ManifestListManager:
@@ -61,13 +61,13 @@ class ManifestListManager:
         for record in reader:
             stats_dict = dict(record['_PARTITION_STATS'])
             partition_stats = SimpleStats(
-                min_values=GenericRowDeserializer.from_bytes(
+                min_values=BinaryRow(
                     stats_dict['_MIN_VALUES'],
-                    self.table.table_schema.get_partition_key_fields()
+                    self.table.partition_keys_fields
                 ),
-                max_values=GenericRowDeserializer.from_bytes(
+                max_values=BinaryRow(
                     stats_dict['_MAX_VALUES'],
-                    self.table.table_schema.get_partition_key_fields()
+                    self.table.partition_keys_fields
                 ),
                 null_counts=stats_dict['_NULL_COUNTS'],
             )
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py 
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index 1130a812fa..065728bc6a 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -17,7 +17,7 @@
 
################################################################################
 
 from dataclasses import dataclass
-from typing import List, Optional
+from typing import List
 from typing import ClassVar
 
 from pypaimon.table.row.generic_row import GenericRow
@@ -28,7 +28,8 @@ from pypaimon.table.row.internal_row import InternalRow
 class SimpleStats:
     min_values: InternalRow
     max_values: InternalRow
-    null_counts: Optional[List[int]]
+    # TODO convert null counts to InternalArray
+    null_counts: List[int]
 
     _empty_stats: ClassVar[object] = None
 
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolution.py 
b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
index 56cea98a85..d2291c54de 100644
--- a/paimon-python/pypaimon/manifest/simple_stats_evolution.py
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolution.py
@@ -59,7 +59,6 @@ class SimpleStatsEvolution:
             null_counts = self._project_array(null_counts, dense_index_mapping)
 
         if self.index_mapping is not None:
-            # TODO support schema evolution
             min_values = self._project_row(min_values, self.index_mapping)
             max_values = self._project_row(max_values, self.index_mapping)
 
diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py 
b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
index 8331b7a5e5..373e333cd9 100644
--- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
+++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py
@@ -40,7 +40,6 @@ class SimpleStatsEvolutions:
         if self.table_schema_id == data_schema_id:
             evolution = 
SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None)
         else:
-            # TODO support schema evolution
             if self.table_fields is None:
                 self.table_fields = self.table_data_fields
 
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index 47a2d86d15..b94364db99 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -48,7 +48,7 @@ class FullStartingScanner(StartingScanner):
         self.manifest_file_manager = ManifestFileManager(table)
 
         self.primary_key_predicate = trim_and_transform_predicate(
-            self.predicate, self.table.field_names, 
self.table.table_schema.get_trimmed_primary_keys())
+            self.predicate, self.table.field_names, 
self.table.trimmed_primary_keys)
 
         self.partition_key_predicate = trim_and_transform_predicate(
             self.predicate, self.table.field_names, self.table.partition_keys)
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 6b93b51d15..000e272e39 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -64,7 +64,7 @@ class SplitRead(ABC):
         self.split = split
         self.value_arity = len(read_type)
 
-        self.trimmed_primary_key = 
self.table.table_schema.get_trimmed_primary_keys()
+        self.trimmed_primary_key = self.table.trimmed_primary_keys
         self.read_fields = read_type
         if isinstance(self, MergeFileSplitRead):
             self.read_fields = self._create_key_value_fields(read_type)
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index d1875a7b0b..25dd19398b 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -145,31 +145,3 @@ class TableSchema:
             comment=self.comment,
             time_millis=self.time_millis
         )
-
-    def get_primary_key_fields(self) -> List[DataField]:
-        if not self.primary_keys:
-            return []
-        field_map = {field.name: field for field in self.fields}
-        return [field_map[name] for name in self.primary_keys if name in 
field_map]
-
-    def get_partition_key_fields(self) -> List[DataField]:
-        if not self.partition_keys:
-            return []
-        field_map = {field.name: field for field in self.fields}
-        return [field_map[name] for name in self.partition_keys if name in 
field_map]
-
-    def get_trimmed_primary_key_fields(self) -> List[DataField]:
-        if not self.primary_keys or not self.partition_keys:
-            return self.get_primary_key_fields()
-        adjusted = [pk for pk in self.primary_keys if pk not in 
self.partition_keys]
-        # Validate that filtered list is not empty
-        if not adjusted:
-            raise ValueError(
-                f"Primary key constraint {self.primary_keys} "
-                f"should not be same with partition fields 
{self.partition_keys}, "
-                "this will result in only one record in a partition")
-        field_map = {field.name: field for field in self.fields}
-        return [field_map[name] for name in adjusted if name in field_map]
-
-    def get_trimmed_primary_keys(self) -> List[str]:
-        return [field.name for field in self.get_trimmed_primary_key_fields()]
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index f0186b1657..cde282eff0 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -49,7 +49,12 @@ class FileStoreTable(Table):
         self.field_names = [field.name for field in table_schema.fields]
         self.field_dict = {field.name: field for field in self.fields}
         self.primary_keys = table_schema.primary_keys
+        self.primary_keys_fields = [self.field_dict[name] for name in 
self.primary_keys]
         self.partition_keys = table_schema.partition_keys
+        self.partition_keys_fields = [self.field_dict[name] for name in 
self.partition_keys]
+        self.trimmed_primary_keys = [pk for pk in self.primary_keys if pk not 
in self.partition_keys]
+        self.trimmed_primary_keys_fields = [self.field_dict[name] for name in 
self.trimmed_primary_keys]
+
         self.options = table_schema.options
         self.cross_partition_update = 
self.table_schema.cross_partition_update()
         self.is_primary_key_table = bool(self.primary_keys)
diff --git a/paimon-python/pypaimon/table/row/binary_row.py 
b/paimon-python/pypaimon/table/row/binary_row.py
index f908935d44..41773b57e3 100644
--- a/paimon-python/pypaimon/table/row/binary_row.py
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -54,8 +54,5 @@ class BinaryRow(InternalRow):
     def get_row_kind(self) -> RowKind:
         return self.row_kind
 
-    def is_null_at(self, pos: int) -> bool:
-        return self.get_field(pos) is None
-
     def __len__(self):
         return self.arity
diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index 13e3742110..b05e475951 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -45,9 +45,6 @@ class GenericRow(InternalRow):
             raise IndexError(f"Position {pos} is out of bounds for row arity 
{len(self.values)}")
         return self.values[pos]
 
-    def is_null_at(self, pos: int) -> bool:
-        return self.get_field(pos) is None
-
     def get_row_kind(self) -> RowKind:
         return self.row_kind
 
diff --git a/paimon-python/pypaimon/table/row/internal_row.py 
b/paimon-python/pypaimon/table/row/internal_row.py
index ca19ebcddf..e70468348c 100644
--- a/paimon-python/pypaimon/table/row/internal_row.py
+++ b/paimon-python/pypaimon/table/row/internal_row.py
@@ -33,12 +33,6 @@ class InternalRow(ABC):
         Returns the value at the given position.
         """
 
-    @abstractmethod
-    def is_null_at(self, pos: int) -> bool:
-        """
-        Returns true if the element is null at the given position.
-        """
-
     @abstractmethod
     def get_row_kind(self) -> RowKind:
         """
diff --git a/paimon-python/pypaimon/table/row/offset_row.py 
b/paimon-python/pypaimon/table/row/offset_row.py
index 9a51a246c9..b6e6f8f432 100644
--- a/paimon-python/pypaimon/table/row/offset_row.py
+++ b/paimon-python/pypaimon/table/row/offset_row.py
@@ -47,9 +47,6 @@ class OffsetRow(InternalRow):
             raise IndexError(f"Position {pos} is out of bounds for row arity 
{self.arity}")
         return self.row_tuple[self.offset + pos]
 
-    def is_null_at(self, pos: int) -> bool:
-        return self.get_field(pos) is None
-
     def get_row_kind(self) -> RowKind:
         return RowKind(self.row_kind_byte)
 
diff --git a/paimon-python/pypaimon/table/row/projected_row.py 
b/paimon-python/pypaimon/table/row/projected_row.py
index 502338a605..d7a1cc6f40 100644
--- a/paimon-python/pypaimon/table/row/projected_row.py
+++ b/paimon-python/pypaimon/table/row/projected_row.py
@@ -50,13 +50,6 @@ class ProjectedRow(InternalRow):
             return None
         return self.row.get_field(self.index_mapping[pos])
 
-    def is_null_at(self, pos: int) -> bool:
-        """Returns true if the element is null at the given position."""
-        if self.index_mapping[pos] < 0:
-            # TODO move this logical to hive
-            return True
-        return self.row.is_null_at(self.index_mapping[pos])
-
     def get_row_kind(self) -> RowKind:
         """Returns the kind of change that this row describes in a 
changelog."""
         return self.row.get_row_kind()
diff --git a/paimon-python/pypaimon/tests/predicates_test.py 
b/paimon-python/pypaimon/tests/predicates_test.py
index 561641589f..6e6de2fcae 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -456,25 +456,25 @@ class PredicateTest(unittest.TestCase):
                 count += 1
                 self.assertEqual(len(split.files), 1)
                 min_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
-                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                                                               
table.primary_keys_fields).to_dict()
                 max_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
-                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                                                               
table.primary_keys_fields).to_dict()
                 self.assertTrue(min_values["key1"] == 1 and min_values["key2"] 
== "e"
                                 and max_values["key1"] == 4 and 
max_values["key2"] == "h")
             elif split.partition.values == ["p2", 2]:
                 count += 1
                 min_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
-                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                                                               
table.primary_keys_fields).to_dict()
                 max_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
-                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                                                               
table.primary_keys_fields).to_dict()
                 self.assertTrue(min_values["key1"] == 5 and min_values["key2"] 
== "a"
                                 and max_values["key1"] == 8 and 
max_values["key2"] == "d")
             elif split.partition.values == ["p1", 1]:
                 count += 1
                 min_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data,
-                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                                                               
table.primary_keys_fields).to_dict()
                 max_values = 
GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data,
-                                                               
table.table_schema.get_primary_key_fields()).to_dict()
+                                                               
table.primary_keys_fields).to_dict()
                 self.assertTrue(min_values["key1"] == max_values["key1"] == 7
                                 and max_values["key2"] == max_values["key2"] 
== "b")
         self.assertEqual(count, 3)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index c2fe33105b..afeba52b3c 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -67,7 +67,7 @@ class FileStoreCommit:
 
         commit_entries = []
         for msg in commit_messages:
-            partition = GenericRow(list(msg.partition), 
self.table.table_schema.get_partition_key_fields())
+            partition = GenericRow(list(msg.partition), 
self.table.partition_keys_fields)
             for file in msg.new_files:
                 commit_entries.append(ManifestEntry(
                     kind=0,
@@ -89,7 +89,7 @@ class FileStoreCommit:
         partition_filter = None
         # sanity check, all changes must be done within the given partition, 
meanwhile build a partition filter
         if len(overwrite_partition) > 0:
-            predicate_builder = 
PredicateBuilder(self.table.table_schema.get_partition_key_fields())
+            predicate_builder = 
PredicateBuilder(self.table.partition_keys_fields)
             sub_predicates = []
             for key, value in overwrite_partition.items():
                 sub_predicates.append(predicate_builder.equal(key, value))
@@ -107,7 +107,7 @@ class FileStoreCommit:
             entry.kind = 1
             commit_entries.append(entry)
         for msg in commit_messages:
-            partition = GenericRow(list(msg.partition), 
self.table.table_schema.get_partition_key_fields())
+            partition = GenericRow(list(msg.partition), 
self.table.partition_keys_fields)
             for file in msg.new_files:
                 commit_entries.append(ManifestEntry(
                     kind=0,
@@ -174,11 +174,11 @@ class FileStoreCommit:
             partition_stats=SimpleStats(
                 min_values=GenericRow(
                     values=partition_min_stats,
-                    fields=self.table.table_schema.get_partition_key_fields(),
+                    fields=self.table.partition_keys_fields
                 ),
                 max_values=GenericRow(
                     values=partition_max_stats,
-                    fields=self.table.table_schema.get_partition_key_fields(),
+                    fields=self.table.partition_keys_fields
                 ),
                 null_counts=partition_null_counts,
             ),
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 3515991933..24e3b0ca48 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -43,8 +43,8 @@ class DataWriter(ABC):
         self.bucket = bucket
 
         self.file_io = self.table.file_io
-        self.trimmed_primary_key_fields = 
self.table.table_schema.get_trimmed_primary_key_fields()
-        self.trimmed_primary_key = 
self.table.table_schema.get_trimmed_primary_keys()
+        self.trimmed_primary_keys_fields = 
self.table.trimmed_primary_keys_fields
+        self.trimmed_primary_keys = self.table.trimmed_primary_keys
 
         options = self.table.options
         self.target_file_size = 256 * 1024 * 1024
@@ -159,7 +159,7 @@ class DataWriter(ABC):
 
         # min key & max key
 
-        selected_table = data.select(self.trimmed_primary_key)
+        selected_table = data.select(self.trimmed_primary_keys)
         key_columns_batch = selected_table.to_batches()[0]
         min_key_row_batch = key_columns_batch.slice(0, 1)
         max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows 
- 1, 1)
@@ -177,7 +177,7 @@ class DataWriter(ABC):
         min_value_stats = [column_stats[field.name]['min_values'] for field in 
all_fields]
         max_value_stats = [column_stats[field.name]['max_values'] for field in 
all_fields]
         value_null_counts = [column_stats[field.name]['null_counts'] for field 
in all_fields]
-        key_fields = self.trimmed_primary_key_fields
+        key_fields = self.trimmed_primary_keys_fields
         min_key_stats = [column_stats[field.name]['min_values'] for field in 
key_fields]
         max_key_stats = [column_stats[field.name]['max_values'] for field in 
key_fields]
         key_null_counts = [column_stats[field.name]['null_counts'] for field 
in key_fields]
@@ -191,11 +191,11 @@ class DataWriter(ABC):
             file_name=file_name,
             file_size=self.file_io.get_file_size(file_path),
             row_count=data.num_rows,
-            min_key=GenericRow(min_key, self.trimmed_primary_key_fields),
-            max_key=GenericRow(max_key, self.trimmed_primary_key_fields),
+            min_key=GenericRow(min_key, self.trimmed_primary_keys_fields),
+            max_key=GenericRow(max_key, self.trimmed_primary_keys_fields),
             key_stats=SimpleStats(
-                GenericRow(min_key_stats, self.trimmed_primary_key_fields),
-                GenericRow(max_key_stats, self.trimmed_primary_key_fields),
+                GenericRow(min_key_stats, self.trimmed_primary_keys_fields),
+                GenericRow(max_key_stats, self.trimmed_primary_keys_fields),
                 key_null_counts,
             ),
             value_stats=SimpleStats(
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index fb929710e8..05cad9bca9 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -38,23 +38,23 @@ class KeyValueDataWriter(DataWriter):
         num_rows = data.num_rows
         enhanced_table = data
 
-        for pk_key in reversed(self.trimmed_primary_key):
+        for pk_key in reversed(self.trimmed_primary_keys):
             if pk_key in data.column_names:
                 key_column = data.column(pk_key)
                 enhanced_table = enhanced_table.add_column(0, 
f'_KEY_{pk_key}', key_column)
 
         sequence_column = pa.array([self.sequence_generator.next() for _ in 
range(num_rows)], type=pa.int64())
-        enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER', 
sequence_column)
+        enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_keys), '_SEQUENCE_NUMBER', 
sequence_column)
 
         # TODO: support real row kind here
         value_kind_column = pa.array([0] * num_rows, type=pa.int32())
-        enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND',
+        enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_keys) + 1, '_VALUE_KIND',
                                                    value_kind_column)
 
         return enhanced_table
 
     def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
-        sort_keys = [(key, 'ascending') for key in self.trimmed_primary_key]
+        sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys]
         if '_SEQUENCE_NUMBER' in data.column_names:
             sort_keys.append(('_SEQUENCE_NUMBER', 'ascending'))
 

Reply via email to