This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new faba3a15ca [Python] Support overwrite mode for writer (#6186)
faba3a15ca is described below

commit faba3a15ca9e840c18ea2082e9a0202aa73caf33
Author: ChengHui Chen <[email protected]>
AuthorDate: Fri Sep 5 22:26:10 2025 +0800

    [Python] Support overwrite mode for writer (#6186)
---
 .../pypaimon/manifest/manifest_file_manager.py     |  76 +++++-----
 paimon-python/pypaimon/read/plan.py                |   5 +
 paimon-python/pypaimon/read/table_scan.py          |  17 ++-
 .../pypaimon/tests/py36/ao_read_write_test.py      |  81 ++++++++++
 paimon-python/pypaimon/tests/reader_basic_test.py  |  81 +++++++++-
 .../pypaimon/tests/rest_table_read_write_test.py   |  81 ++++++++++
 .../pypaimon/tests/test_file_store_commit.py       |  52 +++----
 paimon-python/pypaimon/write/batch_table_commit.py |   2 +-
 .../pypaimon/write/batch_write_builder.py          |   2 +-
 paimon-python/pypaimon/write/file_store_commit.py  | 166 +++++++++++++--------
 10 files changed, 425 insertions(+), 138 deletions(-)

diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 7c97f7b0ca..b2cd7868bb 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -24,9 +24,8 @@ from pypaimon.manifest.schema.data_file_meta import 
DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
 from pypaimon.manifest.schema.simple_stats import SimpleStats
-from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer,
+from pypaimon.table.row.binary_row import (BinaryRowDeserializer,
                                            BinaryRowSerializer)
-from pypaimon.write.commit_message import CommitMessage
 
 
 class ManifestFileManager:
@@ -99,46 +98,43 @@ class ManifestFileManager:
             entries.append(entry)
         return entries
 
-    def write(self, file_name, commit_messages: List[CommitMessage]):
+    def write(self, file_name, entries: List[ManifestEntry]):
         avro_records = []
-        for message in commit_messages:
-            partition_bytes = BinaryRowSerializer.to_bytes(
-                BinaryRow(list(message.partition), 
self.table.table_schema.get_partition_key_fields()))
-            for file in message.new_files:
-                avro_record = {
-                    "_VERSION": 2,
-                    "_KIND": 0,
-                    "_PARTITION": partition_bytes,
-                    "_BUCKET": message.bucket,
-                    "_TOTAL_BUCKETS": self.table.total_buckets,
-                    "_FILE": {
-                        "_FILE_NAME": file.file_name,
-                        "_FILE_SIZE": file.file_size,
-                        "_ROW_COUNT": file.row_count,
-                        "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
-                        "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
-                        "_KEY_STATS": {
-                            "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(file.key_stats.min_values),
-                            "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(file.key_stats.max_values),
-                            "_NULL_COUNTS": file.key_stats.null_counts,
-                        },
-                        "_VALUE_STATS": {
-                            "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(file.value_stats.min_values),
-                            "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(file.value_stats.max_values),
-                            "_NULL_COUNTS": file.value_stats.null_counts,
-                        },
-                        "_MIN_SEQUENCE_NUMBER": file.min_sequence_number,
-                        "_MAX_SEQUENCE_NUMBER": file.max_sequence_number,
-                        "_SCHEMA_ID": file.schema_id,
-                        "_LEVEL": file.level,
-                        "_EXTRA_FILES": file.extra_files,
-                        "_CREATION_TIME": file.creation_time,
-                        "_DELETE_ROW_COUNT": file.delete_row_count,
-                        "_EMBEDDED_FILE_INDEX": file.embedded_index,
-                        "_FILE_SOURCE": file.file_source,
-                    }
+        for entry in entries:
+            avro_record = {
+                "_VERSION": 2,
+                "_KIND": entry.kind,
+                "_PARTITION": BinaryRowSerializer.to_bytes(entry.partition),
+                "_BUCKET": entry.bucket,
+                "_TOTAL_BUCKETS": entry.bucket,
+                "_FILE": {
+                    "_FILE_NAME": entry.file.file_name,
+                    "_FILE_SIZE": entry.file.file_size,
+                    "_ROW_COUNT": entry.file.row_count,
+                    "_MIN_KEY": 
BinaryRowSerializer.to_bytes(entry.file.min_key),
+                    "_MAX_KEY": 
BinaryRowSerializer.to_bytes(entry.file.max_key),
+                    "_KEY_STATS": {
+                        "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(entry.file.key_stats.min_values),
+                        "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(entry.file.key_stats.max_values),
+                        "_NULL_COUNTS": entry.file.key_stats.null_counts,
+                    },
+                    "_VALUE_STATS": {
+                        "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(entry.file.value_stats.min_values),
+                        "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(entry.file.value_stats.max_values),
+                        "_NULL_COUNTS": entry.file.value_stats.null_counts,
+                    },
+                    "_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number,
+                    "_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number,
+                    "_SCHEMA_ID": entry.file.schema_id,
+                    "_LEVEL": entry.file.level,
+                    "_EXTRA_FILES": entry.file.extra_files,
+                    "_CREATION_TIME": entry.file.creation_time,
+                    "_DELETE_ROW_COUNT": entry.file.delete_row_count,
+                    "_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
+                    "_FILE_SOURCE": entry.file.file_source,
                 }
-                avro_records.append(avro_record)
+            }
+            avro_records.append(avro_record)
 
         manifest_path = self.manifest_path / file_name
         try:
diff --git a/paimon-python/pypaimon/read/plan.py 
b/paimon-python/pypaimon/read/plan.py
index c3aeaa8a54..9a65fd6f12 100644
--- a/paimon-python/pypaimon/read/plan.py
+++ b/paimon-python/pypaimon/read/plan.py
@@ -19,13 +19,18 @@
 from dataclasses import dataclass
 from typing import List
 
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.read.split import Split
 
 
 @dataclass
 class Plan:
     """Implementation of Plan for native Python reading."""
+    _files: List[ManifestEntry]
     _splits: List[Split]
 
+    def files(self) -> List[ManifestEntry]:
+        return self._files
+
     def splits(self) -> List[Split]:
         return self._splits
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index a4d92bb796..0b9f97db4f 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -73,16 +73,25 @@ class TableScan:
     def plan(self) -> Plan:
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
         if not latest_snapshot:
-            return Plan([])
+            return Plan([], [])
         manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
 
-        file_entries = []
+        deleted_entries = set()
+        added_entries = []
+        # TODO: filter manifest files by predicate
         for manifest_file in manifest_files:
             manifest_entries = 
self.manifest_file_manager.read(manifest_file.file_name,
                                                                lambda row: 
self._bucket_filter(row))
             for entry in manifest_entries:
                 if entry.kind == 0:
-                    file_entries.append(entry)
+                    added_entries.append(entry)
+                else:
+                    deleted_entries.add((tuple(entry.partition.values), 
entry.bucket, entry.file.file_name))
+
+        file_entries = [
+            entry for entry in added_entries
+            if (tuple(entry.partition.values), entry.bucket, 
entry.file.file_name) not in deleted_entries
+        ]
 
         if self.predicate:
             file_entries = self._filter_by_predicate(file_entries)
@@ -100,7 +109,7 @@ class TableScan:
 
         splits = self._apply_push_down_limit(splits)
 
-        return Plan(splits)
+        return Plan(file_entries, splits)
 
     def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 
'TableScan':
         self.idx_of_this_subtask = idx_of_this_subtask
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index 575bf535e3..7ae847c9f3 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -32,6 +32,87 @@ from pypaimon.tests.rest_catalog_base_test import 
RESTCatalogBaseTest
 
 class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
 
+    def test_overwrite(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(simple_pa_schema, 
partition_keys=['f0'],
+                                            
options={'dynamic-partition-overwrite': 'false'})
+        self.rest_catalog.create_table('default.test_overwrite', schema, False)
+        table = self.rest_catalog.get_table('default.test_overwrite')
+        read_builder = table.new_read_builder()
+
+        # test normal write
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df0 = pd.DataFrame({
+            'f0': [1, 2],
+            'f1': ['apple', 'banana'],
+        })
+
+        table_write.write_pandas(df0)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df0 = 
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+        df0['f0'] = df0['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
+
+        # test partially overwrite
+        write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df1 = pd.DataFrame({
+            'f0': [1],
+            'f1': ['watermelon'],
+        })
+
+        table_write.write_pandas(df1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df1 = 
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+        expected_df1 = pd.DataFrame({
+            'f0': [1, 2],
+            'f1': ['watermelon', 'banana']
+        })
+        expected_df1['f0'] = expected_df1['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df1.reset_index(drop=True), 
expected_df1.reset_index(drop=True))
+
+        # test fully overwrite
+        write_builder = table.new_batch_write_builder().overwrite()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df2 = pd.DataFrame({
+            'f0': [3],
+            'f1': ['Neo'],
+        })
+
+        table_write.write_pandas(df2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df2 = table_read.to_pandas(table_scan.plan().splits())
+        df2['f0'] = df2['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+
     def testParquetAppendOnlyReader(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         self.rest_catalog.create_table('default.test_append_only_parquet', 
schema, False)
diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py 
b/paimon-python/pypaimon/tests/reader_basic_test.py
index 4cfb4cf2ac..445a65763f 100644
--- a/paimon-python/pypaimon/tests/reader_basic_test.py
+++ b/paimon-python/pypaimon/tests/reader_basic_test.py
@@ -68,8 +68,85 @@ class ReaderBasicTest(unittest.TestCase):
         shutil.rmtree(cls.tempdir, ignore_errors=True)
 
     def test_overwrite(self):
-        pass
-        # TODO: support overwrite
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(simple_pa_schema, 
partition_keys=['f0'],
+                                            
options={'dynamic-partition-overwrite': 'false'})
+        self.catalog.create_table('default.test_overwrite', schema, False)
+        table = self.catalog.get_table('default.test_overwrite')
+        read_builder = table.new_read_builder()
+
+        # test normal write
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df0 = pd.DataFrame({
+            'f0': [1, 2],
+            'f1': ['apple', 'banana'],
+        })
+
+        table_write.write_pandas(df0)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df0 = 
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+        df0['f0'] = df0['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
+
+        # test partially overwrite
+        write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df1 = pd.DataFrame({
+            'f0': [1],
+            'f1': ['watermelon'],
+        })
+
+        table_write.write_pandas(df1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df1 = 
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+        expected_df1 = pd.DataFrame({
+            'f0': [1, 2],
+            'f1': ['watermelon', 'banana']
+        })
+        expected_df1['f0'] = expected_df1['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df1.reset_index(drop=True), 
expected_df1.reset_index(drop=True))
+
+        # test fully overwrite
+        write_builder = table.new_batch_write_builder().overwrite()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df2 = pd.DataFrame({
+            'f0': [3],
+            'f1': ['Neo'],
+        })
+
+        table_write.write_pandas(df2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df2 = table_read.to_pandas(table_scan.plan().splits())
+        df2['f0'] = df2['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
 
     def testWriteWrongSchema(self):
         self.catalog.create_table('default.test_wrong_schema',
diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py 
b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
index 0608c4ef58..efa0839a33 100644
--- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
@@ -32,6 +32,87 @@ from pypaimon.tests.rest_catalog_base_test import 
RESTCatalogBaseTest
 
 class RESTTableReadWriteTest(RESTCatalogBaseTest):
 
+    def test_overwrite(self):
+        simple_pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(simple_pa_schema, 
partition_keys=['f0'],
+                                            
options={'dynamic-partition-overwrite': 'false'})
+        self.rest_catalog.create_table('default.test_overwrite', schema, False)
+        table = self.rest_catalog.get_table('default.test_overwrite')
+        read_builder = table.new_read_builder()
+
+        # test normal write
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df0 = pd.DataFrame({
+            'f0': [1, 2],
+            'f1': ['apple', 'banana'],
+        })
+
+        table_write.write_pandas(df0)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df0 = 
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+        df0['f0'] = df0['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
+
+        # test partially overwrite
+        write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df1 = pd.DataFrame({
+            'f0': [1],
+            'f1': ['watermelon'],
+        })
+
+        table_write.write_pandas(df1)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df1 = 
table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
+        expected_df1 = pd.DataFrame({
+            'f0': [1, 2],
+            'f1': ['watermelon', 'banana']
+        })
+        expected_df1['f0'] = expected_df1['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df1.reset_index(drop=True), 
expected_df1.reset_index(drop=True))
+
+        # test fully overwrite
+        write_builder = table.new_batch_write_builder().overwrite()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+
+        df2 = pd.DataFrame({
+            'f0': [3],
+            'f1': ['Neo'],
+        })
+
+        table_write.write_pandas(df2)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
+
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_df2 = table_read.to_pandas(table_scan.plan().splits())
+        df2['f0'] = df2['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+
     def testParquetAppendOnlyReader(self):
         schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
         self.rest_catalog.create_table('default.test_append_only_parquet', 
schema, False)
diff --git a/paimon-python/pypaimon/tests/test_file_store_commit.py 
b/paimon-python/pypaimon/tests/test_file_store_commit.py
index 6f32894c90..5110bc0f80 100644
--- a/paimon-python/pypaimon/tests/test_file_store_commit.py
+++ b/paimon-python/pypaimon/tests/test_file_store_commit.py
@@ -22,7 +22,9 @@ from pathlib import Path
 from unittest.mock import Mock, patch
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.snapshot.snapshot_commit import PartitionStatistics
+from pypaimon.table.row.binary_row import BinaryRow
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_commit import FileStoreCommit
 
@@ -84,7 +86,7 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
         # Test method
-        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+        statistics = 
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
 
         # Verify results
         self.assertEqual(len(statistics), 1)
@@ -145,7 +147,7 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
         # Test method
-        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+        statistics = 
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
 
         # Verify results
         self.assertEqual(len(statistics), 1)
@@ -213,7 +215,8 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
         # Test method
-        statistics = 
file_store_commit._generate_partition_statistics([commit_message_1, 
commit_message_2])
+        statistics = file_store_commit._generate_partition_statistics(
+            self._to_entries([commit_message_1, commit_message_2]))
 
         # Verify results
         self.assertEqual(len(statistics), 2)
@@ -268,7 +271,7 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
         # Test method
-        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+        statistics = 
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
 
         # Verify results
         self.assertEqual(len(statistics), 1)
@@ -308,7 +311,7 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
         # Test method
-        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+        statistics = 
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
 
         # Verify results
         self.assertEqual(len(statistics), 1)
@@ -347,7 +350,7 @@ class TestFileStoreCommit(unittest.TestCase):
         )
 
         # Test method
-        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
+        statistics = 
file_store_commit._generate_partition_statistics(self._to_entries([commit_message]))
 
         # Verify results - should fallback to index-based naming
         self.assertEqual(len(statistics), 1)
@@ -372,29 +375,20 @@ class TestFileStoreCommit(unittest.TestCase):
         # Verify results
         self.assertEqual(len(statistics), 0)
 
-    def test_generate_partition_statistics_commit_message_no_files(
-            self, mock_manifest_list_manager, mock_manifest_file_manager, 
mock_snapshot_manager):
-        """Test partition statistics generation with commit message containing 
no files."""
-        # Create FileStoreCommit instance
-        file_store_commit = self._create_file_store_commit()
-
-        commit_message = CommitMessage(
-            partition=('2024-01-15', 'us-east-1'),
-            bucket=0,
-            new_files=[]  # No files
-        )
-
-        # Test method
-        statistics = 
file_store_commit._generate_partition_statistics([commit_message])
-
-        # Verify results - should still create a partition entry with zero 
counts
-        self.assertEqual(len(statistics), 1)
-
-        stat = statistics[0]
-        self.assertEqual(stat.spec, {'dt': '2024-01-15', 'region': 
'us-east-1'})
-        self.assertEqual(stat.record_count, 0)
-        self.assertEqual(stat.file_count, 0)
-        self.assertEqual(stat.file_size_in_bytes, 0)
+    @staticmethod
+    def _to_entries(commit_messages):
+        commit_entries = []
+        for msg in commit_messages:
+            partition = BinaryRow(list(msg.partition), None)
+            for file in msg.new_files:
+                commit_entries.append(ManifestEntry(
+                    kind=0,
+                    partition=partition,
+                    bucket=msg.bucket,
+                    total_buckets=None,
+                    file=file
+                ))
+        return commit_entries
 
 
 if __name__ == '__main__':
diff --git a/paimon-python/pypaimon/write/batch_table_commit.py 
b/paimon-python/pypaimon/write/batch_table_commit.py
index 55d8de0905..7f42e1cef1 100644
--- a/paimon-python/pypaimon/write/batch_table_commit.py
+++ b/paimon-python/pypaimon/write/batch_table_commit.py
@@ -53,7 +53,7 @@ class BatchTableCommit:
         try:
             if self.overwrite_partition is not None:
                 self.file_store_commit.overwrite(
-                    partition=self.overwrite_partition,
+                    overwrite_partition=self.overwrite_partition,
                     commit_messages=non_empty_messages,
                     commit_identifier=commit_identifier
                 )
diff --git a/paimon-python/pypaimon/write/batch_write_builder.py 
b/paimon-python/pypaimon/write/batch_write_builder.py
index 0a274cfee0..2380530fbc 100644
--- a/paimon-python/pypaimon/write/batch_write_builder.py
+++ b/paimon-python/pypaimon/write/batch_write_builder.py
@@ -33,7 +33,7 @@ class BatchWriteBuilder:
         self.static_partition = None
 
     def overwrite(self, static_partition: Optional[dict] = None):
-        self.static_partition = static_partition
+        self.static_partition = static_partition if static_partition is not 
None else {}
         return self
 
     def new_write(self) -> BatchTableWrite:
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index efe8207ebe..ed9f5d16fc 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -21,15 +21,19 @@ import uuid
 from pathlib import Path
 from typing import List
 
+from pypaimon.common.predicate_builder import PredicateBuilder
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
 from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.read.table_scan import TableScan
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
                                                SnapshotCommit)
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
 from pypaimon.table.row.binary_row import BinaryRow
+from pypaimon.table.row.offset_row import OffsetRow
 from pypaimon.write.commit_message import CommitMessage
 
 
@@ -60,15 +64,83 @@ class FileStoreCommit:
         if not commit_messages:
             return
 
+        commit_entries = []
+        for msg in commit_messages:
+            partition = BinaryRow(list(msg.partition), 
self.table.table_schema.get_partition_key_fields())
+            for file in msg.new_files:
+                commit_entries.append(ManifestEntry(
+                    kind=0,
+                    partition=partition,
+                    bucket=msg.bucket,
+                    total_buckets=self.table.total_buckets,
+                    file=file
+                ))
+
+        self._try_commit(commit_kind="APPEND",
+                         commit_entries=commit_entries,
+                         commit_identifier=commit_identifier)
+
+    def overwrite(self, overwrite_partition, commit_messages: 
List[CommitMessage], commit_identifier: int):
+        """Commit the given commit messages in overwrite mode."""
+        if not commit_messages:
+            return
+
+        partition_filter = None
+        # sanity check, all changes must be done within the given partition, 
meanwhile build a partition filter
+        if len(overwrite_partition) > 0:
+            predicate_builder = 
PredicateBuilder(self.table.table_schema.get_partition_key_fields())
+            sub_predicates = []
+            for key, value in overwrite_partition.items():
+                sub_predicates.append(predicate_builder.equal(key, value))
+            partition_filter = predicate_builder.and_predicates(sub_predicates)
+
+            for msg in commit_messages:
+                row = OffsetRow(msg.partition, 0, len(msg.partition))
+                if not partition_filter.test(row):
+                    raise RuntimeError(f"Trying to overwrite partition 
{overwrite_partition}, but the changes "
+                                       f"in {msg.partition} does not belong to 
this partition")
+
+        commit_entries = []
+        current_entries = TableScan(self.table, partition_filter, None, 
[]).plan().files()
+        for entry in current_entries:
+            entry.kind = 1
+            commit_entries.append(entry)
+        for msg in commit_messages:
+            partition = BinaryRow(list(msg.partition), 
self.table.table_schema.get_partition_key_fields())
+            for file in msg.new_files:
+                commit_entries.append(ManifestEntry(
+                    kind=0,
+                    partition=partition,
+                    bucket=msg.bucket,
+                    total_buckets=self.table.total_buckets,
+                    file=file
+                ))
+
+        self._try_commit(commit_kind="OVERWRITE",
+                         commit_entries=commit_entries,
+                         commit_identifier=commit_identifier)
+
+    def _try_commit(self, commit_kind, commit_entries, commit_identifier):
         unique_id = uuid.uuid4()
         base_manifest_list = f"manifest-list-{unique_id}-0"
         delta_manifest_list = f"manifest-list-{unique_id}-1"
 
         # process new_manifest
         new_manifest_file = f"manifest-{str(uuid.uuid4())}-0"
-        self.manifest_file_manager.write(new_manifest_file, commit_messages)
+        added_file_count = 0
+        deleted_file_count = 0
+        delta_record_count = 0
+        for entry in commit_entries:
+            if entry.kind == 0:
+                added_file_count += 1
+                delta_record_count += entry.file.row_count
+            else:
+                deleted_file_count += 1
+                delta_record_count -= entry.file.row_count
+        self.manifest_file_manager.write(new_manifest_file, commit_entries)
+        # TODO: implement noConflictsOrFail logic
 
-        partition_columns = list(zip(*(msg.partition for msg in 
commit_messages)))
+        partition_columns = list(zip(*(entry.partition.values for entry in 
commit_entries)))
         partition_min_stats = [min(col) for col in partition_columns]
         partition_max_stats = [max(col) for col in partition_columns]
         partition_null_counts = [sum(value == 0 for value in col) for col in 
partition_columns]
@@ -78,8 +150,8 @@ class FileStoreCommit:
         new_manifest_list = ManifestFileMeta(
             file_name=new_manifest_file,
             
file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path
 / new_manifest_file),
-            num_added_files=sum(len(msg.new_files) for msg in commit_messages),
-            num_deleted_files=0,
+            num_added_files=added_file_count,
+            num_deleted_files=deleted_file_count,
             partition_stats=SimpleStats(
                 min_values=BinaryRow(
                     values=partition_min_stats,
@@ -109,8 +181,7 @@ class FileStoreCommit:
 
         # process snapshot
         new_snapshot_id = self._generate_snapshot_id()
-        record_count_add = self._generate_record_count_add(commit_messages)
-        total_record_count += record_count_add
+        total_record_count += delta_record_count
         snapshot_data = Snapshot(
             version=1,
             id=new_snapshot_id,
@@ -118,15 +189,15 @@ class FileStoreCommit:
             base_manifest_list=base_manifest_list,
             delta_manifest_list=delta_manifest_list,
             total_record_count=total_record_count,
-            delta_record_count=record_count_add,
+            delta_record_count=delta_record_count,
             commit_user=self.commit_user,
             commit_identifier=commit_identifier,
-            commit_kind="APPEND",
+            commit_kind=commit_kind,
             time_millis=int(time.time() * 1000),
         )
 
         # Generate partition statistics for the commit
-        statistics = self._generate_partition_statistics(commit_messages)
+        statistics = self._generate_partition_statistics(commit_entries)
 
         # Use SnapshotCommit for atomic commit
         with self.snapshot_commit:
@@ -134,10 +205,6 @@ class FileStoreCommit:
             if not success:
                 raise RuntimeError(f"Failed to commit snapshot 
{new_snapshot_id}")
 
-    def overwrite(self, partition, commit_messages: List[CommitMessage], 
commit_identifier: int):
-        """Commit the given commit messages in overwrite mode."""
-        raise RuntimeError("overwrite unsupported yet")
-
     def abort(self, commit_messages: List[CommitMessage]):
         for message in commit_messages:
             for file in message.new_files:
@@ -161,25 +228,25 @@ class FileStoreCommit:
         else:
             return 1
 
-    def _generate_partition_statistics(self, commit_messages: 
List[CommitMessage]) -> List[PartitionStatistics]:
+    def _generate_partition_statistics(self, commit_entries: 
List[ManifestEntry]) -> List[PartitionStatistics]:
         """
-        Generate partition statistics from commit messages.
+        Generate partition statistics from commit entries.
 
         This method follows the Java implementation pattern from
         org.apache.paimon.manifest.PartitionEntry.fromManifestEntry() and
         PartitionEntry.merge() methods.
 
         Args:
-            commit_messages: List of commit messages to analyze
+            commit_entries: List of commit entries to analyze
 
         Returns:
             List of PartitionStatistics for each unique partition
         """
         partition_stats = {}
 
-        for message in commit_messages:
+        for entry in commit_entries:
             # Convert partition tuple to dictionary for PartitionStatistics
-            partition_value = message.partition  # Call the method to get 
partition value
+            partition_value = tuple(entry.partition.values)  # Call the method 
to get partition value
             if partition_value:
                 # Assuming partition is a tuple and we need to convert it to a 
dict
                 # This may need adjustment based on actual partition format
@@ -211,30 +278,29 @@ class FileStoreCommit:
                     'last_file_creation_time': 0
                 }
 
-            # Process each file in the commit message
             # Following Java implementation: PartitionEntry.fromDataFile()
-            for file_meta in message.new_files:
-                # Extract actual file metadata (following Java DataFileMeta 
pattern)
-                record_count = file_meta.row_count
-                file_size_in_bytes = file_meta.file_size
-                file_count = 1
-
-                # Convert creation_time to milliseconds (Java uses epoch 
millis)
-                if file_meta.creation_time:
-                    file_creation_time = 
int(file_meta.creation_time.timestamp() * 1000)
-                else:
-                    file_creation_time = int(time.time() * 1000)
+            file_meta = entry.file
+            # Extract actual file metadata (following Java DataFileMeta 
pattern)
+            record_count = file_meta.row_count
+            file_size_in_bytes = file_meta.file_size
+            file_count = 1
+
+            # Convert creation_time to milliseconds (Java uses epoch millis)
+            if file_meta.creation_time:
+                file_creation_time = int(file_meta.creation_time.timestamp() * 
1000)
+            else:
+                file_creation_time = int(time.time() * 1000)
 
-                # Accumulate statistics (following Java PartitionEntry.merge() 
logic)
-                partition_stats[partition_key]['record_count'] += record_count
-                partition_stats[partition_key]['file_size_in_bytes'] += 
file_size_in_bytes
-                partition_stats[partition_key]['file_count'] += file_count
+            # Accumulate statistics (following Java PartitionEntry.merge() 
logic)
+            partition_stats[partition_key]['record_count'] += record_count
+            partition_stats[partition_key]['file_size_in_bytes'] += 
file_size_in_bytes
+            partition_stats[partition_key]['file_count'] += file_count
 
-                # Keep the latest creation time
-                partition_stats[partition_key]['last_file_creation_time'] = 
max(
-                    partition_stats[partition_key]['last_file_creation_time'],
-                    file_creation_time
-                )
+            # Keep the latest creation time
+            partition_stats[partition_key]['last_file_creation_time'] = max(
+                partition_stats[partition_key]['last_file_creation_time'],
+                file_creation_time
+            )
 
         # Convert to PartitionStatistics objects
         # Following Java PartitionEntry.toPartitionStatistics() pattern
@@ -248,25 +314,3 @@ class FileStoreCommit:
             )
             for stats in partition_stats.values()
         ]
-
-    def _generate_record_count_add(self, commit_messages: List[CommitMessage]) 
-> int:
-        """
-        Generate record count add from commit messages.
-
-        This method follows the Java implementation pattern from
-        org.apache.paimon.manifest.ManifestEntry.recordCountAdd().
-
-        Args:
-            commit_messages: List of commit messages to analyze
-
-        Returns:
-            Count of add record
-        """
-        record_count = 0
-
-        for message in commit_messages:
-            new_files = message.new_files
-            for file_meta in new_files:
-                record_count += file_meta.row_count
-
-        return record_count


Reply via email to