This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9349579d9b [core] Fix aggregate delete bug and refactor
SortBufferWriteBufferTestBase (#5414)
9349579d9b is described below
commit 9349579d9bc787492ee62723b596b1208c33dbd4
Author: yuzelin <[email protected]>
AuthorDate: Tue Apr 8 17:16:09 2025 +0800
[core] Fix aggregate delete bug and refactor SortBufferWriteBufferTestBase
(#5414)
---
.../compact/aggregate/AggregateMergeFunction.java | 40 +++++-------
.../mergetree/SortBufferWriteBufferTestBase.java | 75 ++++++++++++++++------
.../mergetree/compact/MergeFunctionTestUtils.java | 52 +++++++++++++--
.../paimon/table/PrimaryKeySimpleTableTest.java | 4 +-
.../ValueContentRowDataRecordIteratorTest.java | 7 +-
.../org/apache/paimon/utils/ReusingKeyValue.java | 9 ++-
.../org/apache/paimon/utils/ReusingTestData.java | 17 ++++-
7 files changed, 144 insertions(+), 60 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index f138afb6a9..01e8706168 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -57,7 +57,6 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
private KeyValue reused;
private boolean currentDeleteRow;
private final boolean removeRecordOnDelete;
- private boolean notNullColumnFilled;
public AggregateMergeFunction(
InternalRow.FieldGetter[] getters,
@@ -73,7 +72,6 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void reset() {
this.latestKv = null;
- this.notNullColumnFilled = false;
this.row = new GenericRow(getters.length);
Arrays.stream(aggregators).forEach(FieldAggregator::reset);
this.currentDeleteRow = false;
@@ -82,18 +80,15 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
latestKv = kv;
- boolean isRetract =
- kv.valueKind() != RowKind.INSERT && kv.valueKind() !=
RowKind.UPDATE_AFTER;
- currentDeleteRow = removeRecordOnDelete && isRetract;
+ currentDeleteRow = removeRecordOnDelete && kv.valueKind() ==
RowKind.DELETE;
if (currentDeleteRow) {
- if (!notNullColumnFilled) {
- initRow(row, kv.value());
- notNullColumnFilled = true;
- }
+ row = new GenericRow(getters.length);
+ initRow(row, kv.value());
return;
}
+ boolean isRetract = kv.valueKind().isRetract();
for (int i = 0; i < getters.length; i++) {
FieldAggregator fieldAggregator = aggregators[i];
Object accumulator = getters[i].getFieldOrNull(row);
@@ -104,7 +99,6 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
: fieldAggregator.agg(accumulator, inputField);
row.setField(i, mergedField);
}
- notNullColumnFilled = true;
}
private void initRow(GenericRow row, InternalRow value) {
@@ -140,10 +134,10 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
public static MergeFunctionFactory<KeyValue> factory(
Options conf,
- List<String> tableNames,
- List<DataType> tableTypes,
+ List<String> fieldNames,
+ List<DataType> fieldTypes,
List<String> primaryKeys) {
- return new Factory(conf, tableNames, tableTypes, primaryKeys);
+ return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -151,31 +145,31 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
private static final long serialVersionUID = 1L;
private final CoreOptions options;
- private final List<String> tableNames;
- private final List<DataType> tableTypes;
+ private final List<String> fieldNames;
+ private final List<DataType> fieldTypes;
private final List<String> primaryKeys;
private final boolean removeRecordOnDelete;
private Factory(
Options conf,
- List<String> tableNames,
- List<DataType> tableTypes,
+ List<String> fieldNames,
+ List<DataType> fieldTypes,
List<String> primaryKeys) {
this.options = new CoreOptions(conf);
- this.tableNames = tableNames;
- this.tableTypes = tableTypes;
+ this.fieldNames = fieldNames;
+ this.fieldTypes = fieldTypes;
this.primaryKeys = primaryKeys;
this.removeRecordOnDelete =
options.aggregationRemoveRecordOnDelete();
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- List<String> fieldNames = tableNames;
- List<DataType> fieldTypes = tableTypes;
+ List<String> fieldNames = this.fieldNames;
+ List<DataType> fieldTypes = this.fieldTypes;
if (projection != null) {
Projection project = Projection.of(projection);
- fieldNames = project.project(tableNames);
- fieldTypes = project.project(tableTypes);
+ fieldNames = project.project(fieldNames);
+ fieldTypes = project.project(fieldTypes);
}
FieldAggregator[] fieldAggregators = new
FieldAggregator[fieldNames.size()];
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 8169d6a840..ee34ac2e71 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.compression.CompressOptions;
@@ -33,10 +34,7 @@ import
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReusingKeyValue;
import org.apache.paimon.utils.ReusingTestData;
@@ -47,10 +45,12 @@ import org.junit.jupiter.api.Test;
import java.io.EOFException;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.utils.Preconditions.checkState;
import static org.assertj.core.api.Assertions.assertThat;
@@ -63,9 +63,11 @@ public abstract class SortBufferWriteBufferTestBase {
protected final SortBufferWriteBuffer table =
new SortBufferWriteBuffer(
- new RowType(Collections.singletonList(new DataField(0,
"key", new IntType()))),
- new RowType(
- Collections.singletonList(new DataField(1,
"value", new BigIntType()))),
+ RowType.builder().field("key_f0", DataTypes.INT()).build(),
+ RowType.builder()
+ .field("f0", DataTypes.INT())
+ .field("f1", DataTypes.BIGINT())
+ .build(),
null,
new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024),
false,
@@ -165,21 +167,30 @@ public abstract class SortBufferWriteBufferTestBase {
/** Test for {@link SortBufferWriteBuffer} with {@link
PartialUpdateMergeFunction}. */
public static class WithPartialUpdateMergeFunctionTest extends
SortBufferWriteBufferTestBase {
+ private final boolean addOnly;
+
+ private WithPartialUpdateMergeFunctionTest() {
+ this.addOnly = ThreadLocalRandom.current().nextBoolean();
+ }
+
@Override
protected boolean addOnly() {
- return true;
+ return addOnly;
}
@Override
protected List<ReusingTestData> getExpected(List<ReusingTestData>
input) {
- return MergeFunctionTestUtils.getExpectedForPartialUpdate(input);
+ return MergeFunctionTestUtils.getExpectedForPartialUpdate(input,
addOnly);
}
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
Options options = new Options();
+ options.set(CoreOptions.IGNORE_DELETE, !addOnly);
return PartialUpdateMergeFunction.factory(
- options, RowType.of(DataTypes.BIGINT()),
ImmutableList.of("f0"))
+ options,
+ RowType.of(DataTypes.INT().notNull(),
DataTypes.BIGINT()),
+ ImmutableList.of("f0"))
.create();
}
}
@@ -187,25 +198,36 @@ public abstract class SortBufferWriteBufferTestBase {
/** Test for {@link SortBufferWriteBuffer} with {@link
AggregateMergeFunction}. */
public static class WithAggMergeFunctionTest extends
SortBufferWriteBufferTestBase {
+ private final boolean addOnly;
+ private final boolean removeRecordOnDelete;
+
+ private WithAggMergeFunctionTest() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ this.addOnly = random.nextBoolean();
+ this.removeRecordOnDelete = !addOnly && random.nextBoolean();
+ }
+
@Override
protected boolean addOnly() {
- return false;
+ return addOnly;
}
@Override
protected List<ReusingTestData> getExpected(List<ReusingTestData>
input) {
- return MergeFunctionTestUtils.getExpectedForAggSum(input);
+ return MergeFunctionTestUtils.getExpectedForAggSum(
+ input, addOnly, removeRecordOnDelete);
}
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
Options options = new Options();
- options.set("fields.value.aggregate-function", "sum");
+ options.set("fields.f1.aggregate-function", "sum");
+ options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE,
removeRecordOnDelete);
return AggregateMergeFunction.factory(
options,
- Collections.singletonList("value"),
- Collections.singletonList(DataTypes.BIGINT()),
- Collections.emptyList())
+ Arrays.asList("f0", "f1"),
+ Arrays.asList(DataTypes.INT().notNull(),
DataTypes.BIGINT()),
+ Collections.singletonList("f0"))
.create();
}
}
@@ -213,26 +235,37 @@ public abstract class SortBufferWriteBufferTestBase {
/** Test for {@link SortBufferWriteBuffer} with {@link
LookupMergeFunction}. */
public static class WithLookupFunctionTest extends
SortBufferWriteBufferTestBase {
+ private final boolean addOnly;
+ private final boolean removeRecordOnDelete;
+
+ private WithLookupFunctionTest() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ this.addOnly = random.nextBoolean();
+ this.removeRecordOnDelete = !addOnly && random.nextBoolean();
+ }
+
@Override
protected boolean addOnly() {
- return false;
+ return addOnly;
}
@Override
protected List<ReusingTestData> getExpected(List<ReusingTestData>
input) {
- return MergeFunctionTestUtils.getExpectedForAggSum(input);
+ return MergeFunctionTestUtils.getExpectedForAggSum(
+ input, addOnly, removeRecordOnDelete);
}
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
Options options = new Options();
- options.set("fields.value.aggregate-function", "sum");
+ options.set("fields.f1.aggregate-function", "sum");
+ options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE,
removeRecordOnDelete);
MergeFunctionFactory<KeyValue> aggMergeFunction =
AggregateMergeFunction.factory(
options,
- Collections.singletonList("value"),
- Collections.singletonList(DataTypes.BIGINT()),
- Collections.emptyList());
+ Arrays.asList("f0", "f1"),
+ Arrays.asList(DataTypes.INT().notNull(),
DataTypes.BIGINT()),
+ Collections.singletonList("f0"));
return LookupMergeFunction.wrap(aggMergeFunction).create();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
index 97410c7171..d52d1fa1f8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
@@ -46,7 +46,8 @@ public class MergeFunctionTestUtils {
return expected;
}
- public static List<ReusingTestData>
getExpectedForPartialUpdate(List<ReusingTestData> input) {
+ public static List<ReusingTestData> getExpectedForPartialUpdate(
+ List<ReusingTestData> input, boolean addOnly) {
input = new ArrayList<>(input);
Collections.sort(input);
@@ -60,17 +61,31 @@ public class MergeFunctionTestUtils {
if (group.size() == 1) {
// due to ReducerMergeFunctionWrapper
expected.add(group.get(0));
+ } else if (addOnly) {
+ // get the final value
+ expected.add(group.get(group.size() - 1));
} else {
- group.stream()
- .filter(d -> d.valueKind.isAdd())
- .reduce((first, second) -> second)
- .ifPresent(expected::add);
+ if (group.stream().noneMatch(data -> data.valueKind ==
RowKind.INSERT)) {
+ // No insert: fill the pk and left nullable fields to
null; sequenceNumber =
+ // latest
+ ReusingTestData last = group.get(group.size() - 1);
+ expected.add(
+ new ReusingTestData(
+ last.key, last.sequenceNumber,
RowKind.DELETE, null));
+ } else {
+ // get the last INSERT data because later DELETE data are
ignored
+ group.stream()
+ .filter(d -> d.valueKind.isAdd())
+ .reduce((first, second) -> second)
+ .ifPresent(expected::add);
+ }
}
}
return expected;
}
- public static List<ReusingTestData>
getExpectedForAggSum(List<ReusingTestData> input) {
+ public static List<ReusingTestData> getExpectedForAggSum(
+ List<ReusingTestData> input, boolean addOnly, boolean
removeRecordOndelete) {
input = new ArrayList<>(input);
Collections.sort(input);
@@ -84,7 +99,7 @@ public class MergeFunctionTestUtils {
if (group.size() == 1) {
// due to ReducerMergeFunctionWrapper
expected.add(group.get(0));
- } else {
+ } else if (addOnly || !removeRecordOndelete) {
long sum =
group.stream()
.mapToLong(d -> d.valueKind.isAdd() ? d.value
: -d.value)
@@ -92,6 +107,29 @@ public class MergeFunctionTestUtils {
ReusingTestData last = group.get(group.size() - 1);
expected.add(
new ReusingTestData(last.key, last.sequenceNumber,
RowKind.INSERT, sum));
+ } else {
+ if (group.stream().noneMatch(data -> data.valueKind ==
RowKind.INSERT)) {
+ // No insert: fill the pk and left nullable fields to
null; sequenceNumber =
+ // latest
+ ReusingTestData last = group.get(group.size() - 1);
+ expected.add(
+ new ReusingTestData(
+ last.key, last.sequenceNumber,
RowKind.DELETE, null));
+ } else {
+ RowKind rowKind = null;
+ Long sum = null;
+ for (ReusingTestData data : group) {
+ if (data.valueKind == RowKind.INSERT) {
+ rowKind = RowKind.INSERT;
+ sum = sum == null ? data.value : sum + data.value;
+ } else {
+ rowKind = RowKind.DELETE;
+ sum = null;
+ }
+ }
+ ReusingTestData last = group.get(group.size() - 1);
+ expected.add(new ReusingTestData(last.key,
last.sequenceNumber, rowKind, sum));
+ }
}
}
return expected;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index c04b20cc6e..cb0767704f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -1298,11 +1298,11 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
- // 2. Update Before
+ // 2. Update Before: retract
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowToString);
- assertThat(result).isEmpty();
+ assertThat(result).containsExactly("+I[1, 1, NULL, NULL]");
// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
index 6b5f3a7419..e1fbafcd15 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/ValueContentRowDataRecordIteratorTest.java
@@ -35,6 +35,8 @@ public class ValueContentRowDataRecordIteratorTest extends
RowDataRecordIterator
public void testIterator() throws Exception {
List<ReusingTestData> input =
ReusingTestData.parse("1, 1, +, 100 | 2, 2, +, 200 | 1, 3, -,
100 | 2, 4, +, 300");
+ // to check ReusingTestData.key
+ List<Integer> expectedKeys = Arrays.asList(1, 2, 1, 2);
List<Long> expectedValues = Arrays.asList(100L, 200L, 100L, 300L);
List<RowKind> expectedRowKinds =
Arrays.asList(RowKind.INSERT, RowKind.INSERT, RowKind.DELETE,
RowKind.INSERT);
@@ -43,8 +45,9 @@ public class ValueContentRowDataRecordIteratorTest extends
RowDataRecordIterator
input,
ValueContentRowDataRecordIterator::new,
(rowData, idx) -> {
- assertThat(rowData.getFieldCount()).isEqualTo(1);
-
assertThat(rowData.getLong(0)).isEqualTo(expectedValues.get(idx));
+ assertThat(rowData.getFieldCount()).isEqualTo(2);
+
assertThat(rowData.getInt(0)).isEqualTo(expectedKeys.get(idx));
+
assertThat(rowData.getLong(1)).isEqualTo(expectedValues.get(idx));
assertThat(rowData.getRowKind()).isEqualTo(expectedRowKinds.get(idx));
});
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java
index 2e37c6b9a7..33423cf4c8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingKeyValue.java
@@ -37,7 +37,7 @@ public class ReusingKeyValue {
public ReusingKeyValue() {
this.key = new BinaryRow(1);
this.keyWriter = new BinaryRowWriter(key);
- this.value = new BinaryRow(1);
+ this.value = new BinaryRow(2);
this.valueWriter = new BinaryRowWriter(value);
this.kv = new KeyValue();
}
@@ -45,7 +45,12 @@ public class ReusingKeyValue {
public KeyValue update(ReusingTestData data) {
keyWriter.writeInt(0, data.key);
keyWriter.complete();
- valueWriter.writeLong(0, data.value);
+ valueWriter.writeInt(0, data.key);
+ if (data.value == null) {
+ valueWriter.setNullAt(1);
+ } else {
+ valueWriter.writeLong(1, data.value);
+ }
valueWriter.complete();
return kv.replace(key, data.sequenceNumber, data.valueKind, value);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java
index b96eef5bae..2c4b159ebd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ReusingTestData.java
@@ -21,6 +21,8 @@ package org.apache.paimon.utils;
import org.apache.paimon.KeyValue;
import org.apache.paimon.types.RowKind;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -39,9 +41,9 @@ public class ReusingTestData implements
Comparable<ReusingTestData> {
public final int key;
public final long sequenceNumber;
public final RowKind valueKind;
- public final long value;
+ @Nullable public final Long value;
- public ReusingTestData(int key, long sequenceNumber, RowKind valueKind,
long value) {
+ public ReusingTestData(int key, long sequenceNumber, RowKind valueKind,
@Nullable Long value) {
this.key = key;
this.sequenceNumber = sequenceNumber;
this.valueKind = valueKind;
@@ -60,11 +62,20 @@ public class ReusingTestData implements
Comparable<ReusingTestData> {
return result;
}
+ public String toString() {
+ return String.format("%d,%d,%s,%d", key, sequenceNumber, valueKind,
value);
+ }
+
public void assertEquals(KeyValue kv) {
assertThat(kv.key().getInt(0)).isEqualTo(key);
assertThat(kv.sequenceNumber()).isEqualTo(sequenceNumber);
assertThat(kv.valueKind()).isEqualTo(valueKind);
- assertThat(kv.value().getLong(0)).isEqualTo(value);
+ assertThat(kv.value().getInt(0)).isEqualTo(key);
+ if (kv.value().isNullAt(1)) {
+ assertThat(value).isNull();
+ } else {
+ assertThat(kv.value().getLong(1)).isEqualTo(value);
+ }
}
/**