This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ac64779640 [core] Introduce deletion vector meta cache at bucket level
(#6407)
ac64779640 is described below
commit ac64779640b7ec43852ae2dd8588f042ef76bf0e
Author: bryndenZh <[email protected]>
AuthorDate: Mon Oct 27 20:19:08 2025 +0800
[core] Introduce deletion vector meta cache at bucket level (#6407)
---
.../generated/catalog_configuration.html | 6 +
.../org/apache/paimon/options/CatalogOptions.java | 7 +
.../java/org/apache/paimon/AbstractFileStore.java | 11 +-
.../src/main/java/org/apache/paimon/FileStore.java | 3 +
.../org/apache/paimon/catalog/CachingCatalog.java | 12 +-
.../org/apache/paimon/index/IndexFileHandler.java | 147 ++++++++++++++
.../apache/paimon/manifest/IndexManifestFile.java | 19 +-
.../paimon/operation/metrics/ScanMetrics.java | 10 +
.../paimon/privilege/PrivilegedFileStore.java | 6 +
.../paimon/table/AbstractFileStoreTable.java | 8 +
.../paimon/table/DelegatedFileStoreTable.java | 6 +
.../org/apache/paimon/table/FileStoreTable.java | 3 +
.../table/source/snapshot/SnapshotReaderImpl.java | 38 ++--
.../java/org/apache/paimon/utils/DVMetaCache.java | 179 ++++++++++++++++
.../manifest/IndexManifestFileHandlerTest.java | 226 +++++++++++++++++++++
.../apache/paimon/manifest/ManifestFileTest.java | 1 +
.../paimon/operation/metrics/ScanMetricsTest.java | 4 +-
.../org/apache/paimon/utils/DVMetaCacheTest.java | 150 ++++++++++++++
18 files changed, 818 insertions(+), 18 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 9a8dcfb1ac..eaeea12346 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -32,6 +32,12 @@ under the License.
<td>Boolean</td>
<td>Controls whether the catalog will cache databases, tables,
manifests and partitions.</td>
</tr>
+ <tr>
+ <td><h5>cache.deletion-vectors.max-bucket-num</h5></td>
+ <td style="word-wrap: break-word;">20000</td>
+ <td>Integer</td>
+ <td>Controls the maximum number of bucket-level deletion vector
meta that can be cached.</td>
+ </tr>
<tr>
<td><h5>cache.expire-after-access</h5></td>
<td style="word-wrap: break-word;">10 min</td>
diff --git
a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 7a862e8fb2..72674bdd7b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -136,6 +136,13 @@ public class CatalogOptions {
.withDescription(
"Controls the max number for snapshots per table
in the catalog are cached.");
+ public static final ConfigOption<Integer> CACHE_DV_MAX_NUM =
+ key("cache.deletion-vectors.max-bucket-num")
+ .intType()
+ .defaultValue(20000)
+ .withDescription(
+ "Controls the maximum number of bucket-level
deletion vector meta that can be cached.");
+
public static final ConfigOption<Boolean> CASE_SENSITIVE =
ConfigOptions.key("case-sensitive")
.booleanType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2e209fba45..2a5dbd7766 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -59,6 +59,7 @@ import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -97,6 +98,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
@Nullable private SegmentsCache<Path> readManifestCache;
@Nullable private Cache<Path, Snapshot> snapshotCache;
+ @Nullable private DVMetaCache dvMetaCache;
protected AbstractFileStore(
FileIO fileIO,
@@ -216,7 +218,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
FileFormat.manifestFormat(options),
options.manifestCompression(),
pathFactory(),
- readManifestCache);
+ readManifestCache,
+ dvMetaCache);
}
@Override
@@ -227,6 +230,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
indexManifestFileFactory().create(),
new IndexFilePathFactories(pathFactory()),
options.dvIndexFileTargetSize(),
+ this.dvMetaCache,
options.deletionVectorBitmap64());
}
@@ -488,4 +492,9 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
this.snapshotCache = cache;
}
+
+ @Override
+ public void setDVMetaCache(DVMetaCache dvMetaCache) {
+ this.dvMetaCache = dvMetaCache;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 929302673d..62bc65c744 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
@@ -121,4 +122,6 @@ public interface FileStore<T> {
void setManifestCache(SegmentsCache<Path> manifestCache);
void setSnapshotCache(Cache<Path, Snapshot> cache);
+
+ void setDVMetaCache(DVMetaCache dvMetaCache);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 18ba10cdd1..9dfb32e888 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.SegmentsCache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -43,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.paimon.options.CatalogOptions.CACHE_DV_MAX_NUM;
import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_ACCESS;
import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRE_AFTER_WRITE;
@@ -66,9 +68,9 @@ public class CachingCatalog extends DelegateCatalog {
protected Cache<String, Database> databaseCache;
protected Cache<Identifier, Table> tableCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
-
// partition cache will affect data latency
@Nullable protected Cache<Identifier, List<Partition>> partitionCache;
+ @Nullable protected DVMetaCache dvMetaCache;
public CachingCatalog(Catalog wrapped, Options options) {
super(wrapped);
@@ -97,6 +99,11 @@ public class CachingCatalog extends DelegateCatalog {
this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
this.cachedPartitionMaxNum = options.get(CACHE_PARTITION_MAX_NUM);
+
+ int cacheDvMaxNum = options.get(CACHE_DV_MAX_NUM);
+ if (cacheDvMaxNum > 0) {
+ this.dvMetaCache = new DVMetaCache(cacheDvMaxNum);
+ }
init(Ticker.systemTicker());
}
@@ -271,6 +278,9 @@ public class CachingCatalog extends DelegateCatalog {
if (manifestCache != null) {
storeTable.setManifestCache(manifestCache);
}
+ if (dvMetaCache != null) {
+ storeTable.setDVMetaCache(dvMetaCache);
+ }
}
tableCache.put(identifier, table);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 205db897ea..83a41161f3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -19,6 +19,7 @@
package org.apache.paimon.index;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
@@ -26,19 +27,27 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -52,6 +61,8 @@ public class IndexFileHandler {
private final IndexFilePathFactories pathFactories;
private final MemorySize dvTargetFileSize;
private final boolean dvBitmap64;
+ @Nullable private CacheMetrics cacheMetrics;
+ @Nullable private final DVMetaCache dvMetaCache;
public IndexFileHandler(
FileIO fileIO,
@@ -59,6 +70,7 @@ public class IndexFileHandler {
IndexManifestFile indexManifestFile,
IndexFilePathFactories pathFactories,
MemorySize dvTargetFileSize,
+ DVMetaCache dvMetaCache,
boolean dvBitmap64) {
this.fileIO = fileIO;
this.snapshotManager = snapshotManager;
@@ -66,6 +78,7 @@ public class IndexFileHandler {
this.indexManifestFile = indexManifestFile;
this.dvTargetFileSize = dvTargetFileSize;
this.dvBitmap64 = dvBitmap64;
+ this.dvMetaCache = dvMetaCache;
}
public HashIndexFile hashIndex(BinaryRow partition, int bucket) {
@@ -87,6 +100,140 @@ public class IndexFileHandler {
return result.isEmpty() ? Optional.empty() :
Optional.of(result.get(0));
}
+ public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) {
+ this.cacheMetrics = cacheMetrics;
+ }
+
+ // Construct DataFile -> DeletionFile based on IndexFileMeta
+ @Nullable
+ @VisibleForTesting
+ public Map<String, DeletionFile> extractDeletionFileByMeta(
+ BinaryRow partition, Integer bucket, IndexFileMeta fileMeta) {
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges =
fileMeta.dvRanges();
+ String dvFilePath = dvIndex(partition,
bucket).path(fileMeta).toString();
+ if (dvRanges != null && !dvRanges.isEmpty()) {
+ Map<String, DeletionFile> result = new HashMap<>();
+ for (DeletionVectorMeta dvMeta : dvRanges.values()) {
+ result.put(
+ dvMeta.dataFileName(),
+ new DeletionFile(
+ dvFilePath,
+ dvMeta.offset(),
+ dvMeta.length(),
+ dvMeta.cardinality()));
+ }
+ return result;
+ }
+ return null;
+ }
+
+ // Scan DV index file of given partition buckets
+ // returns <DataFile: DeletionFile> map grouped by partition and bucket
+ public Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
scanDVIndex(
+ Snapshot snapshot, Set<Pair<BinaryRow, Integer>> partitionBuckets)
{
+ if (snapshot == null || snapshot.indexManifest() == null) {
+ return Collections.emptyMap();
+ }
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new
HashMap<>();
+ // to avoid cache being frequently evicted,
+ // currently we only read from cache when bucket number is 1
+ if (this.dvMetaCache != null && partitionBuckets.size() == 1) {
+ Pair<BinaryRow, Integer> partitionBucket =
partitionBuckets.iterator().next();
+ Map<String, DeletionFile> deletionFiles =
+ this.scanDVIndexWithCache(
+ snapshot, partitionBucket.getLeft(),
partitionBucket.getRight());
+ if (deletionFiles != null && deletionFiles.size() > 0) {
+ result.put(partitionBucket, deletionFiles);
+ }
+ return result;
+ }
+ Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
+ scan(
+ snapshot,
+ DELETION_VECTORS_INDEX,
+
partitionBuckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
+ partitionFileMetas.forEach(
+ (entry, indexFileMetas) -> {
+ if (partitionBuckets.contains(entry)) {
+ if (indexFileMetas != null) {
+ indexFileMetas.forEach(
+ indexFileMeta -> {
+ Map<String, DeletionFile> dvMetas =
+ extractDeletionFileByMeta(
+ entry.getLeft(),
+ entry.getRight(),
+ indexFileMeta);
+ if (dvMetas != null) {
+ result.computeIfAbsent(entry, k ->
new HashMap<>())
+ .putAll(dvMetas);
+ }
+ });
+ }
+ }
+ });
+ return result;
+ }
+
+ // Scan DV Meta Cache first, if not exist, scan DV index file, returns the
exact deletion file
+ // of the specified partition/bucket
+ @VisibleForTesting
+ Map<String, DeletionFile> scanDVIndexWithCache(
+ Snapshot snapshot, BinaryRow partition, Integer bucket) {
+ // read from cache
+ String indexManifestName = snapshot.indexManifest();
+ Path indexManifestPath =
this.indexManifestFile.indexManifestFilePath(indexManifestName);
+ Map<String, DeletionFile> result =
+ this.dvMetaCache.read(indexManifestPath, partition, bucket);
+ if (result != null) {
+ if (cacheMetrics != null) {
+ cacheMetrics.increaseHitObject();
+ }
+ return result;
+ }
+ if (cacheMetrics != null) {
+ cacheMetrics.increaseMissedObject();
+ }
+ // If miss, read the whole partition's deletion files
+ Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
+ scan(
+ snapshot,
+ DELETION_VECTORS_INDEX,
+ new HashSet<>(Collections.singletonList(partition)));
+ // for each bucket, extract deletion files, and fill meta cache
+ for (Map.Entry<Pair<BinaryRow, Integer>, List<IndexFileMeta>> entry :
+ partitionFileMetas.entrySet()) {
+ Pair<BinaryRow, Integer> partitionBucket = entry.getKey();
+ List<IndexFileMeta> fileMetas = entry.getValue();
+ if (entry.getValue() != null) {
+ Map<String, DeletionFile> bucketDeletionFiles = new
HashMap<>();
+ fileMetas.forEach(
+ meta -> {
+ Map<String, DeletionFile> bucketDVMetas =
+ extractDeletionFileByMeta(
+ partitionBucket.getLeft(),
+ partitionBucket.getRight(),
+ meta);
+ if (bucketDVMetas != null) {
+ bucketDeletionFiles.putAll(bucketDVMetas);
+ }
+ });
+ // bucketDeletionFiles can be empty
+ this.dvMetaCache.put(
+ indexManifestPath,
+ partitionBucket.getLeft(),
+ partitionBucket.getRight(),
+ bucketDeletionFiles);
+ if (partitionBucket.getRight() != null
+ && partitionBucket.getLeft() != null
+ && partitionBucket.getRight().equals(bucket)
+ && partitionBucket.getLeft().equals(partition)) {
+ result = bucketDeletionFiles;
+ }
+ }
+ }
+ return result;
+ }
+
public List<IndexManifestEntry> scan(String indexType) {
return scan(snapshotManager.latestSnapshot(), indexType);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 077d1a45aa..21086eadf3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
@@ -39,6 +40,8 @@ import java.util.List;
/** Index manifest file. */
public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
+ private final DVMetaCache dvMetaCache;
+
private IndexManifestFile(
FileIO fileIO,
RowType schema,
@@ -46,7 +49,8 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory,
- @Nullable SegmentsCache<Path> cache) {
+ @Nullable SegmentsCache<Path> cache,
+ @Nullable DVMetaCache dvMetaCache) {
super(
fileIO,
new IndexManifestEntrySerializer(),
@@ -56,6 +60,11 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
compression,
pathFactory,
cache);
+ this.dvMetaCache = dvMetaCache;
+ }
+
+ public Path indexManifestFilePath(String fileName) {
+ return pathFactory.toPath(fileName);
}
/** Write new index files to index manifest. */
@@ -79,18 +88,21 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
private final String compression;
private final FileStorePathFactory pathFactory;
@Nullable private final SegmentsCache<Path> cache;
+ @Nullable private final DVMetaCache dvMetaCache;
public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory,
- @Nullable SegmentsCache<Path> cache) {
+ @Nullable SegmentsCache<Path> cache,
+ @Nullable DVMetaCache dvMetaCache) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
this.cache = cache;
+ this.dvMetaCache = dvMetaCache;
}
public IndexManifestFile create() {
@@ -102,7 +114,8 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
fileFormat.createWriterFactory(schema),
compression,
pathFactory.indexManifestFileFactory(),
- cache);
+ cache,
+ dvMetaCache);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
index 280f53a101..10246907ef 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java
@@ -35,10 +35,13 @@ public class ScanMetrics {
public static final String LAST_SCAN_RESULTED_TABLE_FILES =
"lastScanResultedTableFiles";
public static final String MANIFEST_HIT_CACHE = "manifestHitCache";
public static final String MANIFEST_MISSED_CACHE = "manifestMissedCache";
+ public static final String DVMETA_HIT_CACHE = "dvMetaHitCache";
+ public static final String DVMETA_MISSED_CACHE = "dvMetaMissedCache";
private final MetricGroup metricGroup;
private final Histogram durationHistogram;
private final CacheMetrics cacheMetrics;
+ private final CacheMetrics dvMetaCacheMetrics;
private ScanStats latestScan;
@@ -48,6 +51,7 @@ public class ScanMetrics {
LAST_SCAN_DURATION, () -> latestScan == null ? 0L :
latestScan.getDuration());
durationHistogram = metricGroup.histogram(SCAN_DURATION,
HISTOGRAM_WINDOW_SIZE);
cacheMetrics = new CacheMetrics();
+ dvMetaCacheMetrics = new CacheMetrics();
metricGroup.gauge(
LAST_SCANNED_MANIFESTS,
() -> latestScan == null ? 0L :
latestScan.getScannedManifests());
@@ -59,6 +63,8 @@ public class ScanMetrics {
() -> latestScan == null ? 0L :
latestScan.getResultedTableFiles());
metricGroup.gauge(MANIFEST_HIT_CACHE, () ->
cacheMetrics.getHitObject().get());
metricGroup.gauge(MANIFEST_MISSED_CACHE, () ->
cacheMetrics.getMissedObject().get());
+ metricGroup.gauge(DVMETA_HIT_CACHE, () ->
dvMetaCacheMetrics.getHitObject().get());
+ metricGroup.gauge(DVMETA_MISSED_CACHE, () ->
dvMetaCacheMetrics.getMissedObject().get());
}
@VisibleForTesting
@@ -74,4 +80,8 @@ public class ScanMetrics {
public CacheMetrics getCacheMetrics() {
return cacheMetrics;
}
+
+ public CacheMetrics getDvMetaCacheMetrics() {
+ return dvMetaCacheMetrics;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 99069ddc31..b683174cd4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -44,6 +44,7 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
@@ -232,4 +233,9 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
wrapped.setSnapshotCache(cache);
}
+
+ @Override
+ public void setDVMetaCache(DVMetaCache dvMetaCache) {
+ wrapped.setDVMetaCache(dvMetaCache);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index aa5b8bb70d..aeb223aaf5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -55,6 +55,7 @@ import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.CatalogBranchManager;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileSystemBranchManager;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
@@ -93,6 +94,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Nullable protected transient SegmentsCache<Path> manifestCache;
@Nullable protected transient Cache<Path, Snapshot> snapshotCache;
@Nullable protected transient Cache<String, Statistics> statsCache;
+ @Nullable protected transient DVMetaCache dvmetaCache;
protected AbstractFileStoreTable(
FileIO fileIO,
@@ -138,6 +140,12 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
this.statsCache = cache;
}
+ @Override
+ public void setDVMetaCache(DVMetaCache cache) {
+ this.dvmetaCache = cache;
+ store().setDVMetaCache(cache);
+ }
+
@Override
public Optional<Snapshot> latestSnapshot() {
Snapshot snapshot = store().snapshotManager().latestSnapshot();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index e8898a1cd1..3a0d4ecc1c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -42,6 +42,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.SnapshotManager;
@@ -154,6 +155,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
wrapped.setStatsCache(cache);
}
+ @Override
+ public void setDVMetaCache(DVMetaCache cache) {
+ wrapped.setDVMetaCache(cache);
+ }
+
@Override
public TableSchema schema() {
return wrapped.schema();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 0ed51cb852..b07465a258 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -36,6 +36,7 @@ import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.TagManager;
@@ -63,6 +64,8 @@ public interface FileStoreTable extends DataTable {
void setStatsCache(Cache<String, Statistics> cache);
+ void setDVMetaCache(DVMetaCache cache);
+
@Override
default RowType rowType() {
return schema().logicalRowType();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 81c015a9d4..9157737915 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -286,7 +286,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Override
public SnapshotReader withMetricRegistry(MetricRegistry registry) {
- scan.withMetrics(new ScanMetrics(registry, tableName));
+ ScanMetrics scanMetrics = new ScanMetrics(registry, tableName);
+ scan.withMetrics(scanMetrics);
+ indexFileHandler.withCacheMetrics(scanMetrics.getDvMetaCacheMetrics());
return this;
}
@@ -349,14 +351,18 @@ public class SnapshotReaderImpl implements SnapshotReader
{
Map<BinaryRow, Map<Integer, List<ManifestEntry>>>
groupedManifestEntries) {
List<DataSplit> splits = new ArrayList<>();
// Read deletion indexes at once to reduce file IO
- Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap = null;
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFilesMap = null;
if (!isStreaming) {
- deletionIndexFilesMap =
+ Set<Pair<BinaryRow, Integer>> partitionBuckets =
+ groupedManifestEntries.entrySet().stream()
+ .flatMap(
+ e ->
+ e.getValue().keySet().stream()
+ .map(bucket ->
Pair.of(e.getKey(), bucket)))
+ .collect(Collectors.toSet());
+ deletionFilesMap =
deletionVectors && snapshot != null
- ? indexFileHandler.scan(
- snapshot,
- DELETION_VECTORS_INDEX,
- groupedManifestEntries.keySet())
+ ? indexFileHandler.scanDVIndex(snapshot,
partitionBuckets)
: Collections.emptyMap();
}
for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry :
@@ -387,16 +393,14 @@ public class SnapshotReaderImpl implements SnapshotReader
{
builder.withDataFiles(dataFiles)
.rawConvertible(splitGroup.rawConvertible)
.withBucketPath(bucketPath);
- if (deletionVectors && deletionIndexFilesMap != null) {
+ if (deletionVectors && deletionFilesMap != null) {
builder.withDataDeletionFiles(
getDeletionFiles(
- indexFileHandler.dvIndex(partition,
bucket),
dataFiles,
- deletionIndexFilesMap.getOrDefault(
+ deletionFilesMap.getOrDefault(
Pair.of(partition, bucket),
- Collections.emptyList())));
+ Collections.emptyMap())));
}
-
splits.add(builder.build());
}
}
@@ -588,4 +592,14 @@ public class SnapshotReaderImpl implements SnapshotReader {
return deletionFiles;
}
+
+ public List<DeletionFile> getDeletionFiles(
+ List<DataFileMeta> dataFiles, Map<String, DeletionFile>
deletionFilesMap) {
+ List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
+ dataFiles.stream()
+ .map(DataFileMeta::fileName)
+ .map(f -> deletionFilesMap == null ? null :
deletionFilesMap.get(f))
+ .forEach(deletionFiles::add);
+ return deletionFiles;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
new file mode 100644
index 0000000000..d06c563187
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.DeletionFile;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Cache for deletion vector meta. */
+public class DVMetaCache {
+ private final Cache<DVMetaCacheKey, List<DVMetaCacheValue>> cache;
+
+ public DVMetaCache(long maxElementSize) {
+ this.cache =
+ Caffeine.newBuilder()
+ .maximumSize(maxElementSize)
+ .softValues()
+ .executor(Runnable::run)
+ .build();
+ }
+
+ @Nullable
+ public Map<String, DeletionFile> read(Path path, BinaryRow partition, int
bucket) {
+ DVMetaCacheKey cacheKey = new DVMetaCacheKey(path, partition, bucket);
+ List<DVMetaCacheValue> cacheValue = this.cache.getIfPresent(cacheKey);
+ if (cacheValue == null) {
+ return null;
+ }
+ // If the bucket doesn't have dv metas, return empty set.
+ Map<String, DeletionFile> dvFilesMap = new HashMap<>();
+ cacheValue.forEach(
+ dvMeta ->
+ dvFilesMap.put(
+ dvMeta.getDataFileName(),
+ new DeletionFile(
+ dvMeta.getDeletionFilePath(),
+ dvMeta.getOffset(),
+ dvMeta.getLength(),
+ dvMeta.getCardinality())));
+ return dvFilesMap;
+ }
+
+ public void put(
+ Path path, BinaryRow partition, int bucket, Map<String,
DeletionFile> dvFilesMap) {
+ DVMetaCacheKey key = new DVMetaCacheKey(path, partition, bucket);
+ List<DVMetaCacheValue> cacheValue = new ArrayList<>();
+ dvFilesMap.forEach(
+ (dataFileName, deletionFile) -> {
+ DVMetaCacheValue dvMetaCacheValue =
+ new DVMetaCacheValue(
+ dataFileName,
+ deletionFile.path(),
+ (int) deletionFile.offset(),
+ (int) deletionFile.length(),
+ deletionFile.cardinality());
+ cacheValue.add(dvMetaCacheValue);
+ });
+ this.cache.put(key, cacheValue);
+ }
+
+ private static class DVMetaCacheValue {
+ private final String dataFileName;
+ private final String deletionFilePath;
+ private final int offset;
+ private final int length;
+ @Nullable private final Long cardinality;
+
+ public DVMetaCacheValue(
+ String dataFileName,
+ String deletionFilePath,
+ int start,
+ int length,
+ @Nullable Long cardinality) {
+ this.dataFileName = dataFileName;
+ this.deletionFilePath = deletionFilePath;
+ this.offset = start;
+ this.length = length;
+ this.cardinality = cardinality;
+ }
+
+ public String getDataFileName() {
+ return dataFileName;
+ }
+
+ public String getDeletionFilePath() {
+ return deletionFilePath;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ @Nullable
+ public Long getCardinality() {
+ return cardinality;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DVMetaCacheValue that = (DVMetaCacheValue) o;
+ return offset == that.offset
+ && length == that.length
+ && Objects.equals(dataFileName, that.dataFileName)
+ && Objects.equals(deletionFilePath, that.deletionFilePath)
+ && Objects.equals(cardinality, that.cardinality);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dataFileName, deletionFilePath, offset,
length, cardinality);
+ }
+ }
+
+ /** Cache key for deletion vector meta at bucket level. */
+ private static final class DVMetaCacheKey {
+ private final Path path;
+ private final BinaryRow row;
+ private final int bucket;
+
+ public DVMetaCacheKey(Path path, BinaryRow row, int bucket) {
+ this.path = path;
+ this.row = row;
+ this.bucket = bucket;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DVMetaCacheKey)) {
+ return false;
+ }
+ DVMetaCacheKey that = (DVMetaCacheKey) o;
+ return bucket == that.bucket
+ && Objects.equals(path, that.path)
+ && Objects.equals(row, that.row);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, row, bucket);
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
index c2c5baba0a..edc34bc766 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -18,18 +18,40 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.TestAppendFileStore;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.operation.metrics.CacheMetrics;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.utils.DVMetaCache;
+import org.apache.paimon.utils.IndexFilePathFactories;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SegmentsCache;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static
org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile;
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +71,7 @@ public class IndexManifestFileHandlerTest {
FileFormat.manifestFormat(fileStore.options()),
"zstd",
fileStore.pathFactory(),
+ null,
null)
.create();
IndexManifestFileHandler indexManifestFileHandler =
@@ -84,6 +107,7 @@ public class IndexManifestFileHandlerTest {
FileFormat.manifestFormat(fileStore.options()),
"zstd",
fileStore.pathFactory(),
+ null,
null)
.create();
IndexManifestFileHandler indexManifestFileHandler =
@@ -114,4 +138,206 @@ public class IndexManifestFileHandlerTest {
assertThat(entries.contains(entry3)).isTrue();
assertThat(entries.contains(entry4)).isTrue();
}
+
+ @Test
+ public void testDVMetaCache() {
+ TestFileStore fileStore = keyValueFileStore();
+
+ // Setup cache and index-manifest file
+ MemorySize manifestMaxMemory = MemorySize.ofMebiBytes(128);
+ long manifestCacheThreshold = MemorySize.ofMebiBytes(1).getBytes();
+ SegmentsCache<Path> manifestCache =
+ SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
+ DVMetaCache dvMetaCache = new DVMetaCache(1000000);
+
+ IndexManifestFile indexManifestFile =
+ new IndexManifestFile.Factory(
+ fileStore.fileIO(),
+ FileFormat.manifestFormat(fileStore.options()),
+ "zstd",
+ fileStore.pathFactory(),
+ manifestCache,
+ dvMetaCache)
+ .create();
+
+ IndexManifestFileHandler indexManifestFileHandler =
+ new IndexManifestFileHandler(indexManifestFile,
BucketMode.HASH_FIXED);
+
+ BinaryRow partition1 = partition(1);
+ BinaryRow partition2 = partition(2);
+
+ IndexManifestEntry entry1 =
+ new IndexManifestEntry(
+ FileKind.ADD,
+ partition1,
+ 0,
+ deletionVectorIndexFile("data1.parquet",
"data2.parquet"));
+ IndexManifestEntry entry2 =
+ new IndexManifestEntry(
+ FileKind.ADD, partition1, 1,
deletionVectorIndexFile("data3.parquet"));
+ IndexManifestEntry entry3 =
+ new IndexManifestEntry(
+ FileKind.ADD,
+ partition2,
+ 0,
+ deletionVectorIndexFile("data4.parquet",
"data5.parquet"));
+
+ String indexManifestFileName =
+ indexManifestFileHandler.write(null, Arrays.asList(entry1,
entry2, entry3));
+
+ // Create IndexFileHandler with cache enabled
+ IndexFileHandler indexFileHandler =
+ new IndexFileHandler(
+ fileStore.fileIO(),
+ fileStore.snapshotManager(),
+ indexManifestFile,
+ new IndexFilePathFactories(fileStore.pathFactory()),
+ MemorySize.ofMebiBytes(2),
+ dvMetaCache,
+ false);
+
+ Map<String, DeletionFile> expectedPartition1Bucket0Files =
+ indexFileHandler.extractDeletionFileByMeta(partition1, 0,
entry1.indexFile());
+ Map<String, DeletionFile> expectedPartition1Bucket1Files =
+ indexFileHandler.extractDeletionFileByMeta(partition1, 1,
entry2.indexFile());
+ Map<String, DeletionFile> expectedPartition2Bucket0Files =
+ indexFileHandler.extractDeletionFileByMeta(partition2, 0,
entry3.indexFile());
+
+ CacheMetrics cacheMetrics = new CacheMetrics();
+ indexFileHandler.withCacheMetrics(cacheMetrics);
+
+ Snapshot snapshot = snapshot(indexManifestFileName);
+
+ // Test 1: First access should miss cache
+ Set<Pair<BinaryRow, Integer>> partitionBuckets = new HashSet<>();
+ partitionBuckets.add(Pair.of(partition1, 0));
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles1 =
+ indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+ assertThat(deletionFiles1).isNotNull();
+ assertThat(deletionFiles1.containsKey(Pair.of(partition1,
0))).isTrue();
+ assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
+ assertThat(cacheMetrics.getHitObject().get()).isEqualTo(0);
+ Map<String, DeletionFile> actualPartition1Bucket0Files =
+ deletionFiles1.get(Pair.of(partition1, 0));
+
assertThat(actualPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
+
+ // Test 2: Second access to same partition/bucket should hit cache
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles2 =
+ indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+ assertThat(deletionFiles2).isNotNull();
+ assertThat(deletionFiles1).isEqualTo(deletionFiles2);
+ assertThat(cacheMetrics.getHitObject().get()).isEqualTo(1);
+ Map<String, DeletionFile> cachedPartition1Bucket0Files =
+ deletionFiles2.get(Pair.of(partition1, 0));
+
assertThat(cachedPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
+
+ // Test 3: Access different bucket in same partition should hit cache
+ partitionBuckets.clear();
+ partitionBuckets.add(Pair.of(partition1, 1));
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles3 =
+ indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+ assertThat(deletionFiles3).isNotNull();
+ assertThat(cacheMetrics.getHitObject().get()).isEqualTo(2);
+ assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
+ Map<String, DeletionFile> actualPartition1Bucket1Files =
+ deletionFiles3.get(Pair.of(partition1, 1));
+
assertThat(actualPartition1Bucket1Files).isEqualTo(expectedPartition1Bucket1Files);
+
+ // Test 4: Access different partition should miss cache
+ partitionBuckets.clear();
+ partitionBuckets.add(Pair.of(partition2, 0));
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles4 =
+ indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+ assertThat(deletionFiles4).isNotNull();
+ assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(2); // Now
2 misses total
+ Map<String, DeletionFile> actualPartition2Bucket0Files =
+ deletionFiles4.get(Pair.of(partition2, 0));
+
assertThat(actualPartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
+
+ // Test 5: Test non-cache path by requesting multiple partition buckets
+ partitionBuckets.clear();
+ partitionBuckets.add(Pair.of(partition1, 0));
+ partitionBuckets.add(Pair.of(partition2, 0)); // Multiple buckets to
avoid cache path
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles5 =
+ indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
+ assertThat(deletionFiles5).isNotNull();
+ assertThat(deletionFiles5.containsKey(Pair.of(partition1,
0))).isTrue();
+ assertThat(deletionFiles5.containsKey(Pair.of(partition2,
0))).isTrue();
+
+ Map<String, DeletionFile> nonCachePartition1Bucket0Files =
+ deletionFiles5.get(Pair.of(partition1, 0));
+ Map<String, DeletionFile> nonCachePartition2Bucket0Files =
+ deletionFiles5.get(Pair.of(partition2, 0));
+
assertThat(nonCachePartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
+
assertThat(nonCachePartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
+ }
+
+ // ============================ Test utils
===================================
+
+ private BinaryRow partition(int partitionValue) {
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(partition);
+ writer.writeInt(0, partitionValue);
+ writer.complete();
+ return partition;
+ }
+
+ private IndexFileMeta deletionVectorIndexFile(String... dataFileNames) {
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges = new
LinkedHashMap<>();
+ int offset = 0;
+ for (String dataFileName : dataFileNames) {
+ dvRanges.put(
+ dataFileName,
+ new DeletionVectorMeta(
+ dataFileName, offset, 100 + offset, (long) (10 +
offset)));
+ offset += 150;
+ }
+ return new IndexFileMeta(
+ DELETION_VECTORS_INDEX,
+ "dv_index_" + UUID.randomUUID().toString(),
+ 1024,
+ 512,
+ dvRanges,
+ null);
+ }
+
+ private Snapshot snapshot(String indexManifestFileName) {
+ String json =
+ "{\n"
+ + " \"version\" : 3,\n"
+ + " \"id\" : 1,\n"
+ + " \"schemaId\" : 0,\n"
+ + " \"baseManifestList\" : null,\n"
+ + " \"baseManifestListSize\" : 0,\n"
+ + " \"deltaManifestList\" : null,\n"
+ + " \"deltaManifestListSize\" : 0,\n"
+ + " \"changelogManifestListSize\" : 0,\n"
+ + " \"indexManifest\" : \""
+ + indexManifestFileName
+ + "\",\n"
+ + " \"commitUser\" : \"test\",\n"
+ + " \"commitIdentifier\" : 1,\n"
+ + " \"commitKind\" : \"APPEND\",\n"
+ + " \"timeMillis\" : "
+ + System.currentTimeMillis()
+ + ",\n"
+ + " \"totalRecordCount\" : null,\n"
+ + " \"deltaRecordCount\" : null\n"
+ + "}";
+ return Snapshot.fromJson(json);
+ }
+
+ private TestFileStore keyValueFileStore() {
+ return new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ 2, // 2 buckets for testing
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ null)
+ .build();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 78045d9440..142ebc6a47 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -57,6 +57,7 @@ public class ManifestFileTest {
public void testWriteAndReadManifestFile() {
List<ManifestEntry> entries = generateData();
ManifestFileMeta meta = gen.createManifestFileMeta(entries);
+ System.out.println(tempDir.toString());
ManifestFile manifestFile = createManifestFile(tempDir.toString());
List<ManifestFileMeta> actualMetas = manifestFile.write(entries);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
index 7ea86a2800..25b1b21f61 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java
@@ -50,7 +50,9 @@ public class ScanMetricsTest {
ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES,
ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES,
ScanMetrics.MANIFEST_HIT_CACHE,
- ScanMetrics.MANIFEST_MISSED_CACHE);
+ ScanMetrics.MANIFEST_MISSED_CACHE,
+ ScanMetrics.DVMETA_HIT_CACHE,
+ ScanMetrics.DVMETA_MISSED_CACHE);
}
/** Tests that the metrics are updated properly. */
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
new file mode 100644
index 0000000000..5b1c9611bb
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.source.DeletionFile;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DVMetaCache}. */
+public class DVMetaCacheTest {
+
+ @Test
+ public void testPutAndRead() {
+ DVMetaCache cache = new DVMetaCache(100);
+ Path path = new Path("manifest/index-manifest-00001");
+ BinaryRow partition = partition("year=2023/month=12");
+
+ // Put data for bucket 1 with multiple files
+ Map<String, DeletionFile> dvFiles1 = new HashMap<>();
+ dvFiles1.put(
+ "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet",
+ new
DeletionFile("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1", 0L, 100L, 42L));
+ dvFiles1.put(
+ "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet",
+ new
DeletionFile("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1", 100L, 500L, null));
+ cache.put(path, partition, 1, dvFiles1);
+
+ // Put data for bucket 2 with single file
+ Map<String, DeletionFile> dvFiles2 = new HashMap<>();
+ dvFiles2.put(
+ "data-b2c3d4e5-f6g7-8901-bcde-f23456789012-1.parquet",
+ new
DeletionFile("index-b2c3d4e5-f6g7-8901-bcde-f23456789012-1", 0L, 300L, 12L));
+ cache.put(path, partition, 2, dvFiles2);
+
+ // Read bucket 1 - verify multiple files
+ Map<String, DeletionFile> result1 = cache.read(path, partition, 1);
+ assertThat(result1).isNotNull().hasSize(2);
+ assertThat(result1)
+ .containsKeys(
+ "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet",
+ "data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet");
+
+ DeletionFile file1 =
result1.get("data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1.parquet");
+
assertThat(file1.path()).isEqualTo("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1");
+ assertThat(file1.offset()).isEqualTo(0L);
+ assertThat(file1.length()).isEqualTo(100L);
+ assertThat(file1.cardinality()).isEqualTo(42L);
+
+ DeletionFile file2 =
result1.get("data-a1b2c3d4-e5f6-7890-abcd-ef1234567890-2.parquet");
+
assertThat(file2.path()).isEqualTo("index-a1b2c3d4-e5f6-7890-abcd-ef1234567890-1");
+ assertThat(file2.cardinality()).isNull();
+
+ // Read bucket 2 - verify single file
+ Map<String, DeletionFile> result2 = cache.read(path, partition, 2);
+ assertThat(result2).isNotNull().hasSize(1);
+
assertThat(result2).containsKey("data-b2c3d4e5-f6g7-8901-bcde-f23456789012-1.parquet");
+
+ // Read non-existent key
+ assertThat(cache.read(path, partition("year=2024/month=01"),
0)).isNull();
+ assertThat(cache.read(path, partition, 999)).isNull();
+ }
+
+ @Test
+ public void testEmptyMap() {
+ DVMetaCache cache = new DVMetaCache(100);
+ Path path = new Path("manifest/index-manifest-00002");
+ BinaryRow partition = partition("year=2023/month=11");
+ cache.put(path, partition, 1, new HashMap<>());
+
+ // Should return empty map, not null
+ Map<String, DeletionFile> result = cache.read(path, partition, 1);
+ assertThat(result).isNotNull().isEmpty();
+ }
+
+ @Test
+ public void testCacheEviction() {
+ DVMetaCache cache = new DVMetaCache(2);
+ Path path = new Path("manifest/index-manifest-00004");
+ BinaryRow partition = partition("year=2023/month=09");
+
+ // Fill cache to capacity
+ Map<String, DeletionFile> dvFiles1 = new HashMap<>();
+ dvFiles1.put(
+ "data-e5f6g7h8-i9j0-1234-efgh-567890123456-1.parquet",
+ new
DeletionFile("index-e5f6g7h8-i9j0-1234-efgh-567890123456-1", 0L, 100L, 1L));
+ dvFiles1.put(
+ "data-e5f6g7h8-i9j0-1234-efgh-567890123456-2.parquet",
+ new
DeletionFile("index-e5f6g7h8-i9j0-1234-efgh-567890123456-1", 100L, 200L, 2L));
+ cache.put(path, partition, 1, dvFiles1);
+
+ Map<String, DeletionFile> dvFiles2 = new HashMap<>();
+ dvFiles2.put(
+ "data-f6g7h8i9-j0k1-2345-fghi-678901234567-1.parquet",
+ new
DeletionFile("index-f6g7h8i9-j0k1-2345-fghi-678901234567-1", 0L, 100L, 2L));
+ cache.put(path, partition, 2, dvFiles2);
+
+ // Verify both buckets are cached
+ assertThat(cache.read(path, partition, 1)).isNotNull();
+ assertThat(cache.read(path, partition, 2)).isNotNull();
+
+ // Add third entry, should evict first one
+ Map<String, DeletionFile> dvFiles3 = new HashMap<>();
+ dvFiles3.put(
+ "data-g7h8i9j0-k1l2-3456-ghij-789012345678-1.parquet",
+ new
DeletionFile("index-g7h8i9j0-k1l2-3456-ghij-789012345678-1", 0L, 100L, 3L));
+ cache.put(path, partition, 3, dvFiles3);
+
+ // First entry should be evicted
+ assertThat(cache.read(path, partition, 1)).isNull();
+ assertThat(cache.read(path, partition, 3)).isNotNull();
+ }
+
+ // ============================ Test utils
===================================
+
+ private BinaryRow partition(String partitionValue) {
+ InternalRowSerializer serializer =
+ new InternalRowSerializer(RowType.of(DataTypes.STRING()));
+ return serializer
+
.toBinaryRow(GenericRow.of(BinaryString.fromString(partitionValue)))
+ .copy();
+ }
+}