This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e6c913216c [core] Don't use snapshotManager.latestSnapshot in 
AbstractFileStoreWrite to prevent flooding catalog (#5680)
e6c913216c is described below

commit e6c913216ce19a42f030bf463a9010454435df05
Author: tsreaper <[email protected]>
AuthorDate: Fri May 30 19:05:53 2025 +0800

    [core] Don't use snapshotManager.latestSnapshot in AbstractFileStoreWrite 
to prevent flooding catalog (#5680)
---
 .../apache/paimon/operation/AbstractFileStoreWrite.java    |  5 ++++-
 .../main/java/org/apache/paimon/utils/SnapshotManager.java | 14 +++++++++++++-
 .../apache/paimon/spark/commands/PaimonSparkWriter.scala   |  4 ++--
 3 files changed, 19 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 5b2ba064e9..3dd82e21c5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -433,7 +433,10 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             }
         }
 
-        Snapshot previousSnapshot = ignorePreviousFiles ? null : 
snapshotManager.latestSnapshot();
+        // NOTE: don't use snapshotManager.latestSnapshot() here,
+        // because we don't want to flood the catalog with high concurrency
+        Snapshot previousSnapshot =
+                ignorePreviousFiles ? null : 
snapshotManager.latestSnapshotFromFileSystem();
         List<DataFileMeta> restoreFiles = new ArrayList<>();
         int totalBuckets;
         if (previousSnapshot != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 6a4620ca02..1356e090fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -172,7 +172,11 @@ public class SnapshotManager implements Serializable {
                 throw new UncheckedIOException(e);
             }
         }
-        Long snapshotId = latestSnapshotId();
+        return latestSnapshotFromFileSystem();
+    }
+
+    public @Nullable Snapshot latestSnapshotFromFileSystem() {
+        Long snapshotId = latestSnapshotIdFromFileSystem();
         return snapshotId == null ? null : snapshot(snapshotId);
     }
 
@@ -184,6 +188,14 @@ public class SnapshotManager implements Serializable {
                 } catch (UnsupportedOperationException ignored) {
                 }
             }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to find latest snapshot id", e);
+        }
+        return latestSnapshotIdFromFileSystem();
+    }
+
+    public @Nullable Long latestSnapshotIdFromFileSystem() {
+        try {
             return findLatest(snapshotDirectory(), SNAPSHOT_PREFIX, 
this::snapshotPath);
         } catch (IOException e) {
             throw new RuntimeException("Failed to find latest snapshot id", e);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ca1158e69a..33332f6e0a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -209,7 +209,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
             numAssigners)
         }
 
-        if (table.snapshotManager().latestSnapshot() == null) {
+        if (table.snapshotManager().latestSnapshotFromFileSystem() == null) {
           // bootstrap mode
           // Topology: input -> shuffle by special key & partition hash -> 
bucket-assigner
           writeWithBucketAssigner(
@@ -287,7 +287,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
   def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]): 
Seq[CommitMessage] = {
     val sparkSession = deletionVectors.sparkSession
     import sparkSession.implicits._
-    val snapshot = table.snapshotManager().latestSnapshot()
+    val snapshot = table.snapshotManager().latestSnapshotFromFileSystem()
     val serializedCommits = deletionVectors
       .groupByKey(_.partitionAndBucket)
       .mapGroups {

Reply via email to