This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cfdbd73395 [core] Refactor IndexFileHandler to use dvIndex and
hashIndex
cfdbd73395 is described below
commit cfdbd73395e533fd1e5016ffee376ae80c5666aa
Author: JingsongLi <[email protected]>
AuthorDate: Fri Aug 22 21:38:19 2025 +0800
[core] Refactor IndexFileHandler to use dvIndex and hashIndex
---
.../apache/paimon/compact/CompactDeletionFile.java | 12 ++---
.../deletionvectors/BucketedDvMaintainer.java | 21 +++-----
.../append/BaseAppendDeleteFileMaintainer.java | 8 +--
.../paimon/iceberg/IcebergCommitCallback.java | 11 ++--
.../paimon/index/DynamicBucketIndexMaintainer.java | 22 ++++----
.../org/apache/paimon/index/HashIndexFile.java | 14 +++++
.../java/org/apache/paimon/index/IndexFile.java | 11 ++--
.../org/apache/paimon/index/IndexFileHandler.java | 61 ++++++----------------
.../org/apache/paimon/index/PartitionIndex.java | 3 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 10 +++-
.../deletionvectors/BucketedDvMaintainerTest.java | 4 +-
.../append/AppendDeletionFileMaintainerHelper.java | 2 +-
.../index/DynamicBucketIndexMaintainerTest.java | 6 ++-
.../paimon/index/HashBucketAssignerTest.java | 26 ++++-----
.../paimon/operation/FileStoreCommitTest.java | 29 +++++-----
.../apache/paimon/flink/copy/CopyFilesUtil.java | 1 -
.../paimon/flink/copy/CopyMetaFilesFunction.java | 1 -
17 files changed, 115 insertions(+), 127 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
index f52c3a41d8..18a3169330 100644
---
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
@@ -19,7 +19,7 @@
package org.apache.paimon.compact;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
-import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.index.IndexFileMeta;
import javax.annotation.Nullable;
@@ -42,7 +42,7 @@ public interface CompactDeletionFile {
*/
static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) {
Optional<IndexFileMeta> file = maintainer.writeDeletionVectorsIndex();
- return new GeneratedDeletionFile(file.orElse(null),
maintainer.indexFileHandler());
+ return new GeneratedDeletionFile(file.orElse(null),
maintainer.dvIndexFile());
}
/** For sync compaction, only create deletion files when prepareCommit. */
@@ -54,14 +54,14 @@ public interface CompactDeletionFile {
class GeneratedDeletionFile implements CompactDeletionFile {
@Nullable private final IndexFileMeta deletionFile;
- private final IndexFileHandler fileHandler;
+ private final DeletionVectorsIndexFile dvIndexFile;
private boolean getInvoked = false;
public GeneratedDeletionFile(
- @Nullable IndexFileMeta deletionFile, IndexFileHandler
fileHandler) {
+ @Nullable IndexFileMeta deletionFile, DeletionVectorsIndexFile
dvIndexFile) {
this.deletionFile = deletionFile;
- this.fileHandler = fileHandler;
+ this.dvIndexFile = dvIndexFile;
}
@Override
@@ -92,7 +92,7 @@ public interface CompactDeletionFile {
@Override
public void clean() {
if (deletionFile != null) {
- fileHandler.deleteIndexFile(deletionFile);
+ dvIndexFile.delete(deletionFile.fileName());
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
index 1feb135efc..eef21f3b54 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
@@ -33,16 +33,16 @@ import java.util.Optional;
/** Maintainer of deletionVectors index. */
public class BucketedDvMaintainer {
- private final IndexFileHandler indexFileHandler;
+ private final DeletionVectorsIndexFile dvIndexFile;
private final Map<String, DeletionVector> deletionVectors;
protected final boolean bitmap64;
private boolean modified;
private BucketedDvMaintainer(
- IndexFileHandler fileHandler, Map<String, DeletionVector>
deletionVectors) {
- this.indexFileHandler = fileHandler;
+ DeletionVectorsIndexFile dvIndexFile, Map<String, DeletionVector>
deletionVectors) {
+ this.dvIndexFile = dvIndexFile;
this.deletionVectors = deletionVectors;
- this.bitmap64 = indexFileHandler.deletionVectorsIndex().bitmap64();
+ this.bitmap64 = dvIndexFile.bitmap64();
this.modified = false;
}
@@ -114,8 +114,7 @@ public class BucketedDvMaintainer {
public Optional<IndexFileMeta> writeDeletionVectorsIndex() {
if (modified) {
modified = false;
- return Optional.of(
-
indexFileHandler.deletionVectorsIndex().writeSingleFile(deletionVectors));
+ return Optional.of(dvIndexFile.writeSingleFile(deletionVectors));
}
return Optional.empty();
}
@@ -131,8 +130,8 @@ public class BucketedDvMaintainer {
return Optional.ofNullable(deletionVectors.get(fileName));
}
- public IndexFileHandler indexFileHandler() {
- return indexFileHandler;
+ public DeletionVectorsIndexFile dvIndexFile() {
+ return dvIndexFile;
}
@VisibleForTesting
@@ -170,12 +169,8 @@ public class BucketedDvMaintainer {
return create(deletionVectors);
}
- public BucketedDvMaintainer create() {
- return create(new HashMap<>());
- }
-
public BucketedDvMaintainer create(Map<String, DeletionVector>
deletionVectors) {
- return new BucketedDvMaintainer(handler, deletionVectors);
+ return new BucketedDvMaintainer(handler.dvIndex(),
deletionVectors);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
index ae3d3e8a97..729f660177 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
@@ -80,20 +80,20 @@ public interface BaseAppendDeleteFileMaintainer {
.collect(Collectors.toList());
Map<String, DeletionFile> deletionFiles = new HashMap<>();
for (IndexManifestEntry file : manifestEntries) {
- IndexFileMeta meta = file.indexFile();
- LinkedHashMap<String, DeletionVectorMeta> dvMetas =
meta.deletionVectorMetas();
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+ file.indexFile().deletionVectorMetas();
checkNotNull(dvMetas);
for (DeletionVectorMeta dvMeta : dvMetas.values()) {
deletionFiles.put(
dvMeta.dataFileName(),
new DeletionFile(
- indexFileHandler.filePath(meta).toString(),
+ indexFileHandler.filePath(file).toString(),
dvMeta.offset(),
dvMeta.length(),
dvMeta.cardinality()));
}
}
return new AppendDeleteFileMaintainer(
- indexFileHandler.deletionVectorsIndex(), partition,
manifestEntries, deletionFiles);
+ indexFileHandler.dvIndex(), partition, manifestEntries,
deletionFiles);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 273daef49f..f25c39e06b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -43,7 +43,6 @@ import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -1148,8 +1147,8 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
return Collections.emptyList();
}
for (IndexManifestEntry entry : newIndexes) {
- IndexFileMeta indexFileMeta = entry.indexFile();
- LinkedHashMap<String, DeletionVectorMeta> dvMetas =
indexFileMeta.deletionVectorMetas();
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+ entry.indexFile().deletionVectorMetas();
Path bucketPath =
fileStorePathFactory.bucketPath(entry.partition(), entry.bucket());
if (dvMetas != null) {
for (DeletionVectorMeta dvMeta : dvMetas.values()) {
@@ -1162,16 +1161,16 @@ public class IcebergCommitCallback implements
CommitCallback, TagCallback {
"cardinality in DeletionVector is null, stop
generate dv for iceberg. "
+ "dataFile path is {}, indexFile path is
{}",
new Path(bucketPath, dvMeta.dataFileName()),
-
indexFileHandler.filePath(indexFileMeta).toString());
+ indexFileHandler.filePath(entry).toString());
IcebergDataFileMeta deleteFileMeta =
IcebergDataFileMeta.createForDeleteFile(
IcebergDataFileMeta.Content.POSITION_DELETES,
-
indexFileHandler.filePath(indexFileMeta).toString(),
+
indexFileHandler.filePath(entry).toString(),
PUFFIN_FORMAT,
entry.partition(),
dvMeta.cardinality(),
- indexFileMeta.fileSize(),
+ entry.indexFile().fileSize(),
new Path(bucketPath,
dvMeta.dataFileName()).toString(),
(long) dvMeta.offset(),
(long) dvMeta.length());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
index 1d0ca8e4a1..fbf95d7a14 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
@@ -36,25 +36,25 @@ import java.util.List;
/** An Index Maintainer for dynamic bucket to maintain key hashcode in a
bucket. */
public class DynamicBucketIndexMaintainer {
- private final IndexFileHandler fileHandler;
+ private final HashIndexFile indexFile;
private final IntHashSet hashcode;
private boolean modified;
private DynamicBucketIndexMaintainer(
- IndexFileHandler fileHandler, @Nullable IndexFileMeta
restoredFile) {
- this.fileHandler = fileHandler;
+ HashIndexFile indexFile, @Nullable IndexFileMeta restoredFile) {
+ this.indexFile = indexFile;
IntHashSet hashcode = new IntHashSet();
if (restoredFile != null) {
hashcode = new IntHashSet((int) restoredFile.rowCount());
- restore(fileHandler, hashcode, restoredFile);
+ restore(indexFile, hashcode, restoredFile);
}
this.hashcode = hashcode;
this.modified = false;
}
- private void restore(IndexFileHandler fileHandler, IntHashSet hashcode,
IndexFileMeta file) {
- try (IntIterator iterator = fileHandler.readHashIndex(file)) {
+ private void restore(HashIndexFile indexFile, IntHashSet hashcode,
IndexFileMeta file) {
+ try (IntIterator iterator = indexFile.read(file.fileName())) {
while (true) {
try {
hashcode.add(iterator.next());
@@ -80,8 +80,12 @@ public class DynamicBucketIndexMaintainer {
public List<IndexFileMeta> prepareCommit() {
if (modified) {
- IndexFileMeta entry =
- fileHandler.writeHashIndex(hashcode.size(),
hashcode.toIntIterator());
+ IndexFileMeta entry;
+ try {
+ entry = indexFile.write(hashcode.size(),
hashcode.toIntIterator());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
modified = false;
return Collections.singletonList(entry);
}
@@ -107,7 +111,7 @@ public class DynamicBucketIndexMaintainer {
}
public DynamicBucketIndexMaintainer create(@Nullable IndexFileMeta
restoredFile) {
- return new DynamicBucketIndexMaintainer(handler, restoredFile);
+ return new DynamicBucketIndexMaintainer(handler.hashIndex(),
restoredFile);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
index 91efb9b654..33deb96a62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
@@ -24,6 +24,7 @@ import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.PathFactory;
import java.io.IOException;
+import java.util.List;
import static org.apache.paimon.utils.IntFileUtils.readInts;
import static org.apache.paimon.utils.IntFileUtils.writeInts;
@@ -45,9 +46,22 @@ public class HashIndexFile extends IndexFile {
return readInts(fileIO, pathFactory.toPath(fileName));
}
+ public List<Integer> readList(String fileName) throws IOException {
+ return IntIterator.toIntList(read(fileName));
+ }
+
public String write(IntIterator input) throws IOException {
Path path = pathFactory.newPath();
writeInts(fileIO, path, input);
return path.getName();
}
+
+ public IndexFileMeta write(int size, IntIterator input) throws IOException
{
+ String fileName = write(input);
+ return new IndexFileMeta(HASH_INDEX, fileName, fileSize(fileName),
size);
+ }
+
+ public IndexFileMeta write(int[] ints) throws IOException {
+ return write(ints.length, IntIterator.create(ints));
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
index 2106113eb7..441cdd80ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
@@ -19,6 +19,7 @@
package org.apache.paimon.index;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.PathFactory;
import java.io.IOException;
@@ -36,21 +37,25 @@ public abstract class IndexFile {
this.pathFactory = pathFactory;
}
+ public Path path(String fileName) {
+ return pathFactory.toPath(fileName);
+ }
+
public long fileSize(String fileName) {
try {
- return fileIO.getFileSize(pathFactory.toPath(fileName));
+ return fileIO.getFileSize(path(fileName));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void delete(String fileName) {
- fileIO.deleteQuietly(pathFactory.toPath(fileName));
+ fileIO.deleteQuietly(path(fileName));
}
public boolean exists(String fileName) {
try {
- return fileIO.exists(pathFactory.toPath(fileName));
+ return fileIO.exists(path(fileName));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 38c4df0ae9..188efe9348 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -25,13 +25,11 @@ import
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -51,23 +49,27 @@ public class IndexFileHandler {
private final PathFactory pathFactory;
private final IndexManifestFile indexManifestFile;
private final HashIndexFile hashIndex;
- private final DeletionVectorsIndexFile deletionVectorsIndex;
+ private final DeletionVectorsIndexFile dvIndex;
public IndexFileHandler(
SnapshotManager snapshotManager,
PathFactory pathFactory,
IndexManifestFile indexManifestFile,
HashIndexFile hashIndex,
- DeletionVectorsIndexFile deletionVectorsIndex) {
+ DeletionVectorsIndexFile dvIndex) {
this.snapshotManager = snapshotManager;
this.pathFactory = pathFactory;
this.indexManifestFile = indexManifestFile;
this.hashIndex = hashIndex;
- this.deletionVectorsIndex = deletionVectorsIndex;
+ this.dvIndex = dvIndex;
}
- public DeletionVectorsIndexFile deletionVectorsIndex() {
- return this.deletionVectorsIndex;
+ public HashIndexFile hashIndex() {
+ return this.hashIndex;
+ }
+
+ public DeletionVectorsIndexFile dvIndex() {
+ return this.dvIndex;
}
public Optional<IndexFileMeta> scanHashIndex(
@@ -163,38 +165,8 @@ public class IndexFileHandler {
return result;
}
- public Path filePath(IndexFileMeta file) {
- return pathFactory.toPath(file.fileName());
- }
-
- public List<Integer> readHashIndexList(IndexFileMeta file) {
- return IntIterator.toIntList(readHashIndex(file));
- }
-
- public IntIterator readHashIndex(IndexFileMeta file) {
- if (!file.indexType().equals(HASH_INDEX)) {
- throw new IllegalArgumentException("Input file is not hash index:
" + file.indexType());
- }
-
- try {
- return hashIndex.read(file.fileName());
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
-
- public IndexFileMeta writeHashIndex(int[] ints) {
- return writeHashIndex(ints.length, IntIterator.create(ints));
- }
-
- public IndexFileMeta writeHashIndex(int size, IntIterator iterator) {
- String file;
- try {
- file = hashIndex.write(iterator);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- return new IndexFileMeta(HASH_INDEX, file, hashIndex.fileSize(file),
size);
+ public Path filePath(IndexManifestEntry entry) {
+ return pathFactory.toPath(entry.indexFile().fileName());
}
public boolean existsManifest(String indexManifest) {
@@ -215,7 +187,7 @@ public class IndexFileHandler {
case HASH_INDEX:
return hashIndex;
case DELETION_VECTORS_INDEX:
- return deletionVectorsIndex;
+ return dvIndex;
default:
throw new IllegalArgumentException("Unknown index type: " +
file.indexType());
}
@@ -225,11 +197,8 @@ public class IndexFileHandler {
return indexFile(file.indexFile()).exists(file.indexFile().fileName());
}
- public void deleteIndexFile(IndexManifestEntry file) {
- deleteIndexFile(file.indexFile());
- }
-
- public void deleteIndexFile(IndexFileMeta file) {
+ public void deleteIndexFile(IndexManifestEntry entry) {
+ IndexFileMeta file = entry.indexFile();
indexFile(file).delete(file.fileName());
}
@@ -243,6 +212,6 @@ public class IndexFileHandler {
indexFile.indexType().equals(DELETION_VECTORS_INDEX),
"Input file is not deletion vectors index " +
indexFile.indexType());
}
- return deletionVectorsIndex.readAllDeletionVectors(fileMetas);
+ return dvIndex.readAllDeletionVectors(fileMetas);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 8241270a87..958dcfcdee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -124,10 +124,11 @@ public class PartitionIndex {
IntPredicate loadFilter,
IntPredicate bucketFilter) {
List<IndexManifestEntry> files =
indexFileHandler.scanEntries(HASH_INDEX, partition);
+ HashIndexFile hashIndex = indexFileHandler.hashIndex();
Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder();
Map<Integer, Long> buckets = new HashMap<>();
for (IndexManifestEntry file : files) {
- try (IntIterator iterator =
indexFileHandler.readHashIndex(file.indexFile())) {
+ try (IntIterator iterator =
hashIndex.read(file.indexFile().fileName())) {
while (true) {
try {
int hash = iterator.next();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 858d75ceca..ce3f2e9c06 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
@@ -392,6 +393,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
if (deletionVectors && deletionIndexFilesMap != null) {
builder.withDataDeletionFiles(
getDeletionFiles(
+ indexFileHandler.dvIndex(),
dataFiles,
deletionIndexFilesMap.getOrDefault(
Pair.of(partition, bucket),
@@ -518,11 +520,13 @@ public class SnapshotReaderImpl implements SnapshotReader
{
&& deletionIndexFilesMap != null) {
builder.withBeforeDeletionFiles(
getDeletionFiles(
+ indexFileHandler.dvIndex(),
before,
beforDeletionIndexFilesMap.getOrDefault(
Pair.of(part, bucket),
Collections.emptyList())));
builder.withDataDeletionFiles(
getDeletionFiles(
+ indexFileHandler.dvIndex(),
data,
deletionIndexFilesMap.getOrDefault(
Pair.of(part, bucket),
Collections.emptyList())));
@@ -556,7 +560,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
}
private List<DeletionFile> getDeletionFiles(
- List<DataFileMeta> dataFiles, List<IndexFileMeta> indexFileMetas) {
+ DeletionVectorsIndexFile indexFile,
+ List<DataFileMeta> dataFiles,
+ List<IndexFileMeta> indexFileMetas) {
List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
for (IndexFileMeta indexFileMeta : indexFileMetas) {
@@ -574,7 +580,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
if (dvMetas != null && dvMetas.containsKey(file.fileName())) {
deletionFiles.add(
new DeletionFile(
-
indexFileHandler.filePath(indexFileMeta).toString(),
+
indexFile.path(indexFileMeta.fileName()).toString(),
dvMetas.get(file.fileName()).offset(),
dvMetas.get(file.fileName()).length(),
dvMetas.get(file.fileName()).cardinality()));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 2128ae8714..5d3b1594ee 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -89,7 +89,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer = factory.create();
+ BucketedDvMaintainer dvMaintainer = factory.create(new HashMap<>());
DeletionVector deletionVector1 = createDeletionVector(bitmap64);
deletionVector1.delete(1);
deletionVector1.delete(3);
@@ -188,7 +188,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
// write first kind dv
initIndexHandler(bitmap64);
BucketedDvMaintainer.Factory factory1 =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer1 = factory1.create();
+ BucketedDvMaintainer dvMaintainer1 = factory1.create(new HashMap<>());
dvMaintainer1.notifyNewDeletion("f1", 1);
dvMaintainer1.notifyNewDeletion("f1", 3);
dvMaintainer1.notifyNewDeletion("f2", 1);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
index 892d219278..98adf5e8fb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
@@ -48,6 +48,6 @@ public class AppendDeletionFileMaintainerHelper {
indexManifestEntry.indexFile().fileName()))
.collect(Collectors.toList());
return new AppendDeleteFileMaintainer(
- indexFileHandler.deletionVectorsIndex(), partition, manifests,
deletionFiles);
+ indexFileHandler.dvIndex(), partition, manifests,
deletionFiles);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
index 7575851496..4c6c79d17e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -69,7 +70,8 @@ public class DynamicBucketIndexMaintainerTest extends
PrimaryKeyTableTestBase {
return Pair.of(GenericRow.of(partition, key, value), bucket);
}
- private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage>
messages) {
+ private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage>
messages)
+ throws IOException {
Map<BinaryRow, Map<Integer, int[]>> index = new HashMap<>();
for (CommitMessage commitMessage : messages) {
CommitMessageImpl message = (CommitMessageImpl) commitMessage;
@@ -78,7 +80,7 @@ public class DynamicBucketIndexMaintainerTest extends
PrimaryKeyTableTestBase {
continue;
}
int[] ints =
- fileHandler.readHashIndexList(files.get(0)).stream()
+
fileHandler.hashIndex().readList(files.get(0).fileName()).stream()
.mapToInt(Integer::intValue)
.toArray();
index.computeIfAbsent(message.partition(), k -> new HashMap<>())
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 43f594073f..6ece642aa7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -44,11 +45,13 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
private IndexFileHandler fileHandler;
+ private HashIndexFile indexFile;
private StreamTableCommit commit;
@BeforeEach
public void beforeEach() throws Exception {
fileHandler = table.store().newIndexFileHandler();
+ indexFile = fileHandler.hashIndex();
commit =
table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit();
}
@@ -222,9 +225,9 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
}
@Test
- public void testAssignRestore() {
- IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
- IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
+ public void testAssignRestore() throws IOException {
+ IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
+ IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
commit.commit(
0,
Arrays.asList(
@@ -248,9 +251,9 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
}
@Test
- public void testAssignRestoreWithUpperBound() {
- IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
- IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
+ public void testAssignRestoreWithUpperBound() throws IOException {
+ IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
+ IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
commit.commit(
0,
Arrays.asList(
@@ -303,7 +306,7 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
}
@Test
- public void testIndexEliminate() {
+ public void testIndexEliminate() throws IOException {
HashBucketAssigner assigner = createAssigner(1, 1, 0);
// checkpoint 0
@@ -313,10 +316,8 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
0,
Arrays.asList(
- createCommitMessage(
- row(1), 0, 1, fileHandler.writeHashIndex(new
int[] {0})),
- createCommitMessage(
- row(2), 0, 1, fileHandler.writeHashIndex(new
int[] {0}))));
+ createCommitMessage(row(1), 0, 1, indexFile.write(new
int[] {0})),
+ createCommitMessage(row(2), 0, 1, indexFile.write(new
int[] {0}))));
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1),
row(2));
// checkpoint 1, but no commit
@@ -332,8 +333,7 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
commit.commit(
1,
Collections.singletonList(
- createCommitMessage(
- row(1), 0, 1, fileHandler.writeHashIndex(new
int[] {1}))));
+ createCommitMessage(row(1), 0, 1, indexFile.write(new
int[] {1}))));
assigner.prepareCommit(3);
assertThat(assigner.currentPartitions()).isEmpty();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 5701106be4..f9a584b508 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.HashIndexFile;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
@@ -714,6 +715,7 @@ public class FileStoreCommitTest {
public void testIndexFiles() throws Exception {
TestFileStore store = createStore(false, 2);
IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+ HashIndexFile hashIndex = indexFileHandler.hashIndex();
KeyValue record1 = gen.next();
BinaryRow part1 = gen.getPartition(record1);
@@ -725,15 +727,9 @@ public class FileStoreCommitTest {
}
// init write
- store.commitDataIndex(
- record1,
- gen::getPartition,
- 0,
- indexFileHandler.writeHashIndex(new int[] {1, 2, 5}));
- store.commitDataIndex(
- record1, gen::getPartition, 1,
indexFileHandler.writeHashIndex(new int[] {6, 8}));
- store.commitDataIndex(
- record2, gen::getPartition, 2,
indexFileHandler.writeHashIndex(new int[] {3, 5}));
+ store.commitDataIndex(record1, gen::getPartition, 0,
hashIndex.write(new int[] {1, 2, 5}));
+ store.commitDataIndex(record1, gen::getPartition, 1,
hashIndex.write(new int[] {6, 8}));
+ store.commitDataIndex(record2, gen::getPartition, 2,
hashIndex.write(new int[] {3, 5}));
Snapshot snapshot = store.snapshotManager().latestSnapshot();
@@ -744,12 +740,12 @@ public class FileStoreCommitTest {
IndexManifestEntry indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
0).findAny().get();
-
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(1, 2, 5);
indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
1).findAny().get();
-
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(6, 8);
// assert part2
@@ -757,12 +753,11 @@ public class FileStoreCommitTest {
indexFileHandler.scanEntries(snapshot, HASH_INDEX, part2);
assertThat(part2Index.size()).isEqualTo(1);
assertThat(part2Index.get(0).bucket()).isEqualTo(2);
-
assertThat(indexFileHandler.readHashIndexList(part2Index.get(0).indexFile()))
+
assertThat(hashIndex.readList(part2Index.get(0).indexFile().fileName()))
.containsExactlyInAnyOrder(3, 5);
// update part1
- store.commitDataIndex(
- record1, gen::getPartition, 0,
indexFileHandler.writeHashIndex(new int[] {1, 4}));
+ store.commitDataIndex(record1, gen::getPartition, 0,
hashIndex.write(new int[] {1, 4}));
snapshot = store.snapshotManager().latestSnapshot();
// assert update part1
@@ -771,18 +766,18 @@ public class FileStoreCommitTest {
indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
0).findAny().get();
-
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(1, 4);
indexManifestEntry =
part1Index.stream().filter(entry -> entry.bucket() ==
1).findAny().get();
-
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
.containsExactlyInAnyOrder(6, 8);
// assert scan one bucket
Optional<IndexFileMeta> file =
indexFileHandler.scanHashIndex(snapshot, part1, 0);
assertThat(file).isPresent();
-
assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1,
4);
+
assertThat(hashIndex.readList(file.get().fileName())).containsExactlyInAnyOrder(1,
4);
// overwrite one partition
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE,
true);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
index 3c1879eec0..a78eddab21 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
@@ -91,7 +91,6 @@ public class CopyFilesUtil {
() ->
indexFileHandler.readManifestWithIOException(indexManifest));
if (indexManifestEntries != null) {
indexManifestEntries.stream()
- .map(IndexManifestEntry::indexFile)
.map(indexFileHandler::filePath)
.forEach(fileList::add);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
index 6ffe5c57ee..732d1db0d3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
@@ -186,7 +186,6 @@ public class CopyMetaFilesFunction extends
ProcessFunction<Tuple2<String, String
List<Path> indexFileList = new ArrayList<>();
if (indexManifestEntries != null) {
indexManifestEntries.stream()
- .map(IndexManifestEntry::indexFile)
.map(indexFileHandler::filePath)
.forEach(indexFileList::add);
}