This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a78a912aa9 [core] Refactor dv index cache in SnapshotReaderImpl
a78a912aa9 is described below

commit a78a912aa908025c402e0eb5e77b2063accbd6e2
Author: JingsongLi <[email protected]>
AuthorDate: Mon Oct 27 18:59:29 2025 +0800

    [core] Refactor dv index cache in SnapshotReaderImpl
---
 .../generated/catalog_configuration.html           |   6 +-
 .../org/apache/paimon/options/CatalogOptions.java  |   6 +-
 .../java/org/apache/paimon/AbstractFileStore.java  |  11 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   3 -
 .../org/apache/paimon/index/IndexFileHandler.java  | 151 +-------------
 .../apache/paimon/manifest/IndexManifestFile.java  |  15 +-
 .../paimon/privilege/PrivilegedFileStore.java      |   6 -
 .../paimon/table/AbstractFileStoreTable.java       |   4 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  | 180 ++++++++++------
 .../java/org/apache/paimon/utils/DVMetaCache.java  | 121 ++---------
 .../manifest/IndexManifestFileHandlerTest.java     | 226 ---------------------
 .../org/apache/paimon/utils/DVMetaCacheTest.java   |  15 +-
 12 files changed, 161 insertions(+), 583 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index eaeea12346..49641d340e 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -33,10 +33,10 @@ under the License.
             <td>Controls whether the catalog will cache databases, tables, 
manifests and partitions.</td>
         </tr>
         <tr>
-            <td><h5>cache.deletion-vectors.max-bucket-num</h5></td>
-            <td style="word-wrap: break-word;">20000</td>
+            <td><h5>cache.deletion-vectors.max-num</h5></td>
+            <td style="word-wrap: break-word;">500000</td>
             <td>Integer</td>
-            <td>Controls the maximum number of bucket-level deletion vector 
meta that can be cached.</td>
+            <td>Controls the maximum number of deletion vector meta that can 
be cached.</td>
         </tr>
         <tr>
             <td><h5>cache.expire-after-access</h5></td>
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 72674bdd7b..8cd80106a8 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -137,11 +137,11 @@ public class CatalogOptions {
                             "Controls the max number for snapshots per table 
in the catalog are cached.");
 
     public static final ConfigOption<Integer> CACHE_DV_MAX_NUM =
-            key("cache.deletion-vectors.max-bucket-num")
+            key("cache.deletion-vectors.max-num")
                     .intType()
-                    .defaultValue(20000)
+                    .defaultValue(500_000)
                     .withDescription(
-                            "Controls the maximum number of bucket-level 
deletion vector meta that can be cached.");
+                            "Controls the maximum number of deletion vector 
meta that can be cached.");
 
     public static final ConfigOption<Boolean> CASE_SENSITIVE =
             ConfigOptions.key("case-sensitive")
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2a5dbd7766..2e209fba45 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -59,7 +59,6 @@ import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IndexFilePathFactories;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -98,7 +97,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
 
     @Nullable private SegmentsCache<Path> readManifestCache;
     @Nullable private Cache<Path, Snapshot> snapshotCache;
-    @Nullable private DVMetaCache dvMetaCache;
 
     protected AbstractFileStore(
             FileIO fileIO,
@@ -218,8 +216,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 FileFormat.manifestFormat(options),
                 options.manifestCompression(),
                 pathFactory(),
-                readManifestCache,
-                dvMetaCache);
+                readManifestCache);
     }
 
     @Override
@@ -230,7 +227,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 indexManifestFileFactory().create(),
                 new IndexFilePathFactories(pathFactory()),
                 options.dvIndexFileTargetSize(),
-                this.dvMetaCache,
                 options.deletionVectorBitmap64());
     }
 
@@ -492,9 +488,4 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
     public void setSnapshotCache(Cache<Path, Snapshot> cache) {
         this.snapshotCache = cache;
     }
