This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 470b381171 [python] Remove all redundant changes in pr 6969 (#6977)
470b381171 is described below

commit 470b38117198e941083fafd9b7e439ffe4bf093e
Author: YeJunHao <[email protected]>
AuthorDate: Thu Jan 8 23:26:30 2026 +0800

    [python] Remove all redundant changes in pr 6969 (#6977)
---
 paimon-python/pypaimon/common/file_io.py           | 19 +----
 paimon-python/pypaimon/filesystem/local.py         | 34 ++++++++
 .../pypaimon/tests/reader_append_only_test.py      | 60 +++++++++++++
 paimon-python/pypaimon/write/file_store_commit.py  | 98 +++++++++-------------
 4 files changed, 138 insertions(+), 73 deletions(-)

diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 2ec1909306..556d9e9ae7 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -18,7 +18,6 @@
 import logging
 import os
 import subprocess
-import threading
 import uuid
 from pathlib import Path
 from typing import Any, Dict, List, Optional
@@ -26,11 +25,12 @@ from urllib.parse import splitport, urlparse
 
 import pyarrow
 from packaging.version import parse
-from pyarrow._fs import FileSystem, LocalFileSystem
+from pyarrow._fs import FileSystem
 
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions, S3Options
 from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem.local import PaimonLocalFileSystem
 from pypaimon.schema.data_types import DataField, AtomicType, 
PyarrowFieldParser
 from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
 from pypaimon.table.row.generic_row import GenericRow
@@ -39,8 +39,6 @@ from pypaimon.write.blob_format_writer import BlobFormatWriter
 
 
 class FileIO:
-    rename_lock = threading.Lock()
-
     def __init__(self, path: str, catalog_options: Options):
         self.properties = catalog_options
         self.logger = logging.getLogger(__name__)
@@ -183,9 +181,8 @@ class FileIO:
         )
 
     def _initialize_local_fs(self) -> FileSystem:
-        from pyarrow.fs import LocalFileSystem
 
-        return LocalFileSystem()
+        return PaimonLocalFileSystem()
 
     def new_input_stream(self, path: str):
         path_str = self.to_filesystem_path(path)
@@ -255,15 +252,7 @@ class FileIO:
                 self.mkdirs(str(dst_parent))
 
             src_str = self.to_filesystem_path(src)
-            if isinstance(self.filesystem, LocalFileSystem):
-                if self.exists(dst):
-                    return False
-                with FileIO.rename_lock:
-                    if self.exists(dst):
-                        return False
-                    self.filesystem.move(src_str, dst_str)
-            else:
-                self.filesystem.move(src_str, dst_str)
+            self.filesystem.move(src_str, dst_str)
             return True
         except Exception as e:
             self.logger.warning(f"Failed to rename {src} to {dst}: {e}")
diff --git a/paimon-python/pypaimon/filesystem/local.py 
b/paimon-python/pypaimon/filesystem/local.py
new file mode 100644
index 0000000000..c845f8547c
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/local.py
@@ -0,0 +1,34 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import threading
+import pyarrow
+from pyarrow._fs import LocalFileSystem
+
+
+class PaimonLocalFileSystem(LocalFileSystem):
+    
+    rename_lock = threading.Lock()
+
+    def move(self, src, dst):
+        with PaimonLocalFileSystem.rename_lock:
+            file_info = self.get_file_info([dst])[0]
+            result = file_info.type != pyarrow.fs.FileType.NotFound
+            if (result is True):
+                raise Exception("Target file already exists")
+
+            super(PaimonLocalFileSystem, self).move(src, dst)
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index b47f5d1f67..d65658ef5c 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -28,7 +28,11 @@ import pyarrow as pa
 
 from pypaimon import CatalogFactory, Schema
 from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.write.file_store_commit import RetryResult
 
 
 class AoReaderTest(unittest.TestCase):
@@ -153,6 +157,62 @@ class AoReaderTest(unittest.TestCase):
         actual = self._read_test_table(read_builder).sort_by('user_id')
         self.assertEqual(actual, self.expected)
 
+    def test_commit_retry_filter(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.catalog.create_table('default.test_commit_retry_filter', schema, 
False)
+        table = self.catalog.get_table('default.test_commit_retry_filter')
+        write_builder = table.new_batch_write_builder()
+
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        data1 = {
+            'user_id': [1, 2, 3, 4],
+            'item_id': [1001, 1002, 1003, 1004],
+            'behavior': ['a', 'b', 'c', None],
+            'dt': ['p1', 'p1', 'p2', 'p1'],
+        }
+        pa_table1 = pa.Table.from_pydict(data1, schema=self.pa_schema)
+        data2 = {
+            'user_id': [5, 6, 7, 8],
+            'item_id': [1005, 1006, 1007, 1008],
+            'behavior': ['e', 'f', 'g', 'h'],
+            'dt': ['p2', 'p1', 'p2', 'p2'],
+        }
+        pa_table2 = pa.Table.from_pydict(data2, schema=self.pa_schema)
+
+        table_write.write_arrow(pa_table1)
+        table_write.write_arrow(pa_table2)
+
+        messages = table_write.prepare_commit()
+        table_commit.commit(messages)
+        table_write.close()
+
+        snapshot_manager = SnapshotManager(table)
+        latest_snapshot = snapshot_manager.get_latest_snapshot()
+        commit_entries = []
+        for msg in messages:
+            partition = GenericRow(list(msg.partition), 
table.partition_keys_fields)
+            for file in msg.new_files:
+                commit_entries.append(ManifestEntry(
+                    kind=0,
+                    partition=partition,
+                    bucket=msg.bucket,
+                    total_buckets=table.total_buckets,
+                    file=file
+                ))
+        # mock retry
+        success = table_commit.file_store_commit._try_commit_once(
+            RetryResult(None),
+            "APPEND",
+            commit_entries,
+            BATCH_COMMIT_IDENTIFIER,
+            latest_snapshot)
+        self.assertTrue(success.is_success())
+        table_commit.close()
+        read_builder = table.new_read_builder()
+        actual = self._read_test_table(read_builder).sort_by('user_id')
+        self.assertEqual(actual, self.expected)
+
     def test_over_1000_cols_read(self):
         num_rows = 1
         num_cols = 10
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index e55e25f7c8..6c65c5173a 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -110,8 +110,8 @@ class FileStoreCommit:
                 ))
 
         self._try_commit(commit_kind="APPEND",
-                         commit_entries=commit_entries,
-                         commit_identifier=commit_identifier)
+                         commit_identifier=commit_identifier,
+                         commit_entries_plan=lambda snapshot: commit_entries)
 
     def overwrite(self, overwrite_partition, commit_messages: 
List[CommitMessage], commit_identifier: int):
         """Commit the given commit messages in overwrite mode."""
