This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aaa601939e [Python] Enable field merge read in row-tracking table
(#6399)
aaa601939e is described below
commit aaa601939e9da177d53bda11838672190964085e
Author: umi <[email protected]>
AuthorDate: Wed Oct 15 10:54:23 2025 +0800
[Python] Enable field merge read in row-tracking table (#6399)
---
.../pypaimon/manifest/manifest_file_manager.py | 16 +-
.../pypaimon/manifest/schema/data_file_meta.py | 61 ++-
.../pypaimon/manifest/schema/manifest_entry.py | 20 +
.../read/reader/data_evolution_merge_reader.py | 85 ++++
..._record_reader.py => data_file_batch_reader.py} | 0
paimon-python/pypaimon/read/reader/field_bunch.py | 120 +++++
paimon-python/pypaimon/read/split_read.py | 196 ++++++++-
paimon-python/pypaimon/read/table_read.py | 10 +-
paimon-python/pypaimon/read/table_scan.py | 106 +++++
paimon-python/pypaimon/snapshot/snapshot.py | 1 +
.../pypaimon/tests/data_evolution_test.py | 483 +++++++++++++++++++++
.../pypaimon/tests/file_store_commit_test.py | 24 +-
.../pypaimon/tests/py36/data_evolution_test.py | 483 +++++++++++++++++++++
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 6 +-
paimon-python/pypaimon/tests/reader_base_test.py | 6 +-
.../pypaimon/tests/rest/rest_read_write_test.py | 2 +-
paimon-python/pypaimon/write/batch_table_write.py | 19 +-
paimon-python/pypaimon/write/file_store_commit.py | 75 ++++
paimon-python/pypaimon/write/file_store_write.py | 2 +
paimon-python/pypaimon/write/writer/data_writer.py | 22 +-
20 files changed, 1708 insertions(+), 29 deletions(-)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index f4b0ab0be3..e3c9601cf4 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -60,9 +60,13 @@ class ManifestFileManager:
null_counts=key_dict['_NULL_COUNTS'],
)
value_dict = dict(file_dict['_VALUE_STATS'])
- if file_dict.get('_VALUE_STATS_COLS') is None:
- fields = self.table.table_schema.fields
- elif not file_dict.get('_VALUE_STATS_COLS'):
+ if file_dict['_VALUE_STATS_COLS'] is None:
+ if file_dict['_WRITE_COLS'] is None:
+ fields = self.table.table_schema.fields
+ else:
+ read_fields = file_dict['_WRITE_COLS']
+ fields = [self.table.field_dict[col] for col in
read_fields]
+ elif not file_dict['_VALUE_STATS_COLS']:
fields = []
else:
fields = [self.table.field_dict[col] for col in
file_dict['_VALUE_STATS_COLS']]
@@ -89,6 +93,9 @@ class ManifestFileManager:
embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
file_source=file_dict['_FILE_SOURCE'],
value_stats_cols=file_dict.get('_VALUE_STATS_COLS'),
+ external_path=file_dict.get('_EXTERNAL_PATH'),
+ first_row_id=file_dict['_FIRST_ROW_ID'],
+ write_cols=file_dict['_WRITE_COLS'],
)
entry = ManifestEntry(
kind=record['_KIND'],
@@ -137,6 +144,9 @@ class ManifestFileManager:
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
"_FILE_SOURCE": entry.file.file_source,
"_VALUE_STATS_COLS": entry.file.value_stats_cols,
+ "_EXTERNAL_PATH": entry.file.external_path,
+ "_FIRST_ROW_ID": entry.file.first_row_id,
+ "_WRITE_COLS": entry.file.write_cols,
}
}
avro_records.append(avro_record)
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index 82dfb66918..1d1bcb56fb 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -47,6 +47,8 @@ class DataFileMeta:
file_source: Optional[str] = None
value_stats_cols: Optional[List[str]] = None
external_path: Optional[str] = None
+ first_row_id: Optional[int] = None
+ write_cols: Optional[List[str]] = None
# not a schema field, just for internal usage
file_path: str = None
@@ -59,6 +61,58 @@ class DataFileMeta:
path_builder = path_builder / ("bucket-" + str(bucket)) /
self.file_name
self.file_path = str(path_builder)
+ def assign_first_row_id(self, first_row_id: int) -> 'DataFileMeta':
+ """Create a new DataFileMeta with the assigned first_row_id."""
+ return DataFileMeta(
+ file_name=self.file_name,
+ file_size=self.file_size,
+ row_count=self.row_count,
+ min_key=self.min_key,
+ max_key=self.max_key,
+ key_stats=self.key_stats,
+ value_stats=self.value_stats,
+ min_sequence_number=self.min_sequence_number,
+ max_sequence_number=self.max_sequence_number,
+ schema_id=self.schema_id,
+ level=self.level,
+ extra_files=self.extra_files,
+ creation_time=self.creation_time,
+ delete_row_count=self.delete_row_count,
+ embedded_index=self.embedded_index,
+ file_source=self.file_source,
+ value_stats_cols=self.value_stats_cols,
+ external_path=self.external_path,
+ first_row_id=first_row_id,
+ write_cols=self.write_cols,
+ file_path=self.file_path
+ )
+
+ def assign_sequence_number(self, min_sequence_number: int,
max_sequence_number: int) -> 'DataFileMeta':
+ """Create a new DataFileMeta with the assigned sequence numbers."""
+ return DataFileMeta(
+ file_name=self.file_name,
+ file_size=self.file_size,
+ row_count=self.row_count,
+ min_key=self.min_key,
+ max_key=self.max_key,
+ key_stats=self.key_stats,
+ value_stats=self.value_stats,
+ min_sequence_number=min_sequence_number,
+ max_sequence_number=max_sequence_number,
+ schema_id=self.schema_id,
+ level=self.level,
+ extra_files=self.extra_files,
+ creation_time=self.creation_time,
+ delete_row_count=self.delete_row_count,
+ embedded_index=self.embedded_index,
+ file_source=self.file_source,
+ value_stats_cols=self.value_stats_cols,
+ external_path=self.external_path,
+ first_row_id=self.first_row_id,
+ write_cols=self.write_cols,
+ file_path=self.file_path
+ )
+
DATA_FILE_META_SCHEMA = {
"type": "record",
@@ -83,9 +137,14 @@ DATA_FILE_META_SCHEMA = {
"default": None},
{"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default":
None},
{"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default":
None},
- {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
+ {"name": "_FILE_SOURCE", "type": ["null", "string"], "default": None},
{"name": "_VALUE_STATS_COLS",
"type": ["null", {"type": "array", "items": "string"}],
"default": None},
+ {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default":
None},
+ {"name": "_FIRST_ROW_ID", "type": ["null", "long"], "default": None},
+ {"name": "_WRITE_COLS",
+ "type": ["null", {"type": "array", "items": "string"}],
+ "default": None},
]
}
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
index 9a02341175..9608fbbd37 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -31,6 +31,26 @@ class ManifestEntry:
total_buckets: int
file: DataFileMeta
+ def assign_first_row_id(self, first_row_id: int) -> 'ManifestEntry':
+ """Create a new ManifestEntry with the assigned first_row_id."""
+ return ManifestEntry(
+ kind=self.kind,
+ partition=self.partition,
+ bucket=self.bucket,
+ total_buckets=self.total_buckets,
+ file=self.file.assign_first_row_id(first_row_id)
+ )
+
+ def assign_sequence_number(self, min_sequence_number: int,
max_sequence_number: int) -> 'ManifestEntry':
+ """Create a new ManifestEntry with the assigned sequence numbers."""
+ return ManifestEntry(
+ kind=self.kind,
+ partition=self.partition,
+ bucket=self.bucket,
+ total_buckets=self.total_buckets,
+ file=self.file.assign_sequence_number(min_sequence_number,
max_sequence_number)
+ )
+
MANIFEST_ENTRY_SCHEMA = {
"type": "record",
diff --git a/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
new file mode 100644
index 0000000000..43bf926862
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py
@@ -0,0 +1,85 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import List, Optional
+
+import pyarrow as pa
+from pyarrow import RecordBatch
+
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+
+
+class DataEvolutionMergeReader(RecordBatchReader):
+ """
+ This is a union reader which contains multiple inner readers, Each reader
is responsible for reading one file.
+
+ This reader, assembling multiple reader into one big and great reader,
will merge the batches from all readers.
+
+ For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0,
0, 1, 1, 1, 0}, it means:
+ - The first field comes from batch0, and it is at offset 0 in batch0.
+ - The second field comes from batch2, and it is at offset 0 in batch2.
+ - The third field comes from batch0, and it is at offset 1 in batch0.
+ - The fourth field comes from batch1, and it is at offset 1 in batch1.
+ - The fifth field comes from batch2, and it is at offset 1 in batch2.
+ - The sixth field comes from batch1, and it is at offset 0 in batch1.
+ """
+
+ def __init__(self, row_offsets: List[int], field_offsets: List[int],
readers: List[Optional[RecordBatchReader]]):
+ if row_offsets is None:
+ raise ValueError("Row offsets must not be null")
+ if field_offsets is None:
+ raise ValueError("Field offsets must not be null")
+ if len(row_offsets) != len(field_offsets):
+ raise ValueError("Row offsets and field offsets must have the same
length")
+ if not row_offsets:
+ raise ValueError("Row offsets must not be empty")
+ if not readers or len(readers) < 1:
+ raise ValueError("Readers should be more than 0")
+ self.row_offsets = row_offsets
+ self.field_offsets = field_offsets
+ self.readers = readers
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
+ for i, reader in enumerate(self.readers):
+ if reader is not None:
+ batch = reader.read_arrow_batch()
+ if batch is None:
+ # all readers are aligned, as long as one returns null,
the others will also have no data
+ return None
+ batches[i] = batch
+ # Assemble record batches from batches based on row_offsets and
field_offsets
+ columns = []
+ names = []
+ for i in range(len(self.row_offsets)):
+ batch_index = self.row_offsets[i]
+ field_index = self.field_offsets[i]
+ if batches[batch_index] is not None:
+ column = batches[batch_index].column(field_index)
+ columns.append(column)
+ names.append(batches[batch_index].schema.names[field_index])
+ if columns:
+ return pa.RecordBatch.from_arrays(columns, names)
+ return None
+
+ def close(self) -> None:
+ try:
+ for reader in self.readers:
+ if reader is not None:
+ reader.close()
+ except Exception as e:
+ raise IOError("Failed to close inner readers") from e
diff --git a/paimon-python/pypaimon/read/reader/data_file_record_reader.py
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
similarity index 100%
rename from paimon-python/pypaimon/read/reader/data_file_record_reader.py
rename to paimon-python/pypaimon/read/reader/data_file_batch_reader.py
diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py
b/paimon-python/pypaimon/read/reader/field_bunch.py
new file mode 100644
index 0000000000..4ba82bd80e
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/field_bunch.py
@@ -0,0 +1,120 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+"""
+FieldBunch classes for organizing files by field in data evolution.
+
+These classes help organize DataFileMeta objects into groups based on their
field content,
+supporting both regular data files and blob files.
+"""
+from abc import ABC
+from typing import List
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+
+
+class FieldBunch(ABC):
+ """Interface for files organized by field."""
+
+ def row_count(self) -> int:
+ """Return the total row count for this bunch."""
+ ...
+
+ def files(self) -> List[DataFileMeta]:
+ """Return the list of files in this bunch."""
+ ...
+
+
+class DataBunch(FieldBunch):
+ """Files for a single data file."""
+
+ def __init__(self, data_file: DataFileMeta):
+ self.data_file = data_file
+
+ def row_count(self) -> int:
+ return self.data_file.row_count
+
+ def files(self) -> List[DataFileMeta]:
+ return [self.data_file]
+
+
+class BlobBunch(FieldBunch):
+ """Files for partial field (blob files)."""
+
+ def __init__(self, expected_row_count: int):
+ self._files: List[DataFileMeta] = []
+ self.expected_row_count = expected_row_count
+ self.latest_first_row_id = -1
+ self.expected_next_first_row_id = -1
+ self.latest_max_sequence_number = -1
+ self._row_count = 0
+
+ def add(self, file: DataFileMeta) -> None:
+ """Add a blob file to this bunch."""
+ if not self._is_blob_file(file.file_name):
+ raise ValueError("Only blob file can be added to a blob bunch.")
+
+ if file.first_row_id == self.latest_first_row_id:
+ if file.max_sequence_number >= self.latest_max_sequence_number:
+ raise ValueError(
+ "Blob file with same first row id should have decreasing
sequence number."
+ )
+ return
+
+ if self._files:
+ first_row_id = file.first_row_id
+ if first_row_id < self.expected_next_first_row_id:
+ if file.max_sequence_number >= self.latest_max_sequence_number:
+ raise ValueError(
+ "Blob file with overlapping row id should have
decreasing sequence number."
+ )
+ return
+ elif first_row_id > self.expected_next_first_row_id:
+ raise ValueError(
+ f"Blob file first row id should be continuous, expect "
+ f"{self.expected_next_first_row_id} but got {first_row_id}"
+ )
+
+ if file.schema_id != self._files[0].schema_id:
+ raise ValueError(
+ "All files in a blob bunch should have the same schema id."
+ )
+ if file.write_cols != self._files[0].write_cols:
+ raise ValueError(
+ "All files in a blob bunch should have the same write
columns."
+ )
+
+ self._files.append(file)
+ self._row_count += file.row_count
+ if self._row_count > self.expected_row_count:
+ raise ValueError(
+ f"Blob files row count exceed the expect
{self.expected_row_count}"
+ )
+
+ self.latest_max_sequence_number = file.max_sequence_number
+ self.latest_first_row_id = file.first_row_id
+ self.expected_next_first_row_id = self.latest_first_row_id +
file.row_count
+
+ def row_count(self) -> int:
+ return self._row_count
+
+ def files(self) -> List[DataFileMeta]:
+ return self._files
+
+ @staticmethod
+ def _is_blob_file(file_name: str) -> bool:
+ """Check if a file is a blob file based on its extension."""
+ return file_name.endswith('.blob')
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 7674db45f0..81ebdd86f8 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -23,11 +23,14 @@ from typing import List, Optional, Tuple
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader,
ShardBatchReader
from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
-from pypaimon.read.reader.data_file_record_reader import DataFileBatchReader
+from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
+from pypaimon.read.reader.data_evolution_merge_reader import
DataEvolutionMergeReader
+from pypaimon.read.reader.field_bunch import FieldBunch, DataBunch, BlobBunch
from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader
from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader
from pypaimon.read.reader.filter_record_reader import FilterRecordReader
@@ -298,3 +301,194 @@ class MergeFileSplitRead(SplitRead):
def _get_all_data_fields(self):
return self._create_key_value_fields(self.table.fields)
+
+
+class DataEvolutionSplitRead(SplitRead):
+
+ def create_reader(self) -> RecordReader:
+ files = self.split.files
+ suppliers = []
+
+ # Split files by row ID using the same logic as Java
DataEvolutionSplitGenerator.split
+ split_by_row_id = self._split_by_row_id(files)
+
+ for need_merge_files in split_by_row_id:
+ if len(need_merge_files) == 1 or not self.read_fields:
+ # No need to merge fields, just create a single file reader
+ suppliers.append(
+ lambda f=need_merge_files[0]: self._create_file_reader(f)
+ )
+ else:
+ suppliers.append(
+ lambda files=need_merge_files:
self._create_union_reader(files)
+ )
+
+ return ConcatBatchReader(suppliers)
+
+ def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
+ """Split files by firstRowId for data evolution."""
+
+ # Sort files by firstRowId and then by maxSequenceNumber
+ def sort_key(file: DataFileMeta) -> tuple:
+ first_row_id = file.first_row_id if file.first_row_id is not None
else float('-inf')
+ is_blob = 1 if self._is_blob_file(file.file_name) else 0
+ max_seq = file.max_sequence_number
+ return (first_row_id, is_blob, -max_seq)
+
+ sorted_files = sorted(files, key=sort_key)
+
+ # Split files by firstRowId
+ split_by_row_id = []
+ last_row_id = -1
+ check_row_id_start = 0
+ current_split = []
+
+ for file in sorted_files:
+ first_row_id = file.first_row_id
+ if first_row_id is None:
+ split_by_row_id.append([file])
+ continue
+
+ if not self._is_blob_file(file.file_name) and first_row_id !=
last_row_id:
+ if current_split:
+ split_by_row_id.append(current_split)
+ if first_row_id < check_row_id_start:
+ raise ValueError(
+ f"There are overlapping files in the split: {files}, "
+ f"the wrong file is: {file}"
+ )
+ current_split = []
+ last_row_id = first_row_id
+ check_row_id_start = first_row_id + file.row_count
+ current_split.append(file)
+
+ if current_split:
+ split_by_row_id.append(current_split)
+
+ return split_by_row_id
+
+ def _create_union_reader(self, need_merge_files: List[DataFileMeta]) ->
RecordReader:
+ """Create a DataEvolutionFileReader for merging multiple files."""
+ # Split field bunches
+ fields_files = self._split_field_bunches(need_merge_files)
+
+ # Validate row counts and first row IDs
+ row_count = fields_files[0].row_count()
+ first_row_id = fields_files[0].files()[0].first_row_id
+
+ for bunch in fields_files:
+ if bunch.row_count() != row_count:
+ raise ValueError("All files in a field merge split should have
the same row count.")
+ if bunch.files()[0].first_row_id != first_row_id:
+ raise ValueError(
+ "All files in a field merge split should have the same
first row id and could not be null."
+ )
+
+ # Create the union reader
+ all_read_fields = self.read_fields
+ file_record_readers = [None] * len(fields_files)
+ read_field_index = [field.id for field in all_read_fields]
+
+ # Initialize offsets
+ row_offsets = [-1] * len(all_read_fields)
+ field_offsets = [-1] * len(all_read_fields)
+
+ for i, bunch in enumerate(fields_files):
+ first_file = bunch.files()[0]
+
+ # Get field IDs for this bunch
+ if self._is_blob_file(first_file.file_name):
+ # For blob files, we need to get the field ID from the write
columns
+ field_ids = [self._get_field_id_from_write_cols(first_file)]
+ elif first_file.write_cols:
+ field_ids =
self._get_field_ids_from_write_cols(first_file.write_cols)
+ else:
+ # For regular files, get all field IDs from the schema
+ field_ids = [field.id for field in self.table.fields]
+
+ read_fields = []
+ for j, read_field_id in enumerate(read_field_index):
+ for field_id in field_ids:
+ if read_field_id == field_id:
+ if row_offsets[j] == -1:
+ row_offsets[j] = i
+ field_offsets[j] = len(read_fields)
+ read_fields.append(all_read_fields[j])
+ break
+
+ if not read_fields:
+ file_record_readers[i] = None
+ else:
+ table_fields = self.read_fields
+ self.read_fields = read_fields # create reader based on
read_fields
+ # Create reader for this bunch
+ if len(bunch.files()) == 1:
+ file_record_readers[i] =
self._create_file_reader(bunch.files()[0])
+ else:
+ # Create concatenated reader for multiple files
+ suppliers = [
+ lambda f=file: self._create_file_reader(f) for file in
bunch.files()
+ ]
+ file_record_readers[i] = ConcatRecordReader(suppliers)
+ self.read_fields = table_fields
+
+ # Validate that all required fields are found
+ for i, field in enumerate(all_read_fields):
+ if row_offsets[i] == -1:
+ if not field.type.is_nullable():
+ raise ValueError(f"Field {field} is not null but can't
find any file contains it.")
+
+ return DataEvolutionMergeReader(row_offsets, field_offsets,
file_record_readers)
+
+ def _create_file_reader(self, file: DataFileMeta) -> RecordReader:
+ """Create a file reader for a single file."""
+ return self.file_reader_supplier(file_path=file.file_path,
for_merge_read=False)
+
+ def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) ->
List[FieldBunch]:
+ """Split files into field bunches."""
+
+ fields_files = []
+ blob_bunch_map = {}
+ row_count = -1
+
+ for file in need_merge_files:
+ if self._is_blob_file(file.file_name):
+ field_id = self._get_field_id_from_write_cols(file)
+ if field_id not in blob_bunch_map:
+ blob_bunch_map[field_id] = BlobBunch(row_count)
+ blob_bunch_map[field_id].add(file)
+ else:
+ # Normal file, just add it to the current merge split
+ fields_files.append(DataBunch(file))
+ row_count = file.row_count
+
+ fields_files.extend(blob_bunch_map.values())
+ return fields_files
+
+ def _get_field_id_from_write_cols(self, file: DataFileMeta) -> int:
+ """Get field ID from write columns for blob files."""
+ if not file.write_cols or len(file.write_cols) == 0:
+ raise ValueError("Blob file must have write columns")
+
+ # Find the field by name in the table schema
+ field_name = file.write_cols[0]
+ for field in self.table.fields:
+ if field.name == field_name:
+ return field.id
+ raise ValueError(f"Field {field_name} not found in table schema")
+
+ def _get_field_ids_from_write_cols(self, write_cols: List[str]) ->
List[int]:
+ field_ids = []
+ for field_name in write_cols:
+ for field in self.table.fields:
+ if field.name == field_name:
+ field_ids.append(field.id)
+ return field_ids
+
+ @staticmethod
+ def _is_blob_file(file_name: str) -> bool:
+ """Check if a file is a blob file based on its extension."""
+ return file_name.endswith('.blob')
+
+ def _get_all_data_fields(self):
+ return self.table.fields
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index b5f7a7b765..b33fb2c6ad 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -26,7 +26,7 @@ from pypaimon.read.push_down_utils import
extract_predicate_to_list
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.split import Split
from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
- SplitRead)
+ SplitRead, DataEvolutionSplitRead)
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.offset_row import OffsetRow
@@ -132,6 +132,14 @@ class TableRead:
read_type=self.read_type,
split=split
)
+ elif self.table.options.get('data-evolution.enabled', 'false').lower()
== 'true':
+ return DataEvolutionSplitRead(
+ table=self.table,
+ predicate=self.predicate,
+ push_down_predicate=self.push_down_predicate,
+ read_type=self.read_type,
+ split=split
+ )
else:
return RawFileSplitRead(
table=self.table,
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 6a6ab9f3f8..d76725ca97 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -68,6 +68,7 @@ class TableScan:
self.only_read_real_buckets = True if int(
self.table.options.get('bucket', -1)) ==
BucketMode.POSTPONE_BUCKET.value else False
+ self.data_evolution = self.table.options.get('data-evolution.enabled',
'false').lower() == 'true'
def plan(self) -> Plan:
file_entries = self.plan_files()
@@ -75,6 +76,8 @@ class TableScan:
return Plan([])
if self.table.is_primary_key_table:
splits = self._create_primary_key_splits(file_entries)
+ elif self.data_evolution:
+ splits = self._create_data_evolution_splits(file_entries)
else:
splits = self._create_append_only_splits(file_entries)
@@ -253,6 +256,48 @@ class TableScan:
"row_count": file_entry.file.row_count,
})
+ def _create_data_evolution_splits(self, file_entries: List[ManifestEntry])
-> List['Split']:
+ """
+ Create data evolution splits for append-only tables with schema
evolution.
+ This method groups files by firstRowId and creates splits that can
handle
+ column merging across different schema versions.
+ """
+ partitioned_files = defaultdict(list)
+ for entry in file_entries:
+ partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
+
+ if self.idx_of_this_subtask is not None:
+ partitioned_files, plan_start_row, plan_end_row =
self._append_only_filter_by_shard(partitioned_files)
+
+ def weight_func(file_list: List[DataFileMeta]) -> int:
+ return max(sum(f.file_size for f in file_list),
self.open_file_cost)
+
+ splits = []
+ for key, file_entries in partitioned_files.items():
+ if not file_entries:
+ continue
+
+ data_files: List[DataFileMeta] = [e.file for e in file_entries]
+
+ # Split files by firstRowId for data evolution
+ split_by_row_id = self._split_by_row_id(data_files)
+
+ # Pack the split groups for optimal split sizes
+ packed_files: List[List[List[DataFileMeta]]] =
self._pack_for_ordered(split_by_row_id, weight_func,
+
self.target_split_size)
+
+ # Flatten the packed files and build splits
+ flatten_packed_files: List[List[DataFileMeta]] = [
+ [file for sub_pack in pack for file in sub_pack]
+ for pack in packed_files
+ ]
+
+ splits += self._build_split_from_pack(flatten_packed_files,
file_entries, False)
+
+ if self.idx_of_this_subtask is not None:
+ self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
+ return splits
+
def _create_append_only_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
partitioned_files = defaultdict(list)
for entry in file_entries:
@@ -360,3 +405,64 @@ class TableScan:
packed.append(bin_items)
return packed
+
+ @staticmethod
+ def _is_blob_file(file_name: str) -> bool:
+ """Check if a file is a blob file based on its extension."""
+ return file_name.endswith('.blob')
+
+ def _split_by_row_id(self, files: List[DataFileMeta]) ->
List[List[DataFileMeta]]:
+ """
+ Split files by firstRowId for data evolution.
+ This method groups files that have the same firstRowId, which is
essential
+ for handling schema evolution where files with different schemas need
to be
+ read together to merge columns.
+ """
+ split_by_row_id = []
+
+ # Sort files by firstRowId and then by maxSequenceNumber
+ # Files with null firstRowId are treated as having Long.MIN_VALUE
+ def sort_key(file: DataFileMeta) -> tuple:
+ first_row_id = file.first_row_id if file.first_row_id is not None
else float('-inf')
+ is_blob = 1 if self._is_blob_file(file.file_name) else 0
+ # For files with same firstRowId, sort by maxSequenceNumber in
descending order
+ # (larger sequence number means more recent data)
+ max_seq = file.max_sequence_number
+ return (first_row_id, is_blob, -max_seq)
+
+ sorted_files = sorted(files, key=sort_key)
+
+ # Split files by firstRowId
+ last_row_id = -1
+ check_row_id_start = 0
+ current_split = []
+
+ for file in sorted_files:
+ first_row_id = file.first_row_id
+ if first_row_id is None:
+ # Files without firstRowId are treated as individual splits
+ split_by_row_id.append([file])
+ continue
+
+ if not self._is_blob_file(file.file_name) and first_row_id !=
last_row_id:
+ if current_split:
+ split_by_row_id.append(current_split)
+
+ # Validate that files don't overlap
+ if first_row_id < check_row_id_start:
+ file_names = [f.file_name for f in sorted_files]
+ raise ValueError(
+ f"There are overlapping files in the split:
{file_names}, "
+ f"the wrong file is: {file.file_name}"
+ )
+
+ current_split = []
+ last_row_id = first_row_id
+ check_row_id_start = first_row_id + file.row_count
+
+ current_split.append(file)
+
+ if current_split:
+ split_by_row_id.append(current_split)
+
+ return split_by_row_id
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py
b/paimon-python/pypaimon/snapshot/snapshot.py
index 5bc92dcad4..96b287ab55 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -43,3 +43,4 @@ class Snapshot:
changelog_record_count: Optional[int] = json_field("changelogRecordCount",
default=None)
watermark: Optional[int] = json_field("watermark", default=None)
statistics: Optional[str] = json_field("statistics", default=None)
+ next_row_id: Optional[int] = json_field("nextRowId", default=None)
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py
b/paimon-python/pypaimon/tests/data_evolution_test.py
new file mode 100644
index 0000000000..90abd2f916
--- /dev/null
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -0,0 +1,483 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import tempfile
+import unittest
+
+import pyarrow as pa
+from pypaimon import Schema, CatalogFactory
+
+
+class DataEvolutionTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', False)
+
+ def test_basic(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ])
+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
+ options={'row-tracking.enabled':
'true', 'data-evolution.enabled': 'true'})
+ self.catalog.create_table('default.test_row_tracking', schema, False)
+ table = self.catalog.get_table('default.test_row_tracking')
+
+ # write 1
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ expect_data = pa.Table.from_pydict({
+ 'f0': [-1, 2],
+ 'f1': [-1001, 1002]
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(expect_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # write 2
+ table_write = write_builder.new_write().with_write_type(['f0'])
+ table_commit = write_builder.new_commit()
+ data2 = pa.Table.from_pydict({
+ 'f0': [3, 4],
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ]))
+ table_write.write_arrow(data2)
+ cmts = table_write.prepare_commit()
+ cmts[0].new_files[0].first_row_id = 0
+ table_commit.commit(cmts)
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_data = table_read.to_arrow(table_scan.plan().splits())
+ expect_data = pa.Table.from_pydict({
+ 'f0': [3, 4],
+ 'f1': [-1001, 1002]
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ]))
+ self.assertEqual(actual_data, expect_data)
+
+ def test_multiple_appends(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_multiple_appends', schema,
False)
+ table = self.catalog.get_table('default.test_multiple_appends')
+
+ write_builder = table.new_batch_write_builder()
+
+ # write 100 rows: (1, "a", "b")
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ init_data = pa.Table.from_pydict({
+ 'f0': [1] * 100,
+ 'f1': ['a'] * 100,
+ 'f2': ['b'] * 100,
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(init_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ # append:set first_row_id = 100 to modify the row with columns write
+ write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ write1 = write_builder.new_write().with_write_type(['f2'])
+ commit = write_builder.new_commit()
+ data0 = pa.Table.from_pydict({'f0': [2], 'f1': ['x']},
+ schema=pa.schema([('f0', pa.int32()),
('f1', pa.string())]))
+ data1 = pa.Table.from_pydict({'f2': ['y']}, schema=pa.schema([('f2',
pa.string())]))
+ write0.write_arrow(data0)
+ write1.write_arrow(data1)
+ cmts = write0.prepare_commit() + write1.prepare_commit()
+ for c in cmts:
+ for nf in c.new_files:
+ nf.first_row_id = 100
+ commit.commit(cmts)
+ write0.close()
+ write1.close()
+ commit.close()
+
+ # append:write (3, "c") and ("d"), set first_row_id = 101
+ write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ commit0 = write_builder.new_commit()
+ data0 = pa.Table.from_pydict({'f0': [3], 'f1': ['c']},
+ schema=pa.schema([('f0', pa.int32()),
('f1', pa.string())]))
+ write0.write_arrow(data0)
+ cmts0 = write0.prepare_commit()
+ for c in cmts0:
+ for nf in c.new_files:
+ nf.first_row_id = 101
+ commit0.commit(cmts0)
+ write0.close()
+ commit0.close()
+
+ write1 = write_builder.new_write().with_write_type(['f2'])
+ commit1 = write_builder.new_commit()
+ data1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2',
pa.string())]))
+ write1.write_arrow(data1)
+ cmts1 = write1.prepare_commit()
+ for c in cmts1:
+ for nf in c.new_files:
+ nf.first_row_id = 101
+ commit1.commit(cmts1)
+ write1.close()
+ commit1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ self.assertEqual(actual.num_rows, 102)
+ expect = pa.Table.from_pydict({
+ 'f0': [1] * 100 + [2] + [3],
+ 'f1': ['a'] * 100 + ['x'] + ['c'],
+ 'f2': ['b'] * 100 + ['y'] + ['d'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_disorder_cols_append(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_disorder_cols_append', schema,
False)
+ table = self.catalog.get_table('default.test_disorder_cols_append')
+
+ write_builder = table.new_batch_write_builder()
+ num_rows = 100
+ # write 1 rows: (1, "a", "b")
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ init_data = pa.Table.from_pydict({
+ 'f0': [1] * num_rows,
+ 'f1': ['a'] * num_rows,
+ 'f2': ['b'] * num_rows,
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(init_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # append:set first_row_id = 0 to modify the row with columns write
+ write0 = write_builder.new_write().with_write_type(['f0', 'f2'])
+ write1 = write_builder.new_write().with_write_type(['f1'])
+ commit = write_builder.new_commit()
+ data0 = pa.Table.from_pydict({'f0': [2] * num_rows, 'f2': ['y'] *
num_rows},
+ schema=pa.schema([('f0', pa.int32()),
('f2', pa.string())]))
+ data1 = pa.Table.from_pydict({'f1': ['x'] * num_rows},
schema=pa.schema([('f1', pa.string())]))
+ write0.write_arrow(data0)
+ write1.write_arrow(data1)
+ cmts = write0.prepare_commit() + write1.prepare_commit()
+ for c in cmts:
+ for nf in c.new_files:
+ nf.first_row_id = 0
+ commit.commit(cmts)
+ write0.close()
+ write1.close()
+ commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ self.assertEqual(actual.num_rows, 100)
+ expect = pa.Table.from_pydict({
+ 'f0': [2] * num_rows,
+ 'f1': ['x'] * num_rows,
+ 'f2': ['y'] * num_rows,
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_only_some_columns(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_only_some_columns', schema,
False)
+ table = self.catalog.get_table('default.test_only_some_columns')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Commit 1: f0
+ w0 = write_builder.new_write().with_write_type(['f0'])
+ c0 = write_builder.new_commit()
+ d0 = pa.Table.from_pydict({'f0': [1]}, schema=pa.schema([('f0',
pa.int32())]))
+ w0.write_arrow(d0)
+ c0.commit(w0.prepare_commit())
+ w0.close()
+ c0.close()
+
+ # Commit 2: f1, first_row_id = 0
+ w1 = write_builder.new_write().with_write_type(['f1'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({'f1': ['a']}, schema=pa.schema([('f1',
pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ for c in cmts1:
+ for nf in c.new_files:
+ nf.first_row_id = 0
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ # Commit 3: f2, first_row_id = 0
+ w2 = write_builder.new_write().with_write_type(['f2'])
+ c2 = write_builder.new_commit()
+ d2 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2',
pa.string())]))
+ w2.write_arrow(d2)
+ cmts2 = w2.prepare_commit()
+ for c in cmts2:
+ for nf in c.new_files:
+ nf.first_row_id = 0
+ c2.commit(cmts2)
+ w2.close()
+ c2.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ expect = pa.Table.from_pydict({
+ 'f0': [1],
+ 'f1': ['a'],
+ 'f2': ['b'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_null_values(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_null_values', schema, False)
+ table = self.catalog.get_table('default.test_null_values')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Commit 1: some cols are null
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c = write_builder.new_commit()
+
+ d0 = pa.Table.from_pydict({'f0': [1], 'f1': [None]},
+ schema=pa.schema([('f0', pa.int32()), ('f1',
pa.string())]))
+ d1 = pa.Table.from_pydict({'f2': [None]}, schema=pa.schema([('f2',
pa.string())]))
+ w0.write_arrow(d0)
+ w1.write_arrow(d1)
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # Commit 2
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({'f2': ['c']}, schema=pa.schema([('f2',
pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ for msg in cmts1:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+ expect = pa.Table.from_pydict({
+ 'f0': [1],
+ 'f1': [None],
+ 'f2': ['c'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ # different first_row_id append multiple times
+ def test_multiple_appends_different_first_row_ids(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_multiple_appends_diff_rowid',
schema, False)
+ table =
self.catalog.get_table('default.test_multiple_appends_diff_rowid')
+
+ write_builder = table.new_batch_write_builder()
+
+ # commit 1
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c = write_builder.new_commit()
+ d0 = pa.Table.from_pydict({'f0': [1], 'f1': ['a']},
+ schema=pa.schema([('f0', pa.int32()), ('f1',
pa.string())]))
+ d1 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2',
pa.string())]))
+ w0.write_arrow(d0)
+ w1.write_arrow(d1)
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # commit 2
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ c0 = write_builder.new_commit()
+ d0 = pa.Table.from_pydict({'f0': [2], 'f1': ['c']},
+ schema=pa.schema([('f0', pa.int32()), ('f1',
pa.string())]))
+ w0.write_arrow(d0)
+ cmts0 = w0.prepare_commit()
+ for msg in cmts0:
+ for nf in msg.new_files:
+ nf.first_row_id = 1
+ c0.commit(cmts0)
+ w0.close()
+ c0.close()
+
+ # commit 3
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2',
pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ for msg in cmts1:
+ for nf in msg.new_files:
+ nf.first_row_id = 1
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ expect = pa.Table.from_pydict({
+ 'f0': [1, 2],
+ 'f1': ['a', 'c'],
+ 'f2': ['b', 'd'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_more_data(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_more_data', schema, False)
+ table = self.catalog.get_table('default.test_more_data')
+
+ write_builder = table.new_batch_write_builder()
+
+ # first commit:100k rows
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c = write_builder.new_commit()
+ size = 100000
+ d0 = pa.Table.from_pydict({
+ 'f0': list(range(size)),
+ 'f1': [f'a{i}' for i in range(size)],
+ }, schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]))
+ d1 = pa.Table.from_pydict({
+ 'f2': [f'b{i}' for i in range(size)],
+ }, schema=pa.schema([('f2', pa.string())]))
+ w0.write_arrow(d0)
+ w1.write_arrow(d1)
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # second commit:overwrite f2 to 'c{i}'
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({
+ 'f2': [f'c{i}' for i in range(size)],
+ }, schema=pa.schema([('f2', pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ expect = pa.Table.from_pydict({
+ 'f0': list(range(size)),
+ 'f1': [f'a{i}' for i in range(size)],
+ 'f2': [f'c{i}' for i in range(size)],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py
b/paimon-python/pypaimon/tests/file_store_commit_test.py
index ac7ce95094..ab566c3e52 100644
--- a/paimon-python/pypaimon/tests/file_store_commit_test.py
+++ b/paimon-python/pypaimon/tests/file_store_commit_test.py
@@ -76,7 +76,10 @@ class TestFileStoreCommit(unittest.TestCase):
schema_id=0,
level=0,
extra_files=None,
- creation_time=creation_time
+ creation_time=creation_time,
+ external_path=None,
+ first_row_id=None,
+ write_cols=None
)
commit_message = CommitMessage(
@@ -182,7 +185,10 @@ class TestFileStoreCommit(unittest.TestCase):
schema_id=0,
level=0,
extra_files=None,
- creation_time=creation_time
+ creation_time=creation_time,
+ external_path=None,
+ first_row_id=None,
+ write_cols=None
)
# File for partition 2
@@ -199,7 +205,10 @@ class TestFileStoreCommit(unittest.TestCase):
schema_id=0,
level=0,
extra_files=None,
- creation_time=creation_time
+ creation_time=creation_time,
+ external_path=None,
+ first_row_id=None,
+ write_cols=None
)
commit_message_1 = CommitMessage(
@@ -261,7 +270,10 @@ class TestFileStoreCommit(unittest.TestCase):
schema_id=0,
level=0,
extra_files=None,
- creation_time=creation_time
+ creation_time=creation_time,
+ external_path=None,
+ first_row_id=None,
+ write_cols=None
)
commit_message = CommitMessage(
@@ -389,7 +401,3 @@ class TestFileStoreCommit(unittest.TestCase):
file=file
))
return commit_entries
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/paimon-python/pypaimon/tests/py36/data_evolution_test.py
b/paimon-python/pypaimon/tests/py36/data_evolution_test.py
new file mode 100644
index 0000000000..90abd2f916
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/data_evolution_test.py
@@ -0,0 +1,483 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import tempfile
+import unittest
+
+import pyarrow as pa
+from pypaimon import Schema, CatalogFactory
+
+
+class DataEvolutionTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({
+ 'warehouse': cls.warehouse
+ })
+ cls.catalog.create_database('default', False)
+
+ def test_basic(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ])
+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
+ options={'row-tracking.enabled':
'true', 'data-evolution.enabled': 'true'})
+ self.catalog.create_table('default.test_row_tracking', schema, False)
+ table = self.catalog.get_table('default.test_row_tracking')
+
+ # write 1
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ expect_data = pa.Table.from_pydict({
+ 'f0': [-1, 2],
+ 'f1': [-1001, 1002]
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(expect_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # write 2
+ table_write = write_builder.new_write().with_write_type(['f0'])
+ table_commit = write_builder.new_commit()
+ data2 = pa.Table.from_pydict({
+ 'f0': [3, 4],
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ]))
+ table_write.write_arrow(data2)
+ cmts = table_write.prepare_commit()
+ cmts[0].new_files[0].first_row_id = 0
+ table_commit.commit(cmts)
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_data = table_read.to_arrow(table_scan.plan().splits())
+ expect_data = pa.Table.from_pydict({
+ 'f0': [3, 4],
+ 'f1': [-1001, 1002]
+ }, schema=pa.schema([
+ ('f0', pa.int8()),
+ ('f1', pa.int16()),
+ ]))
+ self.assertEqual(actual_data, expect_data)
+
+ def test_multiple_appends(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_multiple_appends', schema,
False)
+ table = self.catalog.get_table('default.test_multiple_appends')
+
+ write_builder = table.new_batch_write_builder()
+
+ # write 100 rows: (1, "a", "b")
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ init_data = pa.Table.from_pydict({
+ 'f0': [1] * 100,
+ 'f1': ['a'] * 100,
+ 'f2': ['b'] * 100,
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(init_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ # append:set first_row_id = 100 to modify the row with columns write
+ write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ write1 = write_builder.new_write().with_write_type(['f2'])
+ commit = write_builder.new_commit()
+ data0 = pa.Table.from_pydict({'f0': [2], 'f1': ['x']},
+ schema=pa.schema([('f0', pa.int32()),
('f1', pa.string())]))
+ data1 = pa.Table.from_pydict({'f2': ['y']}, schema=pa.schema([('f2',
pa.string())]))
+ write0.write_arrow(data0)
+ write1.write_arrow(data1)
+ cmts = write0.prepare_commit() + write1.prepare_commit()
+ for c in cmts:
+ for nf in c.new_files:
+ nf.first_row_id = 100
+ commit.commit(cmts)
+ write0.close()
+ write1.close()
+ commit.close()
+
+ # append:write (3, "c") and ("d"), set first_row_id = 101
+ write0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ commit0 = write_builder.new_commit()
+ data0 = pa.Table.from_pydict({'f0': [3], 'f1': ['c']},
+ schema=pa.schema([('f0', pa.int32()),
('f1', pa.string())]))
+ write0.write_arrow(data0)
+ cmts0 = write0.prepare_commit()
+ for c in cmts0:
+ for nf in c.new_files:
+ nf.first_row_id = 101
+ commit0.commit(cmts0)
+ write0.close()
+ commit0.close()
+
+ write1 = write_builder.new_write().with_write_type(['f2'])
+ commit1 = write_builder.new_commit()
+ data1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2',
pa.string())]))
+ write1.write_arrow(data1)
+ cmts1 = write1.prepare_commit()
+ for c in cmts1:
+ for nf in c.new_files:
+ nf.first_row_id = 101
+ commit1.commit(cmts1)
+ write1.close()
+ commit1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ self.assertEqual(actual.num_rows, 102)
+ expect = pa.Table.from_pydict({
+ 'f0': [1] * 100 + [2] + [3],
+ 'f1': ['a'] * 100 + ['x'] + ['c'],
+ 'f2': ['b'] * 100 + ['y'] + ['d'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_disorder_cols_append(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_disorder_cols_append', schema,
False)
+ table = self.catalog.get_table('default.test_disorder_cols_append')
+
+ write_builder = table.new_batch_write_builder()
+ num_rows = 100
+ # write 1 rows: (1, "a", "b")
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ init_data = pa.Table.from_pydict({
+ 'f0': [1] * num_rows,
+ 'f1': ['a'] * num_rows,
+ 'f2': ['b'] * num_rows,
+ }, schema=simple_pa_schema)
+ table_write.write_arrow(init_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # append:set first_row_id = 0 to modify the row with columns write
+ write0 = write_builder.new_write().with_write_type(['f0', 'f2'])
+ write1 = write_builder.new_write().with_write_type(['f1'])
+ commit = write_builder.new_commit()
+ data0 = pa.Table.from_pydict({'f0': [2] * num_rows, 'f2': ['y'] *
num_rows},
+ schema=pa.schema([('f0', pa.int32()),
('f2', pa.string())]))
+ data1 = pa.Table.from_pydict({'f1': ['x'] * num_rows},
schema=pa.schema([('f1', pa.string())]))
+ write0.write_arrow(data0)
+ write1.write_arrow(data1)
+ cmts = write0.prepare_commit() + write1.prepare_commit()
+ for c in cmts:
+ for nf in c.new_files:
+ nf.first_row_id = 0
+ commit.commit(cmts)
+ write0.close()
+ write1.close()
+ commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ self.assertEqual(actual.num_rows, 100)
+ expect = pa.Table.from_pydict({
+ 'f0': [2] * num_rows,
+ 'f1': ['x'] * num_rows,
+ 'f2': ['y'] * num_rows,
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_only_some_columns(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_only_some_columns', schema,
False)
+ table = self.catalog.get_table('default.test_only_some_columns')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Commit 1: f0
+ w0 = write_builder.new_write().with_write_type(['f0'])
+ c0 = write_builder.new_commit()
+ d0 = pa.Table.from_pydict({'f0': [1]}, schema=pa.schema([('f0',
pa.int32())]))
+ w0.write_arrow(d0)
+ c0.commit(w0.prepare_commit())
+ w0.close()
+ c0.close()
+
+ # Commit 2: f1, first_row_id = 0
+ w1 = write_builder.new_write().with_write_type(['f1'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({'f1': ['a']}, schema=pa.schema([('f1',
pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ for c in cmts1:
+ for nf in c.new_files:
+ nf.first_row_id = 0
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ # Commit 3: f2, first_row_id = 0
+ w2 = write_builder.new_write().with_write_type(['f2'])
+ c2 = write_builder.new_commit()
+ d2 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2',
pa.string())]))
+ w2.write_arrow(d2)
+ cmts2 = w2.prepare_commit()
+ for c in cmts2:
+ for nf in c.new_files:
+ nf.first_row_id = 0
+ c2.commit(cmts2)
+ w2.close()
+ c2.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ expect = pa.Table.from_pydict({
+ 'f0': [1],
+ 'f1': ['a'],
+ 'f2': ['b'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_null_values(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_null_values', schema, False)
+ table = self.catalog.get_table('default.test_null_values')
+
+ write_builder = table.new_batch_write_builder()
+
+ # Commit 1: some cols are null
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c = write_builder.new_commit()
+
+ d0 = pa.Table.from_pydict({'f0': [1], 'f1': [None]},
+ schema=pa.schema([('f0', pa.int32()), ('f1',
pa.string())]))
+ d1 = pa.Table.from_pydict({'f2': [None]}, schema=pa.schema([('f2',
pa.string())]))
+ w0.write_arrow(d0)
+ w1.write_arrow(d1)
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # Commit 2
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({'f2': ['c']}, schema=pa.schema([('f2',
pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ for msg in cmts1:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+ expect = pa.Table.from_pydict({
+ 'f0': [1],
+ 'f1': [None],
+ 'f2': ['c'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ # different first_row_id append multiple times
+ def test_multiple_appends_different_first_row_ids(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_multiple_appends_diff_rowid',
schema, False)
+ table =
self.catalog.get_table('default.test_multiple_appends_diff_rowid')
+
+ write_builder = table.new_batch_write_builder()
+
+ # commit 1
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c = write_builder.new_commit()
+ d0 = pa.Table.from_pydict({'f0': [1], 'f1': ['a']},
+ schema=pa.schema([('f0', pa.int32()), ('f1',
pa.string())]))
+ d1 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2',
pa.string())]))
+ w0.write_arrow(d0)
+ w1.write_arrow(d1)
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # commit 2
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ c0 = write_builder.new_commit()
+ d0 = pa.Table.from_pydict({'f0': [2], 'f1': ['c']},
+ schema=pa.schema([('f0', pa.int32()), ('f1',
pa.string())]))
+ w0.write_arrow(d0)
+ cmts0 = w0.prepare_commit()
+ for msg in cmts0:
+ for nf in msg.new_files:
+ nf.first_row_id = 1
+ c0.commit(cmts0)
+ w0.close()
+ c0.close()
+
+ # commit 3
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2',
pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ for msg in cmts1:
+ for nf in msg.new_files:
+ nf.first_row_id = 1
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ expect = pa.Table.from_pydict({
+ 'f0': [1, 2],
+ 'f1': ['a', 'c'],
+ 'f2': ['b', 'd'],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_more_data(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string()),
+ ('f2', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ simple_pa_schema,
+ options={'row-tracking.enabled': 'true', 'data-evolution.enabled':
'true'}
+ )
+ self.catalog.create_table('default.test_more_data', schema, False)
+ table = self.catalog.get_table('default.test_more_data')
+
+ write_builder = table.new_batch_write_builder()
+
+ # first commit:100k rows
+ w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c = write_builder.new_commit()
+ size = 100000
+ d0 = pa.Table.from_pydict({
+ 'f0': list(range(size)),
+ 'f1': [f'a{i}' for i in range(size)],
+ }, schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]))
+ d1 = pa.Table.from_pydict({
+ 'f2': [f'b{i}' for i in range(size)],
+ }, schema=pa.schema([('f2', pa.string())]))
+ w0.write_arrow(d0)
+ w1.write_arrow(d1)
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for msg in cmts:
+ for nf in msg.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # second commit:overwrite f2 to 'c{i}'
+ w1 = write_builder.new_write().with_write_type(['f2'])
+ c1 = write_builder.new_commit()
+ d1 = pa.Table.from_pydict({
+ 'f2': [f'c{i}' for i in range(size)],
+ }, schema=pa.schema([('f2', pa.string())]))
+ w1.write_arrow(d1)
+ cmts1 = w1.prepare_commit()
+ c1.commit(cmts1)
+ w1.close()
+ c1.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(table_scan.plan().splits())
+
+ expect = pa.Table.from_pydict({
+ 'f0': list(range(size)),
+ 'f1': [f'a{i}' for i in range(size)],
+ 'f2': [f'c{i}' for i in range(size)],
+ }, schema=simple_pa_schema)
+ self.assertEqual(actual, expect)
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index 20e6a2c2d8..e6374132af 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -550,7 +550,7 @@ class RESTReadWritePy36Test(RESTBaseTest):
with self.assertRaises(ValueError) as e:
table_write.write_arrow_batch(record_batch)
- self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
+ self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema and write cols."))
def test_write_wide_table_large_data(self):
logging.basicConfig(level=logging.INFO)
@@ -801,7 +801,9 @@ class RESTReadWritePy36Test(RESTBaseTest):
embedded_index=None,
file_source=None,
value_stats_cols=value_stats_cols, # This is the key field we're
testing
- external_path=None
+ external_path=None,
+ first_row_id=None,
+ write_cols=None
)
# Create ManifestEntry
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index ccf06d5597..6e9dc1ffc6 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -248,7 +248,7 @@ class ReaderBasicTest(unittest.TestCase):
with self.assertRaises(ValueError) as e:
table_write.write_arrow_batch(record_batch)
- self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
+ self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema and write cols."))
def test_reader_iterator(self):
read_builder = self.table.new_read_builder()
@@ -609,7 +609,9 @@ class ReaderBasicTest(unittest.TestCase):
embedded_index=None,
file_source=None,
value_stats_cols=value_stats_cols, # This is the key field we're
testing
- external_path=None
+ external_path=None,
+ first_row_id=None,
+ write_cols=None
)
# Create ManifestEntry
diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
index dc6c47e778..d05942a256 100644
--- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py
@@ -339,7 +339,7 @@ class RESTTableReadWriteTest(RESTBaseTest):
with self.assertRaises(ValueError) as e:
table_write.write_arrow_batch(record_batch)
- self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
+ self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema and write cols."))
def test_reader_iterator(self):
read_builder = self.table.new_read_builder()
diff --git a/paimon-python/pypaimon/write/batch_table_write.py
b/paimon-python/pypaimon/write/batch_table_write.py
index c2e533cef5..a71e9c0503 100644
--- a/paimon-python/pypaimon/write/batch_table_write.py
+++ b/paimon-python/pypaimon/write/batch_table_write.py
@@ -67,10 +67,21 @@ class BatchTableWrite:
self.batch_committed = True
return self.file_store_write.prepare_commit()
+ def with_write_type(self, write_cols: List[str]):
+ for col in write_cols:
+ if col not in self.table_pyarrow_schema.names:
+ raise ValueError(f"Column {col} is not in table schema.")
+ if len(write_cols) == len(self.table_pyarrow_schema.names):
+ write_cols = None
+ self.file_store_write.write_cols = write_cols
+ return self
+
def close(self):
self.file_store_write.close()
- def _validate_pyarrow_schema(self, data_schema):
- if data_schema != self.table_pyarrow_schema:
- raise ValueError(f"Input schema isn't consistent with table
schema. "
- f"Table schema is: {data_schema} Input schema is:
{self.table_pyarrow_schema}")
+ def _validate_pyarrow_schema(self, data_schema: pa.Schema):
+ if data_schema != self.table_pyarrow_schema and data_schema.names !=
self.file_store_write.write_cols:
+ raise ValueError(f"Input schema isn't consistent with table schema
and write cols. "
+ f"Input schema is: {data_schema} "
+ f"Table schema is: {self.table_pyarrow_schema} "
+ f"Write cols is:
{self.file_store_write.write_cols}")
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 5920f50ad8..10d2796b76 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -130,6 +130,24 @@ class FileStoreCommit:
added_file_count = 0
deleted_file_count = 0
delta_record_count = 0
+ # process snapshot
+ new_snapshot_id = self._generate_snapshot_id()
+
+ # Check if row tracking is enabled
+ row_tracking_enabled = self.table.options.get('row-tracking.enabled',
'false').lower() == 'true'
+
+ # Apply row tracking logic if enabled
+ next_row_id = None
+ if row_tracking_enabled:
+ # Assign snapshot ID to delta files
+ commit_entries = self._assign_snapshot_id(new_snapshot_id,
commit_entries)
+
+ # Get the next row ID start from the latest snapshot
+ first_row_id_start = self._get_next_row_id_start()
+
+ # Assign row IDs to new files and get the next row ID for the
snapshot
+ commit_entries, next_row_id =
self._assign_row_tracking_meta(first_row_id_start, commit_entries)
+
for entry in commit_entries:
if entry.kind == 0:
added_file_count += 1
@@ -194,6 +212,7 @@ class FileStoreCommit:
commit_identifier=commit_identifier,
commit_kind=commit_kind,
time_millis=int(time.time() * 1000),
+ next_row_id=next_row_id,
)
# Generate partition statistics for the commit
@@ -314,3 +333,59 @@ class FileStoreCommit:
)
for stats in partition_stats.values()
]
+
+ def _assign_snapshot_id(self, snapshot_id: int, commit_entries:
List[ManifestEntry]) -> List[ManifestEntry]:
+ """Assign snapshot ID to all commit entries."""
+ return [entry.assign_sequence_number(snapshot_id, snapshot_id) for
entry in commit_entries]
+
+ def _get_next_row_id_start(self) -> int:
+ """Get the next row ID start from the latest snapshot."""
+ latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+ if latest_snapshot and hasattr(latest_snapshot, 'next_row_id') and
latest_snapshot.next_row_id is not None:
+ return latest_snapshot.next_row_id
+ return 0
+
+ def _assign_row_tracking_meta(self, first_row_id_start: int,
commit_entries: List[ManifestEntry]):
+ """
+ Assign row tracking metadata (first_row_id) to new files.
+ This follows the Java implementation logic from
FileStoreCommitImpl.assignRowTrackingMeta.
+ """
+ if not commit_entries:
+ return commit_entries, first_row_id_start
+
+ row_id_assigned = []
+ start = first_row_id_start
+ blob_start = first_row_id_start
+
+ for entry in commit_entries:
+ # Check if this is an append file that needs row ID assignment
+ if (entry.kind == 0 and # ADD kind
+ entry.file.file_source == "APPEND" and # APPEND file
source
+ entry.file.first_row_id is None): # No existing
first_row_id
+
+ if self._is_blob_file(entry.file.file_name):
+ # Handle blob files specially
+ if blob_start >= start:
+ raise RuntimeError(
+ f"This is a bug, blobStart {blob_start} should be
less than start {start} "
+ f"when assigning a blob entry file."
+ )
+ row_count = entry.file.row_count
+
row_id_assigned.append(entry.assign_first_row_id(blob_start))
+ blob_start += row_count
+ else:
+ # Handle regular files
+ row_count = entry.file.row_count
+ row_id_assigned.append(entry.assign_first_row_id(start))
+ blob_start = start
+ start += row_count
+ else:
+ # For compact files or files that already have first_row_id,
don't assign
+ row_id_assigned.append(entry)
+
+ return row_id_assigned, start
+
+ @staticmethod
+ def _is_blob_file(file_name: str) -> bool:
+ """Check if a file is a blob file based on its extension."""
+ return file_name.endswith('.blob')
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index bcef10a4c3..841fef3a65 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -34,6 +34,7 @@ class FileStoreWrite:
self.table: FileStoreTable = table
self.data_writers: Dict[Tuple, DataWriter] = {}
self.max_seq_numbers = self._seq_number_stats() # TODO: build this
on-demand instead of on all
+ self.write_cols = None
def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
key = (partition, bucket)
@@ -56,6 +57,7 @@ class FileStoreWrite:
partition=partition,
bucket=bucket,
max_seq_number=self.max_seq_numbers.get((partition, bucket),
1),
+ write_cols=self.write_cols
)
def prepare_commit(self) -> List[CommitMessage]:
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index cc0fc944a1..ad6e327c89 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -26,6 +26,7 @@ from typing import Dict, List, Optional, Tuple
from pypaimon.common.core_options import CoreOptions
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.schema.data_types import PyarrowFieldParser
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.row.generic_row import GenericRow
@@ -33,7 +34,8 @@ from pypaimon.table.row.generic_row import GenericRow
class DataWriter(ABC):
"""Base class for data writers that handle PyArrow tables directly."""
- def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int):
+ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number:
int,
+ write_cols: Optional[List[str]] = None):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
@@ -55,6 +57,7 @@ class DataWriter(ABC):
self.pending_data: Optional[pa.Table] = None
self.committed_files: List[DataFileMeta] = []
+ self.write_cols = write_cols
def write(self, data: pa.RecordBatch):
processed_data = self._process_data(data)
@@ -126,11 +129,13 @@ class DataWriter(ABC):
max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
# key stats & value stats
+ data_fields = self.table.fields if self.table.is_primary_key_table \
+ else PyarrowFieldParser.to_paimon_schema(data.schema)
column_stats = {
field.name: self._get_column_stats(data, field.name)
- for field in self.table.table_schema.fields
+ for field in data_fields
}
- all_fields = self.table.table_schema.fields
+ all_fields = data_fields
min_value_stats = [column_stats[field.name]['min_values'] for field in
all_fields]
max_value_stats = [column_stats[field.name]['max_values'] for field in
all_fields]
value_null_counts = [column_stats[field.name]['null_counts'] for field
in all_fields]
@@ -156,8 +161,8 @@ class DataWriter(ABC):
key_null_counts,
),
value_stats=SimpleStats(
- GenericRow(min_value_stats, self.table.table_schema.fields),
- GenericRow(max_value_stats, self.table.table_schema.fields),
+ GenericRow(min_value_stats, data_fields),
+ GenericRow(max_value_stats, data_fields),
value_null_counts,
),
min_sequence_number=min_seq,
@@ -167,7 +172,12 @@ class DataWriter(ABC):
extra_files=[],
creation_time=datetime.now(),
delete_row_count=0,
- value_stats_cols=None, # None means all columns have statistics
+ file_source="APPEND",
+ value_stats_cols=None, # None means all columns in the data have
statistics
+ external_path=None,
+ first_row_id=None,
+ write_cols=self.write_cols,
+ # None means all columns in the table have been written
file_path=str(file_path),
))