This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3521f8cc3 [core] Read deletion indexes at once to reduce file IO in 
splits generation (#3646)
3521f8cc3 is described below

commit 3521f8cc310a7e3cfcdd7758697c413b73db1f9a
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jul 1 20:31:03 2024 +0800

    [core] Read deletion indexes at once to reduce file IO in splits generation 
(#3646)
---
 .../DeletionVectorIndexFileMaintainer.java         |  2 +-
 .../deletionvectors/DeletionVectorsMaintainer.java |  3 +-
 .../org/apache/paimon/index/IndexFileHandler.java  | 51 ++++++++++++++--------
 .../org/apache/paimon/index/PartitionIndex.java    |  2 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  | 41 +++++++++++------
 .../paimon/operation/FileStoreCommitTest.java      |  6 +--
 .../paimon/flink/DynamicBucketTableITCase.java     |  2 +-
 7 files changed, 70 insertions(+), 37 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
index a9c52fb22..eb8202155 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
@@ -52,7 +52,7 @@ public class DeletionVectorIndexFileMaintainer {
                         .map(deletionFile -> new 
Path(deletionFile.path()).getName())
                         .distinct()
                         .collect(Collectors.toList());
-        indexFileHandler.scan().stream()
+        indexFileHandler.scanEntries().stream()
                 .filter(
                         indexManifestEntry ->
                                 touchedIndexFileNames.contains(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 8cc563994..8079d977c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -148,7 +148,8 @@ public class DeletionVectorsMaintainer {
             List<IndexFileMeta> indexFiles =
                     snapshotId == null
                             ? Collections.emptyList()
-                            : handler.scan(snapshotId, DELETION_VECTORS_INDEX, 
partition).stream()
+                            : handler.scanEntries(snapshotId, 
DELETION_VECTORS_INDEX, partition)
+                                    .stream()
                                     .map(IndexManifestEntry::indexFile)
                                     .collect(Collectors.toList());
             Map<String, DeletionVector> deletionVectors =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index ae4c6f51b..afafb42ec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -28,6 +28,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -35,9 +36,11 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -69,14 +72,6 @@ public class IndexFileHandler {
         return this.deletionVectorsIndex;
     }
 
-    public List<IndexManifestEntry> scan() {
-        Snapshot snapshot = snapshotManager.latestSnapshot();
-        if (snapshot == null || snapshot.indexManifest() == null) {
-            return Collections.emptyList();
-        }
-        return indexManifestFile.read(snapshot.indexManifest());
-    }
-
     public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow 
partition, int bucket) {
         List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition, 
bucket);
         if (result.size() > 1) {
@@ -88,9 +83,8 @@ public class IndexFileHandler {
 
     public List<IndexFileMeta> scan(
             long snapshotId, String indexType, BinaryRow partition, int 
bucket) {
-        List<IndexManifestEntry> entries = scan(snapshotId, indexType, 
partition);
         List<IndexFileMeta> result = new ArrayList<>();
-        for (IndexManifestEntry file : entries) {
+        for (IndexManifestEntry file : scanEntries(snapshotId, indexType, 
partition)) {
             if (file.bucket() == bucket) {
                 result.add(file.indexFile());
             }
@@ -98,31 +92,54 @@ public class IndexFileHandler {
         return result;
     }
 
-    public List<IndexManifestEntry> scan(String indexType, BinaryRow 
partition) {
+    public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
+            long snapshotId, String indexType, Set<BinaryRow> partitions) {
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> result = new 
HashMap<>();
+        for (IndexManifestEntry file : scanEntries(snapshotId, indexType, 
partitions)) {
+            result.computeIfAbsent(Pair.of(file.partition(), file.bucket()), k 
-> new ArrayList<>())
+                    .add(file.indexFile());
+        }
+        return result;
+    }
+
+    public List<IndexManifestEntry> scanEntries() {
+        Snapshot snapshot = snapshotManager.latestSnapshot();
+        if (snapshot == null || snapshot.indexManifest() == null) {
+            return Collections.emptyList();
+        }
+
+        return indexManifestFile.read(snapshot.indexManifest());
+    }
+
+    public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow 
partition) {
         Long snapshot = snapshotManager.latestSnapshotId();
         if (snapshot == null) {
             return Collections.emptyList();
         }
 
-        return scan(snapshot, indexType, partition);
+        return scanEntries(snapshot, indexType, partition);
     }
 
-    public List<IndexManifestEntry> scan(long snapshotId, String indexType, 
BinaryRow partition) {
+    public List<IndexManifestEntry> scanEntries(
+            long snapshotId, String indexType, BinaryRow partition) {
+        return scanEntries(snapshotId, indexType, 
Collections.singleton(partition));
+    }
+
+    public List<IndexManifestEntry> scanEntries(
+            long snapshotId, String indexType, Set<BinaryRow> partitions) {
         Snapshot snapshot = snapshotManager.snapshot(snapshotId);
         String indexManifest = snapshot.indexManifest();
         if (indexManifest == null) {
             return Collections.emptyList();
         }
 
-        List<IndexManifestEntry> allFiles = 
indexManifestFile.read(indexManifest);
         List<IndexManifestEntry> result = new ArrayList<>();
-        for (IndexManifestEntry file : allFiles) {
+        for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
             if (file.indexFile().indexType().equals(indexType)
-                    && file.partition().equals(partition)) {
+                    && partitions.contains(file.partition())) {
                 result.add(file);
             }
         }
-
         return result;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java 
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 79ff72656..bace2c1ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -111,7 +111,7 @@ public class PartitionIndex {
             long targetBucketRowNumber,
             IntPredicate loadFilter,
             IntPredicate bucketFilter) {
-        List<IndexManifestEntry> files = indexFileHandler.scan(HASH_INDEX, 
partition);
+        List<IndexManifestEntry> files = 
indexFileHandler.scanEntries(HASH_INDEX, partition);
         Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder();
         Map<Integer, Long> buckets = new HashMap<>();
         for (IndexManifestEntry file : files) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 6e27a2480..bd9e48e4c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -270,6 +270,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
             SplitGenerator splitGenerator,
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
         List<DataSplit> splits = new ArrayList<>();
+        // Read deletion indexes at once to reduce file IO
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap =
+                deletionVectors
+                        ? indexFileHandler.scan(
+                                snapshotId, DELETION_VECTORS_INDEX, 
groupedDataFiles.keySet())
+                        : Collections.emptyMap();
         for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
                 groupedDataFiles.entrySet()) {
             BinaryRow partition = entry.getKey();
@@ -287,12 +293,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
                         isStreaming
                                 ? splitGenerator.splitForStreaming(bucketFiles)
                                 : splitGenerator.splitForBatch(bucketFiles);
-
                 List<IndexFileMeta> deletionIndexFiles =
-                        deletionVectors
-                                ? indexFileHandler.scan(
-                                        snapshotId, DELETION_VECTORS_INDEX, 
partition, bucket)
-                                : Collections.emptyList();
+                        deletionIndexFilesMap.getOrDefault(
+                                Pair.of(partition, bucket), 
Collections.emptyList());
                 for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
                     List<DataFileMeta> dataFiles = splitGroup.files;
                     String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
@@ -350,6 +353,17 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 (part, bucketMap) ->
                         buckets.computeIfAbsent(part, k -> new HashSet<>())
                                 .addAll(bucketMap.keySet()));
+        // Read deletion indexes at once to reduce file IO
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
beforDeletionIndexFilesMap =
+                deletionVectors
+                        ? indexFileHandler.scan(
+                                beforeSnapshotId, DELETION_VECTORS_INDEX, 
beforeFiles.keySet())
+                        : Collections.emptyMap();
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap =
+                deletionVectors
+                        ? indexFileHandler.scan(
+                                plan.snapshotId(), DELETION_VECTORS_INDEX, 
dataFiles.keySet())
+                        : Collections.emptyMap();
 
         for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
             BinaryRow part = entry.getKey();
@@ -376,15 +390,16 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                                 .isStreaming(isStreaming)
                                 .withBucketPath(pathFactory.bucketPath(part, 
bucket).toString());
                 if (deletionVectors) {
-                    List<IndexFileMeta> beforeDeletionIndexes =
-                            indexFileHandler.scan(
-                                    beforeSnapshotId, DELETION_VECTORS_INDEX, 
part, bucket);
-                    List<IndexFileMeta> deletionIndexes =
-                            indexFileHandler.scan(
-                                    plan.snapshotId(), DELETION_VECTORS_INDEX, 
part, bucket);
                     builder.withBeforeDeletionFiles(
-                            getDeletionFiles(before, beforeDeletionIndexes));
-                    builder.withDataDeletionFiles(getDeletionFiles(data, 
deletionIndexes));
+                            getDeletionFiles(
+                                    before,
+                                    beforDeletionIndexFilesMap.getOrDefault(
+                                            Pair.of(part, bucket), 
Collections.emptyList())));
+                    builder.withDataDeletionFiles(
+                            getDeletionFiles(
+                                    data,
+                                    deletionIndexFilesMap.getOrDefault(
+                                            Pair.of(part, bucket), 
Collections.emptyList())));
                 }
                 splits.add(builder.build());
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 695b023aa..a60554db2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -725,7 +725,7 @@ public class FileStoreCommitTest {
 
         // assert part1
         List<IndexManifestEntry> part1Index =
-                indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+                indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, part1);
         assertThat(part1Index.size()).isEqualTo(2);
 
         IndexManifestEntry indexManifestEntry =
@@ -740,7 +740,7 @@ public class FileStoreCommitTest {
 
         // assert part2
         List<IndexManifestEntry> part2Index =
-                indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2);
+                indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, part2);
         assertThat(part2Index.size()).isEqualTo(1);
         assertThat(part2Index.get(0).bucket()).isEqualTo(2);
         
assertThat(indexFileHandler.readHashIndexList(part2Index.get(0).indexFile()))
@@ -752,7 +752,7 @@ public class FileStoreCommitTest {
         snapshot = store.snapshotManager().latestSnapshot();
 
         // assert update part1
-        part1Index = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+        part1Index = indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, 
part1);
         assertThat(part1Index.size()).isEqualTo(2);
 
         indexManifestEntry =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index bb27851f1..30506495f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -122,7 +122,7 @@ public class DynamicBucketTableITCase extends 
CatalogITCaseBase {
         IndexFileHandler indexFileHandler = 
table.store().newIndexFileHandler();
         List<BinaryRow> partitions = table.newScan().listPartitions();
         List<IndexManifestEntry> entries = new ArrayList<>();
-        partitions.forEach(p -> 
entries.addAll(indexFileHandler.scan(HASH_INDEX, p)));
+        partitions.forEach(p -> 
entries.addAll(indexFileHandler.scanEntries(HASH_INDEX, p)));
 
         Long records =
                 entries.stream().map(entry -> 
entry.indexFile().rowCount()).reduce(Long::sum).get();

Reply via email to