@@ -133,16 +133,14 @@ class FileStoreCommit:
                     raise RuntimeError(f"Trying to overwrite partition 
{overwrite_partition}, but the changes "
                                        f"in {msg.partition} does not belong to 
this partition")
 
-        self._overwrite_partition_filter = partition_filter
-        self._overwrite_commit_messages = commit_messages
-
         self._try_commit(
             commit_kind="OVERWRITE",
-            commit_entries=None,  # Will be generated in _try_commit based on 
latest snapshot
-            commit_identifier=commit_identifier
+            commit_identifier=commit_identifier,
+            commit_entries_plan=lambda snapshot: 
self._generate_overwrite_entries(
+                snapshot, partition_filter, commit_messages)
         )
 
-    def _try_commit(self, commit_kind, commit_entries, commit_identifier):
+    def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan):
         import threading
 
         retry_count = 0
@@ -151,9 +149,7 @@ class FileStoreCommit:
         thread_id = threading.current_thread().name
         while True:
             latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-
-            if commit_kind == "OVERWRITE":
-                commit_entries = self._generate_overwrite_entries()
+            commit_entries = commit_entries_plan(latest_snapshot)
 
             result = self._try_commit_once(
                 retry_result=retry_result,
@@ -164,7 +160,7 @@ class FileStoreCommit:
             )
 
             if result.is_success():
