This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cfdbd73395 [core] Refactor IndexFileHandler to use dvIndex and 
hashIndex
cfdbd73395 is described below

commit cfdbd73395e533fd1e5016ffee376ae80c5666aa
Author: JingsongLi <[email protected]>
AuthorDate: Fri Aug 22 21:38:19 2025 +0800

    [core] Refactor IndexFileHandler to use dvIndex and hashIndex
---
 .../apache/paimon/compact/CompactDeletionFile.java | 12 ++---
 .../deletionvectors/BucketedDvMaintainer.java      | 21 +++-----
 .../append/BaseAppendDeleteFileMaintainer.java     |  8 +--
 .../paimon/iceberg/IcebergCommitCallback.java      | 11 ++--
 .../paimon/index/DynamicBucketIndexMaintainer.java | 22 ++++----
 .../org/apache/paimon/index/HashIndexFile.java     | 14 +++++
 .../java/org/apache/paimon/index/IndexFile.java    | 11 ++--
 .../org/apache/paimon/index/IndexFileHandler.java  | 61 ++++++----------------
 .../org/apache/paimon/index/PartitionIndex.java    |  3 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  | 10 +++-
 .../deletionvectors/BucketedDvMaintainerTest.java  |  4 +-
 .../append/AppendDeletionFileMaintainerHelper.java |  2 +-
 .../index/DynamicBucketIndexMaintainerTest.java    |  6 ++-
 .../paimon/index/HashBucketAssignerTest.java       | 26 ++++-----
 .../paimon/operation/FileStoreCommitTest.java      | 29 +++++-----
 .../apache/paimon/flink/copy/CopyFilesUtil.java    |  1 -
 .../paimon/flink/copy/CopyMetaFilesFunction.java   |  1 -
 17 files changed, 115 insertions(+), 127 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
index f52c3a41d8..18a3169330 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.compact;
 
 import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
-import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.index.IndexFileMeta;
 
 import javax.annotation.Nullable;
@@ -42,7 +42,7 @@ public interface CompactDeletionFile {
      */
     static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) {
         Optional<IndexFileMeta> file = maintainer.writeDeletionVectorsIndex();
-        return new GeneratedDeletionFile(file.orElse(null), 
maintainer.indexFileHandler());
+        return new GeneratedDeletionFile(file.orElse(null), 
maintainer.dvIndexFile());
     }
 
     /** For sync compaction, only create deletion files when prepareCommit. */
