This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new abe5b4b496 [core] Improve performance of PartialUpdateMergeFunction
with sequence group (#5481)
abe5b4b496 is described below
commit abe5b4b496581b0a37a11965d4a72398e1695eb4
Author: Xuannan <[email protected]>
AuthorDate: Tue Apr 22 10:59:00 2025 +0800
[core] Improve performance of PartialUpdateMergeFunction with sequence
group (#5481)
---
.../PartialUpdateMergeFunctionBenchmark.java | 142 +++++++++++++++++++++
.../compact/PartialUpdateMergeFunction.java | 110 +++++++++++++---
2 files changed, 237 insertions(+), 15 deletions(-)
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/compact/PartialUpdateMergeFunctionBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/compact/PartialUpdateMergeFunctionBenchmark.java
new file mode 100644
index 0000000000..c0aab4b2c8
--- /dev/null
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/compact/PartialUpdateMergeFunctionBenchmark.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.benchmark.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.benchmark.Benchmark;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.jupiter.api.Test;
+
+/** Benchmark for measure the performance for {@link
PartialUpdateMergeFunction}. */
+public class PartialUpdateMergeFunctionBenchmark {
+
+ private final int rowCount = 40000000;
+
+ @Test
+ public void testUpdateWithSequenceGroup() {
+ Options options = new Options();
+ options.set("fields.f1.sequence-group", "f2,f3,f4,f5");
+
+ MergeFunctionFactory<KeyValue> factory =
+ PartialUpdateMergeFunction.factory(options, getRowType(6),
ImmutableList.of("f0"));
+
+ MergeFunction<KeyValue> func = factory.create();
+
+ Benchmark benchmark =
+ new Benchmark("partial-update-benchmark", rowCount)
+ .setNumWarmupIters(1)
+ .setOutputPerIteration(true);
+
+ benchmark.addCase(
+ "updateWithSequenceGroup",
+ 5,
+ () -> {
+ func.reset();
+ for (int i = 0; i < rowCount; i++) {
+ add(func, i, RowKind.INSERT, 1, i, 1, 1, 1, 1);
+ }
+ });
+
+ benchmark.run();
+ }
+
+ @Test
+ public void testRetractWithSequenceGroup() {
+ Options options = new Options();
+ options.set("fields.f1.sequence-group", "f2,f3,f4,f5");
+
+ MergeFunctionFactory<KeyValue> factory =
+ PartialUpdateMergeFunction.factory(options, getRowType(6),
ImmutableList.of("f0"));
+
+ MergeFunction<KeyValue> func = factory.create();
+
+ Benchmark benchmark =
+ new Benchmark("partial-update-benchmark", rowCount)
+ .setNumWarmupIters(1)
+ .setOutputPerIteration(true);
+
+ benchmark.addCase(
+ "retractWithSequenceGroup",
+ 5,
+ () -> {
+ func.reset();
+ for (int i = 0; i < rowCount; i++) {
+ add(func, i, RowKind.DELETE, 1, i, 1, 1, 1, 1);
+ }
+ });
+
+ benchmark.run();
+ }
+
+ @Test
+ public void testUpdateWithEmptySequenceGroup() {
+ Options options = new Options();
+ options.set("fields.f1.sequence-group", "f2");
+ options.set("fields.f3.sequence-group", "f4");
+ options.set("fields.f5.sequence-group", "f6,f7,f8");
+
+ MergeFunctionFactory<KeyValue> factory =
+ PartialUpdateMergeFunction.factory(options, getRowType(9),
ImmutableList.of("f0"));
+
+ MergeFunction<KeyValue> func = factory.create();
+
+ Benchmark benchmark =
+ new Benchmark("partial-update-benchmark", rowCount)
+ .setNumWarmupIters(1)
+ .setOutputPerIteration(true);
+
+ benchmark.addCase(
+ "updateWithEmptySequenceGroup",
+ 5,
+ () -> {
+ func.reset();
+ for (int i = 0; i < rowCount; i++) {
+ add(func, i, RowKind.INSERT, 1, i, 1, null, null,
null, null, null, null);
+ }
+ });
+
+ benchmark.run();
+ }
+
+ private RowType getRowType(int numFields) {
+ DataField[] fields = new DataField[numFields];
+ fields[0] = new DataField(0, "k", DataTypes.INT());
+
+ for (int i = 1; i < numFields; i++) {
+ fields[i] = new DataField(i, "f" + i, DataTypes.INT());
+ }
+ return RowType.of(fields);
+ }
+
+ private void add(
+ MergeFunction<KeyValue> function, int sequence, RowKind rowKind,
Integer... f) {
+ function.add(new KeyValue().replace(GenericRow.of(1), sequence,
rowKind, GenericRow.of(f)));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index b24cf1edeb..f061192979 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -41,8 +41,10 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -69,9 +71,9 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private final InternalRow.FieldGetter[] getters;
private final boolean ignoreDelete;
- private final Map<Integer, FieldsComparator> fieldSeqComparators;
+ private final List<WrapperWithFieldIndex<FieldsComparator>>
fieldSeqComparators;
private final boolean fieldSequenceEnabled;
- private final Map<Integer, FieldAggregator> fieldAggregators;
+ private final List<WrapperWithFieldIndex<FieldAggregator>>
fieldAggregators;
private final boolean removeRecordOnDelete;
private final Set<Integer> sequenceGroupPartialDelete;
private final boolean[] nullables;
@@ -100,8 +102,8 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
boolean[] nullables) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
- this.fieldSeqComparators = fieldSeqComparators;
- this.fieldAggregators = fieldAggregators;
+ this.fieldSeqComparators =
getKeySortedListFromMap(fieldSeqComparators);
+ this.fieldAggregators = getKeySortedListFromMap(fieldAggregators);
this.fieldSequenceEnabled = fieldSequenceEnabled;
this.removeRecordOnDelete = removeRecordOnDelete;
this.sequenceGroupPartialDelete = sequenceGroupPartialDelete;
@@ -115,7 +117,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
this.notNullColumnFilled = false;
this.row = new GenericRow(getters.length);
this.latestSequenceNumber = 0;
- fieldAggregators.values().forEach(FieldAggregator::reset);
+ fieldAggregators.forEach(w -> w.getValue().reset());
}
@Override
@@ -188,23 +190,43 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
}
private void updateWithSequenceGroup(KeyValue kv) {
+
+ Iterator<WrapperWithFieldIndex<FieldsComparator>> comparatorIter =
+ fieldSeqComparators.iterator();
+ WrapperWithFieldIndex<FieldsComparator> curComparator =
+ comparatorIter.hasNext() ? comparatorIter.next() : null;
+ Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter =
fieldAggregators.iterator();
+ WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ?
aggIter.next() : null;
+
+ boolean[] isEmptySequenceGroup = new boolean[getters.length];
for (int i = 0; i < getters.length; i++) {
- Object field = getters[i].getFieldOrNull(kv.value());
- FieldsComparator seqComparator = fieldSeqComparators.get(i);
- FieldAggregator aggregator = fieldAggregators.get(i);
- Object accumulator = getters[i].getFieldOrNull(row);
+ FieldsComparator seqComparator = null;
+ if (curComparator != null && curComparator.fieldIndex == i) {
+ seqComparator = curComparator.getValue();
+ curComparator = comparatorIter.hasNext() ?
comparatorIter.next() : null;
+ }
+
+ FieldAggregator aggregator = null;
+ if (curAgg != null && curAgg.fieldIndex == i) {
+ aggregator = curAgg.getValue();
+ curAgg = aggIter.hasNext() ? aggIter.next() : null;
+ }
+
+ Object accumulator = row.getField(i);
if (seqComparator == null) {
+ Object field = getters[i].getFieldOrNull(kv.value());
if (aggregator != null) {
row.setField(i, aggregator.agg(accumulator, field));
} else if (field != null) {
row.setField(i, field);
}
} else {
- if (isEmptySequenceGroup(kv, seqComparator)) {
+ if (isEmptySequenceGroup(kv, seqComparator,
isEmptySequenceGroup)) {
// skip null sequence group
continue;
}
+ Object field = getters[i].getFieldOrNull(kv.value());
if (seqComparator.compare(kv.value(), row) >= 0) {
int index = i;
@@ -226,24 +248,53 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
}
}
- private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator
comparator) {
+ private boolean isEmptySequenceGroup(
+ KeyValue kv, FieldsComparator comparator, boolean[]
isEmptySequenceGroup) {
+
+ // If any flag of the sequence fields is set, it means the sequence
group is empty.
+ if (isEmptySequenceGroup[comparator.compareFields()[0]]) {
+ return true;
+ }
+
for (int fieldIndex : comparator.compareFields()) {
if (getters[fieldIndex].getFieldOrNull(kv.value()) != null) {
return false;
}
}
+ // Set the flag of all the sequence fields of the sequence group.
+ for (int fieldIndex : comparator.compareFields()) {
+ isEmptySequenceGroup[fieldIndex] = true;
+ }
+
return true;
}
private void retractWithSequenceGroup(KeyValue kv) {
Set<Integer> updatedSequenceFields = new HashSet<>();
-
+ Iterator<WrapperWithFieldIndex<FieldsComparator>> comparatorIter =
+ fieldSeqComparators.iterator();
+ WrapperWithFieldIndex<FieldsComparator> curComparator =
+ comparatorIter.hasNext() ? comparatorIter.next() : null;
+ Iterator<WrapperWithFieldIndex<FieldAggregator>> aggIter =
fieldAggregators.iterator();
+ WrapperWithFieldIndex<FieldAggregator> curAgg = aggIter.hasNext() ?
aggIter.next() : null;
+
+ boolean[] isEmptySequenceGroup = new boolean[getters.length];
for (int i = 0; i < getters.length; i++) {
- FieldsComparator seqComparator = fieldSeqComparators.get(i);
+ FieldsComparator seqComparator = null;
+ if (curComparator != null && curComparator.fieldIndex == i) {
+ seqComparator = curComparator.getValue();
+ curComparator = comparatorIter.hasNext() ?
comparatorIter.next() : null;
+ }
+
+ FieldAggregator aggregator = null;
+ if (curAgg != null && curAgg.fieldIndex == i) {
+ aggregator = curAgg.getValue();
+ curAgg = aggIter.hasNext() ? aggIter.next() : null;
+ }
+
if (seqComparator != null) {
- FieldAggregator aggregator = fieldAggregators.get(i);
- if (isEmptySequenceGroup(kv, seqComparator)) {
+ if (isEmptySequenceGroup(kv, seqComparator,
isEmptySequenceGroup)) {
// skip null sequence group
continue;
}
@@ -628,4 +679,33 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return aggFunc == null ? options.fieldsDefaultFunc() : aggFunc;
}
}
+
+ private <T> List<WrapperWithFieldIndex<T>>
getKeySortedListFromMap(Map<Integer, T> map) {
+ List<WrapperWithFieldIndex<T>> res = new ArrayList<>();
+ map.forEach(
+ (index, value) -> {
+ res.add(new WrapperWithFieldIndex<>(value, index));
+ });
+ Collections.sort(res);
+ return res;
+ }
+
+ private static class WrapperWithFieldIndex<T> implements
Comparable<WrapperWithFieldIndex<T>> {
+ private final T value;
+ private final int fieldIndex;
+
+ WrapperWithFieldIndex(T value, int fieldIndex) {
+ this.value = value;
+ this.fieldIndex = fieldIndex;
+ }
+
+ @Override
+ public int
compareTo(PartialUpdateMergeFunction.WrapperWithFieldIndex<T> o) {
+ return this.fieldIndex - o.fieldIndex;
+ }
+
+ public T getValue() {
+ return value;
+ }
+ }
}