This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 91a89b4da9 [core] Refactor LookupChangelogMergeFunctionWrapper to
better readability (#5645)
91a89b4da9 is described below
commit 91a89b4da99bc58f980f158c33c1f3c3b5e16e7b
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 21 22:44:53 2025 +0800
[core] Refactor LookupChangelogMergeFunctionWrapper to better readability
(#5645)
---
.../LookupChangelogMergeFunctionWrapper.java | 62 +++++++++++-----------
.../mergetree/compact/LookupMergeFunction.java | 49 +++++++++++------
.../mergetree/compact/LookupMergeFunctionTest.java | 56 +++++++++++++++++++
3 files changed, 118 insertions(+), 49 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index bbaa354401..ba427f7e92 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -30,8 +30,8 @@ import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Function;
@@ -56,7 +56,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class LookupChangelogMergeFunctionWrapper<T>
implements MergeFunctionWrapper<ChangelogResult> {
- private final MergeFunction<KeyValue> mergeFunction;
+ private final LookupMergeFunction mergeFunction;
private final Function<InternalRow, T> lookup;
private final ChangelogResult reusedResult = new ChangelogResult();
@@ -67,8 +67,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
private final @Nullable DeletionVectorsMaintainer
deletionVectorsMaintainer;
private final Comparator<KeyValue> comparator;
- private final LinkedList<KeyValue> candidates = new LinkedList<>();
-
public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Function<InternalRow, T> lookup,
@@ -86,7 +84,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
deletionVectorsMaintainer != null,
"deletionVectorsMaintainer should not be null, there is a
bug.");
}
- this.mergeFunction = mergeFunctionFactory.create();
+ this.mergeFunction = (LookupMergeFunction) mergeFunction;
this.lookup = lookup;
this.valueEqualiser = valueEqualiser;
this.lookupStrategy = lookupStrategy;
@@ -96,36 +94,23 @@ public class LookupChangelogMergeFunctionWrapper<T>
@Override
public void reset() {
- candidates.clear();
+ mergeFunction.reset();
}
@Override
public void add(KeyValue kv) {
- candidates.add(kv);
+ mergeFunction.add(kv);
}
@Override
public ChangelogResult getResult() {
- // 1. Compute the latest high level record and containLevel0 of
candidates
- Iterator<KeyValue> descending = candidates.descendingIterator();
- KeyValue highLevel = null;
- boolean containLevel0 = false;
- while (descending.hasNext()) {
- KeyValue kv = descending.next();
- if (kv.level() > 0) {
- descending.remove();
- if (highLevel == null || kv.level() < highLevel.level()) {
- highLevel = kv;
- }
- } else {
- containLevel0 = true;
- }
- }
+ // 1. Find the latest high level record and compute containLevel0
+ KeyValue highLevel = mergeFunction.pickHighLevel();
+ boolean containLevel0 = containLevel0();
// 2. Lookup if latest high level record is absent
if (highLevel == null) {
- InternalRow lookupKey = candidates.get(0).key();
- T lookupResult = lookup.apply(lookupKey);
+ T lookupResult = lookup.apply(mergeFunction.key());
if (lookupResult != null) {
if (lookupStrategy.deletionVector) {
PositionedKeyValue positionedKeyValue =
(PositionedKeyValue) lookupResult;
@@ -136,10 +121,13 @@ public class LookupChangelogMergeFunctionWrapper<T>
highLevel = (KeyValue) lookupResult;
}
}
+ if (highLevel != null) {
+ insertInto(mergeFunction.candidates(), highLevel);
+ }
}
// 3. Calculate result
- KeyValue result = calculateResult(candidates, highLevel);
+ KeyValue result = mergeFunction.getResult();
// 4. Set changelog when there's level-0 records
reusedResult.reset();
@@ -150,21 +138,31 @@ public class LookupChangelogMergeFunctionWrapper<T>
return reusedResult.setResult(result);
}
- private KeyValue calculateResult(List<KeyValue> candidates, @Nullable
KeyValue highLevel) {
- mergeFunction.reset();
+ public boolean containLevel0() {
+ for (KeyValue kv : mergeFunction.candidates()) {
+ if (kv.level() == 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void insertInto(LinkedList<KeyValue> candidates, KeyValue
highLevel) {
+ List<KeyValue> newCandidates = new ArrayList<>();
for (KeyValue candidate : candidates) {
if (highLevel != null && comparator.compare(highLevel, candidate)
< 0) {
- mergeFunction.add(highLevel);
- mergeFunction.add(candidate);
+ newCandidates.add(highLevel);
+ newCandidates.add(candidate);
highLevel = null;
} else {
- mergeFunction.add(candidate);
+ newCandidates.add(candidate);
}
}
if (highLevel != null) {
- mergeFunction.add(highLevel);
+ newCandidates.add(highLevel);
}
- return mergeFunction.getResult();
+ candidates.clear();
+ candidates.addAll(newCandidates);
}
private void setChangelog(@Nullable KeyValue before, KeyValue after) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 2aadbff49a..1537677ffc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -19,6 +19,7 @@
package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
import javax.annotation.Nullable;
@@ -48,29 +49,43 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
candidates.add(kv);
}
- @Override
- public KeyValue getResult() {
- // 1. Find the latest high level record
- KeyValue keptHighLevel = null;
- LinkedList<KeyValue> highLevels = new LinkedList<>();
-
+ @Nullable
+ public KeyValue pickHighLevel() {
+ KeyValue highLevel = null;
for (KeyValue kv : candidates) {
- if (kv.level() > 0) {
- highLevels.add(kv);
- if (keptHighLevel == null || kv.level() <
keptHighLevel.level()) {
- keptHighLevel = kv;
- }
+ // records that has not been stored on the disk yet, such as the
data in the write
+ // buffer being at level -1
+ if (kv.level() <= 0) {
+ continue;
+ }
+ // For high-level comparison logic (not involving Level 0), only
the value of the
+ // minimum Level should be selected
+ if (highLevel == null || kv.level() < highLevel.level()) {
+ highLevel = kv;
}
}
+ return highLevel;
+ }
- if (highLevels.size() > 1) {
- highLevels.remove(keptHighLevel);
- candidates.removeAll(highLevels);
- }
+ public InternalRow key() {
+ return candidates.get(0).key();
+ }
- // 2. Do the merge for inputs
+ public LinkedList<KeyValue> candidates() {
+ return candidates;
+ }
+
+ @Override
+ public KeyValue getResult() {
mergeFunction.reset();
- candidates.forEach(mergeFunction::add);
+ KeyValue highLevel = pickHighLevel();
+ for (KeyValue kv : candidates) {
+ // records that has not been stored on the disk yet, such as the
data in the write
+ // buffer being at level -1
+ if (kv.level() <= 0 || kv == highLevel) {
+ mergeFunction.add(kv);
+ }
+ }
return mergeFunction.getResult();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
new file mode 100644
index 0000000000..03b201ca2f
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.types.RowKind.INSERT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LookupMergeFunctionTest {
+
+ @Test
+ public void testKeepLowestHighLevel() {
+ LookupMergeFunction function =
+ (LookupMergeFunction)
+
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(2));
+ KeyValue kv = function.getResult();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(2);
+ }
+
+ @Test
+ public void testLevelNegative() {
+ LookupMergeFunction function =
+ (LookupMergeFunction)
+
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(-1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(-1));
+ KeyValue kv = function.getResult();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(1);
+ }
+}