-
-    @Override
-    public void setDVMetaCache(DVMetaCache dvMetaCache) {
-        this.dvMetaCache = dvMetaCache;
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 62bc65c744..929302673d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -40,7 +40,6 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
@@ -122,6 +121,4 @@ public interface FileStore<T> {
     void setManifestCache(SegmentsCache<Path> manifestCache);
 
     void setSnapshotCache(Cache<Path, Snapshot> cache);
-
-    void setDVMetaCache(DVMetaCache dvMetaCache);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 83a41161f3..efa20fccdc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.index;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
@@ -27,27 +26,19 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.operation.metrics.CacheMetrics;
 import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.IndexFilePathFactories;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -61,8 +52,6 @@ public class IndexFileHandler {
     private final IndexFilePathFactories pathFactories;
     private final MemorySize dvTargetFileSize;
     private final boolean dvBitmap64;
-    @Nullable private CacheMetrics cacheMetrics;
-    @Nullable private final DVMetaCache dvMetaCache;
 
     public IndexFileHandler(
             FileIO fileIO,
@@ -70,7 +59,6 @@ public class IndexFileHandler {
             IndexManifestFile indexManifestFile,
             IndexFilePathFactories pathFactories,
             MemorySize dvTargetFileSize,
-            DVMetaCache dvMetaCache,
             boolean dvBitmap64) {
         this.fileIO = fileIO;
         this.snapshotManager = snapshotManager;
@@ -78,7 +66,6 @@ public class IndexFileHandler {
         this.indexManifestFile = indexManifestFile;
         this.dvTargetFileSize = dvTargetFileSize;
         this.dvBitmap64 = dvBitmap64;
-        this.dvMetaCache = dvMetaCache;
     }
 
     public HashIndexFile hashIndex(BinaryRow partition, int bucket) {
@@ -100,140 +87,6 @@ public class IndexFileHandler {
         return result.isEmpty() ? Optional.empty() : 
Optional.of(result.get(0));
     }
 
-    public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) {
-        this.cacheMetrics = cacheMetrics;
-    }
-
-    // Construct DataFile -> DeletionFile based on IndexFileMeta
-    @Nullable
-    @VisibleForTesting
-    public Map<String, DeletionFile> extractDeletionFileByMeta(
-            BinaryRow partition, Integer bucket, IndexFileMeta fileMeta) {
-        LinkedHashMap<String, DeletionVectorMeta> dvRanges = 
fileMeta.dvRanges();
-        String dvFilePath = dvIndex(partition, 
bucket).path(fileMeta).toString();
-        if (dvRanges != null && !dvRanges.isEmpty()) {
-            Map<String, DeletionFile> result = new HashMap<>();
-            for (DeletionVectorMeta dvMeta : dvRanges.values()) {
-                result.put(
-                        dvMeta.dataFileName(),
-                        new DeletionFile(
-                                dvFilePath,
-                                dvMeta.offset(),
-                                dvMeta.length(),
-                                dvMeta.cardinality()));
-            }
-            return result;
-        }
-        return null;
-    }
-
-    // Scan DV index file of given partition buckets
-    // returns <DataFile: DeletionFile> map grouped by partition and bucket
-    public Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
scanDVIndex(
-            Snapshot snapshot, Set<Pair<BinaryRow, Integer>> partitionBuckets) 
{
-        if (snapshot == null || snapshot.indexManifest() == null) {
-            return Collections.emptyMap();
-        }
-        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new 
HashMap<>();
-        // to avoid cache being frequently evicted,
-        //  currently we only read from cache when bucket number is 1
-        if (this.dvMetaCache != null && partitionBuckets.size() == 1) {
-            Pair<BinaryRow, Integer> partitionBucket = 
partitionBuckets.iterator().next();
-            Map<String, DeletionFile> deletionFiles =
-                    this.scanDVIndexWithCache(
-                            snapshot, partitionBucket.getLeft(), 
partitionBucket.getRight());
-            if (deletionFiles != null && deletionFiles.size() > 0) {
-                result.put(partitionBucket, deletionFiles);
-            }
-            return result;
-        }
-        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
-                scan(
-                        snapshot,
-                        DELETION_VECTORS_INDEX,
-                        
partitionBuckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
-        partitionFileMetas.forEach(
-                (entry, indexFileMetas) -> {
-                    if (partitionBuckets.contains(entry)) {
-                        if (indexFileMetas != null) {
-                            indexFileMetas.forEach(
-                                    indexFileMeta -> {
-                                        Map<String, DeletionFile> dvMetas =
-                                                extractDeletionFileByMeta(
-                                                        entry.getLeft(),
-                                                        entry.getRight(),
-                                                        indexFileMeta);
-                                        if (dvMetas != null) {
-                                            result.computeIfAbsent(entry, k -> 
new HashMap<>())
-                                                    .putAll(dvMetas);
-                                        }
-                                    });
-                        }
-                    }
-                });
-        return result;
-    }
-
-    // Scan DV Meta Cache first, if not exist, scan DV index file, returns the 
exact deletion file
-    // of the specified partition/bucket
-    @VisibleForTesting
-    Map<String, DeletionFile> scanDVIndexWithCache(
-            Snapshot snapshot, BinaryRow partition, Integer bucket) {
-        // read from cache
-        String indexManifestName = snapshot.indexManifest();
-        Path indexManifestPath = 
this.indexManifestFile.indexManifestFilePath(indexManifestName);
-        Map<String, DeletionFile> result =
-                this.dvMetaCache.read(indexManifestPath, partition, bucket);
-        if (result != null) {
-            if (cacheMetrics != null) {
-                cacheMetrics.increaseHitObject();
-            }
-            return result;
-        }
-        if (cacheMetrics != null) {
-            cacheMetrics.increaseMissedObject();
-        }
-        // If miss, read the whole partition's deletion files
-        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
-                scan(
-                        snapshot,
-                        DELETION_VECTORS_INDEX,
-                        new HashSet<>(Collections.singletonList(partition)));
-        // for each bucket, extract deletion files, and fill meta cache
-        for (Map.Entry<Pair<BinaryRow, Integer>, List<IndexFileMeta>> entry :
-                partitionFileMetas.entrySet()) {
-            Pair<BinaryRow, Integer> partitionBucket = entry.getKey();
-            List<IndexFileMeta> fileMetas = entry.getValue();
-            if (entry.getValue() != null) {
-                Map<String, DeletionFile> bucketDeletionFiles = new 
HashMap<>();
-                fileMetas.forEach(
-                        meta -> {
-                            Map<String, DeletionFile> bucketDVMetas =
-                                    extractDeletionFileByMeta(
-                                            partitionBucket.getLeft(),
-                                            partitionBucket.getRight(),
-                                            meta);
-                            if (bucketDVMetas != null) {
-                                bucketDeletionFiles.putAll(bucketDVMetas);
-                            }
-                        });
-                // bucketDeletionFiles can be empty
-                this.dvMetaCache.put(
-                        indexManifestPath,
-                        partitionBucket.getLeft(),
-                        partitionBucket.getRight(),
-                        bucketDeletionFiles);
-                if (partitionBucket.getRight() != null
-                        && partitionBucket.getLeft() != null
-                        && partitionBucket.getRight().equals(bucket)
-                        && partitionBucket.getLeft().equals(partition)) {
-                    result = bucketDeletionFiles;
-                }
-            }
-        }
-        return result;
-    }
-
     public List<IndexManifestEntry> scan(String indexType) {
         return scan(snapshotManager.latestSnapshot(), indexType);
     }
@@ -317,6 +170,10 @@ public class IndexFileHandler {
         return result;
     }
 
+    public Path indexManifestFilePath(String indexManifest) {
+        return indexManifestFile.indexManifestFilePath(indexManifest);
+    }
+
     public Path filePath(IndexManifestEntry entry) {
         return pathFactories.get(entry.partition(), 
entry.bucket()).toPath(entry.indexFile());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 21086eadf3..a46762c1af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -25,7 +25,6 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ObjectsFile;
 import org.apache.paimon.utils.PathFactory;
@@ -40,8 +39,6 @@ import java.util.List;
 /** Index manifest file. */
 public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
 
-    private final DVMetaCache dvMetaCache;
-
     private IndexManifestFile(
             FileIO fileIO,
             RowType schema,
@@ -49,8 +46,7 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
             FormatWriterFactory writerFactory,
             String compression,
             PathFactory pathFactory,
-            @Nullable SegmentsCache<Path> cache,
-            @Nullable DVMetaCache dvMetaCache) {
+            @Nullable SegmentsCache<Path> cache) {
         super(
                 fileIO,
                 new IndexManifestEntrySerializer(),
@@ -60,7 +56,6 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
                 compression,
                 pathFactory,
                 cache);
-        this.dvMetaCache = dvMetaCache;
     }
 
     public Path indexManifestFilePath(String fileName) {
@@ -88,21 +83,18 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
         private final String compression;
         private final FileStorePathFactory pathFactory;
         @Nullable private final SegmentsCache<Path> cache;
-        @Nullable private final DVMetaCache dvMetaCache;
 
         public Factory(
                 FileIO fileIO,
                 FileFormat fileFormat,
                 String compression,
                 FileStorePathFactory pathFactory,
-                @Nullable SegmentsCache<Path> cache,
-                @Nullable DVMetaCache dvMetaCache) {
+                @Nullable SegmentsCache<Path> cache) {
             this.fileIO = fileIO;
             this.fileFormat = fileFormat;
             this.compression = compression;
             this.pathFactory = pathFactory;
             this.cache = cache;
-            this.dvMetaCache = dvMetaCache;
         }
 
         public IndexManifestFile create() {
@@ -114,8 +106,7 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
                     fileFormat.createWriterFactory(schema),
                     compression,
                     pathFactory.indexManifestFileFactory(),
-                    cache,
-                    dvMetaCache);
+                    cache);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index b683174cd4..99069ddc31 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -44,7 +44,6 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
@@ -233,9 +232,4 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     public void setSnapshotCache(Cache<Path, Snapshot> cache) {
         wrapped.setSnapshotCache(cache);
     }
-
-    @Override
-    public void setDVMetaCache(DVMetaCache dvMetaCache) {
-        wrapped.setDVMetaCache(dvMetaCache);
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index aeb223aaf5..55892e25ee 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -143,7 +143,6 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Override
     public void setDVMetaCache(DVMetaCache cache) {
         this.dvmetaCache = cache;
-        store().setDVMetaCache(cache);
     }
 
     @Override
@@ -269,7 +268,8 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 nonPartitionFilterConsumer(),
                 store().pathFactory(),
                 name(),
-                store().newIndexFileHandler());
+                store().newIndexFileHandler(),
+                dvmetaCache);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 9157737915..942c5bc41c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -25,6 +25,7 @@ import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
@@ -37,6 +38,7 @@ import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.operation.metrics.CacheMetrics;
 import org.apache.paimon.operation.metrics.ScanMetrics;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
@@ -48,6 +50,7 @@ import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.LazyField;
@@ -89,9 +92,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private final FileStorePathFactory pathFactory;
     private final String tableName;
     private final IndexFileHandler indexFileHandler;
+    @Nullable private final DVMetaCache dvMetaCache;
 
     private ScanMode scanMode = ScanMode.ALL;
     private RecordComparator lazyPartitionComparator;
+    private CacheMetrics dvMetaCacheMetrics;
 
     public SnapshotReaderImpl(
             FileStoreScan scan,
@@ -103,7 +108,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
             BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
             FileStorePathFactory pathFactory,
             String tableName,
-            IndexFileHandler indexFileHandler) {
+            IndexFileHandler indexFileHandler,
+            @Nullable DVMetaCache dvMetaCache) {
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.options = options;
@@ -121,6 +127,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
         this.tableName = tableName;
         this.indexFileHandler = indexFileHandler;
+        this.dvMetaCache = dvMetaCache;
     }
 
     @Override
@@ -287,8 +294,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
     @Override
     public SnapshotReader withMetricRegistry(MetricRegistry registry) {
         ScanMetrics scanMetrics = new ScanMetrics(registry, tableName);
+        dvMetaCacheMetrics = scanMetrics.getDvMetaCacheMetrics();
         scan.withMetrics(scanMetrics);
-        indexFileHandler.withCacheMetrics(scanMetrics.getDvMetaCacheMetrics());
         return this;
     }
 
@@ -348,25 +355,17 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
             @Nullable Snapshot snapshot,
             boolean isStreaming,
             SplitGenerator splitGenerator,
-            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> 
groupedManifestEntries) {
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> entries) {
         List<DataSplit> splits = new ArrayList<>();
         // Read deletion indexes at once to reduce file IO
         Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFilesMap = null;
         if (!isStreaming) {
-            Set<Pair<BinaryRow, Integer>> partitionBuckets =
-                    groupedManifestEntries.entrySet().stream()
-                            .flatMap(
-                                    e ->
-                                            e.getValue().keySet().stream()
-                                                    .map(bucket -> 
Pair.of(e.getKey(), bucket)))
-                            .collect(Collectors.toSet());
             deletionFilesMap =
                     deletionVectors && snapshot != null
-                            ? indexFileHandler.scanDVIndex(snapshot, 
partitionBuckets)
+                            ? scanDvIndex(snapshot, toPartBuckets(entries))
                             : Collections.emptyMap();
         }
-        for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry :
-                groupedManifestEntries.entrySet()) {
+        for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry : 
entries.entrySet()) {
             BinaryRow partition = entry.getKey();
             Map<Integer, List<ManifestEntry>> buckets = entry.getValue();
             for (Map.Entry<Integer, List<ManifestEntry>> bucketEntry : 
buckets.entrySet()) {
@@ -460,20 +459,16 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                         buckets.computeIfAbsent(part, k -> new HashSet<>())
                                 .addAll(bucketMap.keySet()));
         // Read deletion indexes at once to reduce file IO
-        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
beforDeletionIndexFilesMap = null;
-        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap = null;
-        if (!isStreaming) {
-            beforDeletionIndexFilesMap =
-                    deletionVectors
-                            ? indexFileHandler.scan(
-                                    beforeSnapshot.get(),
-                                    DELETION_VECTORS_INDEX,
-                                    beforeFiles.keySet())
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
beforeDeletionFilesMap = null;
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFilesMap = null;
+        if (!isStreaming && deletionVectors) {
+            beforeDeletionFilesMap =
+                    beforeSnapshot.get() != null
+                            ? scanDvIndex(beforeSnapshot.get(), 
toPartBuckets(beforeFiles))
                             : Collections.emptyMap();
-            deletionIndexFilesMap =
-                    deletionVectors
-                            ? indexFileHandler.scan(
-                                    snapshot, DELETION_VECTORS_INDEX, 
dataFiles.keySet())
+            deletionFilesMap =
+                    snapshot != null
+                            ? scanDvIndex(snapshot, toPartBuckets(dataFiles))
                             : Collections.emptyMap();
         }
 
@@ -516,21 +511,19 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                                 .withDataFiles(data)
                                 .isStreaming(isStreaming)
                                 .withBucketPath(pathFactory.bucketPath(part, 
bucket).toString());
-                if (deletionVectors
-                        && beforDeletionIndexFilesMap != null
-                        && deletionIndexFilesMap != null) {
+                if (deletionVectors && beforeDeletionFilesMap != null) {
                     builder.withBeforeDeletionFiles(
                             getDeletionFiles(
-                                    indexFileHandler.dvIndex(part, bucket),
                                     before,
-                                    beforDeletionIndexFilesMap.getOrDefault(
-                                            Pair.of(part, bucket), 
Collections.emptyList())));
+                                    beforeDeletionFilesMap.getOrDefault(
+                                            Pair.of(part, bucket), 
Collections.emptyMap())));
+                }
+                if (deletionVectors && deletionFilesMap != null) {
                     builder.withDataDeletionFiles(
                             getDeletionFiles(
-                                    indexFileHandler.dvIndex(part, bucket),
                                     data,
-                                    deletionIndexFilesMap.getOrDefault(
-                                            Pair.of(part, bucket), 
Collections.emptyList())));
+                                    deletionFilesMap.getOrDefault(
+                                            Pair.of(part, bucket), 
Collections.emptyMap())));
                 }
                 splits.add(builder.build());
             }
@@ -561,45 +554,100 @@ public class SnapshotReaderImpl implements 
SnapshotReader {
     }
 
     private List<DeletionFile> getDeletionFiles(
-            DeletionVectorsIndexFile indexFile,
-            List<DataFileMeta> dataFiles,
-            List<IndexFileMeta> indexFileMetas) {
+            List<DataFileMeta> dataFiles, Map<String, DeletionFile> 
deletionFilesMap) {
         List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
-        Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
-        for (IndexFileMeta indexFileMeta : indexFileMetas) {
-            if (indexFileMeta.dvRanges() != null) {
-                for (DeletionVectorMeta dvMeta : 
indexFileMeta.dvRanges().values()) {
-                    dataFileToIndexFileMeta.put(dvMeta.dataFileName(), 
indexFileMeta);
+        dataFiles.stream()
+                .map(DataFileMeta::fileName)
+                .map(f -> deletionFilesMap == null ? null : 
deletionFilesMap.get(f))
+                .forEach(deletionFiles::add);
+        return deletionFiles;
+    }
+
+    private Set<Pair<BinaryRow, Integer>> toPartBuckets(
+            Map<BinaryRow, Map<Integer, List<ManifestEntry>>> entries) {
+        return entries.entrySet().stream()
+                .flatMap(
+                        e ->
+                                e.getValue().keySet().stream()
+                                        .map(bucket -> Pair.of(e.getKey(), 
bucket)))
+                .collect(Collectors.toSet());
+    }
+
+    private Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
scanDvIndex(
+            @Nullable Snapshot snapshot, Set<Pair<BinaryRow, Integer>> 
buckets) {
+        if (snapshot == null || snapshot.indexManifest() == null) {
+            return Collections.emptyMap();
+        }
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new 
HashMap<>();
+        Path indexManifestPath = 
indexFileHandler.indexManifestFilePath(snapshot.indexManifest());
+
+        // 1. read from cache
+        if (dvMetaCache != null) {
+            Iterator<Pair<BinaryRow, Integer>> iterator = buckets.iterator();
+            while (iterator.hasNext()) {
+                Pair<BinaryRow, Integer> next = iterator.next();
+                BinaryRow partition = next.getLeft();
+                int bucket = next.getRight();
+                Map<String, DeletionFile> fromCache =
+                        dvMetaCache.read(indexManifestPath, partition, bucket);
+                if (fromCache != null) {
+                    result.put(next, fromCache);
+                    iterator.remove();
+                    if (dvMetaCacheMetrics != null) {
+                        dvMetaCacheMetrics.increaseHitObject();
+                    }
+                } else {
+                    if (dvMetaCacheMetrics != null) {
+                        dvMetaCacheMetrics.increaseMissedObject();
+                    }
                 }
             }
         }
-        for (DataFileMeta file : dataFiles) {
-            IndexFileMeta indexFileMeta = 
dataFileToIndexFileMeta.get(file.fileName());
-            if (indexFileMeta != null) {
-                LinkedHashMap<String, DeletionVectorMeta> dvMetas = 
indexFileMeta.dvRanges();
-                if (dvMetas != null && dvMetas.containsKey(file.fileName())) {
-                    deletionFiles.add(
+
+        // 2. read from file system
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
+                indexFileHandler.scan(
+                        snapshot,
+                        DELETION_VECTORS_INDEX,
+                        
buckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
+        partitionFileMetas.forEach(
+                (entry, indexFileMetas) -> {
+                    Map<String, DeletionFile> deletionFiles =
+                            toDeletionFiles(entry, indexFileMetas);
+                    if (dvMetaCache != null) {
+                        dvMetaCache.put(
+                                indexManifestPath,
+                                entry.getLeft(),
+                                entry.getRight(),
+                                deletionFiles);
+                    }
+                    if (buckets.contains(entry)) {
+                        result.put(entry, deletionFiles);
+                    }
+                });
+        return result;
+    }
+
+    private Map<String, DeletionFile> toDeletionFiles(
+            Pair<BinaryRow, Integer> partitionBucket, List<IndexFileMeta> 
fileMetas) {
+        Map<String, DeletionFile> deletionFiles = new HashMap<>();
+        DeletionVectorsIndexFile dvIndex =
+                indexFileHandler.dvIndex(partitionBucket.getLeft(), 
partitionBucket.getRight());
+        for (IndexFileMeta indexFile : fileMetas) {
+            LinkedHashMap<String, DeletionVectorMeta> dvRanges = 
indexFile.dvRanges();
+            String dvFilePath = dvIndex.path(indexFile).toString();
+            if (dvRanges != null && !dvRanges.isEmpty()) {
+                for (DeletionVectorMeta dvMeta : dvRanges.values()) {
+                    deletionFiles.put(
+                            dvMeta.dataFileName(),
                             new DeletionFile(
-                                    indexFile.path(indexFileMeta).toString(),
-                                    dvMetas.get(file.fileName()).offset(),
-                                    dvMetas.get(file.fileName()).length(),
-                                    
dvMetas.get(file.fileName()).cardinality()));
-                    continue;
+                                    dvFilePath,
+                                    dvMeta.offset(),
+                                    dvMeta.length(),
+                                    dvMeta.cardinality()));
                 }
             }
-            deletionFiles.add(null);
         }
-
-        return deletionFiles;
-    }
-
-    public List<DeletionFile> getDeletionFiles(
-            List<DataFileMeta> dataFiles, Map<String, DeletionFile> 
deletionFilesMap) {
-        List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
-        dataFiles.stream()
-                .map(DataFileMeta::fileName)
-                .map(f -> deletionFilesMap == null ? null : 
deletionFilesMap.get(f))
-                .forEach(deletionFiles::add);
         return deletionFiles;
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
index d06c563187..34b92f9bc3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
@@ -27,132 +27,49 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caff
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 /** Cache for deletion vector meta. */
 public class DVMetaCache {
-    private final Cache<DVMetaCacheKey, List<DVMetaCacheValue>> cache;
 
-    public DVMetaCache(long maxElementSize) {
+    private final Cache<DVMetaCacheKey, Map<String, DeletionFile>> cache;
+
+    public DVMetaCache(long maxValueNumber) {
         this.cache =
                 Caffeine.newBuilder()
-                        .maximumSize(maxElementSize)
+                        .weigher(DVMetaCache::weigh)
+                        .maximumWeight(maxValueNumber)
                         .softValues()
                         .executor(Runnable::run)
                         .build();
     }
 
+    private static int weigh(DVMetaCacheKey cacheKey, Map<String, 
DeletionFile> cacheValue) {
+        return cacheValue.size() + 1;
+    }
+
     @Nullable
-    public Map<String, DeletionFile> read(Path path, BinaryRow partition, int 
bucket) {
-        DVMetaCacheKey cacheKey = new DVMetaCacheKey(path, partition, bucket);
-        List<DVMetaCacheValue> cacheValue = this.cache.getIfPresent(cacheKey);
-        if (cacheValue == null) {
-            return null;
-        }
-        // If the bucket doesn't have dv metas, return empty set.
-        Map<String, DeletionFile> dvFilesMap = new HashMap<>();
-        cacheValue.forEach(
-                dvMeta ->
-                        dvFilesMap.put(
-                                dvMeta.getDataFileName(),
-                                new DeletionFile(
-                                        dvMeta.getDeletionFilePath(),
-                                        dvMeta.getOffset(),
-                                        dvMeta.getLength(),
-                                        dvMeta.getCardinality())));
-        return dvFilesMap;
+    public Map<String, DeletionFile> read(Path manifestPath, BinaryRow 
partition, int bucket) {
+        DVMetaCacheKey cacheKey = new DVMetaCacheKey(manifestPath, partition, 
bucket);
+        return this.cache.getIfPresent(cacheKey);
     }
 
     public void put(
             Path path, BinaryRow partition, int bucket, Map<String, 
DeletionFile> dvFilesMap) {
         DVMetaCacheKey key = new DVMetaCacheKey(path, partition, bucket);
-        List<DVMetaCacheValue> cacheValue = new ArrayList<>();
-        dvFilesMap.forEach(
-                (dataFileName, deletionFile) -> {
-                    DVMetaCacheValue dvMetaCacheValue =
-                            new DVMetaCacheValue(
-                                    dataFileName,
-                                    deletionFile.path(),
-                                    (int) deletionFile.offset(),
-                                    (int) deletionFile.length(),
-                                    deletionFile.cardinality());
-                    cacheValue.add(dvMetaCacheValue);
-                });
-        this.cache.put(key, cacheValue);
-    }
-
-    private static class DVMetaCacheValue {
-        private final String dataFileName;
-        private final String deletionFilePath;
-        private final int offset;
-        private final int length;
-        @Nullable private final Long cardinality;
-
-        public DVMetaCacheValue(
-                String dataFileName,
-                String deletionFilePath,
-                int start,
-                int length,
-                @Nullable Long cardinality) {
-            this.dataFileName = dataFileName;
-            this.deletionFilePath = deletionFilePath;
-            this.offset = start;
-            this.length = length;
-            this.cardinality = cardinality;
-        }
-
-        public String getDataFileName() {
-            return dataFileName;
-        }
-
-        public String getDeletionFilePath() {
-            return deletionFilePath;
-        }
-
-        public int getOffset() {
-            return offset;
-        }
-
-        public int getLength() {
-            return length;
-        }
-
-        @Nullable
-        public Long getCardinality() {
-            return cardinality;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            DVMetaCacheValue that = (DVMetaCacheValue) o;
-            return offset == that.offset
-                    && length == that.length
-                    && Objects.equals(dataFileName, that.dataFileName)
-                    && Objects.equals(deletionFilePath, that.deletionFilePath)
-                    && Objects.equals(cardinality, that.cardinality);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(dataFileName, deletionFilePath, offset, 
length, cardinality);
-        }
+        this.cache.put(key, dvFilesMap);
     }
 
     /** Cache key for deletion vector meta at bucket level. */
     private static final class DVMetaCacheKey {
-        private final Path path;
+
+        private final Path manifestPath;
         private final BinaryRow row;
         private final int bucket;
 
-        public DVMetaCacheKey(Path path, BinaryRow row, int bucket) {
-            this.path = path;
+        public DVMetaCacheKey(Path manifestPath, BinaryRow row, int bucket) {
+            this.manifestPath = manifestPath;
             this.row = row;
             this.bucket = bucket;
         }
@@ -167,13 +84,13 @@ public class DVMetaCache {
             }
             DVMetaCacheKey that = (DVMetaCacheKey) o;
             return bucket == that.bucket
-                    && Objects.equals(path, that.path)
+                    && Objects.equals(manifestPath, that.manifestPath)
                     && Objects.equals(row, that.row);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(path, row, bucket);
+            return Objects.hash(manifestPath, row, bucket);
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
index edc34bc766..c2c5baba0a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -18,40 +18,18 @@
 
 package org.apache.paimon.manifest;
 
-import org.apache.paimon.Snapshot;
 import org.apache.paimon.TestAppendFileStore;
-import org.apache.paimon.TestFileStore;
-import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.DeletionVectorMeta;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.paimon.operation.metrics.CacheMetrics;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.DVMetaCache;
-import org.apache.paimon.utils.IndexFilePathFactories;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.SegmentsCache;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 
-import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static 
org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -71,7 +49,6 @@ public class IndexManifestFileHandlerTest {
                                 FileFormat.manifestFormat(fileStore.options()),
                                 "zstd",
                                 fileStore.pathFactory(),
-                                null,
                                 null)
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
@@ -107,7 +84,6 @@ public class IndexManifestFileHandlerTest {
                                 FileFormat.manifestFormat(fileStore.options()),
                                 "zstd",
                                 fileStore.pathFactory(),
-                                null,
                                 null)
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
@@ -138,206 +114,4 @@ public class IndexManifestFileHandlerTest {
         assertThat(entries.contains(entry3)).isTrue();
         assertThat(entries.contains(entry4)).isTrue();
     }
-
-    @Test
-    public void testDVMetaCache() {
-        TestFileStore fileStore = keyValueFileStore();
-
-        // Setup cache and index-manifest file
-        MemorySize manifestMaxMemory = MemorySize.ofMebiBytes(128);
-        long manifestCacheThreshold = MemorySize.ofMebiBytes(1).getBytes();
-        SegmentsCache<Path> manifestCache =
-                SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
-        DVMetaCache dvMetaCache = new DVMetaCache(1000000);
-
-        IndexManifestFile indexManifestFile =
-                new IndexManifestFile.Factory(
-                                fileStore.fileIO(),
-                                FileFormat.manifestFormat(fileStore.options()),
-                                "zstd",
-                                fileStore.pathFactory(),
-                                manifestCache,
-                                dvMetaCache)
-                        .create();
-
-        IndexManifestFileHandler indexManifestFileHandler =
-                new IndexManifestFileHandler(indexManifestFile, 
BucketMode.HASH_FIXED);
-
-        BinaryRow partition1 = partition(1);
-        BinaryRow partition2 = partition(2);
-
-        IndexManifestEntry entry1 =
-                new IndexManifestEntry(
-                        FileKind.ADD,
-                        partition1,
-                        0,
-                        deletionVectorIndexFile("data1.parquet", 
"data2.parquet"));
-        IndexManifestEntry entry2 =
-                new IndexManifestEntry(
-                        FileKind.ADD, partition1, 1, 
deletionVectorIndexFile("data3.parquet"));
-        IndexManifestEntry entry3 =
-                new IndexManifestEntry(
-                        FileKind.ADD,
-                        partition2,
-                        0,
-                        deletionVectorIndexFile("data4.parquet", 
"data5.parquet"));
-
-        String indexManifestFileName =
-                indexManifestFileHandler.write(null, Arrays.asList(entry1, 
entry2, entry3));
-
-        // Create IndexFileHandler with cache enabled
-        IndexFileHandler indexFileHandler =
-                new IndexFileHandler(
-                        fileStore.fileIO(),
-                        fileStore.snapshotManager(),
-                        indexManifestFile,
-                        new IndexFilePathFactories(fileStore.pathFactory()),
-                        MemorySize.ofMebiBytes(2),
-                        dvMetaCache,
-                        false);
-
-        Map<String, DeletionFile> expectedPartition1Bucket0Files =
-                indexFileHandler.extractDeletionFileByMeta(partition1, 0, 
entry1.indexFile());
-        Map<String, DeletionFile> expectedPartition1Bucket1Files =
-                indexFileHandler.extractDeletionFileByMeta(partition1, 1, 
entry2.indexFile());
-        Map<String, DeletionFile> expectedPartition2Bucket0Files =
-                indexFileHandler.extractDeletionFileByMeta(partition2, 0, 
entry3.indexFile());
-
-        CacheMetrics cacheMetrics = new CacheMetrics();
-        indexFileHandler.withCacheMetrics(cacheMetrics);
-
-        Snapshot snapshot = snapshot(indexManifestFileName);
-
-        // Test 1: First access should miss cache
-        Set<Pair<BinaryRow, Integer>> partitionBuckets = new HashSet<>();
-        partitionBuckets.add(Pair.of(partition1, 0));
-        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles1 =
-                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
-        assertThat(deletionFiles1).isNotNull();
-        assertThat(deletionFiles1.containsKey(Pair.of(partition1, 
0))).isTrue();
-        assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
-        assertThat(cacheMetrics.getHitObject().get()).isEqualTo(0);
-        Map<String, DeletionFile> actualPartition1Bucket0Files =
-                deletionFiles1.get(Pair.of(partition1, 0));
-        
assertThat(actualPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
-
-        // Test 2: Second access to same partition/bucket should hit cache
-        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles2 =
-                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
-        assertThat(deletionFiles2).isNotNull();
-        assertThat(deletionFiles1).isEqualTo(deletionFiles2);
-        assertThat(cacheMetrics.getHitObject().get()).isEqualTo(1);
-        Map<String, DeletionFile> cachedPartition1Bucket0Files =
-                deletionFiles2.get(Pair.of(partition1, 0));
-        
assertThat(cachedPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
-
-        // Test 3: Access different bucket in same partition should hit cache
-        partitionBuckets.clear();
-        partitionBuckets.add(Pair.of(partition1, 1));
-        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles3 =
-                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
-        assertThat(deletionFiles3).isNotNull();
-        assertThat(cacheMetrics.getHitObject().get()).isEqualTo(2);
-        assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
-        Map<String, DeletionFile> actualPartition1Bucket1Files =
-                deletionFiles3.get(Pair.of(partition1, 1));
-        
assertThat(actualPartition1Bucket1Files).isEqualTo(expectedPartition1Bucket1Files);
-
-        // Test 4: Access different partition should miss cache
-        partitionBuckets.clear();
-        partitionBuckets.add(Pair.of(partition2, 0));
-        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles4 =
-                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
-        assertThat(deletionFiles4).isNotNull();
-        assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(2); // Now 
2 misses total
-        Map<String, DeletionFile> actualPartition2Bucket0Files =
-                deletionFiles4.get(Pair.of(partition2, 0));
-        
assertThat(actualPartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
-
-        // Test 5: Test non-cache path by requesting multiple partition buckets
-        partitionBuckets.clear();
-        partitionBuckets.add(Pair.of(partition1, 0));
-        partitionBuckets.add(Pair.of(partition2, 0)); // Multiple buckets to 
avoid cache path
-        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles5 =
-                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
-        assertThat(deletionFiles5).isNotNull();
-        assertThat(deletionFiles5.containsKey(Pair.of(partition1, 
0))).isTrue();
-        assertThat(deletionFiles5.containsKey(Pair.of(partition2, 
0))).isTrue();
-
-        Map<String, DeletionFile> nonCachePartition1Bucket0Files =
-                deletionFiles5.get(Pair.of(partition1, 0));
-        Map<String, DeletionFile> nonCachePartition2Bucket0Files =
-                deletionFiles5.get(Pair.of(partition2, 0));
-        
assertThat(nonCachePartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
-        
assertThat(nonCachePartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
-    }
-
-    // ============================ Test utils 
===================================
-
-    private BinaryRow partition(int partitionValue) {
-        BinaryRow partition = new BinaryRow(1);
-        BinaryRowWriter writer = new BinaryRowWriter(partition);
-        writer.writeInt(0, partitionValue);
-        writer.complete();
-        return partition;
-    }
-
-    private IndexFileMeta deletionVectorIndexFile(String... dataFileNames) {
-        LinkedHashMap<String, DeletionVectorMeta> dvRanges = new 
LinkedHashMap<>();
-        int offset = 0;
-        for (String dataFileName : dataFileNames) {
-            dvRanges.put(
-                    dataFileName,
-                    new DeletionVectorMeta(
-                            dataFileName, offset, 100 + offset, (long) (10 + 
offset)));
-            offset += 150;
-        }
-        return new IndexFileMeta(
-                DELETION_VECTORS_INDEX,
-                "dv_index_" + UUID.randomUUID().toString(),
-                1024,
-                512,
-                dvRanges,
-                null);
-    }
-
-    private Snapshot snapshot(String indexManifestFileName) {
-        String json =
-                "{\n"
-                        + "  \"version\" : 3,\n"
-                        + "  \"id\" : 1,\n"
-                        + "  \"schemaId\" : 0,\n"
-                        + "  \"baseManifestList\" : null,\n"
-                        + "  \"baseManifestListSize\" : 0,\n"
-                        + "  \"deltaManifestList\" : null,\n"
-                        + "  \"deltaManifestListSize\" : 0,\n"
-                        + "  \"changelogManifestListSize\" : 0,\n"
-                        + "  \"indexManifest\" : \""
-                        + indexManifestFileName
-                        + "\",\n"
-                        + "  \"commitUser\" : \"test\",\n"
-                        + "  \"commitIdentifier\" : 1,\n"
-                        + "  \"commitKind\" : \"APPEND\",\n"
-                        + "  \"timeMillis\" : "
-                        + System.currentTimeMillis()
-                        + ",\n"
-                        + "  \"totalRecordCount\" : null,\n"
-                        + "  \"deltaRecordCount\" : null\n"
-                        + "}";
-        return Snapshot.fromJson(json);
-    }
-
-    private TestFileStore keyValueFileStore() {
-        return new TestFileStore.Builder(
-                        "avro",
-                        tempDir.toString(),
-                        2, // 2 buckets for testing
-                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
-                        TestKeyValueGenerator.KEY_TYPE,
-                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
-                        
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
-                        DeduplicateMergeFunction.factory(),
-                        null)
-                .build();
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
index 5b1c9611bb..c0c28ce3ab 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
@@ -102,7 +102,7 @@ public class DVMetaCacheTest {
 
     @Test
     public void testCacheEviction() {
-        DVMetaCache cache = new DVMetaCache(2);
+        DVMetaCache cache = new DVMetaCache(5);
         Path path = new Path("manifest/index-manifest-00004");
         BinaryRow partition = partition("year=2023/month=09");
 
@@ -126,16 +126,25 @@ public class DVMetaCacheTest {
         assertThat(cache.read(path, partition, 1)).isNotNull();
         assertThat(cache.read(path, partition, 2)).isNotNull();
 
-        // Add third entry, should evict first one
+        // Add third entry, should evict itself
         Map<String, DeletionFile> dvFiles3 = new HashMap<>();
         dvFiles3.put(
                 "data-g7h8i9j0-k1l2-3456-ghij-789012345678-1.parquet",
                 new 
DeletionFile("index-g7h8i9j0-k1l2-3456-ghij-789012345678-1", 0L, 100L, 3L));
         cache.put(path, partition, 3, dvFiles3);
 
+        // Add forth entry, should evict first one
+        Map<String, DeletionFile> dvFiles4 = new HashMap<>();
+        dvFiles4.put(
+                "data-g7h8i9j0-k1l2-311456-ghij-789012345678-1.parquet",
+                new 
DeletionFile("index-g117h8i9j0-k1l2-3456-ghij-789012345678-1", 0L, 100L, 3L));
+        cache.put(path, partition, 4, dvFiles4);
+
         // First entry should be evicted
         assertThat(cache.read(path, partition, 1)).isNull();
-        assertThat(cache.read(path, partition, 3)).isNotNull();
+        assertThat(cache.read(path, partition, 2)).isNotNull();
+        assertThat(cache.read(path, partition, 3)).isNull();
+        assertThat(cache.read(path, partition, 4)).isNotNull();
     }
 
     // ============================ Test utils 
===================================

Reply via email to