This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c933bfd337 [core] keep lowest high-level kv for lookup compaction.
(#5632)
c933bfd337 is described below
commit c933bfd337fee24dfedcea9469eaf1d21f8dbdb5
Author: zhoulii <[email protected]>
AuthorDate: Wed May 21 13:44:00 2025 +0800
[core] keep lowest high-level kv for lookup compaction. (#5632)
---
.../LookupChangelogMergeFunctionWrapper.java | 2 +-
.../mergetree/compact/LookupMergeFunction.java | 21 +++++++-----
.../LookupChangelogMergeFunctionWrapperTest.java | 39 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index c327474fa8..bbaa354401 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -114,7 +114,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
KeyValue kv = descending.next();
if (kv.level() > 0) {
descending.remove();
- if (highLevel == null) {
+ if (highLevel == null || kv.level() < highLevel.level()) {
highLevel = kv;
}
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 6f999297aa..2aadbff49a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -22,7 +22,6 @@ import org.apache.paimon.KeyValue;
import javax.annotation.Nullable;
-import java.util.Iterator;
import java.util.LinkedList;
/**
@@ -52,19 +51,23 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
@Override
public KeyValue getResult() {
// 1. Find the latest high level record
- Iterator<KeyValue> descending = candidates.descendingIterator();
- KeyValue highLevel = null;
- while (descending.hasNext()) {
- KeyValue kv = descending.next();
+ KeyValue keptHighLevel = null;
+ LinkedList<KeyValue> highLevels = new LinkedList<>();
+
+ for (KeyValue kv : candidates) {
if (kv.level() > 0) {
- if (highLevel != null) {
- descending.remove();
- } else {
- highLevel = kv;
+ highLevels.add(kv);
+ if (keptHighLevel == null || kv.level() <
keptHighLevel.level()) {
+ keptHighLevel = kv;
}
}
}
+ if (highLevels.size() > 1) {
+ highLevels.remove(keptHighLevel);
+ candidates.removeAll(highLevels);
+ }
+
// 2. Do the merge for inputs
mergeFunction.reset();
candidates.forEach(mergeFunction::add);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 9c7f32589a..5341c6db69 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -508,4 +508,43 @@ public class LookupChangelogMergeFunctionWrapperTest {
kv = result.result();
assertThat(kv).isNull();
}
+
+ @Test
+ public void testKeepLowestHighLevel() {
+ Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+ highLevel::get,
+ null,
+ LookupStrategy.from(false, true, false, false),
+ null,
+ null);
+
+ // Without level-0
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(2));
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+ assertThat(result.changelogs()).isEmpty();
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+
+ // With level-0 record, with multiple level-x (x > 0) record
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(3)).setLevel(0));
+ result = function.getResult();
+ List<KeyValue> changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(2);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
+ assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
+ assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(3);
+ kv = result.result();
+ assertThat(kv.value().getInt(0)).isEqualTo(3);
+ }
}