@@ -54,14 +54,14 @@ public interface CompactDeletionFile {
     class GeneratedDeletionFile implements CompactDeletionFile {
 
         @Nullable private final IndexFileMeta deletionFile;
-        private final IndexFileHandler fileHandler;
+        private final DeletionVectorsIndexFile dvIndexFile;
 
         private boolean getInvoked = false;
 
         public GeneratedDeletionFile(
-                @Nullable IndexFileMeta deletionFile, IndexFileHandler 
fileHandler) {
+                @Nullable IndexFileMeta deletionFile, DeletionVectorsIndexFile 
dvIndexFile) {
             this.deletionFile = deletionFile;
-            this.fileHandler = fileHandler;
+            this.dvIndexFile = dvIndexFile;
         }
 
         @Override
@@ -92,7 +92,7 @@ public interface CompactDeletionFile {
         @Override
         public void clean() {
             if (deletionFile != null) {
-                fileHandler.deleteIndexFile(deletionFile);
+                dvIndexFile.delete(deletionFile.fileName());
             }
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
index 1feb135efc..eef21f3b54 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
@@ -33,16 +33,16 @@ import java.util.Optional;
 /** Maintainer of deletionVectors index. */
 public class BucketedDvMaintainer {
 
-    private final IndexFileHandler indexFileHandler;
+    private final DeletionVectorsIndexFile dvIndexFile;
     private final Map<String, DeletionVector> deletionVectors;
     protected final boolean bitmap64;
     private boolean modified;
 
     private BucketedDvMaintainer(
-            IndexFileHandler fileHandler, Map<String, DeletionVector> 
deletionVectors) {
-        this.indexFileHandler = fileHandler;
+            DeletionVectorsIndexFile dvIndexFile, Map<String, DeletionVector> 
deletionVectors) {
+        this.dvIndexFile = dvIndexFile;
         this.deletionVectors = deletionVectors;
-        this.bitmap64 = indexFileHandler.deletionVectorsIndex().bitmap64();
+        this.bitmap64 = dvIndexFile.bitmap64();
         this.modified = false;
     }
 
@@ -114,8 +114,7 @@ public class BucketedDvMaintainer {
     public Optional<IndexFileMeta> writeDeletionVectorsIndex() {
         if (modified) {
             modified = false;
-            return Optional.of(
-                    
indexFileHandler.deletionVectorsIndex().writeSingleFile(deletionVectors));
+            return Optional.of(dvIndexFile.writeSingleFile(deletionVectors));
         }
         return Optional.empty();
     }
@@ -131,8 +130,8 @@ public class BucketedDvMaintainer {
         return Optional.ofNullable(deletionVectors.get(fileName));
     }
 
-    public IndexFileHandler indexFileHandler() {
-        return indexFileHandler;
+    public DeletionVectorsIndexFile dvIndexFile() {
+        return dvIndexFile;
     }
 
     @VisibleForTesting
@@ -170,12 +169,8 @@ public class BucketedDvMaintainer {
             return create(deletionVectors);
         }
 
-        public BucketedDvMaintainer create() {
-            return create(new HashMap<>());
-        }
-
         public BucketedDvMaintainer create(Map<String, DeletionVector> 
deletionVectors) {
-            return new BucketedDvMaintainer(handler, deletionVectors);
+            return new BucketedDvMaintainer(handler.dvIndex(), 
deletionVectors);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
index ae3d3e8a97..729f660177 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
@@ -80,20 +80,20 @@ public interface BaseAppendDeleteFileMaintainer {
                         .collect(Collectors.toList());
         Map<String, DeletionFile> deletionFiles = new HashMap<>();
         for (IndexManifestEntry file : manifestEntries) {
-            IndexFileMeta meta = file.indexFile();
-            LinkedHashMap<String, DeletionVectorMeta> dvMetas = 
meta.deletionVectorMetas();
+            LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+                    file.indexFile().deletionVectorMetas();
             checkNotNull(dvMetas);
             for (DeletionVectorMeta dvMeta : dvMetas.values()) {
                 deletionFiles.put(
                         dvMeta.dataFileName(),
                         new DeletionFile(
-                                indexFileHandler.filePath(meta).toString(),
+                                indexFileHandler.filePath(file).toString(),
                                 dvMeta.offset(),
                                 dvMeta.length(),
                                 dvMeta.cardinality()));
             }
         }
         return new AppendDeleteFileMaintainer(
-                indexFileHandler.deletionVectorsIndex(), partition, 
manifestEntries, deletionFiles);
+                indexFileHandler.dvIndex(), partition, manifestEntries, 
deletionFiles);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 273daef49f..f25c39e06b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -43,7 +43,6 @@ import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
 import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
 import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -1148,8 +1147,8 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
             return Collections.emptyList();
         }
         for (IndexManifestEntry entry : newIndexes) {
-            IndexFileMeta indexFileMeta = entry.indexFile();
-            LinkedHashMap<String, DeletionVectorMeta> dvMetas = 
indexFileMeta.deletionVectorMetas();
+            LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+                    entry.indexFile().deletionVectorMetas();
             Path bucketPath = 
fileStorePathFactory.bucketPath(entry.partition(), entry.bucket());
             if (dvMetas != null) {
                 for (DeletionVectorMeta dvMeta : dvMetas.values()) {
@@ -1162,16 +1161,16 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                             "cardinality in DeletionVector is null, stop 
generate dv for iceberg. "
                                     + "dataFile path is {}, indexFile path is 
{}",
                             new Path(bucketPath, dvMeta.dataFileName()),
-                            
indexFileHandler.filePath(indexFileMeta).toString());
+                            indexFileHandler.filePath(entry).toString());
 
                     IcebergDataFileMeta deleteFileMeta =
                             IcebergDataFileMeta.createForDeleteFile(
                                     
IcebergDataFileMeta.Content.POSITION_DELETES,
-                                    
indexFileHandler.filePath(indexFileMeta).toString(),
+                                    
indexFileHandler.filePath(entry).toString(),
                                     PUFFIN_FORMAT,
                                     entry.partition(),
                                     dvMeta.cardinality(),
-                                    indexFileMeta.fileSize(),
+                                    entry.indexFile().fileSize(),
                                     new Path(bucketPath, 
dvMeta.dataFileName()).toString(),
                                     (long) dvMeta.offset(),
                                     (long) dvMeta.length());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
index 1d0ca8e4a1..fbf95d7a14 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
@@ -36,25 +36,25 @@ import java.util.List;
 /** An Index Maintainer for dynamic bucket to maintain key hashcode in a 
bucket. */
 public class DynamicBucketIndexMaintainer {
 
-    private final IndexFileHandler fileHandler;
+    private final HashIndexFile indexFile;
     private final IntHashSet hashcode;
 
     private boolean modified;
 
     private DynamicBucketIndexMaintainer(
-            IndexFileHandler fileHandler, @Nullable IndexFileMeta 
restoredFile) {
-        this.fileHandler = fileHandler;
+            HashIndexFile indexFile, @Nullable IndexFileMeta restoredFile) {
+        this.indexFile = indexFile;
         IntHashSet hashcode = new IntHashSet();
         if (restoredFile != null) {
             hashcode = new IntHashSet((int) restoredFile.rowCount());
-            restore(fileHandler, hashcode, restoredFile);
+            restore(indexFile, hashcode, restoredFile);
         }
         this.hashcode = hashcode;
         this.modified = false;
     }
 
-    private void restore(IndexFileHandler fileHandler, IntHashSet hashcode, 
IndexFileMeta file) {
-        try (IntIterator iterator = fileHandler.readHashIndex(file)) {
+    private void restore(HashIndexFile indexFile, IntHashSet hashcode, 
IndexFileMeta file) {
+        try (IntIterator iterator = indexFile.read(file.fileName())) {
             while (true) {
                 try {
                     hashcode.add(iterator.next());
@@ -80,8 +80,12 @@ public class DynamicBucketIndexMaintainer {
 
     public List<IndexFileMeta> prepareCommit() {
         if (modified) {
-            IndexFileMeta entry =
-                    fileHandler.writeHashIndex(hashcode.size(), 
hashcode.toIntIterator());
+            IndexFileMeta entry;
+            try {
+                entry = indexFile.write(hashcode.size(), 
hashcode.toIntIterator());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
             modified = false;
             return Collections.singletonList(entry);
         }
@@ -107,7 +111,7 @@ public class DynamicBucketIndexMaintainer {
         }
 
         public DynamicBucketIndexMaintainer create(@Nullable IndexFileMeta 
restoredFile) {
-            return new DynamicBucketIndexMaintainer(handler, restoredFile);
+            return new DynamicBucketIndexMaintainer(handler.hashIndex(), 
restoredFile);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
index 91efb9b654..33deb96a62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
@@ -24,6 +24,7 @@ import org.apache.paimon.utils.IntIterator;
 import org.apache.paimon.utils.PathFactory;
 
 import java.io.IOException;
+import java.util.List;
 
 import static org.apache.paimon.utils.IntFileUtils.readInts;
 import static org.apache.paimon.utils.IntFileUtils.writeInts;
@@ -45,9 +46,22 @@ public class HashIndexFile extends IndexFile {
         return readInts(fileIO, pathFactory.toPath(fileName));
     }
 
+    public List<Integer> readList(String fileName) throws IOException {
+        return IntIterator.toIntList(read(fileName));
+    }
+
     public String write(IntIterator input) throws IOException {
         Path path = pathFactory.newPath();
         writeInts(fileIO, path, input);
         return path.getName();
     }
+
+    public IndexFileMeta write(int size, IntIterator input) throws IOException 
{
+        String fileName = write(input);
+        return new IndexFileMeta(HASH_INDEX, fileName, fileSize(fileName), 
size);
+    }
+
+    public IndexFileMeta write(int[] ints) throws IOException {
+        return write(ints.length, IntIterator.create(ints));
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
index 2106113eb7..441cdd80ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.index;
 
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.PathFactory;
 
 import java.io.IOException;
@@ -36,21 +37,25 @@ public abstract class IndexFile {
         this.pathFactory = pathFactory;
     }
 
+    public Path path(String fileName) {
+        return pathFactory.toPath(fileName);
+    }
+
     public long fileSize(String fileName) {
         try {
-            return fileIO.getFileSize(pathFactory.toPath(fileName));
+            return fileIO.getFileSize(path(fileName));
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
     }
 
     public void delete(String fileName) {
-        fileIO.deleteQuietly(pathFactory.toPath(fileName));
+        fileIO.deleteQuietly(path(fileName));
     }
 
     public boolean exists(String fileName) {
         try {
-            return fileIO.exists(pathFactory.toPath(fileName));
+            return fileIO.exists(path(fileName));
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 38c4df0ae9..188efe9348 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -25,13 +25,11 @@ import 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.utils.IntIterator;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 import org.apache.paimon.utils.SnapshotManager;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -51,23 +49,27 @@ public class IndexFileHandler {
     private final PathFactory pathFactory;
     private final IndexManifestFile indexManifestFile;
     private final HashIndexFile hashIndex;
-    private final DeletionVectorsIndexFile deletionVectorsIndex;
+    private final DeletionVectorsIndexFile dvIndex;
 
     public IndexFileHandler(
             SnapshotManager snapshotManager,
             PathFactory pathFactory,
             IndexManifestFile indexManifestFile,
             HashIndexFile hashIndex,
-            DeletionVectorsIndexFile deletionVectorsIndex) {
+            DeletionVectorsIndexFile dvIndex) {
         this.snapshotManager = snapshotManager;
         this.pathFactory = pathFactory;
         this.indexManifestFile = indexManifestFile;
         this.hashIndex = hashIndex;
-        this.deletionVectorsIndex = deletionVectorsIndex;
+        this.dvIndex = dvIndex;
     }
 
-    public DeletionVectorsIndexFile deletionVectorsIndex() {
-        return this.deletionVectorsIndex;
+    public HashIndexFile hashIndex() {
+        return this.hashIndex;
+    }
+
+    public DeletionVectorsIndexFile dvIndex() {
+        return this.dvIndex;
     }
 
     public Optional<IndexFileMeta> scanHashIndex(
@@ -163,38 +165,8 @@ public class IndexFileHandler {
         return result;
     }
 
-    public Path filePath(IndexFileMeta file) {
-        return pathFactory.toPath(file.fileName());
-    }
-
-    public List<Integer> readHashIndexList(IndexFileMeta file) {
-        return IntIterator.toIntList(readHashIndex(file));
-    }
-
-    public IntIterator readHashIndex(IndexFileMeta file) {
-        if (!file.indexType().equals(HASH_INDEX)) {
-            throw new IllegalArgumentException("Input file is not hash index: 
" + file.indexType());
-        }
-
-        try {
-            return hashIndex.read(file.fileName());
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    public IndexFileMeta writeHashIndex(int[] ints) {
-        return writeHashIndex(ints.length, IntIterator.create(ints));
-    }
-
-    public IndexFileMeta writeHashIndex(int size, IntIterator iterator) {
-        String file;
-        try {
-            file = hashIndex.write(iterator);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-        return new IndexFileMeta(HASH_INDEX, file, hashIndex.fileSize(file), 
size);
+    public Path filePath(IndexManifestEntry entry) {
+        return pathFactory.toPath(entry.indexFile().fileName());
     }
 
     public boolean existsManifest(String indexManifest) {
@@ -215,7 +187,7 @@ public class IndexFileHandler {
             case HASH_INDEX:
                 return hashIndex;
             case DELETION_VECTORS_INDEX:
-                return deletionVectorsIndex;
+                return dvIndex;
             default:
                 throw new IllegalArgumentException("Unknown index type: " + 
file.indexType());
         }
@@ -225,11 +197,8 @@ public class IndexFileHandler {
         return indexFile(file.indexFile()).exists(file.indexFile().fileName());
     }
 
-    public void deleteIndexFile(IndexManifestEntry file) {
-        deleteIndexFile(file.indexFile());
-    }
-
-    public void deleteIndexFile(IndexFileMeta file) {
+    public void deleteIndexFile(IndexManifestEntry entry) {
+        IndexFileMeta file = entry.indexFile();
         indexFile(file).delete(file.fileName());
     }
 
@@ -243,6 +212,6 @@ public class IndexFileHandler {
                     indexFile.indexType().equals(DELETION_VECTORS_INDEX),
                     "Input file is not deletion vectors index " + 
indexFile.indexType());
         }
-        return deletionVectorsIndex.readAllDeletionVectors(fileMetas);
+        return dvIndex.readAllDeletionVectors(fileMetas);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java 
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 8241270a87..958dcfcdee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -124,10 +124,11 @@ public class PartitionIndex {
             IntPredicate loadFilter,
             IntPredicate bucketFilter) {
         List<IndexManifestEntry> files = 
indexFileHandler.scanEntries(HASH_INDEX, partition);
+        HashIndexFile hashIndex = indexFileHandler.hashIndex();
         Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder();
         Map<Integer, Long> buckets = new HashMap<>();
         for (IndexManifestEntry file : files) {
-            try (IntIterator iterator = 
indexFileHandler.readHashIndex(file.indexFile())) {
+            try (IntIterator iterator = 
hashIndex.read(file.indexFile().fileName())) {
                 while (true) {
                     try {
                         int hash = iterator.next();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 858d75ceca..ce3f2e9c06 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
@@ -392,6 +393,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                     if (deletionVectors && deletionIndexFilesMap != null) {
                         builder.withDataDeletionFiles(
                                 getDeletionFiles(
+                                        indexFileHandler.dvIndex(),
                                         dataFiles,
                                         deletionIndexFilesMap.getOrDefault(
                                                 Pair.of(partition, bucket),
@@ -518,11 +520,13 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                         && deletionIndexFilesMap != null) {
                     builder.withBeforeDeletionFiles(
                             getDeletionFiles(
+                                    indexFileHandler.dvIndex(),
                                     before,
                                     beforDeletionIndexFilesMap.getOrDefault(
                                             Pair.of(part, bucket), 
Collections.emptyList())));
                     builder.withDataDeletionFiles(
                             getDeletionFiles(
+                                    indexFileHandler.dvIndex(),
                                     data,
                                     deletionIndexFilesMap.getOrDefault(
                                             Pair.of(part, bucket), 
Collections.emptyList())));
@@ -556,7 +560,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
     }
 
     private List<DeletionFile> getDeletionFiles(
-            List<DataFileMeta> dataFiles, List<IndexFileMeta> indexFileMetas) {
+            DeletionVectorsIndexFile indexFile,
+            List<DataFileMeta> dataFiles,
+            List<IndexFileMeta> indexFileMetas) {
         List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
         Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
         for (IndexFileMeta indexFileMeta : indexFileMetas) {
@@ -574,7 +580,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 if (dvMetas != null && dvMetas.containsKey(file.fileName())) {
                     deletionFiles.add(
                             new DeletionFile(
-                                    
indexFileHandler.filePath(indexFileMeta).toString(),
+                                    
indexFile.path(indexFileMeta.fileName()).toString(),
                                     dvMetas.get(file.fileName()).offset(),
                                     dvMetas.get(file.fileName()).length(),
                                     
dvMetas.get(file.fileName()).cardinality()));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 2128ae8714..5d3b1594ee 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -89,7 +89,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
 
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
 
-        BucketedDvMaintainer dvMaintainer = factory.create();
+        BucketedDvMaintainer dvMaintainer = factory.create(new HashMap<>());
         DeletionVector deletionVector1 = createDeletionVector(bitmap64);
         deletionVector1.delete(1);
         deletionVector1.delete(3);
@@ -188,7 +188,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         // write first kind dv
         initIndexHandler(bitmap64);
         BucketedDvMaintainer.Factory factory1 = 
BucketedDvMaintainer.factory(fileHandler);
-        BucketedDvMaintainer dvMaintainer1 = factory1.create();
+        BucketedDvMaintainer dvMaintainer1 = factory1.create(new HashMap<>());
         dvMaintainer1.notifyNewDeletion("f1", 1);
         dvMaintainer1.notifyNewDeletion("f1", 3);
         dvMaintainer1.notifyNewDeletion("f2", 1);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
index 892d219278..98adf5e8fb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
@@ -48,6 +48,6 @@ public class AppendDeletionFileMaintainerHelper {
                                                 
indexManifestEntry.indexFile().fileName()))
                         .collect(Collectors.toList());
         return new AppendDeleteFileMaintainer(
-                indexFileHandler.deletionVectorsIndex(), partition, manifests, 
deletionFiles);
+                indexFileHandler.dvIndex(), partition, manifests, 
deletionFiles);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
index 7575851496..4c6c79d17e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.utils.Pair;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,7 +70,8 @@ public class DynamicBucketIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
         return Pair.of(GenericRow.of(partition, key, value), bucket);
     }
 
-    private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage> 
messages) {
+    private Map<BinaryRow, Map<Integer, int[]>> readIndex(List<CommitMessage> 
messages)
+            throws IOException {
         Map<BinaryRow, Map<Integer, int[]>> index = new HashMap<>();
         for (CommitMessage commitMessage : messages) {
             CommitMessageImpl message = (CommitMessageImpl) commitMessage;
@@ -78,7 +80,7 @@ public class DynamicBucketIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
                 continue;
             }
             int[] ints =
-                    fileHandler.readHashIndexList(files.get(0)).stream()
+                    
fileHandler.hashIndex().readList(files.get(0).fileName()).stream()
                             .mapToInt(Integer::intValue)
                             .toArray();
             index.computeIfAbsent(message.partition(), k -> new HashMap<>())
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 43f594073f..6ece642aa7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -44,11 +45,13 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
 
     private IndexFileHandler fileHandler;
+    private HashIndexFile indexFile;
     private StreamTableCommit commit;
 
     @BeforeEach
     public void beforeEach() throws Exception {
         fileHandler = table.store().newIndexFileHandler();
+        indexFile = fileHandler.hashIndex();
         commit = 
table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit();
     }
 
@@ -222,9 +225,9 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
     }
 
     @Test
-    public void testAssignRestore() {
-        IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
-        IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
+    public void testAssignRestore() throws IOException {
+        IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
+        IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
         commit.commit(
                 0,
                 Arrays.asList(
@@ -248,9 +251,9 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
     }
 
     @Test
-    public void testAssignRestoreWithUpperBound() {
-        IndexFileMeta bucket0 = fileHandler.writeHashIndex(new int[] {2, 5});
-        IndexFileMeta bucket2 = fileHandler.writeHashIndex(new int[] {4, 7});
+    public void testAssignRestoreWithUpperBound() throws IOException {
+        IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
+        IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
         commit.commit(
                 0,
                 Arrays.asList(
@@ -303,7 +306,7 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
     }
 
     @Test
-    public void testIndexEliminate() {
+    public void testIndexEliminate() throws IOException {
         HashBucketAssigner assigner = createAssigner(1, 1, 0);
 
         // checkpoint 0
@@ -313,10 +316,8 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 0,
                 Arrays.asList(
-                        createCommitMessage(
-                                row(1), 0, 1, fileHandler.writeHashIndex(new 
int[] {0})),
-                        createCommitMessage(
-                                row(2), 0, 1, fileHandler.writeHashIndex(new 
int[] {0}))));
+                        createCommitMessage(row(1), 0, 1, indexFile.write(new 
int[] {0})),
+                        createCommitMessage(row(2), 0, 1, indexFile.write(new 
int[] {0}))));
         
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1), 
row(2));
 
         // checkpoint 1, but no commit
@@ -332,8 +333,7 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 1,
                 Collections.singletonList(
-                        createCommitMessage(
-                                row(1), 0, 1, fileHandler.writeHashIndex(new 
int[] {1}))));
+                        createCommitMessage(row(1), 0, 1, indexFile.write(new 
int[] {1}))));
         assigner.prepareCommit(3);
         assertThat(assigner.currentPartitions()).isEmpty();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 5701106be4..f9a584b508 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.HashIndexFile;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileKind;
@@ -714,6 +715,7 @@ public class FileStoreCommitTest {
     public void testIndexFiles() throws Exception {
         TestFileStore store = createStore(false, 2);
         IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+        HashIndexFile hashIndex = indexFileHandler.hashIndex();
 
         KeyValue record1 = gen.next();
         BinaryRow part1 = gen.getPartition(record1);
@@ -725,15 +727,9 @@ public class FileStoreCommitTest {
         }
 
         // init write
-        store.commitDataIndex(
-                record1,
-                gen::getPartition,
-                0,
-                indexFileHandler.writeHashIndex(new int[] {1, 2, 5}));
-        store.commitDataIndex(
-                record1, gen::getPartition, 1, 
indexFileHandler.writeHashIndex(new int[] {6, 8}));
-        store.commitDataIndex(
-                record2, gen::getPartition, 2, 
indexFileHandler.writeHashIndex(new int[] {3, 5}));
+        store.commitDataIndex(record1, gen::getPartition, 0, 
hashIndex.write(new int[] {1, 2, 5}));
+        store.commitDataIndex(record1, gen::getPartition, 1, 
hashIndex.write(new int[] {6, 8}));
+        store.commitDataIndex(record2, gen::getPartition, 2, 
hashIndex.write(new int[] {3, 5}));
 
         Snapshot snapshot = store.snapshotManager().latestSnapshot();
 
@@ -744,12 +740,12 @@ public class FileStoreCommitTest {
 
         IndexManifestEntry indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
0).findAny().get();
-        
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(1, 2, 5);
 
         indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
1).findAny().get();
-        
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(6, 8);
 
         // assert part2
@@ -757,12 +753,11 @@ public class FileStoreCommitTest {
                 indexFileHandler.scanEntries(snapshot, HASH_INDEX, part2);
         assertThat(part2Index.size()).isEqualTo(1);
         assertThat(part2Index.get(0).bucket()).isEqualTo(2);
-        
assertThat(indexFileHandler.readHashIndexList(part2Index.get(0).indexFile()))
+        
assertThat(hashIndex.readList(part2Index.get(0).indexFile().fileName()))
                 .containsExactlyInAnyOrder(3, 5);
 
         // update part1
-        store.commitDataIndex(
-                record1, gen::getPartition, 0, 
indexFileHandler.writeHashIndex(new int[] {1, 4}));
+        store.commitDataIndex(record1, gen::getPartition, 0, 
hashIndex.write(new int[] {1, 4}));
         snapshot = store.snapshotManager().latestSnapshot();
 
         // assert update part1
@@ -771,18 +766,18 @@ public class FileStoreCommitTest {
 
         indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
0).findAny().get();
-        
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(1, 4);
 
         indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
1).findAny().get();
-        
assertThat(indexFileHandler.readHashIndexList(indexManifestEntry.indexFile()))
+        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(6, 8);
 
         // assert scan one bucket
         Optional<IndexFileMeta> file = 
indexFileHandler.scanHashIndex(snapshot, part1, 0);
         assertThat(file).isPresent();
-        
assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1,
 4);
+        
assertThat(hashIndex.readList(file.get().fileName())).containsExactlyInAnyOrder(1,
 4);
 
         // overwrite one partition
         
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, 
true);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
index 3c1879eec0..a78eddab21 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
@@ -91,7 +91,6 @@ public class CopyFilesUtil {
                                 () -> 
indexFileHandler.readManifestWithIOException(indexManifest));
                 if (indexManifestEntries != null) {
                     indexManifestEntries.stream()
-                            .map(IndexManifestEntry::indexFile)
                             .map(indexFileHandler::filePath)
                             .forEach(fileList::add);
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
index 6ffe5c57ee..732d1db0d3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyMetaFilesFunction.java
@@ -186,7 +186,6 @@ public class CopyMetaFilesFunction extends 
ProcessFunction<Tuple2<String, String
                 List<Path> indexFileList = new ArrayList<>();
                 if (indexManifestEntries != null) {
                     indexManifestEntries.stream()
-                            .map(IndexManifestEntry::indexFile)
                             .map(indexFileHandler::filePath)
                             .forEach(indexFileList::add);
                 }


Reply via email to