This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3e911f647b [core] Avoid involve all the manifest when full compaction
manifest without delete partition (#5618)
3e911f647b is described below
commit 3e911f647bc11304e0c5c91176081617e8aec948
Author: WenjunMin <[email protected]>
AuthorDate: Mon May 19 11:16:37 2025 +0800
[core] Avoid involve all the manifest when full compaction manifest without
delete partition (#5618)
---
.../paimon/operation/ManifestFileMerger.java | 46 +++++++++++++---------
.../paimon/partition/PartitionPredicate.java | 36 +++++++++++++++++
.../paimon/manifest/ManifestFileMetaTest.java | 43 ++++++++++++++++++++
.../manifest/NoPartitionManifestFileMetaTest.java | 46 ++++++++++++++++++++++
4 files changed, 152 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
index 8b43b6408b..51c7081916 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
@@ -198,27 +198,35 @@ public class ManifestFileMerger {
// 2.2. try to skip base files by partition filter
+ PartitionPredicate predicate;
+ if (deleteEntries.isEmpty()) {
+ predicate = PartitionPredicate.alwaysFalse();
+ } else {
+ if (partitionType.getFieldCount() > 0) {
+ Set<BinaryRow> deletePartitions =
computeDeletePartitions(deleteEntries);
+ predicate = PartitionPredicate.fromMultiple(partitionType,
deletePartitions);
+ } else {
+ predicate = PartitionPredicate.alwaysTrue();
+ }
+ }
+
List<ManifestFileMeta> result = new ArrayList<>();
List<ManifestFileMeta> toBeMerged = new LinkedList<>(inputs);
- if (partitionType.getFieldCount() > 0) {
- Set<BinaryRow> deletePartitions =
computeDeletePartitions(deleteEntries);
- PartitionPredicate predicate =
- PartitionPredicate.fromMultiple(partitionType,
deletePartitions);
- if (predicate != null) {
- Iterator<ManifestFileMeta> iterator = toBeMerged.iterator();
- while (iterator.hasNext()) {
- ManifestFileMeta file = iterator.next();
- if (mustChange.test(file)) {
- continue;
- }
- if (!predicate.test(
- file.numAddedFiles() + file.numDeletedFiles(),
- file.partitionStats().minValues(),
- file.partitionStats().maxValues(),
- file.partitionStats().nullCounts())) {
- iterator.remove();
- result.add(file);
- }
+
+ if (predicate != null) {
+ Iterator<ManifestFileMeta> iterator = toBeMerged.iterator();
+ while (iterator.hasNext()) {
+ ManifestFileMeta file = iterator.next();
+ if (mustChange.test(file)) {
+ continue;
+ }
+ if (!predicate.test(
+ file.numAddedFiles() + file.numDeletedFiles(),
+ file.partitionStats().minValues(),
+ file.partitionStats().maxValues(),
+ file.partitionStats().nullCounts())) {
+ iterator.remove();
+ result.add(file);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 1f6c2cfe45..bfc241d62f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -83,6 +83,42 @@ public interface PartitionPredicate {
new RowDataToObjectArrayConverter(partitionType), partitions);
}
+ static PartitionPredicate alwaysFalse() {
+ return new PartitionPredicate() {
+ @Override
+ public boolean test(BinaryRow part) {
+ return false;
+ }
+
+ @Override
+ public boolean test(
+ long rowCount,
+ InternalRow minValues,
+ InternalRow maxValues,
+ InternalArray nullCounts) {
+ return false;
+ }
+ };
+ }
+
+ static PartitionPredicate alwaysTrue() {
+ return new PartitionPredicate() {
+ @Override
+ public boolean test(BinaryRow part) {
+ return true;
+ }
+
+ @Override
+ public boolean test(
+ long rowCount,
+ InternalRow minValues,
+ InternalRow maxValues,
+ InternalArray nullCounts) {
+ return true;
+ }
+ };
+ }
+
/** A {@link PartitionPredicate} using {@link Predicate}. */
class DefaultPartitionPredicate implements PartitionPredicate {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 1be5993fb0..4dee7f0372 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -452,6 +452,49 @@ public class ManifestFileMetaTest extends
ManifestFileMetaTestBase {
containSameIdentifyEntryFile(fullCompacted, entryIdentifierExpected);
}
+ @Test
+ public void testMergeFullCompactionWithoutDeleteFile() {
+ // entries are All ADD.
+ List<ManifestFileMeta> input = new ArrayList<>();
+ // base
+ for (int j = 0; j < 6; j++) {
+ List<ManifestEntry> entrys = new ArrayList<>();
+ for (int i = 1; i < 50; i++) {
+ entrys.add(makeEntry(true,
String.format(manifestFileNameTemplate, j, i), j));
+ }
+ input.add(makeManifest(entrys.toArray(new ManifestEntry[0])));
+ }
+ // The base file all meet the manifest file size.
+ long threshold =
input.stream().mapToLong(ManifestFileMeta::fileSize).min().getAsLong();
+ Set<String> baseFiles =
+
input.stream().map(ManifestFileMeta::fileName).collect(Collectors.toSet());
+
+ // assert base manifest are not accessed
+ for (String baseFile : baseFiles) {
+ manifestFile.delete(baseFile);
+ }
+
+ // delta
+ input.add(makeManifest(makeEntry(true, "A")));
+ input.add(makeManifest(makeEntry(true, "B")));
+ input.add(makeManifest(makeEntry(true, "C")));
+ input.add(makeManifest(makeEntry(true, "D")));
+ input.add(makeManifest(makeEntry(true, "E")));
+ input.add(makeManifest(makeEntry(true, "F")));
+ input.add(makeManifest(makeEntry(true, "G")));
+
+ List<ManifestFileMeta> merged =
+ ManifestFileMerger.merge(
+ input, manifestFile, threshold, 3, 200,
getPartitionType(), null);
+ assertEquivalentEntries(
+ input.stream()
+ .filter(f -> !baseFiles.contains(f.fileName()))
+ .collect(Collectors.toList()),
+ merged.stream()
+ .filter(f -> !baseFiles.contains(f.fileName()))
+ .collect(Collectors.toList()));
+ }
+
@RepeatedTest(10)
public void testRandomFullCompaction() throws Exception {
List<ManifestFileMeta> input = new ArrayList<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
index 9978f9ee30..591b320651 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/NoPartitionManifestFileMetaTest.java
@@ -25,7 +25,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -55,6 +58,49 @@ public class NoPartitionManifestFileMetaTest extends
ManifestFileMetaTestBase {
assertThat(merged.get(0)).isSameAs(input.get(0));
}
+ @Test
+ public void testMergeFullCompactionWithoutDeleteFile() {
+ // entries are All ADD.
+ List<ManifestFileMeta> input = new ArrayList<>();
+ // base
+ for (int j = 0; j < 6; j++) {
+ List<ManifestEntry> entrys = new ArrayList<>();
+ for (int i = 1; i < 50; i++) {
+ entrys.add(makeEntry(true,
String.format(manifestFileNameTemplate, j, i), null));
+ }
+ input.add(makeManifest(entrys.toArray(new ManifestEntry[0])));
+ }
+ // The base file all meet the manifest file size.
+ long threshold =
input.stream().mapToLong(ManifestFileMeta::fileSize).min().getAsLong();
+ Set<String> baseFiles =
+
input.stream().map(ManifestFileMeta::fileName).collect(Collectors.toSet());
+
+ // assert base manifest are not accessed
+ for (String baseFile : baseFiles) {
+ manifestFile.delete(baseFile);
+ }
+
+ // delta
+ input.add(makeManifest(makeEntry(true, "A", null)));
+ input.add(makeManifest(makeEntry(true, "B", null)));
+ input.add(makeManifest(makeEntry(true, "C", null)));
+ input.add(makeManifest(makeEntry(true, "D", null)));
+ input.add(makeManifest(makeEntry(true, "E", null)));
+ input.add(makeManifest(makeEntry(true, "F", null)));
+ input.add(makeManifest(makeEntry(true, "G", null)));
+
+ List<ManifestFileMeta> merged =
+ ManifestFileMerger.merge(
+ input, manifestFile, threshold, 3, 200,
getPartitionType(), null);
+ assertEquivalentEntries(
+ input.stream()
+ .filter(f -> !baseFiles.contains(f.fileName()))
+ .collect(Collectors.toList()),
+ merged.stream()
+ .filter(f -> !baseFiles.contains(f.fileName()))
+ .collect(Collectors.toList()));
+ }
+
@Override
public ManifestFile getManifestFile() {
return manifestFile;