This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f9a8fb65d [core] Add cache for snapshots in CachingCatalog (#4565)
5f9a8fb65d is described below

commit 5f9a8fb65d079fb572f80dc682728bc1e048d46b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Nov 22 19:11:22 2024 +0800

    [core] Add cache for snapshots in CachingCatalog (#4565)
---
 .../generated/catalog_configuration.html           |  6 +++
 .../org/apache/paimon/options/CatalogOptions.java  |  7 +++
 .../java/org/apache/paimon/AbstractFileStore.java  | 10 +++-
 .../src/main/java/org/apache/paimon/FileStore.java |  4 ++
 .../src/main/java/org/apache/paimon/Snapshot.java  | 58 ++++++++++++----------
 .../org/apache/paimon/catalog/CachingCatalog.java  | 35 ++++++++++---
 .../apache/paimon/operation/OrphanFilesClean.java  |  3 +-
 .../paimon/privilege/PrivilegedFileStore.java      |  8 +++
 .../org/apache/paimon/schema/SchemaManager.java    | 15 +-----
 .../java/org/apache/paimon/schema/TableSchema.java | 39 ++++++++++-----
 .../paimon/table/AbstractFileStoreTable.java       | 10 +++-
 .../java/org/apache/paimon/table/DataTable.java    |  3 ++
 .../paimon/table/DelegatedFileStoreTable.java      | 13 +++++
 .../paimon/table/FallbackReadFileStoreTable.java   |  3 +-
 .../org/apache/paimon/table/FileStoreTable.java    |  5 ++
 .../table/system/AggregationFieldsTable.java       |  7 +--
 .../apache/paimon/table/system/AuditLogTable.java  |  6 +++
 .../paimon/table/system/CompactBucketsTable.java   |  6 +++
 .../paimon/table/system/FileMonitorTable.java      |  6 +++
 .../org/apache/paimon/table/system/FilesTable.java |  3 +-
 .../apache/paimon/table/system/OptionsTable.java   | 24 ++++-----
 .../paimon/table/system/ReadOptimizedTable.java    |  6 +++
 .../apache/paimon/table/system/SchemasTable.java   | 57 ++++-----------------
 .../apache/paimon/table/system/SnapshotsTable.java |  6 +--
 .../src/main/java/org/apache/paimon/tag/Tag.java   | 48 +++++++++---------
 .../org/apache/paimon/utils/BranchManager.java     |  4 +-
 .../org/apache/paimon/utils/SnapshotManager.java   | 54 ++++++++++++++------
 .../java/org/apache/paimon/utils/TagManager.java   |  8 +--
 .../apache/paimon/catalog/CachingCatalogTest.java  | 25 +++++++++-
 .../paimon/catalog/TestableCachingCatalog.java     |  9 +++-
 .../paimon/table/AppendOnlyFileDataTableTest.java  |  2 +-
 .../AppendOnlyTableColumnTypeFileDataTest.java     |  2 +-
 .../AppendOnlyTableColumnTypeFileMetaTest.java     |  2 +-
 .../table/AppendOnlyTableFileMetaFilterTest.java   |  2 +-
 .../paimon/table/FileStoreTableTestBase.java       |  4 +-
 .../table/PrimaryKeyColumnTypeFileDataTest.java    |  2 +-
 .../paimon/table/PrimaryKeyFileDataTableTest.java  |  2 +-
 .../paimon/table/PrimaryKeyFileMetaFilterTest.java |  2 +-
 .../PrimaryKeyTableColumnTypeFileMetaTest.java     |  2 +-
 .../apache/paimon/utils/SnapshotManagerTest.java   |  4 +-
 40 files changed, 316 insertions(+), 196 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 3686fa20c6..6706d5c421 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -68,6 +68,12 @@ under the License.
             <td>Long</td>
             <td>Controls the max number for which partitions in the catalog 
are cached.</td>
         </tr>
+        <tr>
+            <td><h5>cache.snapshot.max-num-per-table</h5></td>
+            <td style="word-wrap: break-word;">20</td>
+            <td>Integer</td>
+            <td>Controls the max number for snapshots per table in the catalog 
are cached.</td>
+        </tr>
         <tr>
             <td><h5>client-pool-size</h5></td>
             <td style="word-wrap: break-word;">2</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 6ad9f3350a..f69af2d599 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -123,6 +123,13 @@ public class CatalogOptions {
                     .noDefaultValue()
                     .withDescription("Controls the maximum memory to cache 
manifest content.");
 
+    public static final ConfigOption<Integer> CACHE_SNAPSHOT_MAX_NUM_PER_TABLE 
=
+            key("cache.snapshot.max-num-per-table")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription(
+                            "Controls the max number for snapshots per table 
in the catalog are cached.");
+
     public static final ConfigOption<String> LINEAGE_META =
             key("lineage-meta")
                     .stringType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 14665961a8..ae4552aa71 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -54,6 +54,8 @@ import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import javax.annotation.Nullable;
 
 import java.time.Duration;
@@ -79,6 +81,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
 
     @Nullable private final SegmentsCache<Path> writeManifestCache;
     @Nullable private SegmentsCache<Path> readManifestCache;
+    @Nullable private Cache<Path, Snapshot> snapshotCache;
 
     protected AbstractFileStore(
             FileIO fileIO,
@@ -116,7 +119,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
 
     @Override
     public SnapshotManager snapshotManager() {
-        return new SnapshotManager(fileIO, options.path(), options.branch());
+        return new SnapshotManager(fileIO, options.path(), options.branch(), 
snapshotCache);
     }
 
     @Override
@@ -340,4 +343,9 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
     public void setManifestCache(SegmentsCache<Path> manifestCache) {
         this.readManifestCache = manifestCache;
     }
+
+    @Override
+    public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+        this.snapshotCache = cache;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index f9bf4c8440..e50d4ada13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -44,6 +44,8 @@ import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import javax.annotation.Nullable;
 
 import java.util.List;
@@ -107,4 +109,6 @@ public interface FileStore<T> {
     List<TagCallback> createTagCallbacks();
 
     void setManifestCache(SegmentsCache<Path> manifestCache);
+
+    void setSnapshotCache(Cache<Path, Snapshot> cache);
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 3b8d2fa15b..baee7bad95 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -29,9 +29,6 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgn
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 import java.io.FileNotFoundException;
@@ -65,7 +62,6 @@ import java.util.Objects;
 @Public
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Snapshot {
-    private static final Logger LOG = LoggerFactory.getLogger(Snapshot.class);
 
     public static final long FIRST_SNAPSHOT_ID = 1;
 
@@ -355,28 +351,6 @@ public class Snapshot {
         return JsonSerdeUtil.toJson(this);
     }
 
-    public static Snapshot fromJson(String json) {
-        return JsonSerdeUtil.fromJson(json, Snapshot.class);
-    }
-
-    public static Snapshot fromPath(FileIO fileIO, Path path) {
-        try {
-            return Snapshot.fromJson(fileIO.readFileUtf8(path));
-        } catch (FileNotFoundException e) {
-            String errorMessage =
-                    String.format(
-                            "Snapshot file %s does not exist. "
-                                    + "It might have been expired by other 
jobs operating on this table. "
-                                    + "In this case, you can avoid concurrent 
modification issues by configuring "
-                                    + "write-only = true and use a dedicated 
compaction job, or configuring "
-                                    + "different expiration thresholds for 
different jobs.",
-                            path);
-            throw new RuntimeException(errorMessage, e);
-        } catch (IOException e) {
-            throw new RuntimeException("Fails to read snapshot from path " + 
path, e);
-        }
-    }
-
     @Override
     public int hashCode() {
         return Objects.hash(
@@ -437,4 +411,36 @@ public class Snapshot {
         /** Collect statistics. */
         ANALYZE
     }
+
+    // =================== Utils for reading =========================
+
+    public static Snapshot fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, Snapshot.class);
+    }
+
+    public static Snapshot fromPath(FileIO fileIO, Path path) {
+        try {
+            return tryFromPath(fileIO, path);
+        } catch (FileNotFoundException e) {
+            String errorMessage =
+                    String.format(
+                            "Snapshot file %s does not exist. "
+                                    + "It might have been expired by other 
jobs operating on this table. "
+                                    + "In this case, you can avoid concurrent 
modification issues by configuring "
+                                    + "write-only = true and use a dedicated 
compaction job, or configuring "
+                                    + "different expiration thresholds for 
different jobs.",
+                            path);
+            throw new RuntimeException(errorMessage, e);
+        }
+    }
+
+    public static Snapshot tryFromPath(FileIO fileIO, Path path) throws 
FileNotFoundException {
+        try {
+            return Snapshot.fromJson(fileIO.readFileUtf8(path));
+        } catch (FileNotFoundException e) {
+            throw e;
+        } catch (IOException e) {
+            throw new RuntimeException("Fails to read snapshot from path " + 
path, e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 003f0edb4f..1912ad6062 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -55,6 +55,7 @@ import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
 import static org.apache.paimon.options.CatalogOptions.CACHE_PARTITION_MAX_NUM;
+import static 
org.apache.paimon.options.CatalogOptions.CACHE_SNAPSHOT_MAX_NUM_PER_TABLE;
 import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
 
 /** A {@link Catalog} to cache databases and tables and manifests. */
@@ -62,6 +63,9 @@ public class CachingCatalog extends DelegateCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CachingCatalog.class);
 
+    private final Duration expirationInterval;
+    private final int snapshotMaxNumPerTable;
+
     protected final Cache<String, Database> databaseCache;
     protected final Cache<Identifier, Table> tableCache;
     @Nullable protected final SegmentsCache<Path> manifestCache;
@@ -75,7 +79,8 @@ public class CachingCatalog extends DelegateCatalog {
                 CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
                 CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
                 CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
-                CACHE_PARTITION_MAX_NUM.defaultValue());
+                CACHE_PARTITION_MAX_NUM.defaultValue(),
+                CACHE_SNAPSHOT_MAX_NUM_PER_TABLE.defaultValue());
     }
 
     public CachingCatalog(
@@ -83,13 +88,15 @@ public class CachingCatalog extends DelegateCatalog {
             Duration expirationInterval,
             MemorySize manifestMaxMemory,
             long manifestCacheThreshold,
-            long cachedPartitionMaxNum) {
+            long cachedPartitionMaxNum,
+            int snapshotMaxNumPerTable) {
         this(
                 wrapped,
                 expirationInterval,
                 manifestMaxMemory,
                 manifestCacheThreshold,
                 cachedPartitionMaxNum,
+                snapshotMaxNumPerTable,
                 Ticker.systemTicker());
     }
 
@@ -99,6 +106,7 @@ public class CachingCatalog extends DelegateCatalog {
             MemorySize manifestMaxMemory,
             long manifestCacheThreshold,
             long cachedPartitionMaxNum,
+            int snapshotMaxNumPerTable,
             Ticker ticker) {
         super(wrapped);
         if (expirationInterval.isZero() || expirationInterval.isNegative()) {
@@ -106,6 +114,9 @@ public class CachingCatalog extends DelegateCatalog {
                     "When cache.expiration-interval is set to negative or 0, 
the catalog cache should be disabled.");
         }
 
+        this.expirationInterval = expirationInterval;
+        this.snapshotMaxNumPerTable = snapshotMaxNumPerTable;
+
         this.databaseCache =
                 Caffeine.newBuilder()
                         .softValues()
@@ -121,6 +132,7 @@ public class CachingCatalog extends DelegateCatalog {
                         .expireAfterAccess(expirationInterval)
                         .ticker(ticker)
                         .build();
+        this.manifestCache = SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
         this.partitionCache =
                 cachedPartitionMaxNum == 0
                         ? null
@@ -134,7 +146,6 @@ public class CachingCatalog extends DelegateCatalog {
                                 .maximumWeight(cachedPartitionMaxNum)
                                 .ticker(ticker)
                                 .build();
-        this.manifestCache = SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
     }
 
     public static Catalog tryToCreate(Catalog catalog, Options options) {
@@ -155,7 +166,8 @@ public class CachingCatalog extends DelegateCatalog {
                 options.get(CACHE_EXPIRATION_INTERVAL_MS),
                 manifestMaxMemory,
                 manifestThreshold,
-                options.get(CACHE_PARTITION_MAX_NUM));
+                options.get(CACHE_PARTITION_MAX_NUM),
+                options.get(CACHE_SNAPSHOT_MAX_NUM_PER_TABLE));
     }
 
     @Override
@@ -244,9 +256,20 @@ public class CachingCatalog extends DelegateCatalog {
     }
 
     private void putTableCache(Identifier identifier, Table table) {
-        if (manifestCache != null && table instanceof FileStoreTable) {
-            ((FileStoreTable) table).setManifestCache(manifestCache);
+        if (table instanceof FileStoreTable) {
+            FileStoreTable storeTable = (FileStoreTable) table;
+            storeTable.setSnapshotCache(
+                    Caffeine.newBuilder()
+                            .softValues()
+                            .expireAfterAccess(expirationInterval)
+                            .maximumSize(snapshotMaxNumPerTable)
+                            .executor(Runnable::run)
+                            .build());
+            if (manifestCache != null) {
+                storeTable.setManifestCache(manifestCache);
+            }
         }
+
         tableCache.put(identifier, table);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 5698908cb9..869100d9cf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -105,7 +105,8 @@ public abstract class OrphanFilesClean implements 
Serializable {
 
         List<String> abnormalBranches = new ArrayList<>();
         for (String branch : branches) {
-            if (!new SchemaManager(table.fileIO(), table.location(), 
branch).latest().isPresent()) {
+            SchemaManager schemaManager = 
table.schemaManager().copyWithBranch(branch);
+            if (!schemaManager.latest().isPresent()) {
                 abnormalBranches.add(branch);
             }
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 59243a5356..3ee0d5fa9b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -20,6 +20,7 @@ package org.apache.paimon.privilege;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
@@ -47,6 +48,8 @@ import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import javax.annotation.Nullable;
 
 import java.util.List;
@@ -210,4 +213,9 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     public void setManifestCache(SegmentsCache<Path> manifestCache) {
         wrapped.setManifestCache(manifestCache);
     }
+
+    @Override
+    public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+        wrapped.setSnapshotCache(cache);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index a84348810b..d827ffd0fb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,7 +45,6 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
-import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StringUtils;
@@ -769,11 +768,7 @@ public class SchemaManager implements Serializable {
 
     /** Read schema for schema id. */
     public TableSchema schema(long id) {
-        try {
-            return 
JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), 
TableSchema.class);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        return TableSchema.fromPath(fileIO, toSchemaPath(id));
     }
 
     /** Check if a schema exists. */
@@ -789,14 +784,6 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    public static TableSchema fromPath(FileIO fileIO, Path path) {
-        try {
-            return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path), 
TableSchema.class);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
     private String branchPath() {
         return BranchManager.branchPath(tableRoot, branch);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index b5bdeccf10..a0a149d1ae 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -29,8 +29,10 @@ import org.apache.paimon.utils.StringUtils;
 
 import javax.annotation.Nullable;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -296,19 +298,6 @@ public class TableSchema implements Serializable {
                 timeMillis);
     }
 
-    public static TableSchema fromJson(String json) {
-        return JsonSerdeUtil.fromJson(json, TableSchema.class);
-    }
-
-    public static TableSchema fromPath(FileIO fileIO, Path path) {
-        try {
-            String json = fileIO.readFileUtf8(path);
-            return TableSchema.fromJson(json);
-        } catch (IOException e) {
-            throw new RuntimeException("Fails to read schema from path " + 
path, e);
-        }
-    }
-
     @Override
     public String toString() {
         return JsonSerdeUtil.toJson(this);
@@ -341,4 +330,28 @@ public class TableSchema implements Serializable {
     public static List<DataField> newFields(RowType rowType) {
         return rowType.getFields();
     }
+
+    // =================== Utils for reading =========================
+
+    public static TableSchema fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, TableSchema.class);
+    }
+
+    public static TableSchema fromPath(FileIO fileIO, Path path) {
+        try {
+            return tryFromPath(fileIO, path);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public static TableSchema tryFromPath(FileIO fileIO, Path path) throws 
FileNotFoundException {
+        try {
+            return fromJson(fileIO.readFileUtf8(path));
+        } catch (FileNotFoundException e) {
+            throw e;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 07c0e88645..4180ff11c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -68,6 +68,8 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.utils.TagManager;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -123,6 +125,11 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         store().setManifestCache(manifestCache);
     }
 
+    @Override
+    public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+        store().setSnapshotCache(cache);
+    }
+
     @Override
     public OptionalLong latestSnapshotId() {
         Long snapshot = store().snapshotManager().latestSnapshotId();
@@ -340,7 +347,8 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 : new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
     }
 
-    protected SchemaManager schemaManager() {
+    @Override
+    public SchemaManager schemaManager() {
         return new SchemaManager(fileIO(), path, currentBranch());
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index e330db0e04..7979daccf7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.BranchManager;
@@ -39,6 +40,8 @@ public interface DataTable extends InnerTable {
 
     SnapshotManager snapshotManager();
 
+    SchemaManager schemaManager();
+
     TagManager tagManager();
 
     BranchManager branchManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 2b369e5005..624476b5b4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.query.LocalTableQuery;
@@ -44,6 +45,8 @@ import org.apache.paimon.utils.SimpleFileReader;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import java.time.Duration;
 import java.util.Objects;
 import java.util.Optional;
@@ -92,6 +95,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         return wrapped.snapshotManager();
     }
 
+    @Override
+    public SchemaManager schemaManager() {
+        return wrapped.schemaManager();
+    }
+
     @Override
     public TagManager tagManager() {
         return wrapped.tagManager();
@@ -117,6 +125,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.setManifestCache(manifestCache);
     }
 
+    @Override
+    public void setSnapshotCache(Cache<Path, Snapshot> cache) {
+        wrapped.setSnapshotCache(cache);
+    }
+
     @Override
     public TableSchema schema() {
         return wrapped.schema();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f1a60b9713..e3e290f060 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -28,7 +28,6 @@ import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
@@ -103,7 +102,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
     private FileStoreTable switchWrappedToBranch(String branchName) {
         Optional<TableSchema> optionalSchema =
-                new SchemaManager(wrapped.fileIO(), wrapped.location(), 
branchName).latest();
+                wrapped.schemaManager().copyWithBranch(branchName).latest();
         Preconditions.checkArgument(
                 optionalSchema.isPresent(), "Branch " + branchName + " does 
not exist");
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 01227dd354..d37e57e4e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -30,6 +31,8 @@ import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SegmentsCache;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -42,6 +45,8 @@ public interface FileStoreTable extends DataTable {
 
     void setManifestCache(SegmentsCache<Path> manifestCache);
 
+    void setSnapshotCache(Cache<Path, Snapshot> cache);
+
     @Override
     default RowType rowType() {
         return schema().logicalRowType();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
index 10a046ca70..8c0eed4d6b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AggregationFieldsTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -27,7 +26,6 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
@@ -78,14 +76,12 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
-    private final String branch;
 
     private final FileStoreTable dataTable;
 
     public AggregationFieldsTable(FileStoreTable dataTable) {
         this.fileIO = dataTable.fileIO();
         this.location = dataTable.location();
-        this.branch = CoreOptions.branch(dataTable.schema().options());
         this.dataTable = dataTable;
     }
 
@@ -192,8 +188,7 @@ public class AggregationFieldsTable implements 
ReadonlyTable {
             if (!(split instanceof AggregationSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            Path location = ((AggregationSplit) split).location;
-            TableSchema schemas = new SchemaManager(fileIO, location, 
branch).latest().get();
+            TableSchema schemas = dataTable.schemaManager().latest().get();
             Iterator<InternalRow> rows = createInternalRowIterator(schemas);
             if (readType != null) {
                 rows =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index b0cbe0772b..1cb967f8d1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -39,6 +39,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateReplaceVisitor;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
@@ -187,6 +188,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         return wrapped.snapshotManager();
     }
 
+    @Override
+    public SchemaManager schemaManager() {
+        return wrapped.schemaManager();
+    }
+
     @Override
     public TagManager tagManager() {
         return wrapped.tagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
index ff40c9502e..31cecbfb15 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
@@ -33,6 +33,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
@@ -145,6 +146,11 @@ public class CompactBucketsTable implements DataTable, 
ReadonlyTable {
         return wrapped.snapshotManager();
     }
 
+    @Override
+    public SchemaManager schemaManager() {
+        return wrapped.schemaManager();
+    }
+
     @Override
     public TagManager tagManager() {
         return wrapped.tagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index fc1bb2a5b1..522335aaa6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
@@ -131,6 +132,11 @@ public class FileMonitorTable implements DataTable, 
ReadonlyTable {
         return wrapped.snapshotManager();
     }
 
+    @Override
+    public SchemaManager schemaManager() {
+        return wrapped.schemaManager();
+    }
+
     @Override
     public TagManager tagManager() {
         return wrapped.tagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
index 0232fc2d2d..6dcbb322d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java
@@ -143,8 +143,7 @@ public class FilesTable implements ReadonlyTable {
 
     @Override
     public InnerTableRead newRead() {
-        return new FilesRead(
-                new SchemaManager(storeTable.fileIO(), storeTable.location()), 
storeTable);
+        return new FilesRead(storeTable.schemaManager(), storeTable);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
index c7dec03343..ed20896646 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/OptionsTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -27,7 +26,6 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
@@ -44,7 +42,6 @@ import org.apache.paimon.utils.ProjectedRow;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -70,14 +67,12 @@ public class OptionsTable implements ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
-    private final String branch;
 
     private final FileStoreTable dataTable;
 
     public OptionsTable(FileStoreTable dataTable) {
         this.fileIO = dataTable.fileIO();
         this.location = dataTable.location();
-        this.branch = CoreOptions.branch(dataTable.schema().options());
         this.dataTable = dataTable;
     }
 
@@ -178,14 +173,20 @@ public class OptionsTable implements ReadonlyTable {
         }
 
         @Override
-        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        public RecordReader<InternalRow> createReader(Split split) {
             if (!(split instanceof OptionsSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            Path location = ((OptionsSplit) split).location;
             Iterator<InternalRow> rows =
                     Iterators.transform(
-                            options(fileIO, location, 
branch).entrySet().iterator(), this::toRow);
+                            dataTable
+                                    .schemaManager()
+                                    .latest()
+                                    .orElseThrow(() -> new 
RuntimeException("Table not exists."))
+                                    .options()
+                                    .entrySet()
+                                    .iterator(),
+                            this::toRow);
             if (readType != null) {
                 rows =
                         Iterators.transform(
@@ -203,11 +204,4 @@ public class OptionsTable implements ReadonlyTable {
                     BinaryString.fromString(option.getValue()));
         }
     }
-
-    private static Map<String, String> options(FileIO fileIO, Path location, 
String branchName) {
-        return new SchemaManager(fileIO, location, branchName)
-                .latest()
-                .orElseThrow(() -> new RuntimeException("Table not exists."))
-                .options();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index deb149791c..5308005053 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.operation.DefaultValueAssigner;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
@@ -165,6 +166,11 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
         return wrapped.snapshotManager();
     }
 
+    @Override
+    public SchemaManager schemaManager() {
+        return wrapped.schemaManager();
+    }
+
     @Override
     public TagManager tagManager() {
         return wrapped.tagManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index d0df75b34f..3cb0ff4783 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -18,13 +18,11 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.And;
 import org.apache.paimon.predicate.CompoundPredicate;
@@ -61,8 +59,6 @@ import org.apache.paimon.utils.SerializationUtils;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
-import javax.annotation.Nullable;
-
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -73,7 +69,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;
@@ -98,16 +93,12 @@ public class SchemasTable implements ReadonlyTable {
                             new DataField(5, "comment", 
SerializationUtils.newStringType(true)),
                             new DataField(6, "update_time", new 
TimestampType(false, 3))));
 
-    private final FileIO fileIO;
     private final Path location;
-    private final String branch;
 
     private final FileStoreTable dataTable;
 
     public SchemasTable(FileStoreTable dataTable) {
-        this.fileIO = dataTable.fileIO();
         this.location = dataTable.location();
-        this.branch = CoreOptions.branch(dataTable.schema().options());
         this.dataTable = dataTable;
     }
 
@@ -133,7 +124,7 @@ public class SchemasTable implements ReadonlyTable {
 
     @Override
     public InnerTableRead newRead() {
-        return new SchemasRead(fileIO);
+        return new SchemasRead();
     }
 
     @Override
@@ -141,24 +132,16 @@ public class SchemasTable implements ReadonlyTable {
         return new SchemasTable(dataTable.copy(dynamicOptions));
     }
 
-    private class SchemasScan extends ReadOnceTableScan {
-        private @Nullable LeafPredicate schemaId;
+    private static class SchemasScan extends ReadOnceTableScan {
 
         @Override
-        public InnerTableScan withFilter(Predicate predicate) {
-            if (predicate == null) {
-                return this;
-            }
-
-            Map<String, LeafPredicate> leafPredicates =
-                    predicate.visit(LeafPredicateExtractor.INSTANCE);
-            schemaId = leafPredicates.get("schema_id");
-            return this;
+        public Plan innerPlan() {
+            return () -> Collections.singletonList(new SchemasSplit());
         }
 
         @Override
-        public Plan innerPlan() {
-            return () -> Collections.singletonList(new SchemasSplit(location, 
schemaId));
+        public InnerTableScan withFilter(Predicate predicate) {
+            return this;
         }
     }
 
@@ -167,47 +150,29 @@ public class SchemasTable implements ReadonlyTable {
 
         private static final long serialVersionUID = 1L;
 
-        private final Path location;
-
-        private final @Nullable LeafPredicate schemaId;
-
-        private SchemasSplit(Path location, @Nullable LeafPredicate schemaId) {
-            this.location = location;
-            this.schemaId = schemaId;
-        }
-
+        @Override
         public boolean equals(Object o) {
             if (this == o) {
                 return true;
             }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            SchemasSplit that = (SchemasSplit) o;
-            return Objects.equals(location, that.location)
-                    && Objects.equals(schemaId, that.schemaId);
+            return o != null && getClass() == o.getClass();
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(location, schemaId);
+            return 0;
         }
     }
 
     /** {@link TableRead} implementation for {@link SchemasTable}. */
     private class SchemasRead implements InnerTableRead {
 
-        private final FileIO fileIO;
         private RowType readType;
 
         private Optional<Long> optionalFilterSchemaIdMax = Optional.empty();
         private Optional<Long> optionalFilterSchemaIdMin = Optional.empty();
         private final List<Long> schemaIds = new ArrayList<>();
 
-        public SchemasRead(FileIO fileIO) {
-            this.fileIO = fileIO;
-        }
-
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
             if (predicate == null) {
@@ -287,9 +252,7 @@ public class SchemasTable implements ReadonlyTable {
             if (!(split instanceof SchemasSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            SchemasSplit schemasSplit = (SchemasSplit) split;
-            Path location = schemasSplit.location;
-            SchemaManager manager = new SchemaManager(fileIO, location, 
branch);
+            SchemaManager manager = dataTable.schemaManager();
 
             Collection<TableSchema> tableSchemas;
             if (!schemaIds.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
index 10e5b691ac..2af13ee937 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.table.system;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -111,14 +110,12 @@ public class SnapshotsTable implements ReadonlyTable {
 
     private final FileIO fileIO;
     private final Path location;
-    private final String branch;
 
     private final FileStoreTable dataTable;
 
     public SnapshotsTable(FileStoreTable dataTable) {
         this.fileIO = dataTable.fileIO();
         this.location = dataTable.location();
-        this.branch = CoreOptions.branch(dataTable.schema().options());
         this.dataTable = dataTable;
     }
 
@@ -289,9 +286,8 @@ public class SnapshotsTable implements ReadonlyTable {
             if (!(split instanceof SnapshotsSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            SnapshotManager snapshotManager =
-                    new SnapshotManager(fileIO, ((SnapshotsSplit) 
split).location, branch);
 
+            SnapshotManager snapshotManager = dataTable.snapshotManager();
             Iterator<Snapshot> snapshots;
             if (!snapshotIds.isEmpty()) {
                 snapshots = snapshotManager.snapshotsWithId(snapshotIds);
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java 
b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
index f1ac879d33..53641a2eb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.Map;
@@ -113,29 +114,6 @@ public class Tag extends Snapshot {
         return JsonSerdeUtil.toJson(this);
     }
 
-    public static Tag fromJson(String json) {
-        return JsonSerdeUtil.fromJson(json, Tag.class);
-    }
-
-    public static Tag fromPath(FileIO fileIO, Path path) {
-        try {
-            String json = fileIO.readFileUtf8(path);
-            return Tag.fromJson(json);
-        } catch (IOException e) {
-            throw new RuntimeException("Fails to read tag from path " + path, 
e);
-        }
-    }
-
-    @Nullable
-    public static Tag safelyFromPath(FileIO fileIO, Path path) throws 
IOException {
-        try {
-            String json = fileIO.readFileUtf8(path);
-            return Tag.fromJson(json);
-        } catch (FileNotFoundException e) {
-            return null;
-        }
-    }
-
     public static Tag fromSnapshotAndTagTtl(
             Snapshot snapshot, Duration tagTimeRetained, LocalDateTime 
tagCreateTime) {
         return new Tag(
@@ -201,4 +179,28 @@ public class Tag extends Snapshot {
         return Objects.equals(tagCreateTime, that.tagCreateTime)
                 && Objects.equals(tagTimeRetained, that.tagTimeRetained);
     }
+
+    // =================== Utils for reading =========================
+
+    public static Tag fromJson(String json) {
+        return JsonSerdeUtil.fromJson(json, Tag.class);
+    }
+
+    public static Tag fromPath(FileIO fileIO, Path path) {
+        try {
+            return tryFromPath(fileIO, path);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public static Tag tryFromPath(FileIO fileIO, Path path) throws 
FileNotFoundException {
+        try {
+            return fromJson(fileIO.readFileUtf8(path));
+        } catch (FileNotFoundException e) {
+            throw e;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index bc353bb10d..2ea5f542f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.tag.Tag;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ public class BranchManager {
             List<Path> deleteSchemaPaths = schemaManager.schemaPaths(id -> id 
>= earliestSchemaId);
             List<Path> deleteTagPaths =
                     tagManager.tagPaths(
-                            path -> Snapshot.fromPath(fileIO, path).id() >= 
earliestSnapshotId);
+                            path -> Tag.fromPath(fileIO, path).id() >= 
earliestSnapshotId);
 
             List<Path> deletePaths =
                     Stream.of(deleteSnapshotPaths, deleteSchemaPaths, 
deleteTagPaths)
@@ -201,6 +202,7 @@ public class BranchManager {
                     tagManager.copyWithBranch(branchName).tagDirectory(),
                     tagManager.tagDirectory(),
                     true);
+            snapshotManager.invalidateCache();
         } catch (IOException e) {
             throw new RuntimeException(
                     String.format(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 5902d4c84c..9a120042ea 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -24,6 +24,8 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,16 +76,26 @@ public class SnapshotManager implements Serializable {
     private final FileIO fileIO;
     private final Path tablePath;
     private final String branch;
+    @Nullable private final Cache<Path, Snapshot> cache;
 
     public SnapshotManager(FileIO fileIO, Path tablePath) {
         this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
     }
 
     /** Specify the default branch for data writing. */
-    public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) {
+    public SnapshotManager(FileIO fileIO, Path tablePath, @Nullable String 
branchName) {
+        this(fileIO, tablePath, branchName, null);
+    }
+
+    public SnapshotManager(
+            FileIO fileIO,
+            Path tablePath,
+            @Nullable String branchName,
+            @Nullable Cache<Path, Snapshot> cache) {
         this.fileIO = fileIO;
         this.tablePath = tablePath;
         this.branch = BranchManager.normalizeBranch(branchName);
+        this.cache = cache;
     }
 
     public SnapshotManager copyWithBranch(String branchName) {
@@ -120,20 +132,34 @@ public class SnapshotManager implements Serializable {
         return new Path(branchPath(tablePath, branch) + "/snapshot");
     }
 
+    public void invalidateCache() {
+        if (cache != null) {
+            cache.invalidateAll();
+        }
+    }
+
     public Snapshot snapshot(long snapshotId) {
-        Path snapshotPath = snapshotPath(snapshotId);
-        return Snapshot.fromPath(fileIO, snapshotPath);
+        Path path = snapshotPath(snapshotId);
+        Snapshot snapshot = cache == null ? null : cache.getIfPresent(path);
+        if (snapshot == null) {
+            snapshot = Snapshot.fromPath(fileIO, path);
+            if (cache != null) {
+                cache.put(path, snapshot);
+            }
+        }
+        return snapshot;
     }
 
     public Snapshot tryGetSnapshot(long snapshotId) throws 
FileNotFoundException {
-        try {
-            Path snapshotPath = snapshotPath(snapshotId);
-            return Snapshot.fromJson(fileIO.readFileUtf8(snapshotPath));
-        } catch (FileNotFoundException fileNotFoundException) {
-            throw fileNotFoundException;
-        } catch (IOException ioException) {
-            throw new RuntimeException(ioException);
+        Path path = snapshotPath(snapshotId);
+        Snapshot snapshot = cache == null ? null : cache.getIfPresent(path);
+        if (snapshot == null) {
+            snapshot = Snapshot.tryFromPath(fileIO, path);
+            if (cache != null) {
+                cache.put(path, snapshot);
+            }
         }
+        return snapshot;
     }
 
     public Changelog changelog(long snapshotId) {
@@ -486,11 +512,9 @@ public class SnapshotManager implements Serializable {
         collectSnapshots(
                 path -> {
                     try {
-                        
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
-                    } catch (IOException e) {
-                        if (!(e instanceof FileNotFoundException)) {
-                            throw new RuntimeException(e);
-                        }
+                        // do not pollution cache
+                        snapshots.add(Snapshot.tryFromPath(fileIO, path));
+                    } catch (FileNotFoundException ignored) {
                     }
                 },
                 paths);
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 65963aafdf..1e05a100d7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -353,7 +353,7 @@ public class TagManager {
                 // If the tag file is not found, it might be deleted by
                 // other processes, so just skip this tag
                 try {
-                    Snapshot snapshot = 
Snapshot.fromJson(fileIO.readFileUtf8(path));
+                    Snapshot snapshot = Tag.tryFromPath(fileIO, 
path).trimToSnapshot();
                     tags.computeIfAbsent(snapshot, s -> new 
ArrayList<>()).add(tagName);
                 } catch (FileNotFoundException ignored) {
                 }
@@ -371,9 +371,9 @@ public class TagManager {
             List<Pair<Tag, String>> tags = new ArrayList<>();
             for (Path path : paths) {
                 String tagName = path.getName().substring(TAG_PREFIX.length());
-                Tag tag = Tag.safelyFromPath(fileIO, path);
-                if (tag != null) {
-                    tags.add(Pair.of(tag, tagName));
+                try {
+                    tags.add(Pair.of(Tag.tryFromPath(fileIO, path), tagName));
+                } catch (FileNotFoundException ignored) {
                 }
             }
             return tags;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 65ed5ce0b7..e4f0a1510b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
@@ -332,6 +333,27 @@ class CachingCatalogTest extends CatalogTestBase {
                 .toArray(Identifier[]::new);
     }
 
+    @Test
+    public void testSnapshotCache() throws Exception {
+        TestableCachingCatalog wrappedCatalog =
+                new TestableCachingCatalog(this.catalog, EXPIRATION_TTL, 
ticker);
+        Identifier tableIdent = new Identifier("db", "tbl");
+        wrappedCatalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+        Table table = wrappedCatalog.getTable(tableIdent);
+
+        // write
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(GenericRow.of(1, fromString("1"), fromString("1")));
+            write.write(GenericRow.of(2, fromString("2"), fromString("2")));
+            commit.commit(write.prepareCommit());
+        }
+
+        Snapshot snapshot = table.snapshot(1);
+        assertThat(snapshot).isSameAs(table.snapshot(1));
+    }
+
     @Test
     public void testManifestCache() throws Exception {
         innerTestManifestCache(Long.MAX_VALUE);
@@ -346,7 +368,8 @@ class CachingCatalogTest extends CatalogTestBase {
                         Duration.ofSeconds(10),
                         MemorySize.ofMebiBytes(1),
                         manifestCacheThreshold,
-                        0L);
+                        0L,
+                        10);
         Identifier tableIdent = new Identifier("db", "tbl");
         catalog.dropTable(tableIdent, true);
         catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index 4c70a0232c..1d4a9b0e8a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -38,7 +38,14 @@ public class TestableCachingCatalog extends CachingCatalog {
     private final Duration cacheExpirationInterval;
 
     public TestableCachingCatalog(Catalog catalog, Duration 
expirationInterval, Ticker ticker) {
-        super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE, 
Long.MAX_VALUE, ticker);
+        super(
+                catalog,
+                expirationInterval,
+                MemorySize.ZERO,
+                Long.MAX_VALUE,
+                Long.MAX_VALUE,
+                Integer.MAX_VALUE,
+                ticker);
         this.cacheExpirationInterval = expirationInterval;
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
index 93854e7661..9ce3db0b1a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileDataTableTest.java
@@ -33,7 +33,7 @@ public class AppendOnlyFileDataTableTest extends 
FileDataFilterTestBase {
         return new AppendOnlyFileStoreTable(
                 FileIOFinder.find(tablePath), tablePath, 
schemaManager.latest().get()) {
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
index b4c16cef20..64d0c728d1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileDataTest.java
@@ -37,7 +37,7 @@ public class AppendOnlyTableColumnTypeFileDataTest extends 
ColumnTypeFileDataTes
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
         return new AppendOnlyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
index f398d28cc5..300483a9f3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableColumnTypeFileMetaTest.java
@@ -40,7 +40,7 @@ public class AppendOnlyTableColumnTypeFileMetaTest extends 
ColumnTypeFileMetaTes
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
         return new AppendOnlyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
index f65546af75..85ed802997 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyTableFileMetaFilterTest.java
@@ -40,7 +40,7 @@ public class AppendOnlyTableFileMetaFilterTest extends 
FileMetaFilterTestBase {
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
         return new AppendOnlyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 4d8408955d..75e284a68c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1193,7 +1193,7 @@ public abstract class FileStoreTableTestBase {
         SchemaManager schemaManager =
                 new SchemaManager(new TraceableFileIO(), tablePath, 
"test-branch");
         TableSchema branchSchema =
-                SchemaManager.fromPath(new TraceableFileIO(), 
schemaManager.toSchemaPath(0));
+                TableSchema.fromPath(new TraceableFileIO(), 
schemaManager.toSchemaPath(0));
         TableSchema schema0 = schemaManager.schema(0);
         assertThat(branchSchema.equals(schema0)).isTrue();
     }
@@ -1344,7 +1344,7 @@ public abstract class FileStoreTableTestBase {
         // verify schema in branch1 and main branch is same
         SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), 
tablePath);
         TableSchema branchSchema =
-                SchemaManager.fromPath(
+                TableSchema.fromPath(
                         new TraceableFileIO(),
                         
schemaManager.copyWithBranch(BRANCH_NAME).toSchemaPath(0));
         TableSchema schema0 = schemaManager.schema(0);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
index 8ba25c6617..64bb5f21ab 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
@@ -91,7 +91,7 @@ public class PrimaryKeyColumnTypeFileDataTest extends 
ColumnTypeFileDataTestBase
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
         return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
index 1be3219754..ba98138044 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
@@ -247,7 +247,7 @@ public class PrimaryKeyFileDataTableTest extends 
FileDataFilterTestBase {
         return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
 
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
index 618e8691c6..88928fe991 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
@@ -146,7 +146,7 @@ public class PrimaryKeyFileMetaFilterTest extends 
FileMetaFilterTestBase {
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
         return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
index 489c1ba052..32a4138be5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
@@ -54,7 +54,7 @@ public class PrimaryKeyTableColumnTypeFileMetaTest extends 
ColumnTypeFileMetaTes
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
         return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
-            protected SchemaManager schemaManager() {
+            public SchemaManager schemaManager() {
                 return schemaManager;
             }
         };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 6b7b28263a..26480cf411 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -281,8 +281,8 @@ public class SnapshotManagerTest {
     @Test
     public void testTraversalSnapshotsFromLatestSafely() throws IOException, 
InterruptedException {
         FileIO localFileIO = LocalFileIO.create();
-        SnapshotManager snapshotManager =
-                new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+        Path path = new Path(tempDir.toString());
+        SnapshotManager snapshotManager = new SnapshotManager(localFileIO, 
path);
         // create 10 snapshots
         for (long i = 0; i < 10; i++) {
             Snapshot snapshot =

Reply via email to