This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e6c913216c [core] Don't use snapshotManager.latestSnapshot in
AbstractFileStoreWrite to prevent flooding catalog (#5680)
e6c913216c is described below
commit e6c913216ce19a42f030bf463a9010454435df05
Author: tsreaper <[email protected]>
AuthorDate: Fri May 30 19:05:53 2025 +0800
[core] Don't use snapshotManager.latestSnapshot in AbstractFileStoreWrite
to prevent flooding catalog (#5680)
---
.../apache/paimon/operation/AbstractFileStoreWrite.java | 5 ++++-
.../main/java/org/apache/paimon/utils/SnapshotManager.java | 14 +++++++++++++-
.../apache/paimon/spark/commands/PaimonSparkWriter.scala | 4 ++--
3 files changed, 19 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 5b2ba064e9..3dd82e21c5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -433,7 +433,10 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
}
}
- Snapshot previousSnapshot = ignorePreviousFiles ? null :
snapshotManager.latestSnapshot();
+ // NOTE: don't use snapshotManager.latestSnapshot() here,
+ // because we don't want to flood the catalog with high concurrency
+ Snapshot previousSnapshot =
+ ignorePreviousFiles ? null :
snapshotManager.latestSnapshotFromFileSystem();
List<DataFileMeta> restoreFiles = new ArrayList<>();
int totalBuckets;
if (previousSnapshot != null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 6a4620ca02..1356e090fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -172,7 +172,11 @@ public class SnapshotManager implements Serializable {
throw new UncheckedIOException(e);
}
}
- Long snapshotId = latestSnapshotId();
+ return latestSnapshotFromFileSystem();
+ }
+
+ public @Nullable Snapshot latestSnapshotFromFileSystem() {
+ Long snapshotId = latestSnapshotIdFromFileSystem();
return snapshotId == null ? null : snapshot(snapshotId);
}
@@ -184,6 +188,14 @@ public class SnapshotManager implements Serializable {
} catch (UnsupportedOperationException ignored) {
}
}
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find latest snapshot id", e);
+ }
+ return latestSnapshotIdFromFileSystem();
+ }
+
+ public @Nullable Long latestSnapshotIdFromFileSystem() {
+ try {
return findLatest(snapshotDirectory(), SNAPSHOT_PREFIX,
this::snapshotPath);
} catch (IOException e) {
throw new RuntimeException("Failed to find latest snapshot id", e);
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ca1158e69a..33332f6e0a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -209,7 +209,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
numAssigners)
}
- if (table.snapshotManager().latestSnapshot() == null) {
+ if (table.snapshotManager().latestSnapshotFromFileSystem() == null) {
// bootstrap mode
// Topology: input -> shuffle by special key & partition hash ->
bucket-assigner
writeWithBucketAssigner(
@@ -287,7 +287,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]):
Seq[CommitMessage] = {
val sparkSession = deletionVectors.sparkSession
import sparkSession.implicits._
- val snapshot = table.snapshotManager().latestSnapshot()
+ val snapshot = table.snapshotManager().latestSnapshotFromFileSystem()
val serializedCommits = deletionVectors
.groupByKey(_.partitionAndBucket)
.mapGroups {