This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a78a912aa9 [core] Refactor dv index cache in SnapshotReaderImpl
a78a912aa9 is described below
commit a78a912aa908025c402e0eb5e77b2063accbd6e2
Author: JingsongLi <[email protected]>
AuthorDate: Mon Oct 27 18:59:29 2025 +0800
[core] Refactor dv index cache in SnapshotReaderImpl
---
.../generated/catalog_configuration.html | 6 +-
.../org/apache/paimon/options/CatalogOptions.java | 6 +-
.../java/org/apache/paimon/AbstractFileStore.java | 11 +-
.../src/main/java/org/apache/paimon/FileStore.java | 3 -
.../org/apache/paimon/index/IndexFileHandler.java | 151 +-------------
.../apache/paimon/manifest/IndexManifestFile.java | 15 +-
.../paimon/privilege/PrivilegedFileStore.java | 6 -
.../paimon/table/AbstractFileStoreTable.java | 4 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 180 ++++++++++------
.../java/org/apache/paimon/utils/DVMetaCache.java | 121 ++---------
.../manifest/IndexManifestFileHandlerTest.java | 226 ---------------------
.../org/apache/paimon/utils/DVMetaCacheTest.java | 15 +-
12 files changed, 161 insertions(+), 583 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index eaeea12346..49641d340e 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -33,10 +33,10 @@ under the License.
<td>Controls whether the catalog will cache databases, tables,
manifests and partitions.</td>
</tr>
<tr>
- <td><h5>cache.deletion-vectors.max-bucket-num</h5></td>
- <td style="word-wrap: break-word;">20000</td>
+ <td><h5>cache.deletion-vectors.max-num</h5></td>
+ <td style="word-wrap: break-word;">500000</td>
<td>Integer</td>
- <td>Controls the maximum number of bucket-level deletion vector
meta that can be cached.</td>
+ <td>Controls the maximum number of deletion vector meta that can
be cached.</td>
</tr>
<tr>
<td><h5>cache.expire-after-access</h5></td>
diff --git
a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 72674bdd7b..8cd80106a8 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -137,11 +137,11 @@ public class CatalogOptions {
"Controls the max number for snapshots per table
in the catalog are cached.");
public static final ConfigOption<Integer> CACHE_DV_MAX_NUM =
- key("cache.deletion-vectors.max-bucket-num")
+ key("cache.deletion-vectors.max-num")
.intType()
- .defaultValue(20000)
+ .defaultValue(500_000)
.withDescription(
- "Controls the maximum number of bucket-level
deletion vector meta that can be cached.");
+ "Controls the maximum number of deletion vector
meta that can be cached.");
public static final ConfigOption<Boolean> CASE_SENSITIVE =
ConfigOptions.key("case-sensitive")
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 2a5dbd7766..2e209fba45 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -59,7 +59,6 @@ import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.InternalRowPartitionComputer;
@@ -98,7 +97,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
@Nullable private SegmentsCache<Path> readManifestCache;
@Nullable private Cache<Path, Snapshot> snapshotCache;
- @Nullable private DVMetaCache dvMetaCache;
protected AbstractFileStore(
FileIO fileIO,
@@ -218,8 +216,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
FileFormat.manifestFormat(options),
options.manifestCompression(),
pathFactory(),
- readManifestCache,
- dvMetaCache);
+ readManifestCache);
}
@Override
@@ -230,7 +227,6 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
indexManifestFileFactory().create(),
new IndexFilePathFactories(pathFactory()),
options.dvIndexFileTargetSize(),
- this.dvMetaCache,
options.deletionVectorBitmap64());
}
@@ -492,9 +488,4 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
this.snapshotCache = cache;
}
-
- @Override
- public void setDVMetaCache(DVMetaCache dvMetaCache) {
- this.dvMetaCache = dvMetaCache;
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 62bc65c744..929302673d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -40,7 +40,6 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
@@ -122,6 +121,4 @@ public interface FileStore<T> {
void setManifestCache(SegmentsCache<Path> manifestCache);
void setSnapshotCache(Cache<Path, Snapshot> cache);
-
- void setDVMetaCache(DVMetaCache dvMetaCache);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 83a41161f3..efa20fccdc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -19,7 +19,6 @@
package org.apache.paimon.index;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
@@ -27,27 +26,19 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -61,8 +52,6 @@ public class IndexFileHandler {
private final IndexFilePathFactories pathFactories;
private final MemorySize dvTargetFileSize;
private final boolean dvBitmap64;
- @Nullable private CacheMetrics cacheMetrics;
- @Nullable private final DVMetaCache dvMetaCache;
public IndexFileHandler(
FileIO fileIO,
@@ -70,7 +59,6 @@ public class IndexFileHandler {
IndexManifestFile indexManifestFile,
IndexFilePathFactories pathFactories,
MemorySize dvTargetFileSize,
- DVMetaCache dvMetaCache,
boolean dvBitmap64) {
this.fileIO = fileIO;
this.snapshotManager = snapshotManager;
@@ -78,7 +66,6 @@ public class IndexFileHandler {
this.indexManifestFile = indexManifestFile;
this.dvTargetFileSize = dvTargetFileSize;
this.dvBitmap64 = dvBitmap64;
- this.dvMetaCache = dvMetaCache;
}
public HashIndexFile hashIndex(BinaryRow partition, int bucket) {
@@ -100,140 +87,6 @@ public class IndexFileHandler {
return result.isEmpty() ? Optional.empty() :
Optional.of(result.get(0));
}
- public void withCacheMetrics(@Nullable CacheMetrics cacheMetrics) {
- this.cacheMetrics = cacheMetrics;
- }
-
- // Construct DataFile -> DeletionFile based on IndexFileMeta
- @Nullable
- @VisibleForTesting
- public Map<String, DeletionFile> extractDeletionFileByMeta(
- BinaryRow partition, Integer bucket, IndexFileMeta fileMeta) {
- LinkedHashMap<String, DeletionVectorMeta> dvRanges =
fileMeta.dvRanges();
- String dvFilePath = dvIndex(partition,
bucket).path(fileMeta).toString();
- if (dvRanges != null && !dvRanges.isEmpty()) {
- Map<String, DeletionFile> result = new HashMap<>();
- for (DeletionVectorMeta dvMeta : dvRanges.values()) {
- result.put(
- dvMeta.dataFileName(),
- new DeletionFile(
- dvFilePath,
- dvMeta.offset(),
- dvMeta.length(),
- dvMeta.cardinality()));
- }
- return result;
- }
- return null;
- }
-
- // Scan DV index file of given partition buckets
- // returns <DataFile: DeletionFile> map grouped by partition and bucket
- public Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
scanDVIndex(
- Snapshot snapshot, Set<Pair<BinaryRow, Integer>> partitionBuckets)
{
- if (snapshot == null || snapshot.indexManifest() == null) {
- return Collections.emptyMap();
- }
- Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new
HashMap<>();
- // to avoid cache being frequently evicted,
- // currently we only read from cache when bucket number is 1
- if (this.dvMetaCache != null && partitionBuckets.size() == 1) {
- Pair<BinaryRow, Integer> partitionBucket =
partitionBuckets.iterator().next();
- Map<String, DeletionFile> deletionFiles =
- this.scanDVIndexWithCache(
- snapshot, partitionBucket.getLeft(),
partitionBucket.getRight());
- if (deletionFiles != null && deletionFiles.size() > 0) {
- result.put(partitionBucket, deletionFiles);
- }
- return result;
- }
- Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
- scan(
- snapshot,
- DELETION_VECTORS_INDEX,
-
partitionBuckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
- partitionFileMetas.forEach(
- (entry, indexFileMetas) -> {
- if (partitionBuckets.contains(entry)) {
- if (indexFileMetas != null) {
- indexFileMetas.forEach(
- indexFileMeta -> {
- Map<String, DeletionFile> dvMetas =
- extractDeletionFileByMeta(
- entry.getLeft(),
- entry.getRight(),
- indexFileMeta);
- if (dvMetas != null) {
- result.computeIfAbsent(entry, k ->
new HashMap<>())
- .putAll(dvMetas);
- }
- });
- }
- }
- });
- return result;
- }
-
- // Scan DV Meta Cache first, if not exist, scan DV index file, returns the
exact deletion file
- // of the specified partition/bucket
- @VisibleForTesting
- Map<String, DeletionFile> scanDVIndexWithCache(
- Snapshot snapshot, BinaryRow partition, Integer bucket) {
- // read from cache
- String indexManifestName = snapshot.indexManifest();
- Path indexManifestPath =
this.indexManifestFile.indexManifestFilePath(indexManifestName);
- Map<String, DeletionFile> result =
- this.dvMetaCache.read(indexManifestPath, partition, bucket);
- if (result != null) {
- if (cacheMetrics != null) {
- cacheMetrics.increaseHitObject();
- }
- return result;
- }
- if (cacheMetrics != null) {
- cacheMetrics.increaseMissedObject();
- }
- // If miss, read the whole partition's deletion files
- Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
- scan(
- snapshot,
- DELETION_VECTORS_INDEX,
- new HashSet<>(Collections.singletonList(partition)));
- // for each bucket, extract deletion files, and fill meta cache
- for (Map.Entry<Pair<BinaryRow, Integer>, List<IndexFileMeta>> entry :
- partitionFileMetas.entrySet()) {
- Pair<BinaryRow, Integer> partitionBucket = entry.getKey();
- List<IndexFileMeta> fileMetas = entry.getValue();
- if (entry.getValue() != null) {
- Map<String, DeletionFile> bucketDeletionFiles = new
HashMap<>();
- fileMetas.forEach(
- meta -> {
- Map<String, DeletionFile> bucketDVMetas =
- extractDeletionFileByMeta(
- partitionBucket.getLeft(),
- partitionBucket.getRight(),
- meta);
- if (bucketDVMetas != null) {
- bucketDeletionFiles.putAll(bucketDVMetas);
- }
- });
- // bucketDeletionFiles can be empty
- this.dvMetaCache.put(
- indexManifestPath,
- partitionBucket.getLeft(),
- partitionBucket.getRight(),
- bucketDeletionFiles);
- if (partitionBucket.getRight() != null
- && partitionBucket.getLeft() != null
- && partitionBucket.getRight().equals(bucket)
- && partitionBucket.getLeft().equals(partition)) {
- result = bucketDeletionFiles;
- }
- }
- }
- return result;
- }
-
public List<IndexManifestEntry> scan(String indexType) {
return scan(snapshotManager.latestSnapshot(), indexType);
}
@@ -317,6 +170,10 @@ public class IndexFileHandler {
return result;
}
+ public Path indexManifestFilePath(String indexManifest) {
+ return indexManifestFile.indexManifestFilePath(indexManifest);
+ }
+
public Path filePath(IndexManifestEntry entry) {
return pathFactories.get(entry.partition(),
entry.bucket()).toPath(entry.indexFile());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 21086eadf3..a46762c1af 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -25,7 +25,6 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
@@ -40,8 +39,6 @@ import java.util.List;
/** Index manifest file. */
public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
- private final DVMetaCache dvMetaCache;
-
private IndexManifestFile(
FileIO fileIO,
RowType schema,
@@ -49,8 +46,7 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory,
- @Nullable SegmentsCache<Path> cache,
- @Nullable DVMetaCache dvMetaCache) {
+ @Nullable SegmentsCache<Path> cache) {
super(
fileIO,
new IndexManifestEntrySerializer(),
@@ -60,7 +56,6 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
compression,
pathFactory,
cache);
- this.dvMetaCache = dvMetaCache;
}
public Path indexManifestFilePath(String fileName) {
@@ -88,21 +83,18 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
private final String compression;
private final FileStorePathFactory pathFactory;
@Nullable private final SegmentsCache<Path> cache;
- @Nullable private final DVMetaCache dvMetaCache;
public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory,
- @Nullable SegmentsCache<Path> cache,
- @Nullable DVMetaCache dvMetaCache) {
+ @Nullable SegmentsCache<Path> cache) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
this.cache = cache;
- this.dvMetaCache = dvMetaCache;
}
public IndexManifestFile create() {
@@ -114,8 +106,7 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
fileFormat.createWriterFactory(schema),
compression,
pathFactory.indexManifestFileFactory(),
- cache,
- dvMetaCache);
+ cache);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index b683174cd4..99069ddc31 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -44,7 +44,6 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
-import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
@@ -233,9 +232,4 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
public void setSnapshotCache(Cache<Path, Snapshot> cache) {
wrapped.setSnapshotCache(cache);
}
-
- @Override
- public void setDVMetaCache(DVMetaCache dvMetaCache) {
- wrapped.setDVMetaCache(dvMetaCache);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index aeb223aaf5..55892e25ee 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -143,7 +143,6 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public void setDVMetaCache(DVMetaCache cache) {
this.dvmetaCache = cache;
- store().setDVMetaCache(cache);
}
@Override
@@ -269,7 +268,8 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
nonPartitionFilterConsumer(),
store().pathFactory(),
name(),
- store().newIndexFileHandler());
+ store().newIndexFileHandler(),
+ dvmetaCache);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 9157737915..942c5bc41c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -25,6 +25,7 @@ import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
@@ -37,6 +38,7 @@ import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.operation.metrics.CacheMetrics;
import org.apache.paimon.operation.metrics.ScanMetrics;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
@@ -48,6 +50,7 @@ import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.utils.ChangelogManager;
+import org.apache.paimon.utils.DVMetaCache;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.LazyField;
@@ -89,9 +92,11 @@ public class SnapshotReaderImpl implements SnapshotReader {
private final FileStorePathFactory pathFactory;
private final String tableName;
private final IndexFileHandler indexFileHandler;
+ @Nullable private final DVMetaCache dvMetaCache;
private ScanMode scanMode = ScanMode.ALL;
private RecordComparator lazyPartitionComparator;
+ private CacheMetrics dvMetaCacheMetrics;
public SnapshotReaderImpl(
FileStoreScan scan,
@@ -103,7 +108,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
FileStorePathFactory pathFactory,
String tableName,
- IndexFileHandler indexFileHandler) {
+ IndexFileHandler indexFileHandler,
+ @Nullable DVMetaCache dvMetaCache) {
this.scan = scan;
this.tableSchema = tableSchema;
this.options = options;
@@ -121,6 +127,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
this.tableName = tableName;
this.indexFileHandler = indexFileHandler;
+ this.dvMetaCache = dvMetaCache;
}
@Override
@@ -287,8 +294,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Override
public SnapshotReader withMetricRegistry(MetricRegistry registry) {
ScanMetrics scanMetrics = new ScanMetrics(registry, tableName);
+ dvMetaCacheMetrics = scanMetrics.getDvMetaCacheMetrics();
scan.withMetrics(scanMetrics);
- indexFileHandler.withCacheMetrics(scanMetrics.getDvMetaCacheMetrics());
return this;
}
@@ -348,25 +355,17 @@ public class SnapshotReaderImpl implements SnapshotReader
{
@Nullable Snapshot snapshot,
boolean isStreaming,
SplitGenerator splitGenerator,
- Map<BinaryRow, Map<Integer, List<ManifestEntry>>>
groupedManifestEntries) {
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> entries) {
List<DataSplit> splits = new ArrayList<>();
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFilesMap = null;
if (!isStreaming) {
- Set<Pair<BinaryRow, Integer>> partitionBuckets =
- groupedManifestEntries.entrySet().stream()
- .flatMap(
- e ->
- e.getValue().keySet().stream()
- .map(bucket ->
Pair.of(e.getKey(), bucket)))
- .collect(Collectors.toSet());
deletionFilesMap =
deletionVectors && snapshot != null
- ? indexFileHandler.scanDVIndex(snapshot,
partitionBuckets)
+ ? scanDvIndex(snapshot, toPartBuckets(entries))
: Collections.emptyMap();
}
- for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry :
- groupedManifestEntries.entrySet()) {
+ for (Map.Entry<BinaryRow, Map<Integer, List<ManifestEntry>>> entry :
entries.entrySet()) {
BinaryRow partition = entry.getKey();
Map<Integer, List<ManifestEntry>> buckets = entry.getValue();
for (Map.Entry<Integer, List<ManifestEntry>> bucketEntry :
buckets.entrySet()) {
@@ -460,20 +459,16 @@ public class SnapshotReaderImpl implements SnapshotReader
{
buckets.computeIfAbsent(part, k -> new HashSet<>())
.addAll(bucketMap.keySet()));
// Read deletion indexes at once to reduce file IO
- Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
beforDeletionIndexFilesMap = null;
- Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap = null;
- if (!isStreaming) {
- beforDeletionIndexFilesMap =
- deletionVectors
- ? indexFileHandler.scan(
- beforeSnapshot.get(),
- DELETION_VECTORS_INDEX,
- beforeFiles.keySet())
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
beforeDeletionFilesMap = null;
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFilesMap = null;
+ if (!isStreaming && deletionVectors) {
+ beforeDeletionFilesMap =
+ beforeSnapshot.get() != null
+ ? scanDvIndex(beforeSnapshot.get(),
toPartBuckets(beforeFiles))
: Collections.emptyMap();
- deletionIndexFilesMap =
- deletionVectors
- ? indexFileHandler.scan(
- snapshot, DELETION_VECTORS_INDEX,
dataFiles.keySet())
+ deletionFilesMap =
+ snapshot != null
+ ? scanDvIndex(snapshot, toPartBuckets(dataFiles))
: Collections.emptyMap();
}
@@ -516,21 +511,19 @@ public class SnapshotReaderImpl implements SnapshotReader
{
.withDataFiles(data)
.isStreaming(isStreaming)
.withBucketPath(pathFactory.bucketPath(part,
bucket).toString());
- if (deletionVectors
- && beforDeletionIndexFilesMap != null
- && deletionIndexFilesMap != null) {
+ if (deletionVectors && beforeDeletionFilesMap != null) {
builder.withBeforeDeletionFiles(
getDeletionFiles(
- indexFileHandler.dvIndex(part, bucket),
before,
- beforDeletionIndexFilesMap.getOrDefault(
- Pair.of(part, bucket),
Collections.emptyList())));
+ beforeDeletionFilesMap.getOrDefault(
+ Pair.of(part, bucket),
Collections.emptyMap())));
+ }
+ if (deletionVectors && deletionFilesMap != null) {
builder.withDataDeletionFiles(
getDeletionFiles(
- indexFileHandler.dvIndex(part, bucket),
data,
- deletionIndexFilesMap.getOrDefault(
- Pair.of(part, bucket),
Collections.emptyList())));
+ deletionFilesMap.getOrDefault(
+ Pair.of(part, bucket),
Collections.emptyMap())));
}
splits.add(builder.build());
}
@@ -561,45 +554,100 @@ public class SnapshotReaderImpl implements
SnapshotReader {
}
private List<DeletionFile> getDeletionFiles(
- DeletionVectorsIndexFile indexFile,
- List<DataFileMeta> dataFiles,
- List<IndexFileMeta> indexFileMetas) {
+ List<DataFileMeta> dataFiles, Map<String, DeletionFile>
deletionFilesMap) {
List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
- Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
- for (IndexFileMeta indexFileMeta : indexFileMetas) {
- if (indexFileMeta.dvRanges() != null) {
- for (DeletionVectorMeta dvMeta :
indexFileMeta.dvRanges().values()) {
- dataFileToIndexFileMeta.put(dvMeta.dataFileName(),
indexFileMeta);
+ dataFiles.stream()
+ .map(DataFileMeta::fileName)
+ .map(f -> deletionFilesMap == null ? null :
deletionFilesMap.get(f))
+ .forEach(deletionFiles::add);
+ return deletionFiles;
+ }
+
+ private Set<Pair<BinaryRow, Integer>> toPartBuckets(
+ Map<BinaryRow, Map<Integer, List<ManifestEntry>>> entries) {
+ return entries.entrySet().stream()
+ .flatMap(
+ e ->
+ e.getValue().keySet().stream()
+ .map(bucket -> Pair.of(e.getKey(),
bucket)))
+ .collect(Collectors.toSet());
+ }
+
+ private Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
scanDvIndex(
+ @Nullable Snapshot snapshot, Set<Pair<BinaryRow, Integer>>
buckets) {
+ if (snapshot == null || snapshot.indexManifest() == null) {
+ return Collections.emptyMap();
+ }
+ Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new
HashMap<>();
+ Path indexManifestPath =
indexFileHandler.indexManifestFilePath(snapshot.indexManifest());
+
+ // 1. read from cache
+ if (dvMetaCache != null) {
+ Iterator<Pair<BinaryRow, Integer>> iterator = buckets.iterator();
+ while (iterator.hasNext()) {
+ Pair<BinaryRow, Integer> next = iterator.next();
+ BinaryRow partition = next.getLeft();
+ int bucket = next.getRight();
+ Map<String, DeletionFile> fromCache =
+ dvMetaCache.read(indexManifestPath, partition, bucket);
+ if (fromCache != null) {
+ result.put(next, fromCache);
+ iterator.remove();
+ if (dvMetaCacheMetrics != null) {
+ dvMetaCacheMetrics.increaseHitObject();
+ }
+ } else {
+ if (dvMetaCacheMetrics != null) {
+ dvMetaCacheMetrics.increaseMissedObject();
+ }
}
}
}
- for (DataFileMeta file : dataFiles) {
- IndexFileMeta indexFileMeta =
dataFileToIndexFileMeta.get(file.fileName());
- if (indexFileMeta != null) {
- LinkedHashMap<String, DeletionVectorMeta> dvMetas =
indexFileMeta.dvRanges();
- if (dvMetas != null && dvMetas.containsKey(file.fileName())) {
- deletionFiles.add(
+
+ // 2. read from file system
+ Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
+ indexFileHandler.scan(
+ snapshot,
+ DELETION_VECTORS_INDEX,
+
buckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
+ partitionFileMetas.forEach(
+ (entry, indexFileMetas) -> {
+ Map<String, DeletionFile> deletionFiles =
+ toDeletionFiles(entry, indexFileMetas);
+ if (dvMetaCache != null) {
+ dvMetaCache.put(
+ indexManifestPath,
+ entry.getLeft(),
+ entry.getRight(),
+ deletionFiles);
+ }
+ if (buckets.contains(entry)) {
+ result.put(entry, deletionFiles);
+ }
+ });
+ return result;
+ }
+
+ private Map<String, DeletionFile> toDeletionFiles(
+ Pair<BinaryRow, Integer> partitionBucket, List<IndexFileMeta>
fileMetas) {
+ Map<String, DeletionFile> deletionFiles = new HashMap<>();
+ DeletionVectorsIndexFile dvIndex =
+ indexFileHandler.dvIndex(partitionBucket.getLeft(),
partitionBucket.getRight());
+ for (IndexFileMeta indexFile : fileMetas) {
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges =
indexFile.dvRanges();
+ String dvFilePath = dvIndex.path(indexFile).toString();
+ if (dvRanges != null && !dvRanges.isEmpty()) {
+ for (DeletionVectorMeta dvMeta : dvRanges.values()) {
+ deletionFiles.put(
+ dvMeta.dataFileName(),
new DeletionFile(
- indexFile.path(indexFileMeta).toString(),
- dvMetas.get(file.fileName()).offset(),
- dvMetas.get(file.fileName()).length(),
-
dvMetas.get(file.fileName()).cardinality()));
- continue;
+ dvFilePath,
+ dvMeta.offset(),
+ dvMeta.length(),
+ dvMeta.cardinality()));
}
}
- deletionFiles.add(null);
}
-
- return deletionFiles;
- }
-
- public List<DeletionFile> getDeletionFiles(
- List<DataFileMeta> dataFiles, Map<String, DeletionFile>
deletionFilesMap) {
- List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
- dataFiles.stream()
- .map(DataFileMeta::fileName)
- .map(f -> deletionFilesMap == null ? null :
deletionFilesMap.get(f))
- .forEach(deletionFiles::add);
return deletionFiles;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
index d06c563187..34b92f9bc3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
@@ -27,132 +27,49 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caff
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
/** Cache for deletion vector meta. */
public class DVMetaCache {
- private final Cache<DVMetaCacheKey, List<DVMetaCacheValue>> cache;
- public DVMetaCache(long maxElementSize) {
+ private final Cache<DVMetaCacheKey, Map<String, DeletionFile>> cache;
+
+ public DVMetaCache(long maxValueNumber) {
this.cache =
Caffeine.newBuilder()
- .maximumSize(maxElementSize)
+ .weigher(DVMetaCache::weigh)
+ .maximumWeight(maxValueNumber)
.softValues()
.executor(Runnable::run)
.build();
}
+ private static int weigh(DVMetaCacheKey cacheKey, Map<String,
DeletionFile> cacheValue) {
+ return cacheValue.size() + 1;
+ }
+
@Nullable
- public Map<String, DeletionFile> read(Path path, BinaryRow partition, int
bucket) {
- DVMetaCacheKey cacheKey = new DVMetaCacheKey(path, partition, bucket);
- List<DVMetaCacheValue> cacheValue = this.cache.getIfPresent(cacheKey);
- if (cacheValue == null) {
- return null;
- }
- // If the bucket doesn't have dv metas, return empty set.
- Map<String, DeletionFile> dvFilesMap = new HashMap<>();
- cacheValue.forEach(
- dvMeta ->
- dvFilesMap.put(
- dvMeta.getDataFileName(),
- new DeletionFile(
- dvMeta.getDeletionFilePath(),
- dvMeta.getOffset(),
- dvMeta.getLength(),
- dvMeta.getCardinality())));
- return dvFilesMap;
+ public Map<String, DeletionFile> read(Path manifestPath, BinaryRow
partition, int bucket) {
+ DVMetaCacheKey cacheKey = new DVMetaCacheKey(manifestPath, partition,
bucket);
+ return this.cache.getIfPresent(cacheKey);
}
public void put(
Path path, BinaryRow partition, int bucket, Map<String,
DeletionFile> dvFilesMap) {
DVMetaCacheKey key = new DVMetaCacheKey(path, partition, bucket);
- List<DVMetaCacheValue> cacheValue = new ArrayList<>();
- dvFilesMap.forEach(
- (dataFileName, deletionFile) -> {
- DVMetaCacheValue dvMetaCacheValue =
- new DVMetaCacheValue(
- dataFileName,
- deletionFile.path(),
- (int) deletionFile.offset(),
- (int) deletionFile.length(),
- deletionFile.cardinality());
- cacheValue.add(dvMetaCacheValue);
- });
- this.cache.put(key, cacheValue);
- }
-
- private static class DVMetaCacheValue {
- private final String dataFileName;
- private final String deletionFilePath;
- private final int offset;
- private final int length;
- @Nullable private final Long cardinality;
-
- public DVMetaCacheValue(
- String dataFileName,
- String deletionFilePath,
- int start,
- int length,
- @Nullable Long cardinality) {
- this.dataFileName = dataFileName;
- this.deletionFilePath = deletionFilePath;
- this.offset = start;
- this.length = length;
- this.cardinality = cardinality;
- }
-
- public String getDataFileName() {
- return dataFileName;
- }
-
- public String getDeletionFilePath() {
- return deletionFilePath;
- }
-
- public int getOffset() {
- return offset;
- }
-
- public int getLength() {
- return length;
- }
-
- @Nullable
- public Long getCardinality() {
- return cardinality;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DVMetaCacheValue that = (DVMetaCacheValue) o;
- return offset == that.offset
- && length == that.length
- && Objects.equals(dataFileName, that.dataFileName)
- && Objects.equals(deletionFilePath, that.deletionFilePath)
- && Objects.equals(cardinality, that.cardinality);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(dataFileName, deletionFilePath, offset,
length, cardinality);
- }
+ this.cache.put(key, dvFilesMap);
}
/** Cache key for deletion vector meta at bucket level. */
private static final class DVMetaCacheKey {
- private final Path path;
+
+ private final Path manifestPath;
private final BinaryRow row;
private final int bucket;
- public DVMetaCacheKey(Path path, BinaryRow row, int bucket) {
- this.path = path;
+ public DVMetaCacheKey(Path manifestPath, BinaryRow row, int bucket) {
+ this.manifestPath = manifestPath;
this.row = row;
this.bucket = bucket;
}
@@ -167,13 +84,13 @@ public class DVMetaCache {
}
DVMetaCacheKey that = (DVMetaCacheKey) o;
return bucket == that.bucket
- && Objects.equals(path, that.path)
+ && Objects.equals(manifestPath, that.manifestPath)
&& Objects.equals(row, that.row);
}
@Override
public int hashCode() {
- return Objects.hash(path, row, bucket);
+ return Objects.hash(manifestPath, row, bucket);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
index edc34bc766..c2c5baba0a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -18,40 +18,18 @@
package org.apache.paimon.manifest;
-import org.apache.paimon.Snapshot;
import org.apache.paimon.TestAppendFileStore;
-import org.apache.paimon.TestFileStore;
-import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.format.FileFormat;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.DeletionVectorMeta;
-import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.paimon.operation.metrics.CacheMetrics;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.DVMetaCache;
-import org.apache.paimon.utils.IndexFilePathFactories;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.SegmentsCache;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static
org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile;
import static org.assertj.core.api.Assertions.assertThat;
@@ -71,7 +49,6 @@ public class IndexManifestFileHandlerTest {
FileFormat.manifestFormat(fileStore.options()),
"zstd",
fileStore.pathFactory(),
- null,
null)
.create();
IndexManifestFileHandler indexManifestFileHandler =
@@ -107,7 +84,6 @@ public class IndexManifestFileHandlerTest {
FileFormat.manifestFormat(fileStore.options()),
"zstd",
fileStore.pathFactory(),
- null,
null)
.create();
IndexManifestFileHandler indexManifestFileHandler =
@@ -138,206 +114,4 @@ public class IndexManifestFileHandlerTest {
assertThat(entries.contains(entry3)).isTrue();
assertThat(entries.contains(entry4)).isTrue();
}
-
- @Test
- public void testDVMetaCache() {
- TestFileStore fileStore = keyValueFileStore();
-
- // Setup cache and index-manifest file
- MemorySize manifestMaxMemory = MemorySize.ofMebiBytes(128);
- long manifestCacheThreshold = MemorySize.ofMebiBytes(1).getBytes();
- SegmentsCache<Path> manifestCache =
- SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
- DVMetaCache dvMetaCache = new DVMetaCache(1000000);
-
- IndexManifestFile indexManifestFile =
- new IndexManifestFile.Factory(
- fileStore.fileIO(),
- FileFormat.manifestFormat(fileStore.options()),
- "zstd",
- fileStore.pathFactory(),
- manifestCache,
- dvMetaCache)
- .create();
-
- IndexManifestFileHandler indexManifestFileHandler =
- new IndexManifestFileHandler(indexManifestFile,
BucketMode.HASH_FIXED);
-
- BinaryRow partition1 = partition(1);
- BinaryRow partition2 = partition(2);
-
- IndexManifestEntry entry1 =
- new IndexManifestEntry(
- FileKind.ADD,
- partition1,
- 0,
- deletionVectorIndexFile("data1.parquet",
"data2.parquet"));
- IndexManifestEntry entry2 =
- new IndexManifestEntry(
- FileKind.ADD, partition1, 1,
deletionVectorIndexFile("data3.parquet"));
- IndexManifestEntry entry3 =
- new IndexManifestEntry(
- FileKind.ADD,
- partition2,
- 0,
- deletionVectorIndexFile("data4.parquet",
"data5.parquet"));
-
- String indexManifestFileName =
- indexManifestFileHandler.write(null, Arrays.asList(entry1,
entry2, entry3));
-
- // Create IndexFileHandler with cache enabled
- IndexFileHandler indexFileHandler =
- new IndexFileHandler(
- fileStore.fileIO(),
- fileStore.snapshotManager(),
- indexManifestFile,
- new IndexFilePathFactories(fileStore.pathFactory()),
- MemorySize.ofMebiBytes(2),
- dvMetaCache,
- false);
-
- Map<String, DeletionFile> expectedPartition1Bucket0Files =
- indexFileHandler.extractDeletionFileByMeta(partition1, 0,
entry1.indexFile());
- Map<String, DeletionFile> expectedPartition1Bucket1Files =
- indexFileHandler.extractDeletionFileByMeta(partition1, 1,
entry2.indexFile());
- Map<String, DeletionFile> expectedPartition2Bucket0Files =
- indexFileHandler.extractDeletionFileByMeta(partition2, 0,
entry3.indexFile());
-
- CacheMetrics cacheMetrics = new CacheMetrics();
- indexFileHandler.withCacheMetrics(cacheMetrics);
-
- Snapshot snapshot = snapshot(indexManifestFileName);
-
- // Test 1: First access should miss cache
- Set<Pair<BinaryRow, Integer>> partitionBuckets = new HashSet<>();
- partitionBuckets.add(Pair.of(partition1, 0));
- Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles1 =
- indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
- assertThat(deletionFiles1).isNotNull();
- assertThat(deletionFiles1.containsKey(Pair.of(partition1,
0))).isTrue();
- assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
- assertThat(cacheMetrics.getHitObject().get()).isEqualTo(0);
- Map<String, DeletionFile> actualPartition1Bucket0Files =
- deletionFiles1.get(Pair.of(partition1, 0));
-
assertThat(actualPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
-
- // Test 2: Second access to same partition/bucket should hit cache
- Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles2 =
- indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
- assertThat(deletionFiles2).isNotNull();
- assertThat(deletionFiles1).isEqualTo(deletionFiles2);
- assertThat(cacheMetrics.getHitObject().get()).isEqualTo(1);
- Map<String, DeletionFile> cachedPartition1Bucket0Files =
- deletionFiles2.get(Pair.of(partition1, 0));
-
assertThat(cachedPartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
-
- // Test 3: Access different bucket in same partition should hit cache
- partitionBuckets.clear();
- partitionBuckets.add(Pair.of(partition1, 1));
- Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles3 =
- indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
- assertThat(deletionFiles3).isNotNull();
- assertThat(cacheMetrics.getHitObject().get()).isEqualTo(2);
- assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(1);
- Map<String, DeletionFile> actualPartition1Bucket1Files =
- deletionFiles3.get(Pair.of(partition1, 1));
-
assertThat(actualPartition1Bucket1Files).isEqualTo(expectedPartition1Bucket1Files);
-
- // Test 4: Access different partition should miss cache
- partitionBuckets.clear();
- partitionBuckets.add(Pair.of(partition2, 0));
- Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles4 =
- indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
- assertThat(deletionFiles4).isNotNull();
- assertThat(cacheMetrics.getMissedObject().get()).isEqualTo(2); // Now
2 misses total
- Map<String, DeletionFile> actualPartition2Bucket0Files =
- deletionFiles4.get(Pair.of(partition2, 0));
-
assertThat(actualPartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
-
- // Test 5: Test non-cache path by requesting multiple partition buckets
- partitionBuckets.clear();
- partitionBuckets.add(Pair.of(partition1, 0));
- partitionBuckets.add(Pair.of(partition2, 0)); // Multiple buckets to
avoid cache path
- Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>>
deletionFiles5 =
- indexFileHandler.scanDVIndex(snapshot, partitionBuckets);
- assertThat(deletionFiles5).isNotNull();
- assertThat(deletionFiles5.containsKey(Pair.of(partition1,
0))).isTrue();
- assertThat(deletionFiles5.containsKey(Pair.of(partition2,
0))).isTrue();
-
- Map<String, DeletionFile> nonCachePartition1Bucket0Files =
- deletionFiles5.get(Pair.of(partition1, 0));
- Map<String, DeletionFile> nonCachePartition2Bucket0Files =
- deletionFiles5.get(Pair.of(partition2, 0));
-
assertThat(nonCachePartition1Bucket0Files).isEqualTo(expectedPartition1Bucket0Files);
-
assertThat(nonCachePartition2Bucket0Files).isEqualTo(expectedPartition2Bucket0Files);
- }
-
- // ============================ Test utils
===================================
-
- private BinaryRow partition(int partitionValue) {
- BinaryRow partition = new BinaryRow(1);
- BinaryRowWriter writer = new BinaryRowWriter(partition);
- writer.writeInt(0, partitionValue);
- writer.complete();
- return partition;
- }
-
- private IndexFileMeta deletionVectorIndexFile(String... dataFileNames) {
- LinkedHashMap<String, DeletionVectorMeta> dvRanges = new
LinkedHashMap<>();
- int offset = 0;
- for (String dataFileName : dataFileNames) {
- dvRanges.put(
- dataFileName,
- new DeletionVectorMeta(
- dataFileName, offset, 100 + offset, (long) (10 +
offset)));
- offset += 150;
- }
- return new IndexFileMeta(
- DELETION_VECTORS_INDEX,
- "dv_index_" + UUID.randomUUID().toString(),
- 1024,
- 512,
- dvRanges,
- null);
- }
-
- private Snapshot snapshot(String indexManifestFileName) {
- String json =
- "{\n"
- + " \"version\" : 3,\n"
- + " \"id\" : 1,\n"
- + " \"schemaId\" : 0,\n"
- + " \"baseManifestList\" : null,\n"
- + " \"baseManifestListSize\" : 0,\n"
- + " \"deltaManifestList\" : null,\n"
- + " \"deltaManifestListSize\" : 0,\n"
- + " \"changelogManifestListSize\" : 0,\n"
- + " \"indexManifest\" : \""
- + indexManifestFileName
- + "\",\n"
- + " \"commitUser\" : \"test\",\n"
- + " \"commitIdentifier\" : 1,\n"
- + " \"commitKind\" : \"APPEND\",\n"
- + " \"timeMillis\" : "
- + System.currentTimeMillis()
- + ",\n"
- + " \"totalRecordCount\" : null,\n"
- + " \"deltaRecordCount\" : null\n"
- + "}";
- return Snapshot.fromJson(json);
- }
-
- private TestFileStore keyValueFileStore() {
- return new TestFileStore.Builder(
- "avro",
- tempDir.toString(),
- 2, // 2 buckets for testing
- TestKeyValueGenerator.DEFAULT_PART_TYPE,
- TestKeyValueGenerator.KEY_TYPE,
- TestKeyValueGenerator.DEFAULT_ROW_TYPE,
-
TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
- DeduplicateMergeFunction.factory(),
- null)
- .build();
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
index 5b1c9611bb..c0c28ce3ab 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
@@ -102,7 +102,7 @@ public class DVMetaCacheTest {
@Test
public void testCacheEviction() {
- DVMetaCache cache = new DVMetaCache(2);
+ DVMetaCache cache = new DVMetaCache(5);
Path path = new Path("manifest/index-manifest-00004");
BinaryRow partition = partition("year=2023/month=09");
@@ -126,16 +126,25 @@ public class DVMetaCacheTest {
assertThat(cache.read(path, partition, 1)).isNotNull();
assertThat(cache.read(path, partition, 2)).isNotNull();
- // Add third entry, should evict first one
+ // Add third entry, should evict itself
Map<String, DeletionFile> dvFiles3 = new HashMap<>();
dvFiles3.put(
"data-g7h8i9j0-k1l2-3456-ghij-789012345678-1.parquet",
new
DeletionFile("index-g7h8i9j0-k1l2-3456-ghij-789012345678-1", 0L, 100L, 3L));
cache.put(path, partition, 3, dvFiles3);
+ // Add forth entry, should evict first one
+ Map<String, DeletionFile> dvFiles4 = new HashMap<>();
+ dvFiles4.put(
+ "data-g7h8i9j0-k1l2-311456-ghij-789012345678-1.parquet",
+ new
DeletionFile("index-g117h8i9j0-k1l2-3456-ghij-789012345678-1", 0L, 100L, 3L));
+ cache.put(path, partition, 4, dvFiles4);
+
// First entry should be evicted
assertThat(cache.read(path, partition, 1)).isNull();
- assertThat(cache.read(path, partition, 3)).isNotNull();
+ assertThat(cache.read(path, partition, 2)).isNotNull();
+ assertThat(cache.read(path, partition, 3)).isNull();
+ assertThat(cache.read(path, partition, 4)).isNotNull();
}
// ============================ Test utils
===================================