This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 445b217d1ea24396b2c2f7175f50211e1ff337cb Author: Jingsong Lee <[email protected]> AuthorDate: Tue Oct 21 21:36:24 2025 +0800 [Python] Refactor BinaryRow to reuse keys and key fields (#6445) --- paimon-python/pypaimon/common/predicate.py | 17 +++---------- .../pypaimon/manifest/manifest_file_manager.py | 16 ++++++------- .../pypaimon/manifest/manifest_list_manager.py | 12 +++++----- .../pypaimon/manifest/schema/simple_stats.py | 5 ++-- .../pypaimon/manifest/simple_stats_evolution.py | 1 - .../pypaimon/manifest/simple_stats_evolutions.py | 1 - .../pypaimon/read/scanner/full_starting_scanner.py | 2 +- paimon-python/pypaimon/read/split_read.py | 2 +- paimon-python/pypaimon/schema/table_schema.py | 28 ---------------------- paimon-python/pypaimon/table/file_store_table.py | 5 ++++ paimon-python/pypaimon/table/row/binary_row.py | 3 --- paimon-python/pypaimon/table/row/generic_row.py | 3 --- paimon-python/pypaimon/table/row/internal_row.py | 6 ----- paimon-python/pypaimon/table/row/offset_row.py | 3 --- paimon-python/pypaimon/table/row/projected_row.py | 7 ------ paimon-python/pypaimon/tests/predicates_test.py | 12 +++++----- paimon-python/pypaimon/write/file_store_commit.py | 10 ++++---- paimon-python/pypaimon/write/writer/data_writer.py | 16 ++++++------- .../pypaimon/write/writer/key_value_data_writer.py | 8 +++---- 19 files changed, 50 insertions(+), 107 deletions(-) diff --git a/paimon-python/pypaimon/common/predicate.py b/paimon-python/pypaimon/common/predicate.py index 5e47fdd5df..9ae2cdfce3 100644 --- a/paimon-python/pypaimon/common/predicate.py +++ b/paimon-python/pypaimon/common/predicate.py @@ -27,7 +27,6 @@ from pyarrow import compute as pyarrow_compute from pyarrow import dataset as pyarrow_dataset from pypaimon.manifest.schema.simple_stats import SimpleStats -from pypaimon.table.row.generic_row import GenericRow from pypaimon.table.row.internal_row import InternalRow @@ -74,25 +73,15 @@ class Predicate: if self.method == 'or': return any(p.test_by_simple_stats(stat, row_count) for p in self.literals) - # Get null count using the mapped index - null_count = stat.null_counts[self.index] if stat.null_counts and self.index < len( - stat.null_counts) else 0 + null_count = stat.null_counts[self.index] if self.method == 'isNull': return null_count is not None and null_count > 0 if self.method == 'isNotNull': return null_count is None or row_count is None or null_count < row_count - if not isinstance(stat.min_values, GenericRow): - # Parse field values using BinaryRow's direct field access by name - min_value = stat.min_values.get_field(self.index) - max_value = stat.max_values.get_field(self.index) - else: - # TODO transform partition to BinaryRow - min_values = stat.min_values.to_dict() - max_values = stat.max_values.to_dict() - min_value = min_values[self.field] - max_value = max_values[self.field] + min_value = stat.min_values.get_field(self.index) + max_value = stat.max_values.get_field(self.index) if min_value is None or max_value is None or (null_count is not None and null_count == row_count): # invalid stats, skip validation diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index c196845ff4..9fc92a4113 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -38,9 +38,9 @@ class ManifestFileManager: self.table: FileStoreTable = table self.manifest_path = table.table_path / "manifest" self.file_io = table.file_io - self.partition_key_fields = self.table.table_schema.get_partition_key_fields() - self.primary_key_fields = self.table.table_schema.get_primary_key_fields() - self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields() + self.partition_keys_fields = self.table.partition_keys_fields + self.primary_keys_fields = self.table.primary_keys_fields + self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields def read(self, manifest_file_name: str, manifest_entry_filter=None, drop_stats=True) -> List[ManifestEntry]: manifest_file_path = self.manifest_path / manifest_file_name @@ -55,8 +55,8 @@ class ManifestFileManager: file_dict = dict(record['_FILE']) key_dict = dict(file_dict['_KEY_STATS']) key_stats = SimpleStats( - min_values=BinaryRow(key_dict['_MIN_VALUES'], self.trimmed_primary_key_fields), - max_values=BinaryRow(key_dict['_MAX_VALUES'], self.trimmed_primary_key_fields), + min_values=BinaryRow(key_dict['_MIN_VALUES'], self.trimmed_primary_keys_fields), + max_values=BinaryRow(key_dict['_MAX_VALUES'], self.trimmed_primary_keys_fields), null_counts=key_dict['_NULL_COUNTS'], ) @@ -80,8 +80,8 @@ class ManifestFileManager: file_name=file_dict['_FILE_NAME'], file_size=file_dict['_FILE_SIZE'], row_count=file_dict['_ROW_COUNT'], - min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'], self.trimmed_primary_key_fields), - max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'], self.trimmed_primary_key_fields), + min_key=GenericRowDeserializer.from_bytes(file_dict['_MIN_KEY'], self.trimmed_primary_keys_fields), + max_key=GenericRowDeserializer.from_bytes(file_dict['_MAX_KEY'], self.trimmed_primary_keys_fields), key_stats=key_stats, value_stats=value_stats, min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'], @@ -100,7 +100,7 @@ class ManifestFileManager: ) entry = ManifestEntry( kind=record['_KIND'], - partition=GenericRowDeserializer.from_bytes(record['_PARTITION'], self.partition_key_fields), + partition=GenericRowDeserializer.from_bytes(record['_PARTITION'], self.partition_keys_fields), bucket=record['_BUCKET'], total_buckets=record['_TOTAL_BUCKETS'], file=file_meta diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index 0fc58652f6..367f802de5 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -25,8 +25,8 @@ from pypaimon.manifest.schema.manifest_file_meta import ( MANIFEST_FILE_META_SCHEMA, ManifestFileMeta) from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.snapshot.snapshot import Snapshot -from pypaimon.table.row.generic_row import (GenericRowDeserializer, - GenericRowSerializer) +from pypaimon.table.row.binary_row import BinaryRow +from pypaimon.table.row.generic_row import GenericRowSerializer class ManifestListManager: @@ -61,13 +61,13 @@ class ManifestListManager: for record in reader: stats_dict = dict(record['_PARTITION_STATS']) partition_stats = SimpleStats( - min_values=GenericRowDeserializer.from_bytes( + min_values=BinaryRow( stats_dict['_MIN_VALUES'], - self.table.table_schema.get_partition_key_fields() + self.table.partition_keys_fields ), - max_values=GenericRowDeserializer.from_bytes( + max_values=BinaryRow( stats_dict['_MAX_VALUES'], - self.table.table_schema.get_partition_key_fields() + self.table.partition_keys_fields ), null_counts=stats_dict['_NULL_COUNTS'], ) diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py b/paimon-python/pypaimon/manifest/schema/simple_stats.py index 1130a812fa..065728bc6a 100644 --- a/paimon-python/pypaimon/manifest/schema/simple_stats.py +++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py @@ -17,7 +17,7 @@ ################################################################################ from dataclasses import dataclass -from typing import List, Optional +from typing import List from typing import ClassVar from pypaimon.table.row.generic_row import GenericRow @@ -28,7 +28,8 @@ from pypaimon.table.row.internal_row import InternalRow class SimpleStats: min_values: InternalRow max_values: InternalRow - null_counts: Optional[List[int]] + # TODO convert null counts to InternalArray + null_counts: List[int] _empty_stats: ClassVar[object] = None diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolution.py b/paimon-python/pypaimon/manifest/simple_stats_evolution.py index 56cea98a85..d2291c54de 100644 --- a/paimon-python/pypaimon/manifest/simple_stats_evolution.py +++ b/paimon-python/pypaimon/manifest/simple_stats_evolution.py @@ -59,7 +59,6 @@ class SimpleStatsEvolution: null_counts = self._project_array(null_counts, dense_index_mapping) if self.index_mapping is not None: - # TODO support schema evolution min_values = self._project_row(min_values, self.index_mapping) max_values = self._project_row(max_values, self.index_mapping) diff --git a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py index 8331b7a5e5..373e333cd9 100644 --- a/paimon-python/pypaimon/manifest/simple_stats_evolutions.py +++ b/paimon-python/pypaimon/manifest/simple_stats_evolutions.py @@ -40,7 +40,6 @@ class SimpleStatsEvolutions: if self.table_schema_id == data_schema_id: evolution = SimpleStatsEvolution(self.schema_fields(data_schema_id), None, None) else: - # TODO support schema evolution if self.table_fields is None: self.table_fields = self.table_data_fields diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 47a2d86d15..b94364db99 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -48,7 +48,7 @@ class FullStartingScanner(StartingScanner): self.manifest_file_manager = ManifestFileManager(table) self.primary_key_predicate = trim_and_transform_predicate( - self.predicate, self.table.field_names, self.table.table_schema.get_trimmed_primary_keys()) + self.predicate, self.table.field_names, self.table.trimmed_primary_keys) self.partition_key_predicate = trim_and_transform_predicate( self.predicate, self.table.field_names, self.table.partition_keys) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 6b93b51d15..000e272e39 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -64,7 +64,7 @@ class SplitRead(ABC): self.split = split self.value_arity = len(read_type) - self.trimmed_primary_key = self.table.table_schema.get_trimmed_primary_keys() + self.trimmed_primary_key = self.table.trimmed_primary_keys self.read_fields = read_type if isinstance(self, MergeFileSplitRead): self.read_fields = self._create_key_value_fields(read_type) diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py index d1875a7b0b..25dd19398b 100644 --- a/paimon-python/pypaimon/schema/table_schema.py +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -145,31 +145,3 @@ class TableSchema: comment=self.comment, time_millis=self.time_millis ) - - def get_primary_key_fields(self) -> List[DataField]: - if not self.primary_keys: - return [] - field_map = {field.name: field for field in self.fields} - return [field_map[name] for name in self.primary_keys if name in field_map] - - def get_partition_key_fields(self) -> List[DataField]: - if not self.partition_keys: - return [] - field_map = {field.name: field for field in self.fields} - return [field_map[name] for name in self.partition_keys if name in field_map] - - def get_trimmed_primary_key_fields(self) -> List[DataField]: - if not self.primary_keys or not self.partition_keys: - return self.get_primary_key_fields() - adjusted = [pk for pk in self.primary_keys if pk not in self.partition_keys] - # Validate that filtered list is not empty - if not adjusted: - raise ValueError( - f"Primary key constraint {self.primary_keys} " - f"should not be same with partition fields {self.partition_keys}, " - "this will result in only one record in a partition") - field_map = {field.name: field for field in self.fields} - return [field_map[name] for name in adjusted if name in field_map] - - def get_trimmed_primary_keys(self) -> List[str]: - return [field.name for field in self.get_trimmed_primary_key_fields()] diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index f0186b1657..cde282eff0 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -49,7 +49,12 @@ class FileStoreTable(Table): self.field_names = [field.name for field in table_schema.fields] self.field_dict = {field.name: field for field in self.fields} self.primary_keys = table_schema.primary_keys + self.primary_keys_fields = [self.field_dict[name] for name in self.primary_keys] self.partition_keys = table_schema.partition_keys + self.partition_keys_fields = [self.field_dict[name] for name in self.partition_keys] + self.trimmed_primary_keys = [pk for pk in self.primary_keys if pk not in self.partition_keys] + self.trimmed_primary_keys_fields = [self.field_dict[name] for name in self.trimmed_primary_keys] + self.options = table_schema.options self.cross_partition_update = self.table_schema.cross_partition_update() self.is_primary_key_table = bool(self.primary_keys) diff --git a/paimon-python/pypaimon/table/row/binary_row.py b/paimon-python/pypaimon/table/row/binary_row.py index f908935d44..41773b57e3 100644 --- a/paimon-python/pypaimon/table/row/binary_row.py +++ b/paimon-python/pypaimon/table/row/binary_row.py @@ -54,8 +54,5 @@ class BinaryRow(InternalRow): def get_row_kind(self) -> RowKind: return self.row_kind - def is_null_at(self, pos: int) -> bool: - return self.get_field(pos) is None - def __len__(self): return self.arity diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index 13e3742110..b05e475951 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -45,9 +45,6 @@ class GenericRow(InternalRow): raise IndexError(f"Position {pos} is out of bounds for row arity {len(self.values)}") return self.values[pos] - def is_null_at(self, pos: int) -> bool: - return self.get_field(pos) is None - def get_row_kind(self) -> RowKind: return self.row_kind diff --git a/paimon-python/pypaimon/table/row/internal_row.py b/paimon-python/pypaimon/table/row/internal_row.py index ca19ebcddf..e70468348c 100644 --- a/paimon-python/pypaimon/table/row/internal_row.py +++ b/paimon-python/pypaimon/table/row/internal_row.py @@ -33,12 +33,6 @@ class InternalRow(ABC): Returns the value at the given position. """ - @abstractmethod - def is_null_at(self, pos: int) -> bool: - """ - Returns true if the element is null at the given position. - """ - @abstractmethod def get_row_kind(self) -> RowKind: """ diff --git a/paimon-python/pypaimon/table/row/offset_row.py b/paimon-python/pypaimon/table/row/offset_row.py index 9a51a246c9..b6e6f8f432 100644 --- a/paimon-python/pypaimon/table/row/offset_row.py +++ b/paimon-python/pypaimon/table/row/offset_row.py @@ -47,9 +47,6 @@ class OffsetRow(InternalRow): raise IndexError(f"Position {pos} is out of bounds for row arity {self.arity}") return self.row_tuple[self.offset + pos] - def is_null_at(self, pos: int) -> bool: - return self.get_field(pos) is None - def get_row_kind(self) -> RowKind: return RowKind(self.row_kind_byte) diff --git a/paimon-python/pypaimon/table/row/projected_row.py b/paimon-python/pypaimon/table/row/projected_row.py index 502338a605..d7a1cc6f40 100644 --- a/paimon-python/pypaimon/table/row/projected_row.py +++ b/paimon-python/pypaimon/table/row/projected_row.py @@ -50,13 +50,6 @@ class ProjectedRow(InternalRow): return None return self.row.get_field(self.index_mapping[pos]) - def is_null_at(self, pos: int) -> bool: - """Returns true if the element is null at the given position.""" - if self.index_mapping[pos] < 0: - # TODO move this logical to hive - return True - return self.row.is_null_at(self.index_mapping[pos]) - def get_row_kind(self) -> RowKind: """Returns the kind of change that this row describes in a changelog.""" return self.row.get_row_kind() diff --git a/paimon-python/pypaimon/tests/predicates_test.py b/paimon-python/pypaimon/tests/predicates_test.py index 561641589f..6e6de2fcae 100644 --- a/paimon-python/pypaimon/tests/predicates_test.py +++ b/paimon-python/pypaimon/tests/predicates_test.py @@ -456,25 +456,25 @@ class PredicateTest(unittest.TestCase): count += 1 self.assertEqual(len(split.files), 1) min_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data, - table.table_schema.get_primary_key_fields()).to_dict() + table.primary_keys_fields).to_dict() max_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data, - table.table_schema.get_primary_key_fields()).to_dict() + table.primary_keys_fields).to_dict() self.assertTrue(min_values["key1"] == 1 and min_values["key2"] == "e" and max_values["key1"] == 4 and max_values["key2"] == "h") elif split.partition.values == ["p2", 2]: count += 1 min_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data, - table.table_schema.get_primary_key_fields()).to_dict() + table.primary_keys_fields).to_dict() max_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data, - table.table_schema.get_primary_key_fields()).to_dict() + table.primary_keys_fields).to_dict() self.assertTrue(min_values["key1"] == 5 and min_values["key2"] == "a" and max_values["key1"] == 8 and max_values["key2"] == "d") elif split.partition.values == ["p1", 1]: count += 1 min_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.min_values.data, - table.table_schema.get_primary_key_fields()).to_dict() + table.primary_keys_fields).to_dict() max_values = GenericRowDeserializer.from_bytes(split.files[0].key_stats.max_values.data, - table.table_schema.get_primary_key_fields()).to_dict() + table.primary_keys_fields).to_dict() self.assertTrue(min_values["key1"] == max_values["key1"] == 7 and max_values["key2"] == max_values["key2"] == "b") self.assertEqual(count, 3) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index c2fe33105b..afeba52b3c 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -67,7 +67,7 @@ class FileStoreCommit: commit_entries = [] for msg in commit_messages: - partition = GenericRow(list(msg.partition), self.table.table_schema.get_partition_key_fields()) + partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) for file in msg.new_files: commit_entries.append(ManifestEntry( kind=0, @@ -89,7 +89,7 @@ class FileStoreCommit: partition_filter = None # sanity check, all changes must be done within the given partition, meanwhile build a partition filter if len(overwrite_partition) > 0: - predicate_builder = PredicateBuilder(self.table.table_schema.get_partition_key_fields()) + predicate_builder = PredicateBuilder(self.table.partition_keys_fields) sub_predicates = [] for key, value in overwrite_partition.items(): sub_predicates.append(predicate_builder.equal(key, value)) @@ -107,7 +107,7 @@ class FileStoreCommit: entry.kind = 1 commit_entries.append(entry) for msg in commit_messages: - partition = GenericRow(list(msg.partition), self.table.table_schema.get_partition_key_fields()) + partition = GenericRow(list(msg.partition), self.table.partition_keys_fields) for file in msg.new_files: commit_entries.append(ManifestEntry( kind=0, @@ -174,11 +174,11 @@ class FileStoreCommit: partition_stats=SimpleStats( min_values=GenericRow( values=partition_min_stats, - fields=self.table.table_schema.get_partition_key_fields(), + fields=self.table.partition_keys_fields ), max_values=GenericRow( values=partition_max_stats, - fields=self.table.table_schema.get_partition_key_fields(), + fields=self.table.partition_keys_fields ), null_counts=partition_null_counts, ), diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 3515991933..24e3b0ca48 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -43,8 +43,8 @@ class DataWriter(ABC): self.bucket = bucket self.file_io = self.table.file_io - self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields() - self.trimmed_primary_key = self.table.table_schema.get_trimmed_primary_keys() + self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields + self.trimmed_primary_keys = self.table.trimmed_primary_keys options = self.table.options self.target_file_size = 256 * 1024 * 1024 @@ -159,7 +159,7 @@ class DataWriter(ABC): # min key & max key - selected_table = data.select(self.trimmed_primary_key) + selected_table = data.select(self.trimmed_primary_keys) key_columns_batch = selected_table.to_batches()[0] min_key_row_batch = key_columns_batch.slice(0, 1) max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows - 1, 1) @@ -177,7 +177,7 @@ class DataWriter(ABC): min_value_stats = [column_stats[field.name]['min_values'] for field in all_fields] max_value_stats = [column_stats[field.name]['max_values'] for field in all_fields] value_null_counts = [column_stats[field.name]['null_counts'] for field in all_fields] - key_fields = self.trimmed_primary_key_fields + key_fields = self.trimmed_primary_keys_fields min_key_stats = [column_stats[field.name]['min_values'] for field in key_fields] max_key_stats = [column_stats[field.name]['max_values'] for field in key_fields] key_null_counts = [column_stats[field.name]['null_counts'] for field in key_fields] @@ -191,11 +191,11 @@ class DataWriter(ABC): file_name=file_name, file_size=self.file_io.get_file_size(file_path), row_count=data.num_rows, - min_key=GenericRow(min_key, self.trimmed_primary_key_fields), - max_key=GenericRow(max_key, self.trimmed_primary_key_fields), + min_key=GenericRow(min_key, self.trimmed_primary_keys_fields), + max_key=GenericRow(max_key, self.trimmed_primary_keys_fields), key_stats=SimpleStats( - GenericRow(min_key_stats, self.trimmed_primary_key_fields), - GenericRow(max_key_stats, self.trimmed_primary_key_fields), + GenericRow(min_key_stats, self.trimmed_primary_keys_fields), + GenericRow(max_key_stats, self.trimmed_primary_keys_fields), key_null_counts, ), value_stats=SimpleStats( diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py b/paimon-python/pypaimon/write/writer/key_value_data_writer.py index fb929710e8..05cad9bca9 100644 --- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py +++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py @@ -38,23 +38,23 @@ class KeyValueDataWriter(DataWriter): num_rows = data.num_rows enhanced_table = data - for pk_key in reversed(self.trimmed_primary_key): + for pk_key in reversed(self.trimmed_primary_keys): if pk_key in data.column_names: key_column = data.column(pk_key) enhanced_table = enhanced_table.add_column(0, f'_KEY_{pk_key}', key_column) sequence_column = pa.array([self.sequence_generator.next() for _ in range(num_rows)], type=pa.int64()) - enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER', sequence_column) + enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys), '_SEQUENCE_NUMBER', sequence_column) # TODO: support real row kind here value_kind_column = pa.array([0] * num_rows, type=pa.int32()) - enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND', + enhanced_table = enhanced_table.add_column(len(self.trimmed_primary_keys) + 1, '_VALUE_KIND', value_kind_column) return enhanced_table def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch: - sort_keys = [(key, 'ascending') for key in self.trimmed_primary_key] + sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys] if '_SEQUENCE_NUMBER' in data.column_names: sort_keys.append(('_SEQUENCE_NUMBER', 'ascending'))
