This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f970245100 [core] Add partition and bucket arguements to
FileStorePathFactory.indexFileFactory
f970245100 is described below
commit f9702451006b876b5f3455eeb86bed782bd97f74
Author: JingsongLi <[email protected]>
AuthorDate: Sat Aug 23 11:50:53 2025 +0800
[core] Add partition and bucket arguements to
FileStorePathFactory.indexFileFactory
---
.../java/org/apache/paimon/AbstractFileStore.java | 14 ++---
.../deletionvectors/BucketedDvMaintainer.java | 13 +++--
.../append/BaseAppendDeleteFileMaintainer.java | 9 ++-
.../paimon/index/DynamicBucketIndexMaintainer.java | 6 +-
.../org/apache/paimon/index/IndexFileHandler.java | 65 ++++++++++++----------
.../org/apache/paimon/index/PartitionIndex.java | 6 +-
.../paimon/operation/AbstractFileStoreWrite.java | 6 +-
.../apache/paimon/table/sink/TableCommitImpl.java | 5 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 6 +-
.../apache/paimon/utils/FileStorePathFactory.java | 2 +-
.../paimon/utils/IndexFilePathFactories.java | 41 ++++++++++++++
.../org/apache/paimon/TestAppendFileStore.java | 2 +-
.../deletionvectors/BucketedDvMaintainerTest.java | 46 +++++++--------
.../append/AppendDeletionFileMaintainerHelper.java | 7 ++-
.../append/AppendDeletionFileMaintainerTest.java | 2 +-
.../index/DynamicBucketIndexMaintainerTest.java | 3 +-
.../paimon/index/HashBucketAssignerTest.java | 50 ++++++++++++-----
.../paimon/operation/FileStoreCommitTest.java | 56 +++++++++++++++----
.../apache/paimon/flink/BatchFileStoreITCase.java | 4 +-
19 files changed, 230 insertions(+), 113 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 4d2b7726de..1c12877e75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -22,13 +22,11 @@ import org.apache.paimon.CoreOptions.ExternalPathStrategy;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.IcebergCommitCallback;
import org.apache.paimon.iceberg.IcebergOptions;
-import org.apache.paimon.index.HashIndexFile;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
@@ -61,6 +59,7 @@ import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
@@ -221,15 +220,12 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
@Override
public IndexFileHandler newIndexFileHandler() {
return new IndexFileHandler(
+ fileIO,
snapshotManager(),
- pathFactory().indexFileFactory(),
indexManifestFileFactory().create(),
- new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
- new DeletionVectorsIndexFile(
- fileIO,
- pathFactory().indexFileFactory(),
- options.dvIndexFileTargetSize(),
- options.deletionVectorBitmap64()));
+ new IndexFilePathFactories(pathFactory()),
+ options.dvIndexFileTargetSize(),
+ options.deletionVectorBitmap64());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
index eef21f3b54..788ea1cce8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
@@ -19,6 +19,7 @@
package org.apache.paimon.deletionvectors;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
@@ -160,17 +161,19 @@ public class BucketedDvMaintainer {
return handler;
}
- public BucketedDvMaintainer create(@Nullable List<IndexFileMeta>
restoredFiles) {
+ public BucketedDvMaintainer create(
+ BinaryRow partition, int bucket, @Nullable List<IndexFileMeta>
restoredFiles) {
if (restoredFiles == null) {
restoredFiles = Collections.emptyList();
}
Map<String, DeletionVector> deletionVectors =
- new
HashMap<>(handler.readAllDeletionVectors(restoredFiles));
- return create(deletionVectors);
+ new HashMap<>(handler.readAllDeletionVectors(partition,
bucket, restoredFiles));
+ return create(partition, bucket, deletionVectors);
}
- public BucketedDvMaintainer create(Map<String, DeletionVector>
deletionVectors) {
- return new BucketedDvMaintainer(handler.dvIndex(),
deletionVectors);
+ public BucketedDvMaintainer create(
+ BinaryRow partition, int bucket, Map<String, DeletionVector>
deletionVectors) {
+ return new BucketedDvMaintainer(handler.dvIndex(partition,
bucket), deletionVectors);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
index 729f660177..21d1870b76 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
@@ -68,7 +69,8 @@ public interface BaseAppendDeleteFileMaintainer {
List<IndexFileMeta> indexFiles =
indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX,
partition, bucket);
BucketedDvMaintainer maintainer =
-
BucketedDvMaintainer.factory(indexFileHandler).create(indexFiles);
+ BucketedDvMaintainer.factory(indexFileHandler)
+ .create(partition, bucket, indexFiles);
return new BucketedAppendDeleteFileMaintainer(partition, bucket,
maintainer);
}
@@ -94,6 +96,9 @@ public interface BaseAppendDeleteFileMaintainer {
}
}
return new AppendDeleteFileMaintainer(
- indexFileHandler.dvIndex(), partition, manifestEntries,
deletionFiles);
+ indexFileHandler.dvIndex(partition, UNAWARE_BUCKET),
+ partition,
+ manifestEntries,
+ deletionFiles);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
index fbf95d7a14..50397a7a85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
@@ -110,8 +110,10 @@ public class DynamicBucketIndexMaintainer {
return handler;
}
- public DynamicBucketIndexMaintainer create(@Nullable IndexFileMeta
restoredFile) {
- return new DynamicBucketIndexMaintainer(handler.hashIndex(),
restoredFile);
+ public DynamicBucketIndexMaintainer create(
+ BinaryRow partition, int bucket, @Nullable IndexFileMeta
restoredFile) {
+ return new DynamicBucketIndexMaintainer(
+ handler.hashIndex(partition, bucket), restoredFile);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 188efe9348..186f375633 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -22,11 +22,13 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;
import java.io.IOException;
@@ -40,36 +42,39 @@ import java.util.Set;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Handle index files. */
public class IndexFileHandler {
+ private final FileIO fileIO;
private final SnapshotManager snapshotManager;
- private final PathFactory pathFactory;
private final IndexManifestFile indexManifestFile;
- private final HashIndexFile hashIndex;
- private final DeletionVectorsIndexFile dvIndex;
+ private final IndexFilePathFactories pathFactories;
+ private final MemorySize dvTargetFileSize;
+ private final boolean dvBitmap64;
public IndexFileHandler(
+ FileIO fileIO,
SnapshotManager snapshotManager,
- PathFactory pathFactory,
IndexManifestFile indexManifestFile,
- HashIndexFile hashIndex,
- DeletionVectorsIndexFile dvIndex) {
+ IndexFilePathFactories pathFactories,
+ MemorySize dvTargetFileSize,
+ boolean dvBitmap64) {
+ this.fileIO = fileIO;
this.snapshotManager = snapshotManager;
- this.pathFactory = pathFactory;
+ this.pathFactories = pathFactories;
this.indexManifestFile = indexManifestFile;
- this.hashIndex = hashIndex;
- this.dvIndex = dvIndex;
+ this.dvTargetFileSize = dvTargetFileSize;
+ this.dvBitmap64 = dvBitmap64;
}
- public HashIndexFile hashIndex() {
- return this.hashIndex;
+ public HashIndexFile hashIndex(BinaryRow partition, int bucket) {
+ return new HashIndexFile(fileIO, pathFactories.get(partition, bucket));
}
- public DeletionVectorsIndexFile dvIndex() {
- return this.dvIndex;
+ public DeletionVectorsIndexFile dvIndex(BinaryRow partition, int bucket) {
+ return new DeletionVectorsIndexFile(
+ fileIO, pathFactories.get(partition, bucket),
dvTargetFileSize, dvBitmap64);
}
public Optional<IndexFileMeta> scanHashIndex(
@@ -166,7 +171,9 @@ public class IndexFileHandler {
}
public Path filePath(IndexManifestEntry entry) {
- return pathFactory.toPath(entry.indexFile().fileName());
+ return pathFactories
+ .get(entry.partition(), entry.bucket())
+ .toPath(entry.indexFile().fileName());
}
public boolean existsManifest(String indexManifest) {
@@ -182,36 +189,36 @@ public class IndexFileHandler {
return indexManifestFile.readWithIOException(indexManifest);
}
- private IndexFile indexFile(IndexFileMeta file) {
+ private IndexFile indexFile(IndexManifestEntry entry) {
+ IndexFileMeta file = entry.indexFile();
switch (file.indexType()) {
case HASH_INDEX:
- return hashIndex;
+ return hashIndex(entry.partition(), entry.bucket());
case DELETION_VECTORS_INDEX:
- return dvIndex;
+ return dvIndex(entry.partition(), entry.bucket());
default:
throw new IllegalArgumentException("Unknown index type: " +
file.indexType());
}
}
public boolean existsIndexFile(IndexManifestEntry file) {
- return indexFile(file.indexFile()).exists(file.indexFile().fileName());
+ return indexFile(file).exists(file.indexFile().fileName());
}
public void deleteIndexFile(IndexManifestEntry entry) {
- IndexFileMeta file = entry.indexFile();
- indexFile(file).delete(file.fileName());
+ indexFile(entry).delete(entry.indexFile().fileName());
}
public void deleteManifest(String indexManifest) {
indexManifestFile.delete(indexManifest);
}
- public Map<String, DeletionVector>
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
- for (IndexFileMeta indexFile : fileMetas) {
- checkArgument(
- indexFile.indexType().equals(DELETION_VECTORS_INDEX),
- "Input file is not deletion vectors index " +
indexFile.indexType());
- }
- return dvIndex.readAllDeletionVectors(fileMetas);
+ public Map<String, DeletionVector> readAllDeletionVectors(
+ BinaryRow partition, int bucket, List<IndexFileMeta> fileMetas) {
+ return dvIndex(partition, bucket).readAllDeletionVectors(fileMetas);
+ }
+
+ public Map<String, DeletionVector>
readAllDeletionVectors(IndexManifestEntry entry) {
+ return dvIndex(entry.partition(),
entry.bucket()).readAllDeletionVectors(entry.indexFile());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 958dcfcdee..1e525ea4e0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -124,11 +124,13 @@ public class PartitionIndex {
IntPredicate loadFilter,
IntPredicate bucketFilter) {
List<IndexManifestEntry> files =
indexFileHandler.scanEntries(HASH_INDEX, partition);
- HashIndexFile hashIndex = indexFileHandler.hashIndex();
Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder();
Map<Integer, Long> buckets = new HashMap<>();
for (IndexManifestEntry file : files) {
- try (IntIterator iterator =
hashIndex.read(file.indexFile().fileName())) {
+ try (IntIterator iterator =
+ indexFileHandler
+ .hashIndex(file.partition(), file.bucket())
+ .read(file.indexFile().fileName())) {
while (true) {
try {
int hash = iterator.next();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 15af7eb4fd..7b77fb179f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -429,11 +429,13 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
DynamicBucketIndexMaintainer indexMaintainer =
dbMaintainerFactory == null
? null
- :
dbMaintainerFactory.create(restored.dynamicBucketIndex());
+ : dbMaintainerFactory.create(
+ partition, bucket,
restored.dynamicBucketIndex());
BucketedDvMaintainer dvMaintainer =
dvMaintainerFactory == null
? null
- :
dvMaintainerFactory.create(restored.deleteVectorsIndex());
+ : dvMaintainerFactory.create(
+ partition, bucket,
restored.deleteVectorsIndex());
List<DataFileMeta> restoreFiles = restored.dataFiles();
if (restoreFiles == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 40c69289a5..fd2aa13f36 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -35,6 +35,7 @@ import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.IndexFilePathFactories;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -278,12 +279,14 @@ public class TableCommitImpl implements InnerTableCommit {
private void checkFilesExistence(List<ManifestCommittable> committables) {
List<Path> files = new ArrayList<>();
DataFilePathFactories factories = new
DataFilePathFactories(commit.pathFactory());
- PathFactory indexFileFactory = commit.pathFactory().indexFileFactory();
+ IndexFilePathFactories indexFactories = new
IndexFilePathFactories(commit.pathFactory());
for (ManifestCommittable committable : committables) {
for (CommitMessage message : committable.fileCommittables()) {
CommitMessageImpl msg = (CommitMessageImpl) message;
DataFilePathFactory pathFactory =
factories.get(message.partition(), message.bucket());
+ PathFactory indexFileFactory =
+ indexFactories.get(message.partition(),
message.bucket());
Consumer<DataFileMeta> collector = f ->
files.addAll(f.collectFiles(pathFactory));
msg.newFilesIncrement().newFiles().forEach(collector);
msg.newFilesIncrement().changelogFiles().forEach(collector);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ce3f2e9c06..e7c8366372 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -393,7 +393,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
if (deletionVectors && deletionIndexFilesMap != null) {
builder.withDataDeletionFiles(
getDeletionFiles(
- indexFileHandler.dvIndex(),
+ indexFileHandler.dvIndex(partition,
bucket),
dataFiles,
deletionIndexFilesMap.getOrDefault(
Pair.of(partition, bucket),
@@ -520,13 +520,13 @@ public class SnapshotReaderImpl implements SnapshotReader
{
&& deletionIndexFilesMap != null) {
builder.withBeforeDeletionFiles(
getDeletionFiles(
- indexFileHandler.dvIndex(),
+ indexFileHandler.dvIndex(part, bucket),
before,
beforDeletionIndexFilesMap.getOrDefault(
Pair.of(part, bucket),
Collections.emptyList())));
builder.withDataDeletionFiles(
getDeletionFiles(
- indexFileHandler.dvIndex(),
+ indexFileHandler.dvIndex(part, bucket),
data,
deletionIndexFilesMap.getOrDefault(
Pair.of(part, bucket),
Collections.emptyList())));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index b63023653a..63d2b06dc0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -268,7 +268,7 @@ public class FileStorePathFactory {
};
}
- public PathFactory indexFileFactory() {
+ public PathFactory indexFileFactory(BinaryRow partition, int bucket) {
return new PathFactory() {
@Override
public Path newPath() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java
b/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java
new file mode 100644
index 0000000000..934e6b01f2
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Cache for index {@link PathFactory}s. */
+public class IndexFilePathFactories {
+
+ private final Map<Pair<BinaryRow, Integer>, PathFactory> cache = new
HashMap<>();
+ private final FileStorePathFactory pathFactory;
+
+ public IndexFilePathFactories(FileStorePathFactory pathFactory) {
+ this.pathFactory = pathFactory;
+ }
+
+ public PathFactory get(BinaryRow partition, int bucket) {
+ return cache.computeIfAbsent(
+ Pair.of(partition, bucket),
+ k -> pathFactory.indexFileFactory(k.getKey(), k.getValue()));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 55ffdcbe53..9caa379884 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -132,7 +132,7 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
List<IndexFileMeta> indexFiles =
fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
partition, bucket);
- return factory.create(indexFiles);
+ return factory.create(partition, bucket, indexFiles);
}
public CommitMessageImpl writeDVIndexFiles(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 5d3b1594ee..c195517731 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.assertj.core.api.Assertions.assertThat;
@@ -61,7 +62,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
initIndexHandler(bitmap64);
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
+ BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0,
emptyList());
assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64);
dvMaintainer.notifyNewDeletion("f1", 1);
@@ -74,7 +75,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
Map<String, DeletionVector> deletionVectors =
-
fileHandler.readAllDeletionVectors(Collections.singletonList(file));
+ fileHandler.readAllDeletionVectors(EMPTY_ROW, 0,
Collections.singletonList(file));
assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
@@ -89,7 +90,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer = factory.create(new HashMap<>());
+ BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, new
HashMap<>());
DeletionVector deletionVector1 = createDeletionVector(bitmap64);
deletionVector1.delete(1);
deletionVector1.delete(3);
@@ -100,7 +101,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
CommitMessage commitMessage =
new CommitMessageImpl(
- BinaryRow.EMPTY_ROW,
+ EMPTY_ROW,
0,
1,
DataIncrement.emptyIncrement(),
@@ -111,8 +112,8 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
List<IndexFileMeta> indexFiles =
- fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
BinaryRow.EMPTY_ROW, 0);
- dvMaintainer = factory.create(indexFiles);
+ fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
EMPTY_ROW, 0);
+ dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
DeletionVector deletionVector2 =
dvMaintainer.deletionVectorOf("f1").get();
assertThat(deletionVector2.isDeleted(1)).isTrue();
assertThat(deletionVector2.isDeleted(2)).isFalse();
@@ -123,7 +124,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
file = dvMaintainer.writeDeletionVectorsIndex().get();
commitMessage =
new CommitMessageImpl(
- BinaryRow.EMPTY_ROW,
+ EMPTY_ROW,
0,
1,
DataIncrement.emptyIncrement(),
@@ -133,9 +134,8 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
commit.commit(Collections.singletonList(commitMessage));
latestSnapshot = table.snapshotManager().latestSnapshot();
- indexFiles =
- fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
BinaryRow.EMPTY_ROW, 0);
- dvMaintainer = factory.create(indexFiles);
+ indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
EMPTY_ROW, 0);
+ dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
DeletionVector deletionVector3 =
dvMaintainer.deletionVectorOf("f1").get();
assertThat(deletionVector3.isDeleted(1)).isTrue();
assertThat(deletionVector3.isDeleted(2)).isTrue();
@@ -147,7 +147,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
initIndexHandler(bitmap64);
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
+ BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0,
emptyList());
File indexDir = new File(tempPath.toFile(), "/default.db/T/index");
@@ -188,7 +188,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
// write first kind dv
initIndexHandler(bitmap64);
BucketedDvMaintainer.Factory factory1 =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer1 = factory1.create(new HashMap<>());
+ BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, new
HashMap<>());
dvMaintainer1.notifyNewDeletion("f1", 1);
dvMaintainer1.notifyNewDeletion("f1", 3);
dvMaintainer1.notifyNewDeletion("f2", 1);
@@ -198,7 +198,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get();
CommitMessage commitMessage1 =
new CommitMessageImpl(
- BinaryRow.EMPTY_ROW,
+ EMPTY_ROW,
0,
1,
DataIncrement.emptyIncrement(),
@@ -212,11 +212,8 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
BucketedDvMaintainer.Factory factory2 =
BucketedDvMaintainer.factory(fileHandler);
List<IndexFileMeta> indexFiles =
fileHandler.scan(
- table.latestSnapshot().get(),
- DELETION_VECTORS_INDEX,
- BinaryRow.EMPTY_ROW,
- 0);
- BucketedDvMaintainer dvMaintainer2 = factory2.create(indexFiles);
+ table.latestSnapshot().get(), DELETION_VECTORS_INDEX,
EMPTY_ROW, 0);
+ BucketedDvMaintainer dvMaintainer2 = factory2.create(EMPTY_ROW, 0,
indexFiles);
dvMaintainer2.notifyNewDeletion("f1", 10);
dvMaintainer2.notifyNewDeletion("f3", 1);
dvMaintainer2.notifyNewDeletion("f3", 3);
@@ -234,7 +231,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
file = dvMaintainer2.writeDeletionVectorsIndex().get();
CommitMessage commitMessage2 =
new CommitMessageImpl(
- BinaryRow.EMPTY_ROW,
+ EMPTY_ROW,
0,
1,
DataIncrement.emptyIncrement(),
@@ -246,11 +243,10 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
// test read dv index file which contains two kinds of dv
Map<String, DeletionVector> readDvs =
fileHandler.readAllDeletionVectors(
+ EMPTY_ROW,
+ 0,
fileHandler.scan(
- table.latestSnapshot().get(),
- "DELETION_VECTORS",
- BinaryRow.EMPTY_ROW,
- 0));
+ table.latestSnapshot().get(),
"DELETION_VECTORS", EMPTY_ROW, 0));
assertThat(readDvs.size()).isEqualTo(3);
assertThat(dvs.get("f1").getCardinality()).isEqualTo(3);
assertThat(dvs.get("f2").getCardinality()).isEqualTo(2);
@@ -282,7 +278,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
.map(IndexManifestEntry::indexFile)
.collect(Collectors.toList());
Map<String, DeletionVector> deletionVectors =
- new HashMap<>(handler.readAllDeletionVectors(indexFiles));
- return factory.create(deletionVectors);
+ new HashMap<>(handler.readAllDeletionVectors(EMPTY_ROW, 0,
indexFiles));
+ return factory.create(EMPTY_ROW, 0, deletionVectors);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
index 98adf5e8fb..e84d09ae72 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
@@ -28,6 +28,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+
/** Helper for {@link BaseAppendDeleteFileMaintainer}. */
public class AppendDeletionFileMaintainerHelper {
@@ -48,6 +50,9 @@ public class AppendDeletionFileMaintainerHelper {
indexManifestEntry.indexFile().fileName()))
.collect(Collectors.toList());
return new AppendDeleteFileMaintainer(
- indexFileHandler.dvIndex(), partition, manifests,
deletionFiles);
+ indexFileHandler.dvIndex(partition, UNAWARE_BUCKET),
+ partition,
+ manifests,
+ deletionFiles);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 3b81c8478e..ab0376095c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -66,7 +66,7 @@ class AppendDeletionFileMaintainerTest {
Collections.singletonMap("f3", Arrays.asList(1, 2,
3)));
store.commit(commitMessage1, commitMessage2);
- PathFactory indexPathFactory = store.pathFactory().indexFileFactory();
+ PathFactory indexPathFactory =
store.pathFactory().indexFileFactory(BinaryRow.EMPTY_ROW, 0);
Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
dataFileToDeletionFiles.putAll(
createDeletionFileMapFromIndexFileMetas(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
index 4c6c79d17e..769817d63b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
@@ -80,7 +80,8 @@ public class DynamicBucketIndexMaintainerTest extends
PrimaryKeyTableTestBase {
continue;
}
int[] ints =
-
fileHandler.hashIndex().readList(files.get(0).fileName()).stream()
+ fileHandler.hashIndex(message.partition(),
message.bucket())
+ .readList(files.get(0).fileName()).stream()
.mapToInt(Integer::intValue)
.toArray();
index.computeIfAbsent(message.partition(), k -> new HashMap<>())
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 6ece642aa7..01eca82c2e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -45,13 +45,11 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
private IndexFileHandler fileHandler;
- private HashIndexFile indexFile;
private StreamTableCommit commit;
@BeforeEach
public void beforeEach() throws Exception {
fileHandler = table.store().newIndexFileHandler();
- indexFile = fileHandler.hashIndex();
commit =
table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit();
}
@@ -226,13 +224,19 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
@Test
public void testAssignRestore() throws IOException {
- IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
- IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
commit.commit(
0,
Arrays.asList(
- createCommitMessage(row(1), 0, 3, bucket0),
- createCommitMessage(row(1), 2, 3, bucket2)));
+ createCommitMessage(
+ row(1),
+ 0,
+ 3,
+ fileHandler.hashIndex(row(1), 0).write(new
int[] {2, 5})),
+ createCommitMessage(
+ row(1),
+ 2,
+ 3,
+ fileHandler.hashIndex(row(1), 2).write(new
int[] {4, 7}))));
HashBucketAssigner assigner0 = createAssigner(3, 3, 0);
HashBucketAssigner assigner2 = createAssigner(3, 3, 2);
@@ -252,13 +256,19 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
@Test
public void testAssignRestoreWithUpperBound() throws IOException {
- IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
- IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
commit.commit(
0,
Arrays.asList(
- createCommitMessage(row(1), 0, 3, bucket0),
- createCommitMessage(row(1), 2, 3, bucket2)));
+ createCommitMessage(
+ row(1),
+ 0,
+ 3,
+ fileHandler.hashIndex(row(1), 0).write(new
int[] {2, 5})),
+ createCommitMessage(
+ row(1),
+ 2,
+ 3,
+ fileHandler.hashIndex(row(1), 2).write(new
int[] {4, 7}))));
HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1);
HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1);
@@ -316,8 +326,17 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
0,
Arrays.asList(
- createCommitMessage(row(1), 0, 1, indexFile.write(new
int[] {0})),
- createCommitMessage(row(2), 0, 1, indexFile.write(new
int[] {0}))));
+ createCommitMessage(
+ row(1),
+ 0,
+ 1,
+ fileHandler.hashIndex(row(1), 0).write(new
int[] {0})),
+ createCommitMessage(
+ row(2),
+ 0,
+ 1,
+ fileHandler.hashIndex(row(2), 0).write(new
int[] {0}))));
+
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1),
row(2));
// checkpoint 1, but no commit
@@ -333,7 +352,12 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
1,
Collections.singletonList(
- createCommitMessage(row(1), 0, 1, indexFile.write(new
int[] {1}))));
+ createCommitMessage(
+ row(1),
+ 0,
+ 1,
+ fileHandler.hashIndex(row(1), 0).write(new
int[] {1}))));
+
assigner.prepareCommit(3);
assertThat(assigner.currentPartitions()).isEmpty();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index f9a584b508..f8a9309a09 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -29,7 +29,6 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.index.HashIndexFile;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
@@ -715,7 +714,6 @@ public class FileStoreCommitTest {
public void testIndexFiles() throws Exception {
TestFileStore store = createStore(false, 2);
IndexFileHandler indexFileHandler = store.newIndexFileHandler();
- HashIndexFile hashIndex = indexFileHandler.hashIndex();
KeyValue record1 = gen.next();
BinaryRow part1 = gen.getPartition(record1);
@@ -727,9 +725,23 @@ public class FileStoreCommitTest {
}
// init write
- store.commitDataIndex(record1, gen::getPartition, 0,
hashIndex.write(new int[] {1, 2, 5}));
- store.commitDataIndex(record1, gen::getPartition, 1,
hashIndex.write(new int[] {6, 8}));
- store.commitDataIndex(record2, gen::getPartition, 2,
hashIndex.write(new int[] {3, 5}));
+ store.commitDataIndex(
+ record1,
+ gen::getPartition,
+ 0,
+ indexFileHandler
+ .hashIndex(gen.getPartition(record1), 0)
+ .write(new int[] {1, 2, 5}));
+ store.commitDataIndex(
+ record1,
+ gen::getPartition,
+ 1,
+ indexFileHandler.hashIndex(gen.getPartition(record1),
1).write(new int[] {6, 8}));
+ store.commitDataIndex(
+ record2,
+ gen::getPartition,
+ 2,
+ indexFileHandler.hashIndex(gen.getPartition(record2),
2).write(new int[] {3, 5}));
Snapshot snapshot = store.snapshotManager().latestSnapshot();
@@ -740,12 +752,18 @@ public class FileStoreCommitTest {
IndexManifestEntry indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
0).findAny().get();
-
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+ assertThat(
+ indexFileHandler
+ .hashIndex(part1, 0)
+
.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(1, 2, 5);
indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
1).findAny().get();
-
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+ assertThat(
+ indexFileHandler
+ .hashIndex(part1, 1)
+
.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(6, 8);
// assert part2
@@ -753,11 +771,18 @@ public class FileStoreCommitTest {
indexFileHandler.scanEntries(snapshot, HASH_INDEX, part2);
assertThat(part2Index.size()).isEqualTo(1);
assertThat(part2Index.get(0).bucket()).isEqualTo(2);
-
assertThat(hashIndex.readList(part2Index.get(0).indexFile().fileName()))
+ assertThat(
+ indexFileHandler
+ .hashIndex(part2, 2)
+
.readList(part2Index.get(0).indexFile().fileName()))
.containsExactlyInAnyOrder(3, 5);
// update part1
- store.commitDataIndex(record1, gen::getPartition, 0,
hashIndex.write(new int[] {1, 4}));
+ store.commitDataIndex(
+ record1,
+ gen::getPartition,
+ 0,
+ indexFileHandler.hashIndex(gen.getPartition(record1),
0).write(new int[] {1, 4}));
snapshot = store.snapshotManager().latestSnapshot();
// assert update part1
@@ -766,18 +791,25 @@ public class FileStoreCommitTest {
indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
0).findAny().get();
-
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+ assertThat(
+ indexFileHandler
+ .hashIndex(part1, 0)
+
.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(1, 4);
indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
1).findAny().get();
-
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+ assertThat(
+ indexFileHandler
+ .hashIndex(part1, 1)
+
.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(6, 8);
// assert scan one bucket
Optional<IndexFileMeta> file =
indexFileHandler.scanHashIndex(snapshot, part1, 0);
assertThat(file).isPresent();
-
assertThat(hashIndex.readList(file.get().fileName())).containsExactlyInAnyOrder(1,
4);
+ assertThat(indexFileHandler.hashIndex(part1,
0).readList(file.get().fileName()))
+ .containsExactlyInAnyOrder(1, 4);
// overwrite one partition
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE,
true);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 44c7ebc754..f61f1fa1d7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -23,7 +23,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.flink.util.AbstractTestBase;
-import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -121,10 +120,9 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
List<IndexManifestEntry> indexManifestEntries =
table.indexManifestFileReader().read(snapshot.indexManifest());
assertThat(indexManifestEntries.size()).isEqualTo(1);
- IndexFileMeta indexFileMeta = indexManifestEntries.get(0).indexFile();
return table.store()
.newIndexFileHandler()
- .readAllDeletionVectors(singletonList(indexFileMeta));
+ .readAllDeletionVectors(indexManifestEntries.get(0));
}
@Test