-                logger.warning(
+                logger.info(
                     f"Thread {thread_id}: commit success {latest_snapshot.id + 
1 if latest_snapshot else 1} "
                     f"after {retry_count} retries"
                 )
@@ -190,24 +186,9 @@ class FileStoreCommit:
     def _try_commit_once(self, retry_result: Optional[RetryResult], 
commit_kind: str,
                          commit_entries: List[ManifestEntry], 
commit_identifier: int,
                          latest_snapshot: Optional[Snapshot]) -> CommitResult:
-        start_time_ms = int(time.time() * 1000)
-
-        if retry_result is not None and latest_snapshot is not None:
-            start_check_snapshot_id = 1  # Snapshot.FIRST_SNAPSHOT_ID
-            if retry_result.latest_snapshot is not None:
-                start_check_snapshot_id = retry_result.latest_snapshot.id + 1
-
-            for snapshot_id in range(start_check_snapshot_id, 
latest_snapshot.id + 2):
-                snapshot = 
self.snapshot_manager.get_snapshot_by_id(snapshot_id)
-                if (snapshot and snapshot.commit_user == self.commit_user and
-                        snapshot.commit_identifier == commit_identifier and
-                        snapshot.commit_kind == commit_kind):
-                    logger.info(
-                        f"Commit already completed (snapshot {snapshot_id}), "
-                        f"user: {self.commit_user}, identifier: 
{commit_identifier}"
-                    )
-                    return SuccessResult()
-
+        if self._is_duplicate_commit(retry_result, latest_snapshot, 
commit_identifier, commit_kind):
+            return SuccessResult()
+        
         unique_id = uuid.uuid4()
         base_manifest_list = f"manifest-list-{unique_id}-0"
         delta_manifest_list = f"manifest-list-{unique_id}-1"
@@ -242,10 +223,8 @@ class FileStoreCommit:
             else:
                 deleted_file_count += 1
                 delta_record_count -= entry.file.row_count
-
         try:
             self.manifest_file_manager.write(new_manifest_file, commit_entries)
-
             # TODO: implement noConflictsOrFail logic
             partition_columns = list(zip(*(entry.partition.values for entry in 
commit_entries)))
             partition_min_stats = [min(col) for col in partition_columns]
@@ -253,13 +232,10 @@ class FileStoreCommit:
             partition_null_counts = [sum(value == 0 for value in col) for col 
in partition_columns]
             if not all(count == 0 for count in partition_null_counts):
                 raise RuntimeError("Partition value should not be null")
-
             manifest_file_path = 
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
-            file_size = self.table.file_io.get_file_size(manifest_file_path)
-
             new_manifest_file_meta = ManifestFileMeta(
                 file_name=new_manifest_file,
-                file_size=file_size,
+                file_size=self.table.file_io.get_file_size(manifest_file_path),
                 num_added_files=added_file_count,
                 num_deleted_files=deleted_file_count,
                 partition_stats=SimpleStats(
@@ -275,7 +251,6 @@ class FileStoreCommit:
                 ),
                 schema_id=self.table.table_schema.id,
             )
-
             self.manifest_list_manager.write(delta_manifest_list, 
[new_manifest_file_meta])
 
             # process existing_manifest
@@ -287,8 +262,8 @@ class FileStoreCommit:
                     total_record_count += previous_record_count
             else:
                 existing_manifest_files = []
