This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1255e1494d [python] Add more log for better trouble shooting (#7233)
1255e1494d is described below

commit 1255e1494de5adcd5d2050f87b2bcb0775281945
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Feb 9 10:54:36 2026 +0800

    [python] Add more log for better trouble shooting (#7233)
---
 .../pypaimon/read/scanner/file_scanner.py          | 10 ++++
 .../pypaimon/snapshot/catalog_snapshot_commit.py   |  8 ++-
 .../pypaimon/snapshot/renaming_snapshot_commit.py  |  9 +--
 .../pypaimon/snapshot/snapshot_manager.py          | 14 ++++-
 paimon-python/pypaimon/write/file_store_commit.py  | 69 +++++++++++++++++++---
 paimon-python/pypaimon/write/table_commit.py       |  7 +++
 6 files changed, 101 insertions(+), 16 deletions(-)

diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 0f53ef0cd3..b5d13e561d 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -16,8 +16,12 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 import os
+import time
+import logging
 from typing import List, Optional, Dict, Set, Callable
 
+logger = logging.getLogger(__name__)
+
 from pypaimon.common.predicate import Predicate
 from pypaimon.globalindex import ScoredGlobalIndexResult
 from pypaimon.table.source.deletion_file import DeletionFile
@@ -143,6 +147,7 @@ class FileScanner:
         return 
self._scan_dv_index(self.snapshot_manager.get_latest_snapshot(), bucket_files)
 
     def scan(self) -> Plan:
+        start_ms = time.time() * 1000
         # Create appropriate split generator based on table type
         if self.table.is_primary_key_table:
             entries = self.plan_files()
@@ -176,6 +181,11 @@ class FileScanner:
         splits = split_generator.create_splits(entries)
 
         splits = self._apply_push_down_limit(splits)
+        duration_ms = int(time.time() * 1000 - start_ms)
+        logger.info(
+            "File store scan plan completed in %d ms. Files size: %d",
+            duration_ms, len(entries)
+        )
         return Plan(splits)
 
     def _create_data_evolution_split_generator(self):
diff --git a/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py 
b/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
index 1ffc80af57..77e5492798 100755
--- a/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
+++ b/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py
@@ -16,9 +16,12 @@
 # limitations under the License.
 
################################################################################
 
+import logging
 from typing import List
 
 from pypaimon.catalog.catalog import Catalog
+
+logger = logging.getLogger(__name__)
 from pypaimon.common.identifier import Identifier
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
@@ -64,7 +67,10 @@ class CatalogSnapshotCommit(SnapshotCommit):
 
         # Call catalog's commit_snapshot method
         if hasattr(self.catalog, 'commit_snapshot'):
-            return self.catalog.commit_snapshot(new_identifier, self.uuid, 
snapshot, statistics)
+            success = self.catalog.commit_snapshot(new_identifier, self.uuid, 
snapshot, statistics)
+            if success:
+                logger.info("Catalog snapshot commit succeeded for %s, 
snapshot id %d", new_identifier, snapshot.id)
+            return success
         else:
             # Fallback for catalogs that don't support snapshot commits
             raise NotImplementedError(
diff --git a/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py 
b/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
index 27c5a54a2c..acc1650baf 100644
--- a/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
+++ b/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py
@@ -16,9 +16,12 @@
 # limitations under the License.
 
################################################################################
 
+import logging
 from typing import List
 
 from pypaimon.common.file_io import FileIO
+
+logger = logging.getLogger(__name__)
 from pypaimon.common.json_util import JSON
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_commit import (PartitionStatistics,
@@ -62,12 +65,12 @@ class RenamingSnapshotCommit(SnapshotCommit):
         """
         new_snapshot_path = 
self.snapshot_manager.get_snapshot_path(snapshot.id)
         if not self.file_io.exists(new_snapshot_path):
-            """Internal function to perform the actual commit."""
             # Try to write atomically using the file IO
             committed = self.file_io.try_to_write_atomic(new_snapshot_path, 
JSON.to_json(snapshot, indent=2))
             if committed:
                 # Update the latest hint
                 self._commit_latest_hint(snapshot.id)
+                logger.info("Renaming snapshot commit succeeded, snapshot id 
%d", snapshot.id)
             return committed
         return False
 
@@ -89,6 +92,4 @@ class RenamingSnapshotCommit(SnapshotCommit):
                 # Fallback to regular write
                 self.file_io.write_file(latest_file, str(snapshot_id), 
overwrite=True)
         except Exception as e:
-            # Log the error but don't fail the commit for this
-            # In a production system, you might want to use proper logging
-            print(f"Warning: Failed to update LATEST hint: {e}")
+            logger.warning("Failed to update LATEST hint: %s", e)
diff --git a/paimon-python/pypaimon/snapshot/snapshot_manager.py 
b/paimon-python/pypaimon/snapshot/snapshot_manager.py
index cc044fbc1a..e6b2ca5c64 100644
--- a/paimon-python/pypaimon/snapshot/snapshot_manager.py
+++ b/paimon-python/pypaimon/snapshot/snapshot_manager.py
@@ -15,9 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import logging
 from typing import Optional
 
 from pypaimon.common.file_io import FileIO
+
+logger = logging.getLogger(__name__)
 from pypaimon.common.json_util import JSON
 from pypaimon.snapshot.snapshot import Snapshot
 
@@ -115,9 +118,14 @@ class SnapshotManager:
         if self.file_io.exists(earliest_file):
             earliest_content = self.file_io.read_file_utf8(earliest_file)
             earliest_snapshot_id = int(earliest_content.strip())
-            return self.get_snapshot_by_id(earliest_snapshot_id)
-        else:
-            return self.get_snapshot_by_id(1)
+            snapshot = self.get_snapshot_by_id(earliest_snapshot_id)
+            if snapshot is None:
+                logger.warning(
+                    "The earliest snapshot or changelog was once identified 
but disappeared. "
+                    "It might have been expired by other jobs operating on 
this table."
+                )
+            return snapshot
+        return self.get_snapshot_by_id(1)
 
     def earlier_or_equal_time_mills(self, timestamp: int) -> 
Optional[Snapshot]:
         """
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index 0b5af2c9c3..881c9d52c3 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -97,6 +97,11 @@ class FileStoreCommit:
         if not commit_messages:
             return
 
+        logger.info(
+            "Ready to commit to table %s, number of commit messages: %d",
+            self.table.identifier,
+            len(commit_messages),
+        )
         commit_entries = []
         for msg in commit_messages:
             partition = GenericRow(list(msg.partition), 
self.table.partition_keys_fields)
@@ -109,6 +114,7 @@ class FileStoreCommit:
                     file=file
                 ))
 
+        logger.info("Finished collecting changes, including: %d entries", 
len(commit_entries))
         self._try_commit(commit_kind="APPEND",
                          commit_identifier=commit_identifier,
                          commit_entries_plan=lambda snapshot: commit_entries)
@@ -118,6 +124,11 @@ class FileStoreCommit:
         if not commit_messages:
             return
 
+        logger.info(
+            "Ready to overwrite to table %s, number of commit messages: %d",
+            self.table.identifier,
+            len(commit_messages),
+        )
         partition_filter = None
         # sanity check, all changes must be done within the given partition, 
meanwhile build a partition filter
         if len(overwrite_partition) > 0:
@@ -203,16 +214,43 @@ class FileStoreCommit:
             )
 
             if result.is_success():
+                commit_duration_ms = int(time.time() * 1000) - start_time_ms
                 logger.info(
-                    f"Thread {thread_id}: commit success {latest_snapshot.id + 
1 if latest_snapshot else 1} "
-                    f"after {retry_count} retries"
+                    "Thread %s: commit success %d after %d retries",
+                    thread_id,
+                    latest_snapshot.id + 1 if latest_snapshot else 1,
+                    retry_count,
                 )
+                if commit_kind == "OVERWRITE":
+                    logger.info(
+                        "Finished overwrite to table %s, duration %d ms",
+                        self.table.identifier,
+                        commit_duration_ms,
+                    )
+                else:
+                    logger.info(
+                        "Finished commit to table %s, duration %d ms",
+                        self.table.identifier,
+                        commit_duration_ms,
+                    )
                 break
 
             retry_result = result
 
             elapsed_ms = int(time.time() * 1000) - start_time_ms
             if elapsed_ms > self.commit_timeout or retry_count >= 
self.commit_max_retries:
+                if commit_kind == "OVERWRITE":
+                    logger.info(
+                        "Finished (Uncertain of success) overwrite to table 
%s, duration %d ms",
+                        self.table.identifier,
+                        elapsed_ms,
+                    )
+                else:
+                    logger.info(
+                        "Finished (Uncertain of success) commit to table %s, 
duration %d ms",
+                        self.table.identifier,
+                        elapsed_ms,
+                    )
                 error_msg = (
                     f"Commit failed {latest_snapshot.id + 1 if latest_snapshot 
else 1} "
                     f"after {elapsed_ms} millis with {retry_count} retries, "
@@ -229,6 +267,7 @@ class FileStoreCommit:
     def _try_commit_once(self, retry_result: Optional[RetryResult], 
commit_kind: str,
                          commit_entries: List[ManifestEntry], 
commit_identifier: int,
                          latest_snapshot: Optional[Snapshot]) -> CommitResult:
+        start_millis = int(time.time() * 1000)
         if self._is_duplicate_commit(retry_result, latest_snapshot, 
commit_identifier, commit_kind):
             return SuccessResult()
         
@@ -306,18 +345,32 @@ class FileStoreCommit:
             with self.snapshot_commit:
                 success = self.snapshot_commit.commit(snapshot_data, 
self.table.current_branch(), statistics)
                 if not success:
-                    logger.warning(f"Atomic commit failed for snapshot 
#{new_snapshot_id} failed")
+                    commit_time_s = (int(time.time() * 1000) - start_millis) / 
1000
+                    logger.warning(
+                        "Atomic commit failed for snapshot #%d by user %s "
+                        "with identifier %s and kind %s after %.0f seconds. "
+                        "Clean up and try again.",
+                        new_snapshot_id,
+                        self.commit_user,
+                        commit_identifier,
+                        commit_kind,
+                        commit_time_s,
+                    )
                     self._cleanup_preparation_failure(delta_manifest_list, 
base_manifest_list)
                     return RetryResult(latest_snapshot, None)
         except Exception as e:
             # Commit exception, not sure about the situation and should not 
clean up the files
-            logger.warning("Retry commit for exception")
+            logger.warning("Retry commit for exception.", exc_info=True)
             return RetryResult(latest_snapshot, e)
 
-        logger.warning(
-            f"Successfully commit snapshot {new_snapshot_id} to table 
{self.table.identifier} "
-            f"for snapshot-{new_snapshot_id} by user {self.commit_user} "
-            + f"with identifier {commit_identifier} and kind {commit_kind}."
+        logger.info(
+            "Successfully commit snapshot %d to table %s by user %s "
+            "with identifier %s and kind %s.",
+            new_snapshot_id,
+            self.table.identifier,
+            self.commit_user,
+            commit_identifier,
+            commit_kind,
         )
         return SuccessResult()
 
diff --git a/paimon-python/pypaimon/write/table_commit.py 
b/paimon-python/pypaimon/write/table_commit.py
index e2b75881ea..1eafafefc0 100644
--- a/paimon-python/pypaimon/write/table_commit.py
+++ b/paimon-python/pypaimon/write/table_commit.py
@@ -16,9 +16,12 @@
 # limitations under the License.
 
################################################################################
 
+import logging
 from typing import Dict, List, Optional
 
 from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
+
+logger = logging.getLogger(__name__)
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_commit import FileStoreCommit
 
@@ -48,6 +51,10 @@ class TableCommit:
         if not non_empty_messages:
             return
 
+        logger.info(
+            "Committing batch table %s, %d non-empty messages",
+            self.table.identifier, len(non_empty_messages)
+        )
         if self.overwrite_partition is not None:
             self.file_store_commit.overwrite(
                 overwrite_partition=self.overwrite_partition,

Reply via email to