This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new faba3a15ca [Python] Support overwrite mode for writer (#6186)
faba3a15ca is described below
commit faba3a15ca9e840c18ea2082e9a0202aa73caf33
Author: ChengHui Chen <[email protected]>
AuthorDate: Fri Sep 5 22:26:10 2025 +0800
[Python] Support overwrite mode for writer (#6186)
---
.../pypaimon/manifest/manifest_file_manager.py | 76 +++++-----
paimon-python/pypaimon/read/plan.py | 5 +
paimon-python/pypaimon/read/table_scan.py | 17 ++-
.../pypaimon/tests/py36/ao_read_write_test.py | 81 ++++++++++
paimon-python/pypaimon/tests/reader_basic_test.py | 81 +++++++++-
.../pypaimon/tests/rest_table_read_write_test.py | 81 ++++++++++
.../pypaimon/tests/test_file_store_commit.py | 52 +++----
paimon-python/pypaimon/write/batch_table_commit.py | 2 +-
.../pypaimon/write/batch_write_builder.py | 2 +-
paimon-python/pypaimon/write/file_store_commit.py | 166 +++++++++++++--------
10 files changed, 425 insertions(+), 138 deletions(-)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 7c97f7b0ca..b2cd7868bb 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -24,9 +24,8 @@ from pypaimon.manifest.schema.data_file_meta import
DataFileMeta
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
ManifestEntry)
from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer,
+from pypaimon.table.row.binary_row import (BinaryRowDeserializer,
BinaryRowSerializer)
-from pypaimon.write.commit_message import CommitMessage
class ManifestFileManager:
@@ -99,46 +98,43 @@ class ManifestFileManager:
entries.append(entry)
return entries
- def write(self, file_name, commit_messages: List[CommitMessage]):
+ def write(self, file_name, entries: List[ManifestEntry]):
avro_records = []
- for message in commit_messages:
- partition_bytes = BinaryRowSerializer.to_bytes(
- BinaryRow(list(message.partition),
self.table.table_schema.get_partition_key_fields()))
- for file in message.new_files:
- avro_record = {
- "_VERSION": 2,
- "_KIND": 0,
- "_PARTITION": partition_bytes,
- "_BUCKET": message.bucket,
- "_TOTAL_BUCKETS": self.table.total_buckets,
- "_FILE": {
- "_FILE_NAME": file.file_name,
- "_FILE_SIZE": file.file_size,
- "_ROW_COUNT": file.row_count,
- "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
- "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
- "_KEY_STATS": {
- "_MIN_VALUES":
BinaryRowSerializer.to_bytes(file.key_stats.min_values),
- "_MAX_VALUES":
BinaryRowSerializer.to_bytes(file.key_stats.max_values),
- "_NULL_COUNTS": file.key_stats.null_counts,
- },
- "_VALUE_STATS": {
- "_MIN_VALUES":
BinaryRowSerializer.to_bytes(file.value_stats.min_values),
- "_MAX_VALUES":
BinaryRowSerializer.to_bytes(file.value_stats.max_values),
- "_NULL_COUNTS": file.value_stats.null_counts,
- },
- "_MIN_SEQUENCE_NUMBER": file.min_sequence_number,
- "_MAX_SEQUENCE_NUMBER": file.max_sequence_number,
- "_SCHEMA_ID": file.schema_id,
- "_LEVEL": file.level,
- "_EXTRA_FILES": file.extra_files,
- "_CREATION_TIME": file.creation_time,
- "_DELETE_ROW_COUNT": file.delete_row_count,
- "_EMBEDDED_FILE_INDEX": file.embedded_index,
- "_FILE_SOURCE": file.file_source,
- }
+ for entry in entries:
+ avro_record = {
+ "_VERSION": 2,
+ "_KIND": entry.kind,
+ "_PARTITION": BinaryRowSerializer.to_bytes(entry.partition),
+ "_BUCKET": entry.bucket,
+ "_TOTAL_BUCKETS": entry.bucket,
+ "_FILE": {
+ "_FILE_NAME": entry.file.file_name,
+ "_FILE_SIZE": entry.file.file_size,
+ "_ROW_COUNT": entry.file.row_count,
+ "_MIN_KEY":
BinaryRowSerializer.to_bytes(entry.file.min_key),
+ "_MAX_KEY":
BinaryRowSerializer.to_bytes(entry.file.max_key),
+ "_KEY_STATS": {
+ "_MIN_VALUES":
BinaryRowSerializer.to_bytes(entry.file.key_stats.min_values),
+ "_MAX_VALUES":
BinaryRowSerializer.to_bytes(entry.file.key_stats.max_values),
+ "_NULL_COUNTS": entry.file.key_stats.null_counts,
+ },
+ "_VALUE_STATS": {
+ "_MIN_VALUES":
BinaryRowSerializer.to_bytes(entry.file.value_stats.min_values),
+ "_MAX_VALUES":
BinaryRowSerializer.to_bytes(entry.file.value_stats.max_values),
+ "_NULL_COUNTS": entry.file.value_stats.null_counts,
+ },
+ "_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number,
+ "_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number,
+ "_SCHEMA_ID": entry.file.schema_id,
+ "_LEVEL": entry.file.level,
+ "_EXTRA_FILES": entry.file.extra_files,
+ "_CREATION_TIME": entry.file.creation_time,
+ "_DELETE_ROW_COUNT": entry.file.delete_row_count,
+ "_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
+ "_FILE_SOURCE": entry.file.file_source,
}
- avro_records.append(avro_record)
+ }
+ avro_records.append(avro_record)
manifest_path = self.manifest_path / file_name
try:
diff --git a/paimon-python/pypaimon/read/plan.py
b/paimon-python/pypaimon/read/plan.py
index c3aeaa8a54..9a65fd6f12 100644
--- a/paimon-python/pypaimon/read/plan.py
+++ b/paimon-python/pypaimon/read/plan.py
@@ -19,13 +19,18 @@
from dataclasses import dataclass
from typing import List
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.split import Split
@dataclass
class Plan:
"""Implementation of Plan for native Python reading."""
+ _files: List[ManifestEntry]
_splits: List[Split]
+ def files(self) -> List[ManifestEntry]:
+ return self._files
+
def splits(self) -> List[Split]:
return self._splits
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index a4d92bb796..0b9f97db4f 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -73,16 +73,25 @@ class TableScan:
def plan(self) -> Plan:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
if not latest_snapshot:
- return Plan([])
+ return Plan([], [])
manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
- file_entries = []
+ deleted_entries = set()
+ added_entries = []
+ # TODO: filter manifest files by predicate
for manifest_file in manifest_files:
manifest_entries =
self.manifest_file_manager.read(manifest_file.file_name,
lambda row:
self._bucket_filter(row))
for entry in manifest_entries:
if entry.kind == 0:
- file_entries.append(entry)
+ added_entries.append(entry)
+ else:
+ deleted_entries.add((tuple(entry.partition.values),
entry.bucket, entry.file.file_name))
+
+ file_entries = [
+ entry for entry in added_entries
+ if (tuple(entry.partition.values), entry.bucket,
entry.file.file_name) not in deleted_entries
+ ]
if self.predicate:
file_entries = self._filter_by_predicate(file_entries)
@@ -100,7 +109,7 @@ class TableScan:
splits = self._apply_push_down_limit(splits)
- return Plan(splits)
+ return Plan(file_entries, splits)
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) ->
'TableScan':
self.idx_of_this_subtask = idx_of_this_subtask
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index 575bf535e3..7ae847c9f3 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -32,6 +32,87 @@ from pypaimon.tests.rest_catalog_base_test import
RESTCatalogBaseTest
class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
+ def test_overwrite(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
partition_keys=['f0'],
+
options={'dynamic-partition-overwrite': 'false'})
+ self.rest_catalog.create_table('default.test_overwrite', schema, False)
+ table = self.rest_catalog.get_table('default.test_overwrite')
+ read_builder = table.new_read_builder()
+
+ # test normal write
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df0 = pd.DataFrame({
+ 'f0': [1, 2],
+ 'f1': ['apple', 'banana'],
+ })
+
+ table_write.write_pandas(df0)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df0 =
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+ df0['f0'] = df0['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
+
+ # test partially overwrite
+ write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df1 = pd.DataFrame({
+ 'f0': [1],
+ 'f1': ['watermelon'],
+ })
+
+ table_write.write_pandas(df1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df1 =
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+ expected_df1 = pd.DataFrame({
+ 'f0': [1, 2],
+ 'f1': ['watermelon', 'banana']
+ })
+ expected_df1['f0'] = expected_df1['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df1.reset_index(drop=True),
expected_df1.reset_index(drop=True))
+
+ # test fully overwrite
+ write_builder = table.new_batch_write_builder().overwrite()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df2 = pd.DataFrame({
+ 'f0': [3],
+ 'f1': ['Neo'],
+ })
+
+ table_write.write_pandas(df2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df2 = table_read.to_pandas(table_scan.plan().splits())
+ df2['f0'] = df2['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+
def testParquetAppendOnlyReader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py
b/paimon-python/pypaimon/tests/reader_basic_test.py
index 4cfb4cf2ac..445a65763f 100644
--- a/paimon-python/pypaimon/tests/reader_basic_test.py
+++ b/paimon-python/pypaimon/tests/reader_basic_test.py
@@ -68,8 +68,85 @@ class ReaderBasicTest(unittest.TestCase):
shutil.rmtree(cls.tempdir, ignore_errors=True)
def test_overwrite(self):
- pass
- # TODO: support overwrite
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
partition_keys=['f0'],
+
options={'dynamic-partition-overwrite': 'false'})
+ self.catalog.create_table('default.test_overwrite', schema, False)
+ table = self.catalog.get_table('default.test_overwrite')
+ read_builder = table.new_read_builder()
+
+ # test normal write
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df0 = pd.DataFrame({
+ 'f0': [1, 2],
+ 'f1': ['apple', 'banana'],
+ })
+
+ table_write.write_pandas(df0)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df0 =
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+ df0['f0'] = df0['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
+
+ # test partially overwrite
+ write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df1 = pd.DataFrame({
+ 'f0': [1],
+ 'f1': ['watermelon'],
+ })
+
+ table_write.write_pandas(df1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df1 =
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+ expected_df1 = pd.DataFrame({
+ 'f0': [1, 2],
+ 'f1': ['watermelon', 'banana']
+ })
+ expected_df1['f0'] = expected_df1['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df1.reset_index(drop=True),
expected_df1.reset_index(drop=True))
+
+ # test fully overwrite
+ write_builder = table.new_batch_write_builder().overwrite()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df2 = pd.DataFrame({
+ 'f0': [3],
+ 'f1': ['Neo'],
+ })
+
+ table_write.write_pandas(df2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df2 = table_read.to_pandas(table_scan.plan().splits())
+ df2['f0'] = df2['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
def testWriteWrongSchema(self):
self.catalog.create_table('default.test_wrong_schema',
diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
index 0608c4ef58..efa0839a33 100644
--- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
@@ -32,6 +32,87 @@ from pypaimon.tests.rest_catalog_base_test import
RESTCatalogBaseTest
class RESTTableReadWriteTest(RESTCatalogBaseTest):
+ def test_overwrite(self):
+ simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(simple_pa_schema,
partition_keys=['f0'],
+
options={'dynamic-partition-overwrite': 'false'})
+ self.rest_catalog.create_table('default.test_overwrite', schema, False)
+ table = self.rest_catalog.get_table('default.test_overwrite')
+ read_builder = table.new_read_builder()
+
+ # test normal write
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df0 = pd.DataFrame({
+ 'f0': [1, 2],
+ 'f1': ['apple', 'banana'],
+ })
+
+ table_write.write_pandas(df0)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df0 =
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+ df0['f0'] = df0['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
+
+ # test partially overwrite
+ write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df1 = pd.DataFrame({
+ 'f0': [1],
+ 'f1': ['watermelon'],
+ })
+
+ table_write.write_pandas(df1)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df1 =
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+ expected_df1 = pd.DataFrame({
+ 'f0': [1, 2],
+ 'f1': ['watermelon', 'banana']
+ })
+ expected_df1['f0'] = expected_df1['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df1.reset_index(drop=True),
expected_df1.reset_index(drop=True))
+
+ # test fully overwrite
+ write_builder = table.new_batch_write_builder().overwrite()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ df2 = pd.DataFrame({
+ 'f0': [3],
+ 'f1': ['Neo'],
+ })
+
+ table_write.write_pandas(df2)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ actual_df2 = table_read.to_pandas(table_scan.plan().splits())
+ df2['f0'] = df2['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+
def testParquetAppendOnlyReader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
diff --git a/paimon-python/pypaimon/tests/test_file_store_commit.py
b/paimon-python/pypaimon/tests/test_file_store_commit.py
index 6f32894c90..5110bc0f80 100644
--- a/paimon-python/pypaimon/tests/test_file_store_commit.py
+++ b/paimon-python/pypaimon/tests/test_file_store_commit.py
@@ -22,7 +22,9 @@ from pathlib import Path
from unittest.mock import Mock, patch
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.table.row.binary_row import BinaryRow
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_commit import FileStoreCommit
@@ -84,7 +86,7 @@ class TestFileStoreCommit(unittest.TestCase):
)
# Test method
- statistics =
file_store_commit._generate_partition_statistics([commit_message])
+ statistics =
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
# Verify results
self.assertEqual(len(statistics), 1)
@@ -145,7 +147,7 @@ class TestFileStoreCommit(unittest.TestCase):
)
# Test method
- statistics =
file_store_commit._generate_partition_statistics([commit_message])
+ statistics =
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
# Verify results
self.assertEqual(len(statistics), 1)
@@ -213,7 +215,8 @@ class TestFileStoreCommit(unittest.TestCase):
)
# Test method
- statistics =
file_store_commit._generate_partition_statistics([commit_message_1,
commit_message_2])
+ statistics = file_store_commit._generate_partition_statistics(
+ self._to_entries([commit_message_1, commit_message_2]))
# Verify results
self.assertEqual(len(statistics), 2)
@@ -268,7 +271,7 @@ class TestFileStoreCommit(unittest.TestCase):
)
# Test method
- statistics =
file_store_commit._generate_partition_statistics([commit_message])
+ statistics =
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
# Verify results
self.assertEqual(len(statistics), 1)
@@ -308,7 +311,7 @@ class TestFileStoreCommit(unittest.TestCase):
)
# Test method
- statistics =
file_store_commit._generate_partition_statistics([commit_message])
+ statistics =
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
# Verify results
self.assertEqual(len(statistics), 1)
@@ -347,7 +350,7 @@ class TestFileStoreCommit(unittest.TestCase):
)
# Test method
- statistics =
file_store_commit._generate_partition_statistics([commit_message])
+ statistics =
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
# Verify results - should fallback to index-based naming
self.assertEqual(len(statistics), 1)
@@ -372,29 +375,20 @@ class TestFileStoreCommit(unittest.TestCase):
# Verify results
self.assertEqual(len(statistics), 0)
- def test_generate_partition_statistics_commit_message_no_files(
- self, mock_manifest_list_manager, mock_manifest_file_manager,
mock_snapshot_manager):
- """Test partition statistics generation with commit message containing
no files."""
- # Create FileStoreCommit instance
- file_store_commit = self._create_file_store_commit()
-
- commit_message = CommitMessage(
- partition=('2024-01-15', 'us-east-1'),
- bucket=0,
- new_files=[] # No files
- )
-
- # Test method
- statistics =
file_store_commit._generate_partition_statistics([commit_message])
-
- # Verify results - should still create a partition entry with zero
counts
- self.assertEqual(len(statistics), 1)
-
- stat = statistics[0]
- self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region':
'us-east-1'})
- self.assertEqual(stat.record_count, 0)
- self.assertEqual(stat.file_count, 0)
- self.assertEqual(stat.file_size_in_bytes, 0)
+ @staticmethod
+ def _to_entries(commit_messages):
+ commit_entries = []
+ for msg in commit_messages:
+ partition = BinaryRow(list(msg.partition), None)
+ for file in msg.new_files:
+ commit_entries.append(ManifestEntry(
+ kind=0,
+ partition=partition,
+ bucket=msg.bucket,
+ total_buckets=None,
+ file=file
+ ))
+ return commit_entries
if __name__ == '__main__':
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py
b/paimon-python/pypaimon/write/batch_table_commit.py
index 55d8de0905..7f42e1cef1 100644
--- a/paimon-python/pypaimon/write/batch_table_commit.py
+++ b/paimon-python/pypaimon/write/batch_table_commit.py
@@ -53,7 +53,7 @@ class BatchTableCommit:
try:
if self.overwrite_partition is not None:
self.file_store_commit.overwrite(
- partition=self.overwrite_partition,
+ overwrite_partition=self.overwrite_partition,
commit_messages=non_empty_messages,
commit_identifier=commit_identifier
)
diff --git a/paimon-python/pypaimon/write/batch_write_builder.py
b/paimon-python/pypaimon/write/batch_write_builder.py
index 0a274cfee0..2380530fbc 100644
--- a/paimon-python/pypaimon/write/batch_write_builder.py
+++ b/paimon-python/pypaimon/write/batch_write_builder.py
@@ -33,7 +33,7 @@ class BatchWriteBuilder:
self.static_partition = None
def overwrite(self, static_partition: Optional[dict] = None):
- self.static_partition = static_partition
+ self.static_partition = static_partition if static_partition is not
None else {}
return self
def new_write(self) -> BatchTableWrite:
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index efe8207ebe..ed9f5d16fc 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -21,15 +21,19 @@ import uuid
from pathlib import Path
from typing import List
+from pypaimon.common.predicate_builder import PredicateBuilder
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.read.table_scan import TableScan
from pypaimon.snapshot.snapshot import Snapshot
from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
SnapshotCommit)
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.binary_row import BinaryRow
+from pypaimon.table.row.offset_row import OffsetRow
from pypaimon.write.commit_message import CommitMessage
@@ -60,15 +64,83 @@ class FileStoreCommit:
if not commit_messages:
return
+ commit_entries = []
+ for msg in commit_messages:
+ partition = BinaryRow(list(msg.partition),
self.table.table_schema.get_partition_key_fields())
+ for file in msg.new_files:
+ commit_entries.append(ManifestEntry(
+ kind=0,
+ partition=partition,
+ bucket=msg.bucket,
+ total_buckets=self.table.total_buckets,
+ file=file
+ ))
+
+ self._try_commit(commit_kind="APPEND",
+ commit_entries=commit_entries,
+ commit_identifier=commit_identifier)
+
+ def overwrite(self, overwrite_partition, commit_messages:
List[CommitMessage], commit_identifier: int):
+ """Commit the given commit messages in overwrite mode."""
+ if not commit_messages:
+ return
+
+ partition_filter = None
+ # sanity check, all changes must be done within the given partition,
meanwhile build a partition filter
+ if len(overwrite_partition) > 0:
+ predicate_builder =
PredicateBuilder(self.table.table_schema.get_partition_key_fields())
+ sub_predicates = []
+ for key, value in overwrite_partition.items():
+ sub_predicates.append(predicate_builder.equal(key, value))
+ partition_filter = predicate_builder.and_predicates(sub_predicates)
+
+ for msg in commit_messages:
+ row = OffsetRow(msg.partition, 0, len(msg.partition))
+ if not partition_filter.test(row):
+ raise RuntimeError(f"Trying to overwrite partition
{overwrite_partition}, but the changes "
+ f"in {msg.partition} does not belong to
this partition")
+
+ commit_entries = []
+ current_entries = TableScan(self.table, partition_filter, None,
[]).plan().files()
+ for entry in current_entries:
+ entry.kind = 1
+ commit_entries.append(entry)
+ for msg in commit_messages:
+ partition = BinaryRow(list(msg.partition),
self.table.table_schema.get_partition_key_fields())
+ for file in msg.new_files:
+ commit_entries.append(ManifestEntry(
+ kind=0,
+ partition=partition,
+ bucket=msg.bucket,
+ total_buckets=self.table.total_buckets,
+ file=file
+ ))
+
+ self._try_commit(commit_kind="OVERWRITE",
+ commit_entries=commit_entries,
+ commit_identifier=commit_identifier)
+
+ def _try_commit(self, commit_kind, commit_entries, commit_identifier):
unique_id = uuid.uuid4()
base_manifest_list = f"manifest-list-{unique_id}-0"
delta_manifest_list = f"manifest-list-{unique_id}-1"
# process new_manifest
new_manifest_file = f"manifest-{str(uuid.uuid4())}-0"
- self.manifest_file_manager.write(new_manifest_file, commit_messages)
+ added_file_count = 0
+ deleted_file_count = 0
+ delta_record_count = 0
+ for entry in commit_entries:
+ if entry.kind == 0:
+ added_file_count += 1
+ delta_record_count += entry.file.row_count
+ else:
+ deleted_file_count += 1
+ delta_record_count -= entry.file.row_count
+ self.manifest_file_manager.write(new_manifest_file, commit_entries)
+ # TODO: implement noConflictsOrFail logic
- partition_columns = list(zip(*(msg.partition for msg in
commit_messages)))
+ partition_columns = list(zip(*(entry.partition.values for entry in
commit_entries)))
partition_min_stats = [min(col) for col in partition_columns]
partition_max_stats = [max(col) for col in partition_columns]
partition_null_counts = [sum(value == 0 for value in col) for col in
partition_columns]
@@ -78,8 +150,8 @@ class FileStoreCommit:
new_manifest_list = ManifestFileMeta(
file_name=new_manifest_file,
file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path
/ new_manifest_file),
- num_added_files=sum(len(msg.new_files) for msg in commit_messages),
- num_deleted_files=0,
+ num_added_files=added_file_count,
+ num_deleted_files=deleted_file_count,
partition_stats=SimpleStats(
min_values=BinaryRow(
values=partition_min_stats,
@@ -109,8 +181,7 @@ class FileStoreCommit:
# process snapshot
new_snapshot_id = self._generate_snapshot_id()
- record_count_add = self._generate_record_count_add(commit_messages)
- total_record_count += record_count_add
+ total_record_count += delta_record_count
snapshot_data = Snapshot(
version=1,
id=new_snapshot_id,
@@ -118,15 +189,15 @@ class FileStoreCommit:
base_manifest_list=base_manifest_list,
delta_manifest_list=delta_manifest_list,
total_record_count=total_record_count,
- delta_record_count=record_count_add,
+ delta_record_count=delta_record_count,
commit_user=self.commit_user,
commit_identifier=commit_identifier,
- commit_kind="APPEND",
+ commit_kind=commit_kind,
time_millis=int(time.time() * 1000),
)
# Generate partition statistics for the commit
- statistics = self._generate_partition_statistics(commit_messages)
+ statistics = self._generate_partition_statistics(commit_entries)
# Use SnapshotCommit for atomic commit
with self.snapshot_commit:
@@ -134,10 +205,6 @@ class FileStoreCommit:
if not success:
raise RuntimeError(f"Failed to commit snapshot
{new_snapshot_id}")
- def overwrite(self, partition, commit_messages: List[CommitMessage],
commit_identifier: int):
- """Commit the given commit messages in overwrite mode."""
- raise RuntimeError("overwrite unsupported yet")
-
def abort(self, commit_messages: List[CommitMessage]):
for message in commit_messages:
for file in message.new_files:
@@ -161,25 +228,25 @@ class FileStoreCommit:
else:
return 1
- def _generate_partition_statistics(self, commit_messages:
List[CommitMessage]) -> List[PartitionStatistics]:
+ def _generate_partition_statistics(self, commit_entries:
List[ManifestEntry]) -> List[PartitionStatistics]:
"""
- Generate partition statistics from commit messages.
+ Generate partition statistics from commit entries.
This method follows the Java implementation pattern from
org.apache.paimon.manifest.PartitionEntry.fromManifestEntry() and
PartitionEntry.merge() methods.
Args:
- commit_messages: List of commit messages to analyze
+ commit_entries: List of commit entries to analyze
Returns:
List of PartitionStatistics for each unique partition
"""
partition_stats = {}
- for message in commit_messages:
+ for entry in commit_entries:
# Convert partition tuple to dictionary for PartitionStatistics
- partition_value = message.partition # Call the method to get
partition value
+ partition_value = tuple(entry.partition.values) # Call the method
to get partition value
if partition_value:
# Assuming partition is a tuple and we need to convert it to a
dict
# This may need adjustment based on actual partition format
@@ -211,30 +278,29 @@ class FileStoreCommit:
'last_file_creation_time': 0
}
- # Process each file in the commit message
# Following Java implementation: PartitionEntry.fromDataFile()
- for file_meta in message.new_files:
- # Extract actual file metadata (following Java DataFileMeta
pattern)
- record_count = file_meta.row_count
- file_size_in_bytes = file_meta.file_size
- file_count = 1
-
- # Convert creation_time to milliseconds (Java uses epoch
millis)
- if file_meta.creation_time:
- file_creation_time =
int(file_meta.creation_time.timestamp() * 1000)
- else:
- file_creation_time = int(time.time() * 1000)
+ file_meta = entry.file
+ # Extract actual file metadata (following Java DataFileMeta
pattern)
+ record_count = file_meta.row_count
+ file_size_in_bytes = file_meta.file_size
+ file_count = 1
+
+ # Convert creation_time to milliseconds (Java uses epoch millis)
+ if file_meta.creation_time:
+ file_creation_time = int(file_meta.creation_time.timestamp() *
1000)
+ else:
+ file_creation_time = int(time.time() * 1000)
- # Accumulate statistics (following Java PartitionEntry.merge()
logic)
- partition_stats[partition_key]['record_count'] += record_count
- partition_stats[partition_key]['file_size_in_bytes'] +=
file_size_in_bytes
- partition_stats[partition_key]['file_count'] += file_count
+ # Accumulate statistics (following Java PartitionEntry.merge()
logic)
+ partition_stats[partition_key]['record_count'] += record_count
+ partition_stats[partition_key]['file_size_in_bytes'] +=
file_size_in_bytes
+ partition_stats[partition_key]['file_count'] += file_count
- # Keep the latest creation time
- partition_stats[partition_key]['last_file_creation_time'] =
max(
- partition_stats[partition_key]['last_file_creation_time'],
- file_creation_time
- )
+ # Keep the latest creation time
+ partition_stats[partition_key]['last_file_creation_time'] = max(
+ partition_stats[partition_key]['last_file_creation_time'],
+ file_creation_time
+ )
# Convert to PartitionStatistics objects
# Following Java PartitionEntry.toPartitionStatistics() pattern
@@ -248,25 +314,3 @@ class FileStoreCommit:
)
for stats in partition_stats.values()
]
-
- def _generate_record_count_add(self, commit_messages: List[CommitMessage])
-> int:
- """
- Generate record count add from commit messages.
-
- This method follows the Java implementation pattern from
- org.apache.paimon.manifest.ManifestEntry.recordCountAdd().
-
- Args:
- commit_messages: List of commit messages to analyze
-
- Returns:
- Count of add record
- """
- record_count = 0
-
- for message in commit_messages:
- new_files = message.new_files
- for file_meta in new_files:
- record_count += file_meta.row_count
-
- return record_count