-
             self.manifest_list_manager.write(base_manifest_list, 
existing_manifest_files)
+
             total_record_count += delta_record_count
             snapshot_data = Snapshot(
                 version=3,
@@ -307,8 +282,7 @@ class FileStoreCommit:
             # Generate partition statistics for the commit
             statistics = self._generate_partition_statistics(commit_entries)
         except Exception as e:
-            self._cleanup_preparation_failure(new_manifest_file, 
delta_manifest_list,
-                                              base_manifest_list)
+            self._cleanup_preparation_failure(delta_manifest_list, 
base_manifest_list)
             logger.warning(f"Exception occurs when preparing snapshot: {e}", 
exc_info=True)
             raise RuntimeError(f"Failed to prepare snapshot: {e}")
 
@@ -317,16 +291,8 @@ class FileStoreCommit:
             with self.snapshot_commit:
                 success = self.snapshot_commit.commit(snapshot_data, 
self.table.current_branch(), statistics)
                 if not success:
-                    # Commit failed, clean up temporary files and retry
-                    commit_time_sec = (int(time.time() * 1000) - 
start_time_ms) / 1000
-                    logger.warning(
-                        f"Atomic commit failed for snapshot #{new_snapshot_id} 
"
-                        f"by user {self.commit_user} "
-                        f"with identifier {commit_identifier} and kind 
{commit_kind} after {commit_time_sec}s. "
-                        f"Clean up and try again."
-                    )
-                    self._cleanup_preparation_failure(new_manifest_file, 
delta_manifest_list,
-                                                      base_manifest_list)
+                    logger.warning(f"Atomic commit failed for snapshot 
#{new_snapshot_id} failed")
+                    self._cleanup_preparation_failure(delta_manifest_list, 
base_manifest_list)
                     return RetryResult(latest_snapshot, None)
         except Exception as e:
             # Commit exception, not sure about the situation and should not 
clean up the files
@@ -340,14 +306,34 @@ class FileStoreCommit:
         )
         return SuccessResult()
 
-    def _generate_overwrite_entries(self):
+    def _is_duplicate_commit(self, retry_result, latest_snapshot, 
commit_identifier, commit_kind) -> bool:
+        if retry_result is not None and latest_snapshot is not None:
+            start_check_snapshot_id = 1  # Snapshot.FIRST_SNAPSHOT_ID
+            if retry_result.latest_snapshot is not None:
+                start_check_snapshot_id = retry_result.latest_snapshot.id + 1
+
+            for snapshot_id in range(start_check_snapshot_id, 
latest_snapshot.id + 1):
+                snapshot = 
self.snapshot_manager.get_snapshot_by_id(snapshot_id)
+                if (snapshot and snapshot.commit_user == self.commit_user and
+                        snapshot.commit_identifier == commit_identifier and
+                        snapshot.commit_kind == commit_kind):
+                    logger.info(
+                        f"Commit already completed (snapshot {snapshot_id}), "
+                        f"user: {self.commit_user}, identifier: 
{commit_identifier}"
+                    )
+                    return True
+        return False
+
+    def _generate_overwrite_entries(self, latestSnapshot, partition_filter, 
commit_messages):
         """Generate commit entries for OVERWRITE mode based on latest 
snapshot."""
         entries = []
-        current_entries = FullStartingScanner(self.table, 
self._overwrite_partition_filter, None).plan_files()
+        current_entries = [] if latestSnapshot is None \
+            else (FullStartingScanner(self.table, partition_filter, None).
+                  
read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot)))
         for entry in current_entries:
             entry.kind = 1  # DELETE
             entries.append(entry)
-        for msg in self._overwrite_commit_messages:
+        for msg in commit_messages:
             partition = GenericRow(list(msg.partition), 
self.table.partition_keys_fields)
             for file in msg.new_files:
                 entries.append(ManifestEntry(
@@ -377,7 +363,7 @@ class FileStoreCommit:
         )
         time.sleep(total_wait_ms / 1000.0)
 
-    def _cleanup_preparation_failure(self, manifest_file: Optional[str],
+    def _cleanup_preparation_failure(self,
                                      delta_manifest_list: Optional[str],
                                      base_manifest_list: Optional[str]):
         try:
@@ -394,10 +380,6 @@ class FileStoreCommit:
             if base_manifest_list:
                 base_path = f"{manifest_path}/{base_manifest_list}"
                 self.table.file_io.delete_quietly(base_path)
-
-            if manifest_file:
-                manifest_file_path = 
f"{self.manifest_file_manager.manifest_path}/{manifest_file}"
-                self.table.file_io.delete_quietly(manifest_file_path)
         except Exception as e:
             logger.warning(f"Failed to clean up temporary files during 
preparation failure: {e}", exc_info=True)
 

Reply via email to