This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ac64779640 [core] Introduce deletion vector meta cache at bucket level 
(#6407)
ac64779640 is described below

commit ac64779640b7ec43852ae2dd8588f042ef76bf0e
Author: bryndenZh <[email protected]>
AuthorDate: Mon Oct 27 20:19:08 2025 +0800

    [core] Introduce deletion vector meta cache at bucket level (#6407)
---
 .../generated/catalog_configuration.html           |   6 +
 .../org/apache/paimon/options/CatalogOptions.java  |   7 +
 .../java/org/apache/paimon/AbstractFileStore.java  |  11 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   3 +
 .../org/apache/paimon/catalog/CachingCatalog.java  |  12 +-
 .../org/apache/paimon/index/IndexFileHandler.java  | 147 ++++++++++++++
 .../apache/paimon/manifest/IndexManifestFile.java  |  19 +-
 .../paimon/operation/metrics/ScanMetrics.java      |  10 +
 .../paimon/privilege/PrivilegedFileStore.java      |   6 +
 .../paimon/table/AbstractFileStoreTable.java       |   8 +
 .../paimon/table/DelegatedFileStoreTable.java      |   6 +
 .../org/apache/paimon/table/FileStoreTable.java    |   3 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  38 ++--
 .../java/org/apache/paimon/utils/DVMetaCache.java  | 179 ++++++++++++++++
 .../manifest/IndexManifestFileHandlerTest.java     | 226 +++++++++++++++++++++
 .../apache/paimon/manifest/ManifestFileTest.java   |   1 +
 .../paimon/operation/metrics/ScanMetricsTest.java  |   4 +-
 .../org/apache/paimon/utils/DVMetaCacheTest.java   | 150 ++++++++++++++
 18 files changed, 818 insertions(+), 18 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 9a8dcfb1ac..eaeea12346 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -32,6 +32,12 @@ under the License.
             <td>Boolean</td>
             <td>Controls whether the catalog will cache databases, tables, 
manifests and partitions.</td>
         </tr>
+        <tr>
+            <td><h5>cache.deletion-vectors.max-bucket-num</h5></td>
+            <td style="word-wrap: break-word;">20000</td>
+            <td>Integer</td>
+            <td>Controls the maximum number of bucket-level deletion vector 
meta that can be cached.</td>
+        </tr>
         <tr>
             <td><h5>cache.expire-after-access</h5></td>
             <td style="word-wrap: break-word;">10 min</td>
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 7a862e8fb2..72674bdd7b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -136,6 +136,13 @@ public class CatalogOptions {
                     .withDescription(
                             "Controls the max number for snapshots per table 
in the catalog are cached.");
 
+    public static final ConfigOption<Integer> CACHE_DV_MAX_NUM =
+            key("cache.deletion-vectors.max-bucket-num")
+                    .intType()
+                    .defaultValue(20000)
+                    .withDescription(
+                            "Controls the maximum number of bucket-level 
deletion vector meta that can be cached.");
+
     public static final ConfigOption<Boolean> CASE_SENSITIVE =
             ConfigOptions.key("case-sensitive")
                     .booleanType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2e209fba45..2a5dbd7766 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -59,6 +59,7 @@ import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IndexFilePathFactories;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -97,6 +98,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
 
     @Nullable private SegmentsCache<Path> readManifestCache;
     @Nullable private Cache<Path, Snapshot> snapshotCache;
+    @Nullable private DVMetaCache dvMetaCache;
 
     protected AbstractFileStore(
             FileIO fileIO,
@@ -216,7 +218,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 FileFormat.manifestFormat(options),
                 options.manifestCompression(),
                 pathFactory(),
-                readManifestCache);
+                readManifestCache,
+                dvMetaCache);
     }
 
     @Override
@@ -227,6 +230,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 indexManifestFileFactory().create(),
                 new IndexFilePathFactories(pathFactory()),
                 options.dvIndexFileTargetSize(),
+                this.dvMetaCache,
                 options.deletionVectorBitmap64());
     }
 
@@ -488,4 +492,9 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
     public void setSnapshotCache(Cache<Path, Snapshot> cache) {
         this.snapshotCache = cache;
     }
+
+    @Override
+    public void setDVMetaCache(DVMetaCache dvMetaCache) {
+        this.dvMetaCache = dvMetaCache;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 929302673d..62bc65c744 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
@@ -121,4 +122,6 @@ public interface FileStore<T> {
     void setManifestCache(SegmentsCache<Path> manifestCache);
 
     void setSnapshotCache(Cache<Path, Snapshot> cache);
+
+    void setDVMetaCache(DVMetaCache dvMetaCache);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 18ba10cdd1..9dfb32e888 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.SegmentsCache;
 
 import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.paimon.options.CatalogOptions.CACHE_DV_MAX_NUM;
 import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
 import static 
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
@@ -66,9 +68,9 @@ public class CachingCatalog extends DelegateCatalog {
     protected Cache<String, Database> databaseCache;
     protected Cache<Identifier, Table> tableCache;
     @Nullable protected final SegmentsCache<Path> manifestCache;
-
     // partition cache will affect data latency
     @Nullable protected Cache<Identifier, List<Partition>> partitionCache;
+    @Nullable protected DVMetaCache dvMetaCache;
 
     public CachingCatalog(Catalog wrapped, Options options) {
         super(wrapped);
@@ -97,6 +99,11 @@ public class CachingCatalog extends DelegateCatalog {
         this.manifestCache = SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
 
         this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM);
+
+        int cacheDvMaxNum = options.get(CACHE_DV_MAX_NUM);
+        if (cacheDvMaxNum > 0) {
+            this.dvMetaCache = new DVMetaCache(cacheDvMaxNum);
+        }
         init(Ticker.systemTicker());
     }
 
@@ -271,6 +278,9 @@ public class CachingCatalog extends DelegateCatalog {
             if (manifestCache != null) {
                 storeTable.setManifestCache(manifestCache);
             }
+            if (dvMetaCache != null) {
+                storeTable.setDVMetaCache(dvMetaCache);
+            }
         }
 
         tableCache.put(identifier, table);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 205db897ea..83a41161f3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.index;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
@@ -26,19 +27,27 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.operation.metrics.CacheMetrics;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.IndexFilePathFactories;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -52,6 +61,8 @@ public class IndexFileHandler {
     private final IndexFilePathFactories pathFactories;
     private final MemorySize dvTargetFileSize;
     private final boolean dvBitmap64;
+    @Nullable private CacheMetrics cacheMetrics;
+    @Nullable private final DVMetaCache dvMetaCache;
 
     public IndexFileHandler(
             FileIO fileIO,
@@ -59,6 +70,7 @@ public class IndexFileHandler {
             IndexManifestFile indexManifestFile,
             IndexFilePathFactories pathFactories,
             MemorySize dvTargetFileSize,
+            DVMetaCache dvMetaCache,
             boolean dvBitmap64) {
         this.fileIO = fileIO;
         this.snapshotManager = snapshotManager;
@@ -66,6 +78,7 @@ public class IndexFileHandler {
         this.indexManifestFile = indexManifestFile;
         this.dvTargetFileSize = dvTargetFileSize;
         this.dvBitmap64 = dvBitmap64;
+        this.dvMetaCache = dvMetaCache;
     }
 
     public HashIndexFile hashIndex(BinaryRow partition, int bucket) {
@@ -87,6 +100,140 @@ public class IndexFileHandler {
         return result.isEmpty() ? Optional.empty() : 
Optional.of(result.get(0));
     }
 
+    public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) {
+        this.cacheMetrics = cacheMetrics;
+    }
+
+    // Construct DataFile -> DeletionFile based on IndexFileMeta
+    @Nullable
+    @VisibleForTesting
+    public Map<String, DeletionFile> extractDeletionFileByMeta(
+            BinaryRow partition, Integer bucket, IndexFileMeta fileMeta) {
+        LinkedHashMap<String, DeletionVectorMeta> dvRanges = 
fileMeta.dvRanges();
+        String dvFilePath = dvIndex(partition, 
bucket).path(fileMeta).toString();
+        if (dvRanges != null && !dvRanges.isEmpty()) {
+            Map<String, DeletionFile> result = new HashMap<>();
+            for (DeletionVectorMeta dvMeta : dvRanges.values()) {
+                result.put(
+                        dvMeta.dataFileName(),
+                        new DeletionFile(
+                                dvFilePath,
+                                dvMeta.offset(),
+                                dvMeta.length(),
+                                dvMeta.cardinality()));
+            }
+            return result;
+        }
+        return null;
+    }
+
+    // Scan DV index file of given partition buckets
+    // returns <DataFile: DeletionFile> map grouped by partition and bucket
+    public Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
scanDVIndex(
+            Snapshot snapshot, Set<Pair<BinaryRow, Integer>> partitionBuckets) 
{
+        if (snapshot == null || snapshot.indexManifest() == null) {
+            return Collections.emptyMap();
+        }
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new 
HashMap<>();
+        // to avoid cache being frequently evicted,
+        //  currently we only read from cache when bucket number is 1
+        if (this.dvMetaCache != null && partitionBuckets.size() == 1) {
+            Pair<BinaryRow, Integer> partitionBucket = 
partitionBuckets.iterator().next();
+            Map<String, DeletionFile> deletionFiles =
+                    this.scanDVIndexWithCache(
+                            snapshot, partitionBucket.getLeft(), 
partitionBucket.getRight());
+            if (deletionFiles != null && deletionFiles.size() > 0) {
+                result.put(partitionBucket, deletionFiles);
+            }
+            return result;
+        }
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
+                scan(
+                        snapshot,
+                        DELETION_VECTORS_INDEX,
+                        
partitionBuckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
+        partitionFileMetas.forEach(
+                (entry, indexFileMetas) -> {
+                    if (partitionBuckets.contains(entry)) {
+                        if (indexFileMetas != null) {
+                            indexFileMetas.forEach(
+                                    indexFileMeta -> {
+                                        Map<String, DeletionFile> dvMetas =
+                                                extractDeletionFileByMeta(
+                                                        entry.getLeft(),
+                                                        entry.getRight(),
+                                                        indexFileMeta);
+                                        if (dvMetas != null) {
+                                            result.computeIfAbsent(entry, k -> 
new HashMap<>())
+                                                    .putAll(dvMetas);
+                                        }
+                                    });
+                        }
+                    }
+                });
+        return result;
+    }
+
+    // Scan DV Meta Cache first, if not exist, scan DV index file, returns the 
exact deletion file
+    // of the specified partition/bucket
+    @VisibleForTesting
+    Map<String, DeletionFile> scanDVIndexWithCache(
+            Snapshot snapshot, BinaryRow partition, Integer bucket) {
+        // read from cache
+        String indexManifestName = snapshot.indexManifest();
+        Path indexManifestPath = 
this.indexManifestFile.indexManifestFilePath(indexManifestName);
+        Map<String, DeletionFile> result =
+                this.dvMetaCache.read(indexManifestPath, partition, bucket);
+        if (result != null) {
+            if (cacheMetrics != null) {
+                cacheMetrics.increaseHitObject();
+            }
+            return result;
+        }
+        if (cacheMetrics != null) {
+            cacheMetrics.increaseMissedObject();
+        }
+        // If miss, read the whole partition's deletion files
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
+                scan(
+                        snapshot,
+                        DELETION_VECTORS_INDEX,
+                        new HashSet<>(Collections.singletonList(partition)));
+        // for each bucket, extract deletion files, and fill meta cache
+        for (Map.Entry<Pair<BinaryRow, Integer>, List<IndexFileMeta>> entry :
+                partitionFileMetas.entrySet()) {
+            Pair<BinaryRow, Integer> partitionBucket = entry.getKey();
+            List<IndexFileMeta> fileMetas = entry.getValue();
+            if (entry.getValue() != null) {
+                Map<String, DeletionFile> bucketDeletionFiles = new 
HashMap<>();
+                fileMetas.forEach(
+                        meta -> {
+                            Map<String, DeletionFile> bucketDVMetas =
+                                    extractDeletionFileByMeta(
+                                            partitionBucket.getLeft(),
+                                            partitionBucket.getRight(),
+                                            meta);
+                            if (bucketDVMetas != null) {
+                                bucketDeletionFiles.putAll(bucketDVMetas);
+                            }
+                        });
+                // bucketDeletionFiles can be empty
+                this.dvMetaCache.put(
+                        indexManifestPath,
+                        partitionBucket.getLeft(),
+                        partitionBucket.getRight(),
+                        bucketDeletionFiles);
+                if (partitionBucket.getRight() != null
+                        && partitionBucket.getLeft() != null
+                        && partitionBucket.getRight().equals(bucket)
+                        && partitionBucket.getLeft().equals(partition)) {
+                    result = bucketDeletionFiles;
+                }
+            }
+        }
+        return result;
+    }
+
     public List<IndexManifestEntry> scan(String indexType) {
         return scan(snapshotManager.latestSnapshot(), indexType);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 077d1a45aa..21086eadf3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ObjectsFile;
 import org.apache.paimon.utils.PathFactory;
@@ -39,6 +40,8 @@ import java.util.List;
 /** Index manifest file. */
 public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
 
+    private final DVMetaCache dvMetaCache;
+
     private IndexManifestFile(
             FileIO fileIO,
             RowType schema,
@@ -46,7 +49,8 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
             FormatWriterFactory writerFactory,
             String compression,
             PathFactory pathFactory,
-            @Nullable SegmentsCache<Path> cache) {
+            @Nullable SegmentsCache<Path> cache,
+            @Nullable DVMetaCache dvMetaCache) {
         super(
                 fileIO,
                 new IndexManifestEntrySerializer(),
@@ -56,6 +60,11 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
                 compression,
                 pathFactory,
                 cache);
+        this.dvMetaCache = dvMetaCache;
+    }
+
+    public Path indexManifestFilePath(String fileName) {
+        return pathFactory.toPath(fileName);
     }
 
     /** Write new index files to index manifest. */
@@ -79,18 +88,21 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
         private final String compression;
         private final FileStorePathFactory pathFactory;
         @Nullable private final SegmentsCache<Path> cache;
+        @Nullable private final DVMetaCache dvMetaCache;
 
         public Factory(
                 FileIO fileIO,
                 FileFormat fileFormat,
                 String compression,
                 FileStorePathFactory pathFactory,
-                @Nullable SegmentsCache<Path> cache) {
+                @Nullable SegmentsCache<Path> cache,
+                @Nullable DVMetaCache dvMetaCache) {
             this.fileIO = fileIO;
             this.fileFormat = fileFormat;
             this.compression = compression;
             this.pathFactory = pathFactory;
             this.cache = cache;
+            this.dvMetaCache = dvMetaCache;
         }
 
         public IndexManifestFile create() {
@@ -102,7 +114,8 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
                     fileFormat.createWriterFactory(schema),
                     compression,
                     pathFactory.indexManifestFileFactory(),
-                    cache);
+                    cache,
+                    dvMetaCache);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
index 280f53a101..10246907ef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -35,10 +35,13 @@ public class ScanMetrics {
     public static final String LAST_SCAN_RESULTED_TABLE_FILES = 
"lastScanResultedTableFiles";
     public static final String MANIFEST_HIT_CACHE = "manifestHitCache";
     public static final String MANIFEST_MISSED_CACHE = "manifestMissedCache";
+    public static final String DVMETA_HIT_CACHE = "dvMetaHitCache";
+    public static final String DVMETA_MISSED_CACHE = "dvMetaMissedCache";
 
     private final MetricGroup metricGroup;
     private final Histogram durationHistogram;
     private final CacheMetrics cacheMetrics;
+    private final CacheMetrics dvMetaCacheMetrics;
 
     private ScanStats latestScan;
 
@@ -48,6 +51,7 @@ public class ScanMetrics {
                 LAST_SCAN_DURATION, () -> latestScan == null ? 0L : 
latestScan.getDuration());
         durationHistogram = metricGroup.histogram(SCAN_DURATION, 
HISTOGRAM_WINDOW_SIZE);
         cacheMetrics = new CacheMetrics();
+        dvMetaCacheMetrics = new CacheMetrics();
         metricGroup.gauge(
                 LAST_SCANNED_MANIFESTS,
                 () -> latestScan == null ? 0L : 
latestScan.getScannedManifests());
@@ -59,6 +63,8 @@ public class ScanMetrics {
                 () -> latestScan == null ? 0L : 
latestScan.getResultedTableFiles());
         metricGroup.gauge(MANIFEST_HIT_CACHE, () -> 
cacheMetrics.getHitObject().get());
         metricGroup.gauge(MANIFEST_MISSED_CACHE, () -> 
cacheMetrics.getMissedObject().get());
+        metricGroup.gauge(DVMETA_HIT_CACHE, () -> 
dvMetaCacheMetrics.getHitObject().get());
+        metricGroup.gauge(DVMETA_MISSED_CACHE, () -> 
dvMetaCacheMetrics.getMissedObject().get());
     }
 
     @VisibleForTesting
@@ -74,4 +80,8 @@ public class ScanMetrics {
     public CacheMetrics getCacheMetrics() {
         return cacheMetrics;
     }
+
+    public CacheMetrics getDvMetaCacheMetrics() {
+        return dvMetaCacheMetrics;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 99069ddc31..b683174cd4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -44,6 +44,7 @@ import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
@@ -232,4 +233,9 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     public void setSnapshotCache(Cache<Path, Snapshot> cache) {
         wrapped.setSnapshotCache(cache);
     }
+
+    @Override
+    public void setDVMetaCache(DVMetaCache dvMetaCache) {
+        wrapped.setDVMetaCache(dvMetaCache);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index aa5b8bb70d..aeb223aaf5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -55,6 +55,7 @@ import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.CatalogBranchManager;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.FileSystemBranchManager;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SegmentsCache;
@@ -93,6 +94,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     @Nullable protected transient SegmentsCache<Path> manifestCache;
     @Nullable protected transient Cache<Path, Snapshot> snapshotCache;
     @Nullable protected transient Cache<String, Statistics> statsCache;
+    @Nullable protected transient DVMetaCache dvmetaCache;
 
     protected AbstractFileStoreTable(
             FileIO fileIO,
@@ -138,6 +140,12 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         this.statsCache = cache;
     }
 
+    @Override
+    public void setDVMetaCache(DVMetaCache cache) {
+        this.dvmetaCache = cache;
+        store().setDVMetaCache(cache);
+    }
+
     @Override
     public Optional<Snapshot> latestSnapshot() {
         Snapshot snapshot = store().snapshotManager().latestSnapshot();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index e8898a1cd1..3a0d4ecc1c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -42,6 +42,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SimpleFileReader;
 import org.apache.paimon.utils.SnapshotManager;
@@ -154,6 +155,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.setStatsCache(cache);
     }
 
+    @Override
+    public void setDVMetaCache(DVMetaCache cache) {
+        wrapped.setDVMetaCache(cache);
+    }
+
     @Override
     public TableSchema schema() {
         return wrapped.schema();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 0ed51cb852..b07465a258 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -36,6 +36,7 @@ import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.TagManager;
 
@@ -63,6 +64,8 @@ public interface FileStoreTable extends DataTable {
 
     void setStatsCache(Cache<String, Statistics> cache);
 
+    void setDVMetaCache(DVMetaCache cache);
+
     @Override
     default RowType rowType() {
         return schema().logicalRowType();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 81c015a9d4..9157737915 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -286,7 +286,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
     @Override
     public SnapshotReader withMetricRegistry(MetricRegistry registry) {
-        scan.withMetrics(new ScanMetrics(registry, tableName));
+        ScanMetrics scanMetrics = new ScanMetrics(registry, tableName);
+        scan.withMetrics(scanMetrics);
+        indexFileHandler.withCacheMetrics(scanMetrics.getDvMetaCacheMetrics());
         return this;
     }
 
@@ -349,14 +351,18 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
             Map<BinaryRow, Map<Integer, List<ManifestEntry>>> 
groupedManifestEntries) {
         List<DataSplit> splits = new ArrayList<>();
         // Read deletion indexes at once to reduce file IO
-        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap = null;
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFilesMap = null;
         if (!isStreaming) {
-            deletionIndexFilesMap =
+            Set<Pair<BinaryRow, Integer>> partitionBuckets =
+                    groupedManifestEntries.entrySet().stream()
+                            .flatMap(
+                                    e ->
+                                            e.getValue().keySet().stream()
+                                                    .map(bucket -> 
Pair.of(e.getKey(), bucket)))
+                            .collect(Collectors.toSet());
+            deletionFilesMap =
                     deletionVectors && snapshot != null
-                            ? indexFileHandler.scan(
-                                    snapshot,
-                                    DELETION_VECTORS_INDEX,
-                                    groupedManifestEntries.keySet())
+                            ? indexFileHandler.scanDVIndex(snapshot, 
partitionBuckets)
                             : Collections.emptyMap();
         }
         for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry :
@@ -387,16 +393,14 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                     builder.withDataFiles(dataFiles)
                             .rawConvertible(splitGroup.rawConvertible)
                             .withBucketPath(bucketPath);
-                    if (deletionVectors && deletionIndexFilesMap != null) {
+                    if (deletionVectors && deletionFilesMap != null) {
                         builder.withDataDeletionFiles(
                                 getDeletionFiles(
-                                        indexFileHandler.dvIndex(partition, 
bucket),
                                         dataFiles,
-                                        deletionIndexFilesMap.getOrDefault(
+                                        deletionFilesMap.getOrDefault(
                                                 Pair.of(partition, bucket),
-                                                Collections.emptyList())));
+                                                Collections.emptyMap())));
                     }
-
                     splits.add(builder.build());
                 }
             }
@@ -588,4 +592,14 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
         return deletionFiles;
     }
+
+    public List<DeletionFile> getDeletionFiles(
+            List<DataFileMeta> dataFiles, Map<String, DeletionFile> 
deletionFilesMap) {
+        List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
+        dataFiles.stream()
+                .map(DataFileMeta::fileName)
+                .map(f -> deletionFilesMap == null ? null : 
deletionFilesMap.get(f))
+                .forEach(deletionFiles::add);
+        return deletionFiles;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
new file mode 100644
index 0000000000..d06c563187
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.DeletionFile;
+
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Cache for deletion vector meta. */
+public class DVMetaCache {
+    private final Cache<DVMetaCacheKey, List<DVMetaCacheValue>> cache;
+
+    public DVMetaCache(long maxElementSize) {
+        this.cache =
+                Caffeine.newBuilder()
+                        .maximumSize(maxElementSize)
+                        .softValues()
+                        .executor(Runnable::run)
+                        .build();
+    }
+
+    @Nullable
+    public Map<String, DeletionFile> read(Path path, BinaryRow partition, int 
bucket) {
+        DVMetaCacheKey cacheKey = new DVMetaCacheKey(path, partition, bucket);
+        List<DVMetaCacheValue> cacheValue = this.cache.getIfPresent(cacheKey);
+        if (cacheValue == null) {
+            return null;
+        }
+        // If the bucket doesn't have dv metas, return empty set.
+        Map<String, DeletionFile> dvFilesMap = new HashMap<>();
+        cacheValue.forEach(
+                dvMeta ->
+                        dvFilesMap.put(
+                                dvMeta.getDataFileName(),
+                                new DeletionFile(
+                                        dvMeta.getDeletionFilePath(),
+                                        dvMeta.getOffset(),
+                                        dvMeta.getLength(),
+                                        dvMeta.getCardinality())));
+        return dvFilesMap;
+    }
+
+    public void put(
+            Path path, BinaryRow partition, int bucket, Map<String, 
DeletionFile> dvFilesMap) {
+        DVMetaCacheKey key = new DVMetaCacheKey(path, partition, bucket);
+        List<DVMetaCacheValue> cacheValue = new ArrayList<>();
+        dvFilesMap.forEach(
+                (dataFileName, deletionFile) -> {
+                    DVMetaCacheValue dvMetaCacheValue =
+                            new DVMetaCacheValue(
+                                    dataFileName,
+                                    deletionFile.path(),
+                                    (int) deletionFile.offset(),
+                                    (int) deletionFile.length(),
+                                    deletionFile.cardinality());
+                    cacheValue.add(dvMetaCacheValue);
+                });
+        this.cache.put(key, cacheValue);
+    }
+
+    private static class DVMetaCacheValue {
+        private final String dataFileName;
+        private final String deletionFilePath;
+        private final int offset;
+        private final int length;
+        @Nullable private final Long cardinality;
+
+        public DVMetaCacheValue(
+                String dataFileName,
+                String deletionFilePath,
+                int start,
+                int length,
+                @Nullable Long cardinality) {
+            this.dataFileName = dataFileName;
+            this.deletionFilePath = deletionFilePath;
+            this.offset = start;
+            this.length = length;
+            this.cardinality = cardinality;
+        }
+
+        public String getDataFileName() {
+            return dataFileName;
+        }
+
+        public String getDeletionFilePath() {
+            return deletionFilePath;
+        }
+
+        public int getOffset() {
+            return offset;
+        }
+
+        public int getLength() {
+            return length;
+        }
+
+        @Nullable
+        public Long getCardinality() {
+            return cardinality;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DVMetaCacheValue that = (DVMetaCacheValue) o;
+            return offset == that.offset
+                    && length == that.length
+                    && Objects.equals(dataFileName, that.dataFileName)
+                    && Objects.equals(deletionFilePath, that.deletionFilePath)
+                    && Objects.equals(cardinality, that.cardinality);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(dataFileName, deletionFilePath, offset, 
length, cardinality);
+        }
+    }
+
+    /** Cache key for deletion vector meta at bucket level. */
+    private static final class DVMetaCacheKey {
+        private final Path path;
+        private final BinaryRow row;
+        private final int bucket;
+
+        public DVMetaCacheKey(Path path, BinaryRow row, int bucket) {
+            this.path = path;
+            this.row = row;
+            this.bucket = bucket;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof DVMetaCacheKey)) {
+                return false;
+            }
+            DVMetaCacheKey that = (DVMetaCacheKey) o;
+            return bucket == that.bucket
+                    && Objects.equals(path, that.path)
+                    && Objects.equals(row, that.row);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(path, row, bucket);
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
index c2c5baba0a..edc34bc766 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -18,18 +18,40 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.TestAppendFileStore;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.operation.metrics.CacheMetrics;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.utils.DVMetaCache;
+import org.apache.paimon.utils.IndexFilePathFactories;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SegmentsCache;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static 
org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -49,6 +71,7 @@ public class IndexManifestFileHandlerTest {
                                 FileFormat.manifestFormat(fileStore.options()),
                                 "zstd",
                                 fileStore.pathFactory(),
+                                null,
                                 null)
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
@@ -84,6 +107,7 @@ public class IndexManifestFileHandlerTest {
                                 FileFormat.manifestFormat(fileStore.options()),
                                 "zstd",
                                 fileStore.pathFactory(),
+                                null,
                                 null)
                         .create();
         IndexManifestFileHandler indexManifestFileHandler =
@@ -114,4 +138,206 @@ public class IndexManifestFileHandlerTest {
         assertThat(entries.contains(entry3)).isTrue();
         assertThat(entries.contains(entry4)).isTrue();
     }
+
+    @Test
+    public void testDVMetaCache() {
+        TestFileStore fileStore = keyValueFileStore();
+
+        // Setup cache and index-manifest file
+        MemorySize manifestMaxMemory = MemorySize.ofMebiBytes(128);
+        long manifestCacheThreshold = MemorySize.ofMebiBytes(1).getBytes();
+        SegmentsCache<Path> manifestCache =
+                SegmentsCache.create(manifestMaxMemory, 
manifestCacheThreshold);
+        DVMetaCache dvMetaCache = new DVMetaCache(1000000);
+
+        IndexManifestFile indexManifestFile =
+                new IndexManifestFile.Factory(
+                                fileStore.fileIO(),
+                                FileFormat.manifestFormat(fileStore.options()),
+                                "zstd",
+                                fileStore.pathFactory(),
+                                manifestCache,
+                                dvMetaCache)
+                        .create();
+
+        IndexManifestFileHandler indexManifestFileHandler =
+                new IndexManifestFileHandler(indexManifestFile, 
BucketMode.HASH_FIXED);
+
+        BinaryRow partition1 = partition(1);
+        BinaryRow partition2 = partition(2);
+
+        IndexManifestEntry entry1 =
+                new IndexManifestEntry(
+                        FileKind.ADD,
+                        partition1,
+                        0,
+                        deletionVectorIndexFile("data1.parquet", 
"data2.parquet"));
+        IndexManifestEntry entry2 =
+                new IndexManifestEntry(
+                        FileKind.ADD, partition1, 1, 
deletionVectorIndexFile("data3.parquet"));
+        IndexManifestEntry entry3 =
+                new IndexManifestEntry(
+                        FileKind.ADD,
+                        partition2,
+                        0,
+                        deletionVectorIndexFile("data4.parquet", 
"data5.parquet"));
+
+        String indexManifestFileName =
+                indexManifestFileHandler.write(null, Arrays.asList(entry1, 
entry2, entry3));
+
+        // Create IndexFileHandler with cache enabled
+        IndexFileHandler indexFileHandler =
+                new IndexFileHandler(
+                        fileStore.fileIO(),
+                        fileStore.snapshotManager(),
+                        indexManifestFile,
+                        new IndexFilePathFactories(fileStore.pathFactory()),
+                        MemorySize.ofMebiBytes(2),
+                        dvMetaCache,
+                        false);
+
+        Map<String, DeletionFile> expectedPartition1Bucket0Files =
+                indexFileHandler.extractDeletionFileByMeta(partition1, 0, 
entry1.indexFile());
+        Map<String, DeletionFile> expectedPartition1Bucket1Files =
+                indexFileHandler.extractDeletionFileByMeta(partition1, 1, 
entry2.indexFile());
+        Map<String, DeletionFile> expectedPartition2Bucket0Files =
+                indexFileHandler.extractDeletionFileByMeta(partition2, 0, 
entry3.indexFile());
+
+        CacheMetrics cacheMetrics = new CacheMetrics();
+        indexFileHandler.withCacheMetrics(cacheMetrics);
+
+        Snapshot snapshot = snapshot(indexManifestFileName);
+
+        // Test 1: First access should miss cache
+        Set<Pair<BinaryRow, Integer>> partitionBuckets = new HashSet<>();
+        partitionBuckets.add(Pair.of(partition1, 0));
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles1 =
+                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+        assertThat(deletionFiles1).isNotNull();
+        assertThat(deletionFiles1.containsKey(Pair.of(partition1, 
0))).isTrue();
+        assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
+        assertThat(cacheMetrics.getHitObject().get()).isEqualTo(0);
+        Map<String, DeletionFile> actualPartition1Bucket0Files =
+                deletionFiles1.get(Pair.of(partition1, 0));
+        
assertThat(actualPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
+
+        // Test 2: Second access to same partition/bucket should hit cache
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles2 =
+                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+        assertThat(deletionFiles2).isNotNull();
+        assertThat(deletionFiles1).isEqualTo(deletionFiles2);
+        assertThat(cacheMetrics.getHitObject().get()).isEqualTo(1);
+        Map<String, DeletionFile> cachedPartition1Bucket0Files =
+                deletionFiles2.get(Pair.of(partition1, 0));
+        
assertThat(cachedPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
+
+        // Test 3: Access different bucket in same partition should hit cache
+        partitionBuckets.clear();
+        partitionBuckets.add(Pair.of(partition1, 1));
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles3 =
+                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+        assertThat(deletionFiles3).isNotNull();
+        assertThat(cacheMetrics.getHitObject().get()).isEqualTo(2);
+        assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
+        Map<String, DeletionFile> actualPartition1Bucket1Files =
+                deletionFiles3.get(Pair.of(partition1, 1));
+        
assertThat(actualPartition1Bucket1Files).isEqualTo(expectedPartition1Bucket1Files);
+
+        // Test 4: Access different partition should miss cache
+        partitionBuckets.clear();
+        partitionBuckets.add(Pair.of(partition2, 0));
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles4 =
+                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+        assertThat(deletionFiles4).isNotNull();
+        assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(2); // Now 
2 misses total
+        Map<String, DeletionFile> actualPartition2Bucket0Files =
+                deletionFiles4.get(Pair.of(partition2, 0));
+        
assertThat(actualPartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
+
+        // Test 5: Test non-cache path by requesting multiple partition buckets
+        partitionBuckets.clear();
+        partitionBuckets.add(Pair.of(partition1, 0));
+        partitionBuckets.add(Pair.of(partition2, 0)); // Multiple buckets to 
avoid cache path
+        Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
deletionFiles5 =
+                indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+        assertThat(deletionFiles5).isNotNull();
+        assertThat(deletionFiles5.containsKey(Pair.of(partition1, 
0))).isTrue();
+        assertThat(deletionFiles5.containsKey(Pair.of(partition2, 
0))).isTrue();
+
+        Map<String, DeletionFile> nonCachePartition1Bucket0Files =
+                deletionFiles5.get(Pair.of(partition1, 0));
+        Map<String, DeletionFile> nonCachePartition2Bucket0Files =
+                deletionFiles5.get(Pair.of(partition2, 0));
+        
assertThat(nonCachePartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
+        
assertThat(nonCachePartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
+    }
+
+    // ============================ Test utils 
===================================
+
+    private BinaryRow partition(int partitionValue) {
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(partition);
+        writer.writeInt(0, partitionValue);
+        writer.complete();
+        return partition;
+    }
+
+    private IndexFileMeta deletionVectorIndexFile(String... dataFileNames) {
+        LinkedHashMap<String, DeletionVectorMeta> dvRanges = new 
LinkedHashMap<>();
+        int offset = 0;
+        for (String dataFileName : dataFileNames) {
+            dvRanges.put(
+                    dataFileName,
+                    new DeletionVectorMeta(
+                            dataFileName, offset, 100 + offset, (long) (10 + 
offset)));
+            offset += 150;
+        }
+        return new IndexFileMeta(
+                DELETION_VECTORS_INDEX,
+                "dv_index_" + UUID.randomUUID().toString(),
+                1024,
+                512,
+                dvRanges,
+                null);
+    }
+
+    private Snapshot snapshot(String indexManifestFileName) {
+        String json =
+                "{\n"
+                        + "  \"version\" : 3,\n"
+                        + "  \"id\" : 1,\n"
+                        + "  \"schemaId\" : 0,\n"
+                        + "  \"baseManifestList\" : null,\n"
+                        + "  \"baseManifestListSize\" : 0,\n"
+                        + "  \"deltaManifestList\" : null,\n"
+                        + "  \"deltaManifestListSize\" : 0,\n"
+                        + "  \"changelogManifestListSize\" : 0,\n"
+                        + "  \"indexManifest\" : \""
+                        + indexManifestFileName
+                        + "\",\n"
+                        + "  \"commitUser\" : \"test\",\n"
+                        + "  \"commitIdentifier\" : 1,\n"
+                        + "  \"commitKind\" : \"APPEND\",\n"
+                        + "  \"timeMillis\" : "
+                        + System.currentTimeMillis()
+                        + ",\n"
+                        + "  \"totalRecordCount\" : null,\n"
+                        + "  \"deltaRecordCount\" : null\n"
+                        + "}";
+        return Snapshot.fromJson(json);
+    }
+
+    private TestFileStore keyValueFileStore() {
+        return new TestFileStore.Builder(
+                        "avro",
+                        tempDir.toString(),
+                        2, // 2 buckets for testing
+                        TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                        TestKeyValueGenerator.KEY_TYPE,
+                        TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                        
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+                        DeduplicateMergeFunction.factory(),
+                        null)
+                .build();
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 78045d9440..142ebc6a47 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -57,6 +57,7 @@ public class ManifestFileTest {
     public void testWriteAndReadManifestFile() {
         List<ManifestEntry> entries = generateData();
         ManifestFileMeta meta = gen.createManifestFileMeta(entries);
+        System.out.println(tempDir.toString());
         ManifestFile manifestFile = createManifestFile(tempDir.toString());
 
         List<ManifestFileMeta> actualMetas = manifestFile.write(entries);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
index 7ea86a2800..25b1b21f61 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
@@ -50,7 +50,9 @@ public class ScanMetricsTest {
                         ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
                         ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
                         ScanMetrics.MANIFEST_HIT_CACHE,
-                        ScanMetrics.MANIFEST_MISSED_CACHE);
+                        ScanMetrics.MANIFEST_MISSED_CACHE,
+                        ScanMetrics.DVMETA_HIT_CACHE,
+                        ScanMetrics.DVMETA_MISSED_CACHE);
     }
 
     /** Tests that the metrics are updated properly. */
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
new file mode 100644
index 0000000000..5b1c9611bb
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DVMetaCache}. */
+public class DVMetaCacheTest {
+
+    @Test
+    public void testPutAndRead() {
+        DVMetaCache cache = new DVMetaCache(100);
+        Path path = new Path("manifest/index-manifest-00001");
+        BinaryRow partition = partition("year=2023/month=12");
+
+        // Put data for bucket 1 with multiple files
+        Map<String, DeletionFile> dvFiles1 = new HashMap<>();
+        dvFiles1.put(
+                "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet",
+                new 
DeletionFile("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1", 0L, 100L, 42L));
+        dvFiles1.put(
+                "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet",
+                new 
DeletionFile("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1", 100L, 500L, null));
+        cache.put(path, partition, 1, dvFiles1);
+
+        // Put data for bucket 2 with single file
+        Map<String, DeletionFile> dvFiles2 = new HashMap<>();
+        dvFiles2.put(
+                "data-b2c3d4e5-f6g7-8901-bcde-f23456789012-1.parquet",
+                new 
DeletionFile("index-b2c3d4e5-f6g7-8901-bcde-f23456789012-1", 0L, 300L, 12L));
+        cache.put(path, partition, 2, dvFiles2);
+
+        // Read bucket 1 - verify multiple files
+        Map<String, DeletionFile> result1 = cache.read(path, partition, 1);
+        assertThat(result1).isNotNull().hasSize(2);
+        assertThat(result1)
+                .containsKeys(
+                        "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet",
+                        "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet");
+
+        DeletionFile file1 = 
result1.get("data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet");
+        
assertThat(file1.path()).isEqualTo("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1");
+        assertThat(file1.offset()).isEqualTo(0L);
+        assertThat(file1.length()).isEqualTo(100L);
+        assertThat(file1.cardinality()).isEqualTo(42L);
+
+        DeletionFile file2 = 
result1.get("data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet");
+        
assertThat(file2.path()).isEqualTo("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1");
+        assertThat(file2.cardinality()).isNull();
+
+        // Read bucket 2 - verify single file
+        Map<String, DeletionFile> result2 = cache.read(path, partition, 2);
+        assertThat(result2).isNotNull().hasSize(1);
+        
assertThat(result2).containsKey("data-b2c3d4e5-f6g7-8901-bcde-f23456789012-1.parquet");
+
+        // Read non-existent key
+        assertThat(cache.read(path, partition("year=2024/month=01"), 
0)).isNull();
+        assertThat(cache.read(path, partition, 999)).isNull();
+    }
+
+    @Test
+    public void testEmptyMap() {
+        DVMetaCache cache = new DVMetaCache(100);
+        Path path = new Path("manifest/index-manifest-00002");
+        BinaryRow partition = partition("year=2023/month=11");
+        cache.put(path, partition, 1, new HashMap<>());
+
+        // Should return empty map, not null
+        Map<String, DeletionFile> result = cache.read(path, partition, 1);
+        assertThat(result).isNotNull().isEmpty();
+    }
+
+    @Test
+    public void testCacheEviction() {
+        DVMetaCache cache = new DVMetaCache(2);
+        Path path = new Path("manifest/index-manifest-00004");
+        BinaryRow partition = partition("year=2023/month=09");
+
+        // Fill cache to capacity
+        Map<String, DeletionFile> dvFiles1 = new HashMap<>();
+        dvFiles1.put(
+                "data-e5f6g7h8-i9j0-1234-efgh-567890123456-1.parquet",
+                new 
DeletionFile("index-e5f6g7h8-i9j0-1234-efgh-567890123456-1", 0L, 100L, 1L));
+        dvFiles1.put(
+                "data-e5f6g7h8-i9j0-1234-efgh-567890123456-2.parquet",
+                new 
DeletionFile("index-e5f6g7h8-i9j0-1234-efgh-567890123456-1", 100L, 200L, 2L));
+        cache.put(path, partition, 1, dvFiles1);
+
+        Map<String, DeletionFile> dvFiles2 = new HashMap<>();
+        dvFiles2.put(
+                "data-f6g7h8i9-j0k1-2345-fghi-678901234567-1.parquet",
+                new 
DeletionFile("index-f6g7h8i9-j0k1-2345-fghi-678901234567-1", 0L, 100L, 2L));
+        cache.put(path, partition, 2, dvFiles2);
+
+        // Verify both buckets are cached
+        assertThat(cache.read(path, partition, 1)).isNotNull();
+        assertThat(cache.read(path, partition, 2)).isNotNull();
+
+        // Add third entry, should evict first one
+        Map<String, DeletionFile> dvFiles3 = new HashMap<>();
+        dvFiles3.put(
+                "data-g7h8i9j0-k1l2-3456-ghij-789012345678-1.parquet",
+                new 
DeletionFile("index-g7h8i9j0-k1l2-3456-ghij-789012345678-1", 0L, 100L, 3L));
+        cache.put(path, partition, 3, dvFiles3);
+
+        // First entry should be evicted
+        assertThat(cache.read(path, partition, 1)).isNull();
+        assertThat(cache.read(path, partition, 3)).isNotNull();
+    }
+
+    // ============================ Test utils 
===================================
+
+    private BinaryRow partition(String partitionValue) {
+        InternalRowSerializer serializer =
+                new InternalRowSerializer(RowType.of(DataTypes.STRING()));
+        return serializer
+                
.toBinaryRow(GenericRow.of(BinaryString.fromString(partitionValue)))
+                .copy();
+    }
+}

Reply via email to