This is an automated email from the ASF dual-hosted git repository.

hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f2234c7b6 HDDS-10262. Encapsulate SnapshotCache inside 
OmSnapshotManager (#6135)
2f2234c7b6 is described below

commit 2f2234c7b61714404399ada8f31b3fb4772b613a
Author: Cyrill <[email protected]>
AuthorDate: Fri Feb 9 22:04:16 2024 +0300

    HDDS-10262. Encapsulate SnapshotCache inside OmSnapshotManager (#6135)
---
 .../hadoop/ozone/freon/TestOMSnapshotDAG.java      |  41 +++-----
 .../ozone/om/TestSnapshotBackgroundServices.java   |  16 ++-
 .../ozone/om/TestSnapshotDeletingService.java      |  23 ++---
 .../hadoop/ozone/om/snapshot/TestOmSnapshot.java   |   4 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  19 ++--
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  | 110 +++++++++++++++------
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  47 ++++-----
 .../hadoop/ozone/om/SstFilteringService.java       |  18 ++--
 .../hadoop/ozone/om/request/OMClientRequest.java   |   5 +-
 .../request/snapshot/OMSnapshotPurgeRequest.java   |   3 +-
 .../key/OMDirectoriesPurgeResponseWithFSO.java     |  12 +--
 .../ozone/om/response/key/OMKeyPurgeResponse.java  |  12 +--
 .../OMSnapshotMoveDeletedKeysResponse.java         |  21 ++--
 .../ozone/om/service/DirectoryDeletingService.java |  10 +-
 .../ozone/om/service/KeyDeletingService.java       |  32 +++---
 .../ozone/om/service/SnapshotDeletingService.java  |  23 ++---
 .../service/SnapshotDirectoryCleaningService.java  |  32 +++---
 .../hadoop/ozone/om/snapshot/ReferenceCounted.java |   6 +-
 .../hadoop/ozone/om/snapshot/SnapshotCache.java    |  46 +++------
 .../ozone/om/snapshot/SnapshotDiffManager.java     |  24 ++---
 .../hadoop/ozone/om/TestOmSnapshotManager.java     |  12 +--
 .../hadoop/ozone/om/TestSstFilteringService.java   |  12 +--
 .../key/TestOMKeyPurgeRequestAndResponse.java      |  12 +--
 .../ozone/om/request/key/TestOMKeyRequest.java     |   3 +-
 .../s3/multipart/TestS3MultipartRequest.java       |   3 +-
 .../snapshot/TestOMSnapshotDeleteRequest.java      |   3 -
 .../TestOMSnapshotPurgeRequestAndResponse.java     |   3 +-
 .../ozone/om/service/TestKeyDeletingService.java   |  10 +-
 .../ozone/om/snapshot/TestSnapshotCache.java       |  50 +++-------
 .../ozone/om/snapshot/TestSnapshotDiffManager.java |  76 +++++++-------
 30 files changed, 304 insertions(+), 384 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index bca21aebd1..c566cae414 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -39,7 +38,6 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
 import org.apache.ozone.test.GenericTestUtils;
@@ -215,20 +213,16 @@ public class TestOMSnapshotDAG {
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
     RDBStore rdbStore = (RDBStore) omMetadataManager.getStore();
     RocksDBCheckpointDiffer differ = rdbStore.getRocksDBCheckpointDiffer();
-    ReferenceCounted<IOmMetadataReader, SnapshotCache>
-        snapDB1 = ozoneManager.getOmSnapshotManager()
-        .getSnapshotCache().get(
-            SnapshotInfo.getTableKey(volumeName, bucketName, "snap1"));
-    ReferenceCounted<IOmMetadataReader, SnapshotCache>
-        snapDB2 = ozoneManager.getOmSnapshotManager()
-        .getSnapshotCache().get(
-            SnapshotInfo.getTableKey(volumeName, bucketName, "snap2"));
+    ReferenceCounted<OmSnapshot> snapDB1 = ozoneManager.getOmSnapshotManager()
+        .getActiveSnapshot(volumeName, bucketName, "snap1");
+    ReferenceCounted<OmSnapshot> snapDB2 = ozoneManager.getOmSnapshotManager()
+        .getActiveSnapshot(volumeName, bucketName, "snap2");
     DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager,
         volumeName, bucketName, "snap1",
-        ((RDBStore)((OmSnapshot)snapDB1.get())
+        ((RDBStore) snapDB1.get()
             .getMetadataManager().getStore()).getDb().getManagedRocksDb());
     DifferSnapshotInfo snap2 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap2", ((RDBStore)((OmSnapshot)snapDB2.get())
+        volumeName, bucketName, "snap2", ((RDBStore) snapDB2.get()
             .getMetadataManager().getStore()).getDb().getManagedRocksDb());
 
       // RocksDB does checkpointing in a separate thread, wait for it
@@ -247,13 +241,11 @@ public class TestOMSnapshotDAG {
 
     resp = store.createSnapshot(volumeName, bucketName, "snap3");
     LOG.debug("Snapshot created: {}", resp);
-    ReferenceCounted<IOmMetadataReader, SnapshotCache>
-        snapDB3 = ozoneManager.getOmSnapshotManager()
-        .getSnapshotCache().get(
-            SnapshotInfo.getTableKey(volumeName, bucketName, "snap3"));
+    ReferenceCounted<OmSnapshot> snapDB3 = ozoneManager.getOmSnapshotManager()
+        .getActiveSnapshot(volumeName, bucketName, "snap3");
     DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
         volumeName, bucketName, "snap3",
-        ((RDBStore)((OmSnapshot)snapDB3.get())
+        ((RDBStore) snapDB3.get()
             .getMetadataManager().getStore()).getDb().getManagedRocksDb());
     final File checkpointSnap3 = new File(snap3.getDbPath());
     GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
@@ -274,24 +266,21 @@ public class TestOMSnapshotDAG {
     ozoneManager = cluster.getOzoneManager();
     omMetadataManager = ozoneManager.getMetadataManager();
     snapDB1 = ozoneManager.getOmSnapshotManager()
-        .getSnapshotCache().get(
-            SnapshotInfo.getTableKey(volumeName, bucketName, "snap1"));
+        .getActiveSnapshot(volumeName, bucketName, "snap1");
     snapDB2 = ozoneManager.getOmSnapshotManager()
-        .getSnapshotCache().get(
-            SnapshotInfo.getTableKey(volumeName, bucketName, "snap2"));
+        .getActiveSnapshot(volumeName, bucketName, "snap2");
     snap1 = getDifferSnapshotInfo(omMetadataManager,
         volumeName, bucketName, "snap1",
-        ((RDBStore)((OmSnapshot)snapDB1.get())
+        ((RDBStore) snapDB1.get()
             .getMetadataManager().getStore()).getDb().getManagedRocksDb());
     snap2 = getDifferSnapshotInfo(omMetadataManager,
-        volumeName, bucketName, "snap2", ((RDBStore)((OmSnapshot)snapDB2.get())
+        volumeName, bucketName, "snap2", ((RDBStore) snapDB2.get()
             .getMetadataManager().getStore()).getDb().getManagedRocksDb());
     snapDB3 = ozoneManager.getOmSnapshotManager()
-        .getSnapshotCache().get(
-            SnapshotInfo.getTableKey(volumeName, bucketName, "snap3"));
+        .getActiveSnapshot(volumeName, bucketName, "snap3");
     snap3 = getDifferSnapshotInfo(omMetadataManager,
         volumeName, bucketName, "snap3",
-        ((RDBStore)((OmSnapshot)snapDB3.get())
+        ((RDBStore) snapDB3.get()
             .getMetadataManager().getStore()).getDb().getManagedRocksDb());
     List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1);
     assertEquals(sstDiffList21, sstDiffList21Run2);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
index a7bc554464..83386693d7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotBackgroundServices.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
 import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse;
 import org.apache.ozone.compaction.log.CompactionLogEntry;
@@ -76,7 +75,6 @@ import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SE
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.TestOzoneManagerHAWithStoppedNodes.createKey;
 import static 
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -259,12 +257,11 @@ public class TestSnapshotBackgroundServices {
 
     // get snapshot c
     OmSnapshot snapC;
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcC = newLeaderOM
+    try (ReferenceCounted<OmSnapshot> rcC = newLeaderOM
         .getOmSnapshotManager()
-        .checkForSnapshot(volumeName, bucketName,
-            getSnapshotPrefix(snapshotInfoC.getName()), true)) {
+        .getSnapshot(volumeName, bucketName, snapshotInfoC.getName())) {
       assertNotNull(rcC);
-      snapC = (OmSnapshot) rcC.get();
+      snapC = rcC.get();
     }
 
     // assert that key a is in snapshot c's deleted table
@@ -284,12 +281,11 @@ public class TestSnapshotBackgroundServices {
 
     // get snapshot d
     OmSnapshot snapD;
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcD = newLeaderOM
+    try (ReferenceCounted<OmSnapshot> rcD = newLeaderOM
         .getOmSnapshotManager()
-        .checkForSnapshot(volumeName, bucketName,
-            getSnapshotPrefix(snapshotInfoD.getName()), true)) {
+        .getSnapshot(volumeName, bucketName, snapshotInfoD.getName())) {
       assertNotNull(rcD);
-      snapD = (OmSnapshot) rcD.get();
+      snapD = rcD.get();
     }
 
     // wait until key a appears in deleted table of snapshot d
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDeletingService.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDeletingService.java
index e627a880fd..9f697d4148 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDeletingService.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSnapshotDeletingService.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -58,7 +57,6 @@ import java.util.concurrent.TimeoutException;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_TIMEOUT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -133,9 +131,8 @@ public class TestSnapshotDeletingService {
     GenericTestUtils.waitFor(() -> snapshotDeletingService
             .getSuccessfulRunCount() >= 1, 1000, 10000);
 
-    OmSnapshot bucket1snap3 = (OmSnapshot) om.getOmSnapshotManager()
-        .checkForSnapshot(VOLUME_NAME, BUCKET_NAME_ONE,
-            getSnapshotPrefix("bucket1snap3"), true).get();
+    OmSnapshot bucket1snap3 = om.getOmSnapshotManager()
+        .getSnapshot(VOLUME_NAME, BUCKET_NAME_ONE, "bucket1snap3").get();
 
     // Check bucket1key1 added to next non deleted snapshot db.
     List<? extends Table.KeyValue<String, RepeatedOmKeyInfo>> omKeyInfos =
@@ -190,8 +187,7 @@ public class TestSnapshotDeletingService {
 
     // verify the cache of purged snapshot
     // /vol1/bucket2/bucket2snap1 has been cleaned up from cache map
-    SnapshotCache snapshotCache = om.getOmSnapshotManager().getSnapshotCache();
-    assertEquals(2, snapshotCache.size());
+    assertEquals(2, om.getOmSnapshotManager().getSnapshotCacheSize());
   }
 
   @SuppressWarnings("checkstyle:MethodLength")
@@ -359,9 +355,8 @@ public class TestSnapshotDeletingService {
     assertTableRowCount(om.getMetadataManager().getSnapshotInfoTable(), 2);
 
     verifySnapshotChain(deletedSnap, "/vol1/bucket2/snap3");
-    OmSnapshot snap3 = (OmSnapshot) om.getOmSnapshotManager()
-        .checkForSnapshot(VOLUME_NAME, BUCKET_NAME_TWO,
-            getSnapshotPrefix("snap3"), true).get();
+    OmSnapshot snap3 = om.getOmSnapshotManager()
+        .getSnapshot(VOLUME_NAME, BUCKET_NAME_TWO, "snap3").get();
 
     Table<String, OmKeyInfo> snapDeletedDirTable =
         snap3.getMetadataManager().getDeletedDirTable();
@@ -388,10 +383,10 @@ public class TestSnapshotDeletingService {
     assertTableRowCount(renamedTable, 4);
     assertTableRowCount(deletedDirTable, 3);
 
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcSnap1 =
-        om.getOmSnapshotManager().checkForSnapshot(
-            VOLUME_NAME, BUCKET_NAME_TWO, getSnapshotPrefix("snap1"), true);
-    OmSnapshot snap1 = (OmSnapshot) rcSnap1.get();
+    ReferenceCounted<OmSnapshot> rcSnap1 =
+        om.getOmSnapshotManager().getSnapshot(
+            VOLUME_NAME, BUCKET_NAME_TWO, "snap1");
+    OmSnapshot snap1 = rcSnap1.get();
     Table<String, OmKeyInfo> snap1KeyTable =
         snap1.getMetadataManager().getFileTable();
     try (TableIterator<String, ? extends Table.KeyValue<String,
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
index 587ae18f86..dafd43be04 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestOmSnapshot.java
@@ -2031,7 +2031,7 @@ public abstract class TestOmSnapshot {
     String snapPrefix = createSnapshot(volumeName, bucketName);
     try (RDBStore snapshotDBStore = (RDBStore)
         ((OmSnapshot) cluster.getOzoneManager().getOmSnapshotManager()
-            .checkForSnapshot(volumeName, bucketName, snapPrefix, false).get())
+            .getActiveFsMetadataOrSnapshot(volumeName, bucketName, 
snapPrefix).get())
             .getMetadataManager().getStore()) {
       for (String table : snapshotDBStore.getTableNames().values()) {
         assertTrue(snapshotDBStore.getDb().getColumnFamily(table)
@@ -2161,7 +2161,7 @@ public abstract class TestOmSnapshot {
 
     OmSnapshot omSnapshot = (OmSnapshot) cluster.getOzoneManager()
         .getOmSnapshotManager()
-        .checkForSnapshot(volumeName, bucketName, snapshotName, false).get();
+        .getActiveFsMetadataOrSnapshot(volumeName, bucketName, 
snapshotName).get();
 
     RDBStore snapshotDbStore =
         (RDBStore) omSnapshot.getMetadataManager().getStore();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 05d6e772fc..982e04df04 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -89,7 +89,6 @@ import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTrans
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ExpiredMultipartUploadsBucket;
@@ -109,7 +108,6 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_L
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
@@ -1555,7 +1553,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
           OmBucketInfo bucketInfo = getBucketTable().get(bucketKey);
 
           // Get the latest snapshot in snapshot path.
-          try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
+          try (ReferenceCounted<OmSnapshot>
               rcLatestSnapshot = getLatestActiveSnapshot(
                   keySplit[1], keySplit[2], omSnapshotManager)) {
 
@@ -1573,13 +1571,12 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
 
               if (rcLatestSnapshot != null) {
                 Table<String, OmKeyInfo> prevKeyTable =
-                    ((OmSnapshot) rcLatestSnapshot.get())
+                    rcLatestSnapshot.get()
                         .getMetadataManager()
                         .getKeyTable(bucketInfo.getBucketLayout());
 
                 Table<String, RepeatedOmKeyInfo> prevDeletedTable =
-                    ((OmSnapshot) rcLatestSnapshot.get())
-                        .getMetadataManager().getDeletedTable();
+                    
rcLatestSnapshot.get().getMetadataManager().getDeletedTable();
                 String prevKeyTableDBKey = getSnapshotRenamedTable()
                     .get(dbRenameKey);
                 String prevDelTableDBKey = getOzoneKey(info.getVolumeName(),
@@ -1665,8 +1662,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
   /**
    * Get the latest OmSnapshot for a snapshot path.
    */
-  public ReferenceCounted<
-      IOmMetadataReader, SnapshotCache> getLatestActiveSnapshot(
+  public ReferenceCounted<OmSnapshot> getLatestActiveSnapshot(
           String volumeName, String bucketName,
           OmSnapshotManager snapshotManager)
       throws IOException {
@@ -1700,13 +1696,12 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
       }
     }
 
-    Optional<ReferenceCounted<IOmMetadataReader, SnapshotCache>> rcOmSnapshot =
+    Optional<ReferenceCounted<OmSnapshot>> rcOmSnapshot =
         snapshotInfo.isPresent() ?
             Optional.ofNullable(
-                snapshotManager.checkForSnapshot(volumeName,
+                snapshotManager.getSnapshot(volumeName,
                     bucketName,
-                    getSnapshotPrefix(snapshotInfo.get().getName()),
-                    true)
+                    snapshotInfo.get().getName())
             ) :
             Optional.empty();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 2dab56ede6..e22d6b3097 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -271,10 +271,10 @@ public final class OmSnapshotManager implements 
AutoCloseable {
     };
 
     // Init snapshot cache
-    this.snapshotCache = new SnapshotCache(this, loader, softCacheSize);
+    this.snapshotCache = new SnapshotCache(loader, softCacheSize);
 
     this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
-        ozoneManager, snapshotCache, snapDiffJobCf, snapDiffReportCf,
+        ozoneManager, snapDiffJobCf, snapDiffReportCf,
         columnFamilyOptions, codecRegistry);
 
     diffCleanupServiceInterval = ozoneManager.getConfiguration()
@@ -397,11 +397,32 @@ public final class OmSnapshotManager implements 
AutoCloseable {
   }
 
   /**
-   * Get snapshot instance LRU cache.
-   * @return LoadingCache
+   * Get snapshot instance LRU cache size.
+   * @return cache size.
    */
-  public SnapshotCache getSnapshotCache() {
-    return snapshotCache;
+  @VisibleForTesting
+  public int getSnapshotCacheSize() {
+    return snapshotCache == null ? 0 : snapshotCache.size();
+  }
+
+  /**
+   * Immediately invalidate all entries and close their DB instances in cache.
+   */
+  public void invalidateCache() {
+    if (snapshotCache != null) {
+      snapshotCache.invalidateAll();
+    }
+  }
+
+  /**
+   * Immediately invalidate an entry.
+   *
+   * @param key DB snapshot table key
+   */
+  public void invalidateCacheEntry(String key) throws IOException {
+    if (snapshotCache != null) {
+      snapshotCache.invalidate(key);
+    }
   }
 
   /**
@@ -590,11 +611,11 @@ public final class OmSnapshotManager implements 
AutoCloseable {
   }
 
   // Get OmSnapshot if the keyName has ".snapshot" key indicator
-  public ReferenceCounted<IOmMetadataReader, SnapshotCache> checkForSnapshot(
+  @SuppressWarnings("unchecked")
+  public ReferenceCounted<IOmMetadataReader> getActiveFsMetadataOrSnapshot(
       String volumeName,
       String bucketName,
-      String keyName,
-      boolean skipActiveCheck) throws IOException {
+      String keyName) throws IOException {
     if (keyName == null || !ozoneManager.isFilesystemSnapshotEnabled()) {
       return ozoneManager.getOmMetadataReader();
     }
@@ -603,31 +624,58 @@ public final class OmSnapshotManager implements 
AutoCloseable {
     String[] keyParts = keyName.split(OM_KEY_PREFIX);
     if (isSnapshotKey(keyParts)) {
       String snapshotName = keyParts[1];
-      if (snapshotName == null || snapshotName.isEmpty()) {
-        // don't allow snapshot indicator without snapshot name
-        throw new OMException(INVALID_KEY_NAME);
-      }
-      String snapshotTableKey = SnapshotInfo.getTableKey(volumeName,
-          bucketName, snapshotName);
-
-      // Block FS API reads when snapshot is not active.
-      if (!skipActiveCheck) {
-        checkSnapshotActive(ozoneManager, snapshotTableKey);
-      }
 
-      // Warn if actual cache size exceeds the soft limit already.
-      if (snapshotCache.size() > softCacheSize) {
-        LOG.warn("Snapshot cache size ({}) exceeds configured soft-limit 
({}).",
-            snapshotCache.size(), softCacheSize);
-      }
-
-      // retrieve the snapshot from the cache
-      return snapshotCache.get(snapshotTableKey, skipActiveCheck);
+      return (ReferenceCounted<IOmMetadataReader>) (ReferenceCounted<?>)
+          getActiveSnapshot(volumeName, bucketName, snapshotName);
     } else {
       return ozoneManager.getOmMetadataReader();
     }
   }
 
+  public ReferenceCounted<OmSnapshot> getActiveSnapshot(
+      String volumeName,
+      String bucketName,
+      String snapshotName) throws IOException {
+    return getSnapshot(volumeName, bucketName, snapshotName, false);
+  }
+
+  public ReferenceCounted<OmSnapshot> getSnapshot(
+      String volumeName,
+      String bucketName,
+      String snapshotName) throws IOException {
+    return getSnapshot(volumeName, bucketName, snapshotName, true);
+  }
+
+  private ReferenceCounted<OmSnapshot> getSnapshot(
+      String volumeName,
+      String bucketName,
+      String snapshotName,
+      boolean skipActiveCheck) throws IOException {
+
+    if (snapshotName == null || snapshotName.isEmpty()) {
+      // don't allow snapshot indicator without snapshot name
+      throw new OMException(INVALID_KEY_NAME);
+    }
+
+    String snapshotTableKey = SnapshotInfo.getTableKey(volumeName,
+        bucketName, snapshotName);
+
+    return getSnapshot(snapshotTableKey, skipActiveCheck);
+  }
+
+  private ReferenceCounted<OmSnapshot> getSnapshot(
+      String snapshotTableKey,
+      boolean skipActiveCheck) throws IOException {
+
+    // Block FS API reads when snapshot is not active.
+    if (!skipActiveCheck) {
+      checkSnapshotActive(ozoneManager, snapshotTableKey);
+    }
+
+    // retrieve the snapshot from the cache
+    return snapshotCache.get(snapshotTableKey);
+  }
+
   /**
    * Returns true if the snapshot is in given status.
    * @param key DB snapshot table key
@@ -894,9 +942,9 @@ public final class OmSnapshotManager implements 
AutoCloseable {
     if (snapshotDiffManager != null) {
       snapshotDiffManager.close();
     }
-    if (snapshotCache != null) {
-      snapshotCache.invalidateAll();
-    }
+
+    invalidateCache();
+
     if (snapshotDiffCleanupService != null) {
       snapshotDiffCleanupService.shutdown();
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 410e08a4db..c4e9eb2ed3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -100,7 +100,6 @@ import org.apache.hadoop.ozone.om.s3.S3SecretStoreProvider;
 import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
 import org.apache.hadoop.ozone.security.acl.OzoneAuthorizerFactory;
 import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse;
@@ -485,7 +484,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private OmMetadataReader omMetadataReader;
   // Wrap active DB metadata reader in ReferenceCounted once to avoid
   // instance creation every single time.
-  private ReferenceCounted<IOmMetadataReader, SnapshotCache> 
rcOmMetadataReader;
+  private ReferenceCounted<IOmMetadataReader> rcOmMetadataReader;
   private OmSnapshotManager omSnapshotManager;
 
   @SuppressWarnings("methodlength")
@@ -2580,8 +2579,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     return allowListAllVolumes;
   }
 
-  public ReferenceCounted<
-      IOmMetadataReader, SnapshotCache> getOmMetadataReader() {
+  public ReferenceCounted<IOmMetadataReader> getOmMetadataReader() {
     return rcOmMetadataReader;
   }
 
@@ -2851,8 +2849,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
-        rcReader = getReader(args)) {
+    try (ReferenceCounted<IOmMetadataReader> rcReader = getReader(args)) {
       return rcReader.get().lookupKey(args);
     }
   }
@@ -2864,8 +2861,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public KeyInfoWithVolumeContext getKeyInfo(final OmKeyArgs args,
                                              boolean assumeS3Context)
       throws IOException {
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcReader =
-        getReader(args)) {
+    try (ReferenceCounted<IOmMetadataReader> rcReader = getReader(args)) {
       return rcReader.get().getKeyInfo(args, assumeS3Context);
     }
   }
@@ -2877,7 +2873,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public ListKeysResult listKeys(String volumeName, String bucketName,
                                  String startKey, String keyPrefix, int 
maxKeys)
       throws IOException {
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcReader =
+    try (ReferenceCounted<IOmMetadataReader> rcReader =
              getReader(volumeName, bucketName, keyPrefix)) {
       return rcReader.get().listKeys(
           volumeName, bucketName, startKey, keyPrefix, maxKeys);
@@ -3629,7 +3625,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcReader =
+    try (ReferenceCounted<IOmMetadataReader> rcReader =
         getReader(args)) {
       return rcReader.get().getFileStatus(args);
     }
@@ -3640,7 +3636,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcReader =
+    try (ReferenceCounted<IOmMetadataReader> rcReader =
         getReader(args)) {
       return rcReader.get().lookupFile(args);
     }
@@ -3659,7 +3655,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
       String startKey, long numEntries, boolean allowPartialPrefixes)
       throws IOException {
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcReader =
+    try (ReferenceCounted<IOmMetadataReader> rcReader =
         getReader(args)) {
       return rcReader.get().listStatus(
           args, recursive, startKey, numEntries, allowPartialPrefixes);
@@ -3683,7 +3679,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    */
   @Override
   public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcReader =
+    try (ReferenceCounted<IOmMetadataReader> rcReader =
         getReader(obj)) {
       return rcReader.get().getAcl(obj);
     }
@@ -3751,7 +3747,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       keyManager.stop();
       stopSecretManager();
       stopTrashEmptier();
-      omSnapshotManager.getSnapshotCache().invalidateAll();
+      omSnapshotManager.invalidateCache();
       // Pause the State Machine so that no new transactions can be applied.
       // This action also clears the OM Double Buffer so that if there are any
       // pending transactions in the buffer, they are discarded.
@@ -4705,12 +4701,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    * @param keyArgs OmKeyArgs
    * @return ReferenceCounted<IOmMetadataReader, SnapshotCache>
    */
-  private ReferenceCounted<
-      IOmMetadataReader, SnapshotCache> getReader(OmKeyArgs keyArgs)
+  private ReferenceCounted<IOmMetadataReader> getReader(OmKeyArgs keyArgs)
       throws IOException {
-    return omSnapshotManager.checkForSnapshot(
-        keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName(),
-        false);
+    return omSnapshotManager.getActiveFsMetadataOrSnapshot(
+        keyArgs.getVolumeName(), keyArgs.getBucketName(), 
keyArgs.getKeyName());
   }
 
   /**
@@ -4722,11 +4716,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    * @param key key path
    * @return ReferenceCounted<IOmMetadataReader, SnapshotCache>
    */
-  private ReferenceCounted<
-      IOmMetadataReader, SnapshotCache> getReader(
+  private ReferenceCounted<IOmMetadataReader> getReader(
           String volumeName, String bucketName, String key) throws IOException 
{
-    return omSnapshotManager.checkForSnapshot(
-        volumeName, bucketName, key, false);
+    return omSnapshotManager.getActiveFsMetadataOrSnapshot(
+        volumeName, bucketName, key);
   }
 
   /**
@@ -4736,14 +4729,12 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    * @param ozoneObj OzoneObj
    * @return ReferenceCounted<IOmMetadataReader, SnapshotCache>
    */
-  private ReferenceCounted<
-      IOmMetadataReader, SnapshotCache> getReader(OzoneObj ozoneObj)
+  private ReferenceCounted<IOmMetadataReader> getReader(OzoneObj ozoneObj)
       throws IOException {
-    return omSnapshotManager.checkForSnapshot(
+    return omSnapshotManager.getActiveFsMetadataOrSnapshot(
         ozoneObj.getVolumeName(),
         ozoneObj.getBucketName(),
-        ozoneObj.getKeyName(),
-        false);
+        ozoneObj.getKeyName());
   }
 
   @SuppressWarnings("parameternumber")
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index cae9bc4b3f..20d0ab0e53 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -147,10 +146,9 @@ public class SstFilteringService extends BackgroundService
     @Override
     public BackgroundTaskResult call() throws Exception {
 
-      Optional<SnapshotCache> snapshotCache = Optional.ofNullable(ozoneManager)
-          .map(OzoneManager::getOmSnapshotManager)
-          .map(OmSnapshotManager::getSnapshotCache);
-      if (!snapshotCache.isPresent()) {
+      Optional<OmSnapshotManager> snapshotManager = 
Optional.ofNullable(ozoneManager)
+          .map(OzoneManager::getOmSnapshotManager);
+      if (!snapshotManager.isPresent()) {
         return BackgroundTaskResult.EmptyTaskResult.newResult();
       }
       Table<String, SnapshotInfo> snapshotInfoTable =
@@ -183,10 +181,12 @@ public class SstFilteringService extends BackgroundService
                     snapshotInfo.getBucketName());
 
             try (
-                ReferenceCounted<IOmMetadataReader, SnapshotCache>
-                    snapshotMetadataReader = snapshotCache.get().get(
-                        snapshotInfo.getTableKey())) {
-              OmSnapshot omSnapshot = (OmSnapshot) 
snapshotMetadataReader.get();
+                ReferenceCounted<OmSnapshot> snapshotMetadataReader =
+                    snapshotManager.get().getActiveSnapshot(
+                        snapshotInfo.getVolumeName(),
+                        snapshotInfo.getBucketName(),
+                        snapshotInfo.getName())) {
+              OmSnapshot omSnapshot = snapshotMetadataReader.get();
               RDBStore rdbStore = (RDBStore) omSnapshot.getMetadataManager()
                   .getStore();
               RocksDatabase db = rdbStore.getDb();
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index 2698d12f9f..d0dd2caa54 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -42,7 +42,6 @@ import 
org.apache.hadoop.ozone.om.protocolPB.grpc.GrpcClientConstants;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LayoutVersion;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -296,7 +295,7 @@ public abstract class OMClientRequest implements 
RequestAuditor {
         contextBuilder.setOwnerName(bucketOwner);
       }
 
-      try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcMetadataReader 
=
+      try (ReferenceCounted<IOmMetadataReader> rcMetadataReader =
           ozoneManager.getOmMetadataReader()) {
         OmMetadataReader omMetadataReader =
             (OmMetadataReader) rcMetadataReader.get();
@@ -362,7 +361,7 @@ public abstract class OMClientRequest implements 
RequestAuditor {
       String bucketOwner)
       throws IOException {
 
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcMetadataReader =
+    try (ReferenceCounted<IOmMetadataReader> rcMetadataReader =
         ozoneManager.getOmMetadataReader()) {
       OzoneAclUtils.checkAllAcls((OmMetadataReader) rcMetadataReader.get(),
           resType, storeType, aclType,
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
index b7dba82602..1533ceebe3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotPurgeRequest.java
@@ -91,8 +91,7 @@ public class OMSnapshotPurgeRequest extends OMClientRequest {
             trxnLogIndex, updatedSnapInfos);
         updateSnapshotChainAndCache(omMetadataManager, fromSnapshot,
             trxnLogIndex, updatedPathPreviousAndGlobalSnapshots);
-        ozoneManager.getOmSnapshotManager().getSnapshotCache()
-            .invalidate(snapTableKey);
+        ozoneManager.getOmSnapshotManager().invalidateCacheEntry(snapTableKey);
       }
 
       omClientResponse = new OMSnapshotPurgeResponse(omResponse.build(),
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
index bb9562dff2..848c5c3089 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMDirectoriesPurgeResponseWithFSO.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -36,7 +35,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.key.OMDirectoriesPurgeRequestWithFSO;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.slf4j.Logger;
@@ -50,7 +48,6 @@ import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_DIR_TABLE
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 
 /**
  * Response for {@link OMDirectoriesPurgeRequestWithFSO} request.
@@ -86,13 +83,12 @@ public class OMDirectoriesPurgeResponseWithFSO extends 
OmKeyResponse {
           ((OmMetadataManagerImpl) metadataManager)
               .getOzoneManager().getOmSnapshotManager();
 
-      try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
-          rcFromSnapshotInfo = omSnapshotManager.checkForSnapshot(
+      try (ReferenceCounted<OmSnapshot>
+          rcFromSnapshotInfo = omSnapshotManager.getSnapshot(
               fromSnapshotInfo.getVolumeName(),
               fromSnapshotInfo.getBucketName(),
-              getSnapshotPrefix(fromSnapshotInfo.getName()),
-              true)) {
-        OmSnapshot fromSnapshot = (OmSnapshot) rcFromSnapshotInfo.get();
+              fromSnapshotInfo.getName())) {
+        OmSnapshot fromSnapshot = rcFromSnapshotInfo.get();
         DBStore fromSnapshotStore = fromSnapshot.getMetadataManager()
             .getStore();
         // Init Batch Operation for snapshot db.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
index 4e9ee75633..b16ba95d78 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/key/OMKeyPurgeResponse.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.ozone.om.response.key;
 
 import org.apache.hadoop.hdds.utils.db.DBStore;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -29,7 +28,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.request.key.OMKeyPurgeRequest;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
@@ -41,7 +39,6 @@ import java.util.List;
 import jakarta.annotation.Nonnull;
 
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DELETED_TABLE;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotMoveDeletedKeysResponse.createRepeatedOmKeyInfo;
 
 /**
@@ -81,14 +78,13 @@ public class OMKeyPurgeResponse extends OmKeyResponse {
           ((OmMetadataManagerImpl) omMetadataManager)
               .getOzoneManager().getOmSnapshotManager();
 
-      try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmFromSnapshot 
=
-          omSnapshotManager.checkForSnapshot(
+      try (ReferenceCounted<OmSnapshot> rcOmFromSnapshot =
+          omSnapshotManager.getSnapshot(
               fromSnapshot.getVolumeName(),
               fromSnapshot.getBucketName(),
-              getSnapshotPrefix(fromSnapshot.getName()),
-              true)) {
+              fromSnapshot.getName())) {
 
-        OmSnapshot fromOmSnapshot = (OmSnapshot) rcOmFromSnapshot.get();
+        OmSnapshot fromOmSnapshot = rcOmFromSnapshot.get();
         DBStore fromSnapshotStore =
             fromOmSnapshot.getMetadataManager().getStore();
         // Init Batch Operation for snapshot db.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
index 1255e4ae7f..3726faacfd 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotMoveDeletedKeysResponse.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.om.response.snapshot;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.RDBStore;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -32,7 +31,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -42,7 +40,6 @@ import java.io.IOException;
 import java.util.List;
 
 import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 
 /**
  * Response for OMSnapshotMoveDeletedKeysRequest.
@@ -93,24 +90,22 @@ public class OMSnapshotMoveDeletedKeysResponse extends 
OMClientResponse {
         ((OmMetadataManagerImpl) omMetadataManager)
             .getOzoneManager().getOmSnapshotManager();
 
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmFromSnapshot =
-        omSnapshotManager.checkForSnapshot(
+    try (ReferenceCounted<OmSnapshot> rcOmFromSnapshot =
+        omSnapshotManager.getSnapshot(
             fromSnapshot.getVolumeName(),
             fromSnapshot.getBucketName(),
-            getSnapshotPrefix(fromSnapshot.getName()),
-            true)) {
+            fromSnapshot.getName())) {
 
-      OmSnapshot fromOmSnapshot = (OmSnapshot) rcOmFromSnapshot.get();
+      OmSnapshot fromOmSnapshot = rcOmFromSnapshot.get();
 
       if (nextSnapshot != null) {
-        try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
-            rcOmNextSnapshot = omSnapshotManager.checkForSnapshot(
+        try (ReferenceCounted<OmSnapshot>
+            rcOmNextSnapshot = omSnapshotManager.getSnapshot(
                 nextSnapshot.getVolumeName(),
                 nextSnapshot.getBucketName(),
-                getSnapshotPrefix(nextSnapshot.getName()),
-                true)) {
+                nextSnapshot.getName())) {
 
-          OmSnapshot nextOmSnapshot = (OmSnapshot) rcOmNextSnapshot.get();
+          OmSnapshot nextOmSnapshot = rcOmNextSnapshot.get();
           RDBStore nextSnapshotStore =
               (RDBStore) nextOmSnapshot.getMetadataManager().getStore();
           // Init Batch Operation for snapshot db.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 9643fa8296..d7205b2c1b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -35,7 +34,6 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
@@ -238,7 +236,7 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
       OmMetadataManagerImpl metadataManager = (OmMetadataManagerImpl)
           getOzoneManager().getMetadataManager();
 
-      try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcLatestSnapshot 
=
+      try (ReferenceCounted<OmSnapshot> rcLatestSnapshot =
           metadataManager.getLatestActiveSnapshot(
               deletedDirInfo.getVolumeName(),
               deletedDirInfo.getBucketName(),
@@ -249,11 +247,9 @@ public class DirectoryDeletingService extends 
AbstractKeyDeletingService {
               .getRenameKey(deletedDirInfo.getVolumeName(),
                   deletedDirInfo.getBucketName(), 
deletedDirInfo.getObjectID());
           Table<String, OmDirectoryInfo> prevDirTable =
-              ((OmSnapshot) rcLatestSnapshot.get())
-                  .getMetadataManager().getDirectoryTable();
+              rcLatestSnapshot.get().getMetadataManager().getDirectoryTable();
           Table<String, OmKeyInfo> prevDeletedDirTable =
-              ((OmSnapshot) rcLatestSnapshot.get())
-                  .getMetadataManager().getDeletedDirTable();
+              rcLatestSnapshot.get().getMetadataManager().getDeletedDirTable();
           OmKeyInfo prevDeletedDirInfo = prevDeletedDirTable.get(key);
           if (prevDeletedDirInfo != null) {
             return true;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
index e89608e82d..83991668c9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -46,7 +45,6 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
@@ -58,7 +56,6 @@ import 
org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
@@ -264,13 +261,12 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
             continue;
           }
 
-          try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
-              rcCurrOmSnapshot = omSnapshotManager.checkForSnapshot(
+          try (ReferenceCounted<OmSnapshot>
+              rcCurrOmSnapshot = omSnapshotManager.getSnapshot(
                   currSnapInfo.getVolumeName(),
                   currSnapInfo.getBucketName(),
-                  getSnapshotPrefix(currSnapInfo.getName()),
-                  true)) {
-            OmSnapshot currOmSnapshot = (OmSnapshot) rcCurrOmSnapshot.get();
+                  currSnapInfo.getName())) {
+            OmSnapshot currOmSnapshot = rcCurrOmSnapshot.get();
 
             Table<String, RepeatedOmKeyInfo> snapDeletedTable =
                 currOmSnapshot.getMetadataManager().getDeletedTable();
@@ -304,18 +300,16 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
 
             Table<String, OmKeyInfo> previousKeyTable = null;
             Table<String, String> prevRenamedTable = null;
-            ReferenceCounted<IOmMetadataReader, SnapshotCache>
-                rcPrevOmSnapshot = null;
+            ReferenceCounted<OmSnapshot> rcPrevOmSnapshot = null;
 
             // Split RepeatedOmKeyInfo and update current snapshot
             // deletedKeyTable and next snapshot deletedKeyTable.
             if (previousSnapshot != null) {
-              rcPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
+              rcPrevOmSnapshot = omSnapshotManager.getSnapshot(
                   previousSnapshot.getVolumeName(),
                   previousSnapshot.getBucketName(),
-                  getSnapshotPrefix(previousSnapshot.getName()), true);
-              OmSnapshot omPreviousSnapshot = (OmSnapshot)
-                  rcPrevOmSnapshot.get();
+                  previousSnapshot.getName());
+              OmSnapshot omPreviousSnapshot = rcPrevOmSnapshot.get();
 
               previousKeyTable = omPreviousSnapshot.getMetadataManager()
                   .getKeyTable(bucketInfo.getBucketLayout());
@@ -324,15 +318,13 @@ public class KeyDeletingService extends 
AbstractKeyDeletingService {
             }
 
             Table<String, OmKeyInfo> previousToPrevKeyTable = null;
-            ReferenceCounted<IOmMetadataReader, SnapshotCache>
-                rcPrevToPrevOmSnapshot = null;
+            ReferenceCounted<OmSnapshot> rcPrevToPrevOmSnapshot = null;
             if (previousToPrevSnapshot != null) {
-              rcPrevToPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
+              rcPrevToPrevOmSnapshot = omSnapshotManager.getSnapshot(
                   previousToPrevSnapshot.getVolumeName(),
                   previousToPrevSnapshot.getBucketName(),
-                  getSnapshotPrefix(previousToPrevSnapshot.getName()), true);
-              OmSnapshot omPreviousToPrevSnapshot = (OmSnapshot)
-                  rcPrevToPrevOmSnapshot.get();
+                  previousToPrevSnapshot.getName());
+              OmSnapshot omPreviousToPrevSnapshot = 
rcPrevToPrevOmSnapshot.get();
 
               previousToPrevKeyTable = omPreviousToPrevSnapshot
                   .getMetadataManager()
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
index cc275b4e8e..29b2b31953 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.KeyManagerImpl;
@@ -52,7 +51,6 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveDeletedKeysRequest;
@@ -78,7 +76,6 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETIN
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DELETING_LIMIT_PER_TASK_DEFAULT;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 
 /**
  * Background Service to clean-up deleted snapshot and reclaim space.
@@ -143,10 +140,8 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
 
       getRunCount().incrementAndGet();
 
-      ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot =
-          null;
-      ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmPreviousSnapshot =
-          null;
+      ReferenceCounted<OmSnapshot> rcOmSnapshot = null;
+      ReferenceCounted<OmSnapshot> rcOmPreviousSnapshot = null;
 
       Table<String, SnapshotInfo> snapshotInfoTable =
           ozoneManager.getMetadataManager().getSnapshotInfoTable();
@@ -169,12 +164,11 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
 
           // Note: Can refactor this to use try-with-resources.
           // Handling RC decrements manually for now to minimize conflicts.
-          rcOmSnapshot = omSnapshotManager.checkForSnapshot(
+          rcOmSnapshot = omSnapshotManager.getSnapshot(
               snapInfo.getVolumeName(),
               snapInfo.getBucketName(),
-              getSnapshotPrefix(snapInfo.getName()),
-              true);
-          OmSnapshot omSnapshot = (OmSnapshot) rcOmSnapshot.get();
+              snapInfo.getName());
+          OmSnapshot omSnapshot = rcOmSnapshot.get();
 
           Table<String, RepeatedOmKeyInfo> snapshotDeletedTable =
               omSnapshot.getMetadataManager().getDeletedTable();
@@ -226,12 +220,11 @@ public class SnapshotDeletingService extends 
AbstractKeyDeletingService {
           // Split RepeatedOmKeyInfo and update current snapshot 
deletedKeyTable
           // and next snapshot deletedKeyTable.
           if (previousSnapshot != null) {
-            rcOmPreviousSnapshot = omSnapshotManager.checkForSnapshot(
+            rcOmPreviousSnapshot = omSnapshotManager.getSnapshot(
                 previousSnapshot.getVolumeName(),
                 previousSnapshot.getBucketName(),
-                getSnapshotPrefix(previousSnapshot.getName()),
-                true);
-            omPreviousSnapshot = (OmSnapshot) rcOmPreviousSnapshot.get();
+                previousSnapshot.getName());
+            omPreviousSnapshot = rcOmPreviousSnapshot.get();
 
             previousKeyTable = omPreviousSnapshot
                 
.getMetadataManager().getKeyTable(bucketInfo.getBucketLayout());
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
index 9a60f63038..fe0f6e111e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDirectoryCleaningService.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.common.BlockGroup;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -44,7 +43,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetSnapshotPropertyRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotSize;
@@ -63,7 +61,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.getDirectoryInfo;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getOzonePathKeyForFso;
@@ -158,10 +155,8 @@ public class SnapshotDirectoryCleaningService
             continue;
           }
 
-          ReferenceCounted<IOmMetadataReader, SnapshotCache>
-              rcPrevOmSnapshot = null;
-          ReferenceCounted<IOmMetadataReader, SnapshotCache>
-              rcPrevToPrevOmSnapshot = null;
+          ReferenceCounted<OmSnapshot> rcPrevOmSnapshot = null;
+          ReferenceCounted<OmSnapshot> rcPrevToPrevOmSnapshot = null;
           try {
             long volumeId = metadataManager
                 .getVolumeId(currSnapInfo.getVolumeName());
@@ -189,12 +184,11 @@ public class SnapshotDirectoryCleaningService
             Table<String, String> prevRenamedTable = null;
 
             if (previousSnapshot != null) {
-              rcPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
+              rcPrevOmSnapshot = omSnapshotManager.getActiveSnapshot(
                   previousSnapshot.getVolumeName(),
                   previousSnapshot.getBucketName(),
-                  getSnapshotPrefix(previousSnapshot.getName()), false);
-              OmSnapshot omPreviousSnapshot = (OmSnapshot)
-                  rcPrevOmSnapshot.get();
+                  previousSnapshot.getName());
+              OmSnapshot omPreviousSnapshot = rcPrevOmSnapshot.get();
 
               previousKeyTable = omPreviousSnapshot.getMetadataManager()
                   .getKeyTable(bucketInfo.getBucketLayout());
@@ -206,12 +200,11 @@ public class SnapshotDirectoryCleaningService
 
             Table<String, OmKeyInfo> previousToPrevKeyTable = null;
             if (previousToPrevSnapshot != null) {
-              rcPrevToPrevOmSnapshot = omSnapshotManager.checkForSnapshot(
+              rcPrevToPrevOmSnapshot = omSnapshotManager.getActiveSnapshot(
                   previousToPrevSnapshot.getVolumeName(),
                   previousToPrevSnapshot.getBucketName(),
-                  getSnapshotPrefix(previousToPrevSnapshot.getName()), false);
-              OmSnapshot omPreviousToPrevSnapshot = (OmSnapshot)
-                  rcPrevToPrevOmSnapshot.get();
+                  previousToPrevSnapshot.getName());
+              OmSnapshot omPreviousToPrevSnapshot = 
rcPrevToPrevOmSnapshot.get();
 
               previousToPrevKeyTable = omPreviousToPrevSnapshot
                   .getMetadataManager()
@@ -220,14 +213,13 @@ public class SnapshotDirectoryCleaningService
 
             String dbBucketKeyForDir = getOzonePathKeyForFso(metadataManager,
                 currSnapInfo.getVolumeName(), currSnapInfo.getBucketName());
-            try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
-                     rcCurrOmSnapshot = omSnapshotManager.checkForSnapshot(
+            try (ReferenceCounted<OmSnapshot>
+                     rcCurrOmSnapshot = omSnapshotManager.getActiveSnapshot(
                 currSnapInfo.getVolumeName(),
                 currSnapInfo.getBucketName(),
-                getSnapshotPrefix(currSnapInfo.getName()),
-                false)) {
+                currSnapInfo.getName())) {
 
-              OmSnapshot currOmSnapshot = (OmSnapshot) rcCurrOmSnapshot.get();
+              OmSnapshot currOmSnapshot = rcCurrOmSnapshot.get();
               Table<String, OmKeyInfo> snapDeletedDirTable =
                   currOmSnapshot.getMetadataManager().getDeletedDirTable();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java
index 808a5ed4c1..0a9d47fc86 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/ReferenceCounted.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * Add reference counter to an object instance.
  */
-public class ReferenceCounted<T, U>
+public class ReferenceCounted<T>
     implements AutoCloseable {
 
   /**
@@ -51,10 +51,10 @@ public class ReferenceCounted<T, U>
   /**
    * Parent instance whose callback will be triggered upon this RC closure.
    */
-  private final U parentWithCallback;
+  private final Object parentWithCallback;
 
   public ReferenceCounted(T obj, boolean disableCounter,
-      U parentWithCallback) {
+      Object parentWithCallback) {
     // A param to allow disabling ref counting to reduce active DB
     //  access penalties due to AtomicLong operations.
     this.obj = obj;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index 226acbb7dd..e776968fca 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -19,9 +19,7 @@ package org.apache.hadoop.ozone.om.snapshot;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheLoader;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OmSnapshot;
-import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +30,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
-import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
 
 /**
  * Thread-safe custom unbounded LRU cache to manage open snapshot DB instances.
@@ -45,26 +42,23 @@ public class SnapshotCache {
   // Key:   DB snapshot table key
   // Value: OmSnapshot instance, each holds a DB instance handle inside
   // TODO: [SNAPSHOT] Consider wrapping SoftReference<> around 
IOmMetadataReader
-  private final ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader, 
SnapshotCache>> dbMap;
+  private final ConcurrentHashMap<String, ReferenceCounted<OmSnapshot>> dbMap;
 
-  private final OmSnapshotManager omSnapshotManager;
   private final CacheLoader<String, OmSnapshot> cacheLoader;
   // Soft-limit of the total number of snapshot DB instances allowed to be
   // opened on the OM.
   private final int cacheSizeLimit;
 
   public SnapshotCache(
-      OmSnapshotManager omSnapshotManager,
       CacheLoader<String, OmSnapshot> cacheLoader,
       int cacheSizeLimit) {
     this.dbMap = new ConcurrentHashMap<>();
-    this.omSnapshotManager = omSnapshotManager;
     this.cacheLoader = cacheLoader;
     this.cacheSizeLimit = cacheSizeLimit;
   }
 
   @VisibleForTesting
-  ConcurrentHashMap<String, ReferenceCounted<IOmMetadataReader, 
SnapshotCache>> getDbMap() {
+  ConcurrentHashMap<String, ReferenceCounted<OmSnapshot>> getDbMap() {
     return dbMap;
   }
 
@@ -85,7 +79,7 @@ public class SnapshotCache {
         LOG.warn("Key: '{}' does not exist in cache.", k);
       } else {
         try {
-          ((OmSnapshot) v.get()).close();
+          v.get().close();
         } catch (IOException e) {
           throw new IllegalStateException("Failed to close snapshot: " + key, 
e);
         }
@@ -98,12 +92,12 @@ public class SnapshotCache {
    * Immediately invalidate all entries and close their DB instances in cache.
    */
   public void invalidateAll() {
-    Iterator<Map.Entry<String, ReferenceCounted<IOmMetadataReader, 
SnapshotCache>>>
+    Iterator<Map.Entry<String, ReferenceCounted<OmSnapshot>>>
         it = dbMap.entrySet().iterator();
 
     while (it.hasNext()) {
-      Map.Entry<String, ReferenceCounted<IOmMetadataReader, SnapshotCache>> 
entry = it.next();
-      OmSnapshot omSnapshot = (OmSnapshot) entry.getValue().get();
+      Map.Entry<String, ReferenceCounted<OmSnapshot>> entry = it.next();
+      OmSnapshot omSnapshot = entry.getValue().get();
       try {
         // TODO: If wrapped with SoftReference<>, omSnapshot could be null?
         omSnapshot.close();
@@ -125,21 +119,22 @@ public class SnapshotCache {
     GARBAGE_COLLECTION_WRITE
   }
 
-  public ReferenceCounted<IOmMetadataReader, SnapshotCache> get(String key) 
throws IOException {
-    return get(key, false);
-  }
-
   /**
    * Get or load OmSnapshot. Shall be close()d after use.
    * TODO: [SNAPSHOT] Can add reason enum to param list later.
    * @param key snapshot table key
    * @return an OmSnapshot instance, or null on error
    */
-  public ReferenceCounted<IOmMetadataReader, SnapshotCache> get(String key, 
boolean skipActiveCheck)
+  public ReferenceCounted<OmSnapshot> get(String key)
       throws IOException {
+    // Warn if actual cache size exceeds the soft limit already.
+    if (size() > cacheSizeLimit) {
+      LOG.warn("Snapshot cache size ({}) exceeds configured soft-limit ({}).",
+          size(), cacheSizeLimit);
+    }
     // Atomic operation to initialize the OmSnapshot instance (once) if the key
     // does not exist, and increment the reference count on the instance.
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot =
+    ReferenceCounted<OmSnapshot> rcOmSnapshot =
         dbMap.compute(key, (k, v) -> {
           if (v == null) {
             LOG.info("Loading snapshot. Table key: {}", k);
@@ -173,17 +168,6 @@ public class SnapshotCache {
           OMException.ResultCodes.FILE_NOT_FOUND);
     }
 
-    // If the snapshot is already loaded in cache, the check inside the loader
-    // above is ignored. But we would still want to reject all get()s except
-    // when called from SDT (and some) if the snapshot is not active anymore.
-    if (!skipActiveCheck && !omSnapshotManager.isSnapshotStatus(key, 
SNAPSHOT_ACTIVE)) {
-      // Ref count was incremented. Need to decrement on exception here.
-      rcOmSnapshot.decrementRefCount();
-      throw new OMException("Unable to load snapshot. " +
-          "Snapshot with table key '" + key + "' is no longer active",
-          FILE_NOT_FOUND);
-    }
-
     // Check if any entries can be cleaned up.
     // At this point, cache size might temporarily exceed cacheSizeLimit
     // even if there are entries that can be evicted, which is fine since it
@@ -237,7 +221,7 @@ public class SnapshotCache {
    * TODO: [SNAPSHOT] Add new ozone debug CLI command to trigger this directly.
    */
   private void cleanupInternal() {
-    for (Map.Entry<String, ReferenceCounted<IOmMetadataReader, SnapshotCache>> 
entry : dbMap.entrySet()) {
+    for (Map.Entry<String, ReferenceCounted<OmSnapshot>> entry : 
dbMap.entrySet()) {
       dbMap.compute(entry.getKey(), (k, v) -> {
         if (v == null) {
           throw new IllegalStateException("Key '" + k + "' does not exist in 
cache. The RocksDB " +
@@ -252,7 +236,7 @@ public class SnapshotCache {
           LOG.debug("Closing Snapshot {}. It is not being referenced 
anymore.", k);
           // Close the instance, which also closes its DB handle.
           try {
-            ((OmSnapshot) v.get()).close();
+            v.get().close();
           } catch (IOException ex) {
             throw new IllegalStateException("Error while closing snapshot DB", 
ex);
           }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index 41e990097e..2a5da96f63 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -36,7 +36,6 @@ import 
org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.ozone.OFSPath;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
@@ -114,7 +113,6 @@ import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_DIS
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_DISABLE_NATIVE_LIBS_DEFAULT;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
-import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.getTableKey;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
@@ -150,7 +148,6 @@ public class SnapshotDiffManager implements AutoCloseable {
   private final ManagedRocksDB db;
   private final RocksDBCheckpointDiffer differ;
   private final OzoneManager ozoneManager;
-  private final SnapshotCache snapshotCache;
   private final CodecRegistry codecRegistry;
   private final ManagedColumnFamilyOptions familyOptions;
   // TODO: [SNAPSHOT] Use different wait time based of job status.
@@ -199,7 +196,6 @@ public class SnapshotDiffManager implements AutoCloseable {
   public SnapshotDiffManager(ManagedRocksDB db,
                              RocksDBCheckpointDiffer differ,
                              OzoneManager ozoneManager,
-                             SnapshotCache snapshotCache,
                              ColumnFamilyHandle snapDiffJobCfh,
                              ColumnFamilyHandle snapDiffReportCfh,
                              ManagedColumnFamilyOptions familyOptions,
@@ -207,7 +203,6 @@ public class SnapshotDiffManager implements AutoCloseable {
     this.db = db;
     this.differ = differ;
     this.ozoneManager = ozoneManager;
-    this.snapshotCache = snapshotCache;
     this.familyOptions = familyOptions;
     this.codecRegistry = codecRegistry;
     this.defaultWaitTime = ozoneManager.getConfiguration().getTimeDuration(
@@ -832,8 +827,8 @@ public class SnapshotDiffManager implements AutoCloseable {
     // job by RocksDBCheckpointDiffer#pruneOlderSnapshotsWithCompactionHistory.
     Path path = Paths.get(sstBackupDirForSnapDiffJobs + "/" + jobId);
 
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcFromSnapshot = null;
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcToSnapshot = null;
+    ReferenceCounted<OmSnapshot> rcFromSnapshot = null;
+    ReferenceCounted<OmSnapshot> rcToSnapshot = null;
 
     try {
       if (!areDiffJobAndSnapshotsActive(volumeName, bucketName,
@@ -841,14 +836,15 @@ public class SnapshotDiffManager implements AutoCloseable 
{
         return;
       }
 
-      String fsKey = getTableKey(volumeName, bucketName, fromSnapshotName);
-      String tsKey = getTableKey(volumeName, bucketName, toSnapshotName);
+      rcFromSnapshot =
+          ozoneManager.getOmSnapshotManager()
+              .getActiveSnapshot(volumeName, bucketName, fromSnapshotName);
+      rcToSnapshot =
+          ozoneManager.getOmSnapshotManager()
+              .getActiveSnapshot(volumeName, bucketName, toSnapshotName);
 
-      rcFromSnapshot = snapshotCache.get(fsKey);
-      rcToSnapshot = snapshotCache.get(tsKey);
-
-      OmSnapshot fromSnapshot = (OmSnapshot) rcFromSnapshot.get();
-      OmSnapshot toSnapshot = (OmSnapshot) rcToSnapshot.get();
+      OmSnapshot fromSnapshot = rcFromSnapshot.get();
+      OmSnapshot toSnapshot = rcToSnapshot.get();
       SnapshotInfo fsInfo = getSnapshotInfo(ozoneManager,
           volumeName, bucketName, fromSnapshotName);
       SnapshotInfo tsInfo = getSnapshotInfo(ozoneManager,
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index e1ae8f57d1..9cc79012dc 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -66,7 +66,6 @@ import static 
org.apache.hadoop.ozone.om.OmMetadataManagerImpl.BUCKET_TABLE;
 import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.VOLUME_TABLE;
 import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
 import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.getINode;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static 
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -176,9 +175,9 @@ class TestOmSnapshotManager {
 
     // retrieve it and setup store mock
     OmSnapshotManager omSnapshotManager = om.getOmSnapshotManager();
-    OmSnapshot firstSnapshot = (OmSnapshot) omSnapshotManager
-        .checkForSnapshot(first.getVolumeName(),
-        first.getBucketName(), getSnapshotPrefix(first.getName()), 
false).get();
+    OmSnapshot firstSnapshot = omSnapshotManager
+        .getActiveSnapshot(first.getVolumeName(), first.getBucketName(), 
first.getName())
+        .get();
     DBStore firstSnapshotStore = mock(DBStore.class);
     HddsWhiteboxTestUtils.setInternalState(
         firstSnapshot.getMetadataManager(), "store", firstSnapshotStore);
@@ -192,13 +191,12 @@ class TestOmSnapshotManager {
 
     // read in second snapshot to evict first
     omSnapshotManager
-        .checkForSnapshot(second.getVolumeName(),
-        second.getBucketName(), getSnapshotPrefix(second.getName()), false);
+        .getActiveSnapshot(second.getVolumeName(), second.getBucketName(), 
second.getName());
 
     // As a workaround, invalidate all cache entries in order to trigger
     // instances close in this test case, since JVM GC most likely would not
     // have triggered and closed the instances yet at this point.
-    omSnapshotManager.getSnapshotCache().invalidateAll();
+    omSnapshotManager.invalidateCache();
 
     // confirm store was closed
     verify(firstSnapshotStore, timeout(3000).times(1)).close();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
index a8b026af05..8ebf76cbf7 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.ratis.util.ExitUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -461,11 +460,12 @@ public class TestSstFilteringService {
                                           String snapshot) throws IOException {
     SnapshotInfo snapshotInfo = om.getMetadataManager().getSnapshotInfoTable()
         .get(SnapshotInfo.getTableKey(volume, bucket, snapshot));
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache>
-             snapshotMetadataReader = om.getOmSnapshotManager()
-        .getSnapshotCache()
-        .get(snapshotInfo.getTableKey())) {
-      OmSnapshot omSnapshot = (OmSnapshot) snapshotMetadataReader.get();
+    try (ReferenceCounted<OmSnapshot> snapshotMetadataReader =
+             om.getOmSnapshotManager().getActiveSnapshot(
+                 snapshotInfo.getVolumeName(),
+                 snapshotInfo.getBucketName(),
+                 snapshotInfo.getName())) {
+      OmSnapshot omSnapshot = snapshotMetadataReader.get();
       return getKeysFromDb(omSnapshot.getMetadataManager(), volume, bucket);
     }
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
index ff3db1abbe..a912f549b3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyPurgeRequestAndResponse.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
@@ -31,7 +30,6 @@ import 
org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotCreateRequest;
 import org.apache.hadoop.ozone.om.request.snapshot.TestOMSnapshotCreateRequest;
 import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.junit.jupiter.api.Test;
 
 import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
@@ -44,7 +42,6 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -219,13 +216,12 @@ public class TestOMKeyPurgeRequestAndResponse extends 
TestOMKeyRequest {
         .setName("snap1")
         .build();
 
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot =
-        ozoneManager.getOmSnapshotManager().checkForSnapshot(
+    ReferenceCounted<OmSnapshot> rcOmSnapshot =
+        ozoneManager.getOmSnapshotManager().getSnapshot(
             fromSnapshotInfo.getVolumeName(),
             fromSnapshotInfo.getBucketName(),
-            getSnapshotPrefix(fromSnapshotInfo.getName()),
-            true);
-    OmSnapshot omSnapshot = (OmSnapshot) rcOmSnapshot.get();
+            fromSnapshotInfo.getName());
+    OmSnapshot omSnapshot = rcOmSnapshot.get();
 
     // The keys should be present in the snapshot's deletedTable
     for (String deletedKey : deletedKeyNames) {
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 47b090f88d..fde83d7b76 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
@@ -168,7 +167,7 @@ public class TestOMKeyRequest {
     when(ozoneManager.getAccessAuthorizer())
         .thenReturn(new OzoneNativeAuthorizer());
 
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmMetadataReader =
+    ReferenceCounted<IOmMetadataReader> rcOmMetadataReader =
         mock(ReferenceCounted.class);
     when(ozoneManager.getOmMetadataReader()).thenReturn(rcOmMetadataReader);
     // Init OmMetadataReader to let the test pass
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
index c01bb459b8..16cb9b6821 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/multipart/TestS3MultipartRequest.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataReader;
 import org.apache.hadoop.ozone.om.IOmMetadataReader;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
@@ -86,7 +85,7 @@ public class TestS3MultipartRequest {
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     auditLogger = mock(AuditLogger.class);
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmMetadataReader =
+    ReferenceCounted<IOmMetadataReader> rcOmMetadataReader =
         mock(ReferenceCounted.class);
     when(ozoneManager.getOmMetadataReader()).thenReturn(rcOmMetadataReader);
     // Init OmMetadataReader to let the test pass
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
index ca737d2bd2..03dc7862e3 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotDeleteRequest.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -110,8 +109,6 @@ public class TestOMSnapshotDeleteRequest {
     doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
 
     OmSnapshotManager omSnapshotManager = mock(OmSnapshotManager.class);
-    when(omSnapshotManager.getSnapshotCache())
-        .thenReturn(mock(SnapshotCache.class));
     when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
 
     volumeName = UUID.randomUUID().toString();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
index a3b0dae463..71882c3423 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotPurgeRequestAndResponse.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse;
 import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotPurgeRequest;
@@ -115,7 +114,7 @@ public class TestOMSnapshotPurgeRequestAndResponse {
     when(ozoneManager.isAdmin(any())).thenReturn(true);
     when(ozoneManager.isFilesystemSnapshotEnabled()).thenReturn(true);
 
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmMetadataReader =
+    ReferenceCounted<IOmMetadataReader> rcOmMetadataReader =
         mock(ReferenceCounted.class);
     when(ozoneManager.getOmMetadataReader()).thenReturn(rcOmMetadataReader);
     omSnapshotManager = new OmSnapshotManager(ozoneManager);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
index 77bf15ed76..d745a01e62 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyDeletingService.java
@@ -36,7 +36,6 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmSnapshot;
@@ -52,7 +51,6 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
-import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
 import org.apache.ozone.test.OzoneTestBase;
 import org.apache.ratis.util.ExitUtils;
 import org.junit.jupiter.api.AfterAll;
@@ -82,7 +80,6 @@ import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL;
-import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -359,10 +356,9 @@ class TestKeyDeletingService extends OzoneTestBase {
 
       keyDeletingService.resume();
 
-      try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot =
-               om.getOmSnapshotManager().checkForSnapshot(
-                   volumeName, bucketName, getSnapshotPrefix(snap3), true)) {
-        OmSnapshot snapshot3 = (OmSnapshot) rcOmSnapshot.get();
+      try (ReferenceCounted<OmSnapshot> rcOmSnapshot =
+               om.getOmSnapshotManager().getSnapshot(volumeName, bucketName, 
snap3)) {
+        OmSnapshot snapshot3 = rcOmSnapshot.get();
 
         Table<String, RepeatedOmKeyInfo> snap3deletedTable =
             snapshot3.getMetadataManager().getDeletedTable();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
index cecd7a99af..2a70a1f09a 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotCache.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.ozone.om.snapshot;
 
 import com.google.common.cache.CacheLoader;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OmSnapshot;
-import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -34,14 +32,12 @@ import org.slf4j.event.Level;
 
 import java.io.IOException;
 
-import static 
org.apache.hadoop.ozone.om.helpers.SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -52,15 +48,11 @@ import static org.mockito.Mockito.when;
 class TestSnapshotCache {
 
   private static final int CACHE_SIZE_LIMIT = 3;
-  private static OmSnapshotManager omSnapshotManager;
   private static CacheLoader<String, OmSnapshot> cacheLoader;
   private SnapshotCache snapshotCache;
 
   @BeforeAll
   static void beforeAll() throws Exception {
-    omSnapshotManager = mock(OmSnapshotManager.class);
-    when(omSnapshotManager.isSnapshotStatus(any(), eq(SNAPSHOT_ACTIVE)))
-        .thenReturn(true);
     cacheLoader = mock(CacheLoader.class);
     // Create a difference mock OmSnapshot instance each time load() is called
     when(cacheLoader.load(any())).thenAnswer(
@@ -81,8 +73,7 @@ class TestSnapshotCache {
   @BeforeEach
   void setUp() {
     // Reset cache for each test case
-    snapshotCache = new SnapshotCache(
-        omSnapshotManager, cacheLoader, CACHE_SIZE_LIMIT);
+    snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT);
   }
 
   @AfterEach
@@ -95,8 +86,7 @@ class TestSnapshotCache {
   @DisplayName("01. get()")
   void testGet() throws IOException {
     final String dbKey1 = "dbKey1";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot =
-        snapshotCache.get(dbKey1);
+    ReferenceCounted<OmSnapshot> omSnapshot = snapshotCache.get(dbKey1);
     assertNotNull(omSnapshot);
     assertNotNull(omSnapshot.get());
     assertInstanceOf(OmSnapshot.class, omSnapshot.get());
@@ -107,13 +97,11 @@ class TestSnapshotCache {
   @DisplayName("02. get() same entry twice yields one cache entry only")
   void testGetTwice() throws IOException {
     final String dbKey1 = "dbKey1";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot1 =
-        snapshotCache.get(dbKey1);
+    ReferenceCounted<OmSnapshot> omSnapshot1 = snapshotCache.get(dbKey1);
     assertNotNull(omSnapshot1);
     assertEquals(1, snapshotCache.size());
 
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot1again =
-        snapshotCache.get(dbKey1);
+    ReferenceCounted<OmSnapshot> omSnapshot1again = snapshotCache.get(dbKey1);
     // Should be the same instance
     assertEquals(omSnapshot1, omSnapshot1again);
     assertEquals(omSnapshot1.get(), omSnapshot1again.get());
@@ -124,8 +112,7 @@ class TestSnapshotCache {
   @DisplayName("03. release(String)")
   void testReleaseByDbKey() throws IOException {
     final String dbKey1 = "dbKey1";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot1 =
-        snapshotCache.get(dbKey1);
+    ReferenceCounted<OmSnapshot> omSnapshot1 = snapshotCache.get(dbKey1);
     assertNotNull(omSnapshot1);
     assertNotNull(omSnapshot1.get());
     assertEquals(1, snapshotCache.size());
@@ -139,12 +126,11 @@ class TestSnapshotCache {
   @DisplayName("04. release(OmSnapshot)")
   void testReleaseByOmSnapshotInstance() throws IOException {
     final String dbKey1 = "dbKey1";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot1 =
-        snapshotCache.get(dbKey1);
+    ReferenceCounted<OmSnapshot> omSnapshot1 = snapshotCache.get(dbKey1);
     assertNotNull(omSnapshot1);
     assertEquals(1, snapshotCache.size());
 
-    snapshotCache.release((OmSnapshot) omSnapshot1.get());
+    snapshotCache.release(omSnapshot1.get());
     // Entry will not be immediately evicted
     assertEquals(1, snapshotCache.size());
   }
@@ -153,8 +139,7 @@ class TestSnapshotCache {
   @DisplayName("05. invalidate()")
   void testInvalidate() throws IOException {
     final String dbKey1 = "dbKey1";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot =
-        snapshotCache.get(dbKey1);
+    ReferenceCounted<OmSnapshot> omSnapshot = snapshotCache.get(dbKey1);
     assertNotNull(omSnapshot);
     assertEquals(1, snapshotCache.size());
 
@@ -170,22 +155,19 @@ class TestSnapshotCache {
   @DisplayName("06. invalidateAll()")
   void testInvalidateAll() throws IOException {
     final String dbKey1 = "dbKey1";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot1 =
-        snapshotCache.get(dbKey1);
+    ReferenceCounted<OmSnapshot> omSnapshot1 = snapshotCache.get(dbKey1);
     assertNotNull(omSnapshot1);
     assertEquals(1, snapshotCache.size());
 
     final String dbKey2 = "dbKey2";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot2 =
-        snapshotCache.get(dbKey2);
+    ReferenceCounted<OmSnapshot> omSnapshot2 = snapshotCache.get(dbKey2);
     assertNotNull(omSnapshot2);
     assertEquals(2, snapshotCache.size());
     // Should be difference omSnapshot instances
     assertNotEquals(omSnapshot1, omSnapshot2);
 
     final String dbKey3 = "dbKey3";
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> omSnapshot3 =
-        snapshotCache.get(dbKey3);
+    ReferenceCounted<OmSnapshot> omSnapshot3 = snapshotCache.get(dbKey3);
     assertNotNull(omSnapshot3);
     assertEquals(3, snapshotCache.size());
 
@@ -279,7 +261,7 @@ class TestSnapshotCache {
   void testEviction3WithClose() throws IOException {
 
     final String dbKey1 = "dbKey1";
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot = 
snapshotCache.get(dbKey1)) {
+    try (ReferenceCounted<OmSnapshot> rcOmSnapshot = 
snapshotCache.get(dbKey1)) {
       assertEquals(1L, rcOmSnapshot.getTotalRefCount());
       assertEquals(1, snapshotCache.size());
     }
@@ -289,11 +271,11 @@ class TestSnapshotCache {
     assertEquals(1, snapshotCache.size());
 
     final String dbKey2 = "dbKey2";
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot = 
snapshotCache.get(dbKey2)) {
+    try (ReferenceCounted<OmSnapshot> rcOmSnapshot = 
snapshotCache.get(dbKey2)) {
       assertEquals(1L, rcOmSnapshot.getTotalRefCount());
       assertEquals(2, snapshotCache.size());
       // Get dbKey2 entry a second time
-      try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot2 = 
snapshotCache.get(dbKey2)) {
+      try (ReferenceCounted<OmSnapshot> rcOmSnapshot2 = 
snapshotCache.get(dbKey2)) {
         assertEquals(2L, rcOmSnapshot.getTotalRefCount());
         assertEquals(2L, rcOmSnapshot2.getTotalRefCount());
         assertEquals(2, snapshotCache.size());
@@ -304,7 +286,7 @@ class TestSnapshotCache {
     assertEquals(2, snapshotCache.size());
 
     final String dbKey3 = "dbKey3";
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot = 
snapshotCache.get(dbKey3)) {
+    try (ReferenceCounted<OmSnapshot> rcOmSnapshot = 
snapshotCache.get(dbKey3)) {
       assertEquals(1L, rcOmSnapshot.getTotalRefCount());
       assertEquals(3, snapshotCache.size());
     }
@@ -312,7 +294,7 @@ class TestSnapshotCache {
     assertEquals(3, snapshotCache.size());
 
     final String dbKey4 = "dbKey4";
-    try (ReferenceCounted<IOmMetadataReader, SnapshotCache> rcOmSnapshot = 
snapshotCache.get(dbKey4)) {
+    try (ReferenceCounted<OmSnapshot> rcOmSnapshot = 
snapshotCache.get(dbKey4)) {
       assertEquals(1L, rcOmSnapshot.getTotalRefCount());
       assertEquals(1, snapshotCache.size());
     }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index b92546c289..a6461182f2 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -39,7 +39,6 @@ import 
org.apache.hadoop.hdds.utils.db.managed.ManagedSSTDumpTool;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
-import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
@@ -217,9 +216,6 @@ public class TestSnapshotDiffManager {
   private OzoneManager ozoneManager;
   @Mock
   private OzoneConfiguration configuration;
-
-  private SnapshotCache snapshotCache;
-
   @Mock
   private Table<String, SnapshotInfo> snapshotInfoTable;
   @Mock
@@ -382,23 +378,30 @@ public class TestSnapshotDiffManager {
     when(ozoneManager.getConfiguration()).thenReturn(configuration);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
 
-    CacheLoader<String, OmSnapshot> loader =
-        new CacheLoader<String, OmSnapshot>() {
-          @Nonnull
-          @Override
-          public OmSnapshot load(@Nonnull String key) {
-            return getMockedOmSnapshot(key);
-          }
-        };
-
     omSnapshotManager = mock(OmSnapshotManager.class);
     when(omSnapshotManager.isSnapshotStatus(
         any(), any())).thenReturn(true);
-    snapshotCache = new SnapshotCache(omSnapshotManager, loader, 10);
-
+    SnapshotCache snapshotCache = new SnapshotCache(mockCacheLoader(), 10);
+
+    when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), 
anyString()))
+        .thenAnswer(invocationOnMock -> {
+          String snapshotTableKey = 
SnapshotInfo.getTableKey(invocationOnMock.getArgument(0),
+              invocationOnMock.getArgument(1), 
invocationOnMock.getArgument(2));
+          return snapshotCache.get(snapshotTableKey);
+        });
+    when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
     snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager,
-        snapshotCache, snapDiffJobTable, snapDiffReportTable,
-        columnFamilyOptions, codecRegistry);
+        snapDiffJobTable, snapDiffReportTable, columnFamilyOptions, 
codecRegistry);
+  }
+
+  private CacheLoader<String, OmSnapshot> mockCacheLoader() {
+    return new CacheLoader<String, OmSnapshot>() {
+      @Nonnull
+      @Override
+      public OmSnapshot load(@Nonnull String key) {
+        return getMockedOmSnapshot(key);
+      }
+    };
   }
 
   @AfterEach
@@ -444,12 +447,12 @@ public class TestSnapshotDiffManager {
         eq(diffDir))
     ).thenReturn(Lists.newArrayList(randomStrings));
 
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcFromSnapshot =
-        snapshotCache.get(snap1.toString());
-    ReferenceCounted<IOmMetadataReader, SnapshotCache> rcToSnapshot =
-        snapshotCache.get(snap2.toString());
-    OmSnapshot fromSnapshot = (OmSnapshot) rcFromSnapshot.get();
-    OmSnapshot toSnapshot = (OmSnapshot) rcToSnapshot.get();
+    ReferenceCounted<OmSnapshot> rcFromSnapshot =
+        omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap1.toString());
+    ReferenceCounted<OmSnapshot> rcToSnapshot =
+        omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap2.toString());
+    OmSnapshot fromSnapshot = rcFromSnapshot.get();
+    OmSnapshot toSnapshot = rcToSnapshot.get();
 
     SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
     SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap2);
@@ -509,12 +512,12 @@ public class TestSnapshotDiffManager {
             .thenReturn(Collections.emptyList());
       }
 
-      ReferenceCounted<IOmMetadataReader, SnapshotCache> rcFromSnapshot =
-          snapshotCache.get(snap1.toString());
-      ReferenceCounted<IOmMetadataReader, SnapshotCache> rcToSnapshot =
-          snapshotCache.get(snap2.toString());
-      OmSnapshot fromSnapshot = (OmSnapshot) rcFromSnapshot.get();
-      OmSnapshot toSnapshot = (OmSnapshot) rcToSnapshot.get();
+      ReferenceCounted<OmSnapshot> rcFromSnapshot =
+          omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap1.toString());
+      ReferenceCounted<OmSnapshot> rcToSnapshot =
+          omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap2.toString());
+      OmSnapshot fromSnapshot = rcFromSnapshot.get();
+      OmSnapshot toSnapshot = rcToSnapshot.get();
 
       SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
       SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1);
@@ -572,12 +575,12 @@ public class TestSnapshotDiffManager {
               any(DifferSnapshotInfo.class),
               anyString());
 
-      ReferenceCounted<IOmMetadataReader, SnapshotCache> rcFromSnapshot =
-          snapshotCache.get(snap1.toString());
-      ReferenceCounted<IOmMetadataReader, SnapshotCache> rcToSnapshot =
-          snapshotCache.get(snap2.toString());
-      OmSnapshot fromSnapshot = (OmSnapshot) rcFromSnapshot.get();
-      OmSnapshot toSnapshot = (OmSnapshot) rcToSnapshot.get();
+      ReferenceCounted<OmSnapshot> rcFromSnapshot =
+          omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap1.toString());
+      ReferenceCounted<OmSnapshot> rcToSnapshot =
+          omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME, 
snap2.toString());
+      OmSnapshot fromSnapshot = rcFromSnapshot.get();
+      OmSnapshot toSnapshot = rcToSnapshot.get();
 
       SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
       SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap1);
@@ -680,8 +683,7 @@ public class TestSnapshotDiffManager {
           getMockedTable(fromSnapshotTableMap, snapshotTableName);
 
       snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager,
-          snapshotCache, snapDiffJobTable, snapDiffReportTable,
-          columnFamilyOptions, codecRegistry);
+          snapDiffJobTable, snapDiffReportTable, columnFamilyOptions, 
codecRegistry);
       SnapshotDiffManager spy = spy(snapshotDiffManager);
 
       doAnswer(invocation -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to