This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3521f8cc3 [core] Read deletion indexes at once to reduce file IO in
splits generation (#3646)
3521f8cc3 is described below
commit 3521f8cc310a7e3cfcdd7758697c413b73db1f9a
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jul 1 20:31:03 2024 +0800
[core] Read deletion indexes at once to reduce file IO in splits generation
(#3646)
---
.../DeletionVectorIndexFileMaintainer.java | 2 +-
.../deletionvectors/DeletionVectorsMaintainer.java | 3 +-
.../org/apache/paimon/index/IndexFileHandler.java | 51 ++++++++++++++--------
.../org/apache/paimon/index/PartitionIndex.java | 2 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 41 +++++++++++------
.../paimon/operation/FileStoreCommitTest.java | 6 +--
.../paimon/flink/DynamicBucketTableITCase.java | 2 +-
7 files changed, 70 insertions(+), 37 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
index a9c52fb22..eb8202155 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
@@ -52,7 +52,7 @@ public class DeletionVectorIndexFileMaintainer {
.map(deletionFile -> new
Path(deletionFile.path()).getName())
.distinct()
.collect(Collectors.toList());
- indexFileHandler.scan().stream()
+ indexFileHandler.scanEntries().stream()
.filter(
indexManifestEntry ->
touchedIndexFileNames.contains(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 8cc563994..8079d977c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -148,7 +148,8 @@ public class DeletionVectorsMaintainer {
List<IndexFileMeta> indexFiles =
snapshotId == null
? Collections.emptyList()
- : handler.scan(snapshotId, DELETION_VECTORS_INDEX,
partition).stream()
+ : handler.scanEntries(snapshotId,
DELETION_VECTORS_INDEX, partition)
+ .stream()
.map(IndexManifestEntry::indexFile)
.collect(Collectors.toList());
Map<String, DeletionVector> deletionVectors =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index ae4c6f51b..afafb42ec 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -28,6 +28,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;
@@ -35,9 +36,11 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -69,14 +72,6 @@ public class IndexFileHandler {
return this.deletionVectorsIndex;
}
- public List<IndexManifestEntry> scan() {
- Snapshot snapshot = snapshotManager.latestSnapshot();
- if (snapshot == null || snapshot.indexManifest() == null) {
- return Collections.emptyList();
- }
- return indexManifestFile.read(snapshot.indexManifest());
- }
-
public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow
partition, int bucket) {
List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition,
bucket);
if (result.size() > 1) {
@@ -88,9 +83,8 @@ public class IndexFileHandler {
public List<IndexFileMeta> scan(
long snapshotId, String indexType, BinaryRow partition, int
bucket) {
- List<IndexManifestEntry> entries = scan(snapshotId, indexType,
partition);
List<IndexFileMeta> result = new ArrayList<>();
- for (IndexManifestEntry file : entries) {
+ for (IndexManifestEntry file : scanEntries(snapshotId, indexType,
partition)) {
if (file.bucket() == bucket) {
result.add(file.indexFile());
}
@@ -98,31 +92,54 @@ public class IndexFileHandler {
return result;
}
- public List<IndexManifestEntry> scan(String indexType, BinaryRow
partition) {
+ public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
+ long snapshotId, String indexType, Set<BinaryRow> partitions) {
+ Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> result = new
HashMap<>();
+ for (IndexManifestEntry file : scanEntries(snapshotId, indexType,
partitions)) {
+ result.computeIfAbsent(Pair.of(file.partition(), file.bucket()), k
-> new ArrayList<>())
+ .add(file.indexFile());
+ }
+ return result;
+ }
+
+ public List<IndexManifestEntry> scanEntries() {
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null || snapshot.indexManifest() == null) {
+ return Collections.emptyList();
+ }
+
+ return indexManifestFile.read(snapshot.indexManifest());
+ }
+
+ public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow
partition) {
Long snapshot = snapshotManager.latestSnapshotId();
if (snapshot == null) {
return Collections.emptyList();
}
- return scan(snapshot, indexType, partition);
+ return scanEntries(snapshot, indexType, partition);
}
- public List<IndexManifestEntry> scan(long snapshotId, String indexType,
BinaryRow partition) {
+ public List<IndexManifestEntry> scanEntries(
+ long snapshotId, String indexType, BinaryRow partition) {
+ return scanEntries(snapshotId, indexType,
Collections.singleton(partition));
+ }
+
+ public List<IndexManifestEntry> scanEntries(
+ long snapshotId, String indexType, Set<BinaryRow> partitions) {
Snapshot snapshot = snapshotManager.snapshot(snapshotId);
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
}
- List<IndexManifestEntry> allFiles =
indexManifestFile.read(indexManifest);
List<IndexManifestEntry> result = new ArrayList<>();
- for (IndexManifestEntry file : allFiles) {
+ for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
if (file.indexFile().indexType().equals(indexType)
- && file.partition().equals(partition)) {
+ && partitions.contains(file.partition())) {
result.add(file);
}
}
-
return result;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 79ff72656..bace2c1ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -111,7 +111,7 @@ public class PartitionIndex {
long targetBucketRowNumber,
IntPredicate loadFilter,
IntPredicate bucketFilter) {
- List<IndexManifestEntry> files = indexFileHandler.scan(HASH_INDEX,
partition);
+ List<IndexManifestEntry> files =
indexFileHandler.scanEntries(HASH_INDEX, partition);
Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder();
Map<Integer, Long> buckets = new HashMap<>();
for (IndexManifestEntry file : files) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 6e27a2480..bd9e48e4c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -270,6 +270,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
SplitGenerator splitGenerator,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
List<DataSplit> splits = new ArrayList<>();
+ // Read deletion indexes at once to reduce file IO
+ Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap =
+ deletionVectors
+ ? indexFileHandler.scan(
+ snapshotId, DELETION_VECTORS_INDEX,
groupedDataFiles.keySet())
+ : Collections.emptyMap();
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
groupedDataFiles.entrySet()) {
BinaryRow partition = entry.getKey();
@@ -287,12 +293,9 @@ public class SnapshotReaderImpl implements SnapshotReader {
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
-
List<IndexFileMeta> deletionIndexFiles =
- deletionVectors
- ? indexFileHandler.scan(
- snapshotId, DELETION_VECTORS_INDEX,
partition, bucket)
- : Collections.emptyList();
+ deletionIndexFilesMap.getOrDefault(
+ Pair.of(partition, bucket),
Collections.emptyList());
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
String bucketPath = pathFactory.bucketPath(partition,
bucket).toString();
@@ -350,6 +353,17 @@ public class SnapshotReaderImpl implements SnapshotReader {
(part, bucketMap) ->
buckets.computeIfAbsent(part, k -> new HashSet<>())
.addAll(bucketMap.keySet()));
+ // Read deletion indexes at once to reduce file IO
+ Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
beforDeletionIndexFilesMap =
+ deletionVectors
+ ? indexFileHandler.scan(
+ beforeSnapshotId, DELETION_VECTORS_INDEX,
beforeFiles.keySet())
+ : Collections.emptyMap();
+ Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap =
+ deletionVectors
+ ? indexFileHandler.scan(
+ plan.snapshotId(), DELETION_VECTORS_INDEX,
dataFiles.keySet())
+ : Collections.emptyMap();
for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
BinaryRow part = entry.getKey();
@@ -376,15 +390,16 @@ public class SnapshotReaderImpl implements SnapshotReader
{
.isStreaming(isStreaming)
.withBucketPath(pathFactory.bucketPath(part,
bucket).toString());
if (deletionVectors) {
- List<IndexFileMeta> beforeDeletionIndexes =
- indexFileHandler.scan(
- beforeSnapshotId, DELETION_VECTORS_INDEX,
part, bucket);
- List<IndexFileMeta> deletionIndexes =
- indexFileHandler.scan(
- plan.snapshotId(), DELETION_VECTORS_INDEX,
part, bucket);
builder.withBeforeDeletionFiles(
- getDeletionFiles(before, beforeDeletionIndexes));
- builder.withDataDeletionFiles(getDeletionFiles(data,
deletionIndexes));
+ getDeletionFiles(
+ before,
+ beforDeletionIndexFilesMap.getOrDefault(
+ Pair.of(part, bucket),
Collections.emptyList())));
+ builder.withDataDeletionFiles(
+ getDeletionFiles(
+ data,
+ deletionIndexFilesMap.getOrDefault(
+ Pair.of(part, bucket),
Collections.emptyList())));
}
splits.add(builder.build());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 695b023aa..a60554db2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -725,7 +725,7 @@ public class FileStoreCommitTest {
// assert part1
List<IndexManifestEntry> part1Index =
- indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+ indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, part1);
assertThat(part1Index.size()).isEqualTo(2);
IndexManifestEntry indexManifestEntry =
@@ -740,7 +740,7 @@ public class FileStoreCommitTest {
// assert part2
List<IndexManifestEntry> part2Index =
- indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2);
+ indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX, part2);
assertThat(part2Index.size()).isEqualTo(1);
assertThat(part2Index.get(0).bucket()).isEqualTo(2);
assertThat(indexFileHandler.readHashIndexList(part2Index.get(0).indexFile()))
@@ -752,7 +752,7 @@ public class FileStoreCommitTest {
snapshot = store.snapshotManager().latestSnapshot();
// assert update part1
- part1Index = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+ part1Index = indexFileHandler.scanEntries(snapshot.id(), HASH_INDEX,
part1);
assertThat(part1Index.size()).isEqualTo(2);
indexManifestEntry =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index bb27851f1..30506495f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -122,7 +122,7 @@ public class DynamicBucketTableITCase extends
CatalogITCaseBase {
IndexFileHandler indexFileHandler =
table.store().newIndexFileHandler();
List<BinaryRow> partitions = table.newScan().listPartitions();
List<IndexManifestEntry> entries = new ArrayList<>();
- partitions.forEach(p ->
entries.addAll(indexFileHandler.scan(HASH_INDEX, p)));
+ partitions.forEach(p ->
entries.addAll(indexFileHandler.scanEntries(HASH_INDEX, p)));
Long records =
entries.stream().map(entry ->
entry.indexFile().rowCount()).reduce(Long::sum).get();