This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b20b78e020 [core] Avoid the unnecessary key value copy (#5116)
b20b78e020 is described below
commit b20b78e02076bdc27a85570d8997435322d58919
Author: WenjunMin <[email protected]>
AuthorDate: Thu Feb 20 10:59:56 2025 +0800
[core] Avoid the unnecessary key value copy (#5116)
---
.../paimon/benchmark/TableWriterBenchmark.java | 18 ++++++++++
.../java/org/apache/paimon/KeyValueSerializer.java | 13 ++++++-
.../paimon/mergetree/SortBufferWriteBuffer.java | 8 +++--
.../compact/DeduplicateMergeFunction.java | 5 +++
.../mergetree/compact/FirstRowMergeFunction.java | 29 ++++++----------
.../LookupChangelogMergeFunctionWrapper.java | 8 +----
.../mergetree/compact/LookupMergeFunction.java | 40 ++++++----------------
.../paimon/mergetree/compact/MergeFunction.java | 2 ++
.../compact/PartialUpdateMergeFunction.java | 5 +++
.../compact/aggregate/AggregateMergeFunction.java | 5 +++
.../paimon/table/PrimaryKeyFileStoreTable.java | 4 +--
.../apache/paimon/table/PrimaryKeyTableUtils.java | 3 +-
.../java/org/apache/paimon/utils/OffsetRow.java | 4 +++
.../mergetree/SortBufferWriteBufferTestBase.java | 13 ++-----
.../LookupChangelogMergeFunctionWrapperTest.java | 30 +++-------------
.../mergetree/compact/SortMergeReaderTestBase.java | 10 +-----
.../paimon/operation/MergeFileSplitReadTest.java | 5 +++
.../paimon/flink/LookupChangelogWithAggITCase.java | 10 +++++-
18 files changed, 104 insertions(+), 108 deletions(-)
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
index d3624f9330..19349d4eee 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java
@@ -19,6 +19,7 @@
package org.apache.paimon.benchmark;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -80,6 +81,7 @@ public class TableWriterBenchmark extends TableBenchmark {
public void testParquet() throws Exception {
Options options = new Options();
options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.LOOKUP);
innerTest("parquet", options);
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
@@ -104,6 +106,21 @@ public class TableWriterBenchmark extends TableBenchmark {
*/
}
+ @Test
+ public void testParquetLookupCompaction() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.LOOKUP);
+ innerTest("parquet", options);
+ /*
+ * OpenJDK 64-Bit Server VM 11.0.24+0 on Mac OS X 14.5
+ * Apple M3 Pro
+ * parquet: Best/Avg Time(ms) Row Rate(K/s) Per
Row(ns) Relative
+ *
-------------------------------------------------------------------------------
+ * parquet_write 31918 / 32666 94.0 10639.3
1.0X
+ */
+ }
+
public void innerTest(String name, Options options) throws Exception {
options.set(CoreOptions.BUCKET, 1);
Table table = createTable(options, "T");
@@ -119,6 +136,7 @@ public class TableWriterBenchmark extends TableBenchmark {
() -> {
BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
+ write.withIOManager(new
IOManagerImpl(tempFile.toString()));
BatchTableCommit commit = writeBuilder.newCommit();
for (int i = 0; i < valuesPerIteration; i++) {
try {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java
index 34fd9b1405..13a172e49a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueSerializer.java
@@ -36,6 +36,7 @@ public class KeyValueSerializer extends
ObjectSerializer<KeyValue> {
private static final long serialVersionUID = 1L;
private final int keyArity;
+ private final int valueArity;
private final GenericRow reusedMeta;
private final JoinedRow reusedKeyWithMeta;
@@ -49,7 +50,7 @@ public class KeyValueSerializer extends
ObjectSerializer<KeyValue> {
super(KeyValue.schema(keyType, valueType));
this.keyArity = keyType.getFieldCount();
- int valueArity = valueType.getFieldCount();
+ this.valueArity = valueType.getFieldCount();
this.reusedMeta = new GenericRow(2);
this.reusedKeyWithMeta = new JoinedRow();
@@ -85,4 +86,14 @@ public class KeyValueSerializer extends
ObjectSerializer<KeyValue> {
public KeyValue getReusedKv() {
return reusedKv;
}
+
+ public KeyValue getCopiedKv() {
+ InternalRow row = rowSerializer.copy(reusedKey.getOriginalRow());
+ return new KeyValue()
+ .replace(
+ new OffsetRow(keyArity, 0).replace(row),
+ reusedKv.sequenceNumber(),
+ reusedKv.valueKind(),
+ new OffsetRow(valueArity, keyArity + 2).replace(row));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
index 433ff2158b..3c60c10bb9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java
@@ -178,6 +178,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
private final MutableObjectIterator<BinaryRow> kvIter;
private final Comparator<InternalRow> keyComparator;
private final ReducerMergeFunctionWrapper mergeFunctionWrapper;
+ private final boolean requireCopy;
// previously read kv
private KeyValueSerializer previous;
@@ -199,6 +200,7 @@ public class SortBufferWriteBuffer implements WriteBuffer {
this.kvIter = kvIter;
this.keyComparator = keyComparator;
this.mergeFunctionWrapper = new
ReducerMergeFunctionWrapper(mergeFunction);
+ this.requireCopy = mergeFunction.requireCopy();
int totalFieldCount = keyType.getFieldCount() + 2 +
valueType.getFieldCount();
this.previous = new KeyValueSerializer(keyType, valueType);
@@ -235,7 +237,8 @@ public class SortBufferWriteBuffer implements WriteBuffer {
return;
}
mergeFunctionWrapper.reset();
- mergeFunctionWrapper.add(previous.getReusedKv());
+ mergeFunctionWrapper.add(
+ requireCopy ? previous.getCopiedKv() :
previous.getReusedKv());
while (readOnce()) {
if (keyComparator.compare(
@@ -243,7 +246,8 @@ public class SortBufferWriteBuffer implements WriteBuffer {
!= 0) {
break;
}
- mergeFunctionWrapper.add(current.getReusedKv());
+ mergeFunctionWrapper.add(
+ requireCopy ? current.getCopiedKv() :
current.getReusedKv());
swapSerializers();
}
result = mergeFunctionWrapper.getResult();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index bad9b29160..5422f5469e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -58,6 +58,11 @@ public class DeduplicateMergeFunction implements
MergeFunction<KeyValue> {
return latestKv;
}
+ @Override
+ public boolean requireCopy() {
+ return false;
+ }
+
public static MergeFunctionFactory<KeyValue> factory() {
return new Factory(false);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index b3d9b8661e..d795f16bbd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -20,9 +20,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.Options;
-import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -32,15 +30,11 @@ import javax.annotation.Nullable;
*/
public class FirstRowMergeFunction implements MergeFunction<KeyValue> {
- private final InternalRowSerializer keySerializer;
- private final InternalRowSerializer valueSerializer;
private KeyValue first;
public boolean containsHighLevel;
private final boolean ignoreDelete;
- protected FirstRowMergeFunction(RowType keyType, RowType valueType,
boolean ignoreDelete) {
- this.keySerializer = new InternalRowSerializer(keyType);
- this.valueSerializer = new InternalRowSerializer(valueType);
+ protected FirstRowMergeFunction(boolean ignoreDelete) {
this.ignoreDelete = ignoreDelete;
}
@@ -65,7 +59,7 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
}
if (first == null) {
- this.first = kv.copy(keySerializer, valueSerializer);
+ this.first = kv;
}
if (kv.level() > 0) {
containsHighLevel = true;
@@ -77,28 +71,27 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
return first;
}
- public static MergeFunctionFactory<KeyValue> factory(
- Options options, RowType keyType, RowType valueType) {
- return new FirstRowMergeFunction.Factory(
- keyType, valueType, options.get(CoreOptions.IGNORE_DELETE));
+ @Override
+ public boolean requireCopy() {
+ return true;
+ }
+
+ public static MergeFunctionFactory<KeyValue> factory(Options options) {
+ return new
FirstRowMergeFunction.Factory(options.get(CoreOptions.IGNORE_DELETE));
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
private static final long serialVersionUID = 1L;
- private final RowType keyType;
- private final RowType valueType;
private final boolean ignoreDelete;
- public Factory(RowType keyType, RowType valueType, boolean
ignoreDelete) {
- this.keyType = keyType;
- this.valueType = valueType;
+ public Factory(boolean ignoreDelete) {
this.ignoreDelete = ignoreDelete;
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new FirstRowMergeFunction(keyType, valueType, ignoreDelete);
+ return new FirstRowMergeFunction(ignoreDelete);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 450df52314..c327474fa8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -21,7 +21,6 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
@@ -69,8 +68,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
private final Comparator<KeyValue> comparator;
private final LinkedList<KeyValue> candidates = new LinkedList<>();
- private final InternalRowSerializer keySerializer;
- private final InternalRowSerializer valueSerializer;
public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -89,9 +86,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
deletionVectorsMaintainer != null,
"deletionVectorsMaintainer should not be null, there is a
bug.");
}
- LookupMergeFunction lookupMergeFunction = (LookupMergeFunction)
mergeFunction;
- this.keySerializer = lookupMergeFunction.getKeySerializer();
- this.valueSerializer = lookupMergeFunction.getValueSerializer();
this.mergeFunction = mergeFunctionFactory.create();
this.lookup = lookup;
this.valueEqualiser = valueEqualiser;
@@ -107,7 +101,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
@Override
public void add(KeyValue kv) {
- candidates.add(kv.copy(keySerializer, valueSerializer));
+ candidates.add(kv);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index a3a6af23cb..6f999297aa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -19,9 +19,6 @@
package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Projection;
import javax.annotation.Nullable;
@@ -37,14 +34,9 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
private final MergeFunction<KeyValue> mergeFunction;
private final LinkedList<KeyValue> candidates = new LinkedList<>();
- private final InternalRowSerializer keySerializer;
- private final InternalRowSerializer valueSerializer;
- public LookupMergeFunction(
- MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType
valueType) {
+ public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
this.mergeFunction = mergeFunction;
- this.keySerializer = new InternalRowSerializer(keyType);
- this.valueSerializer = new InternalRowSerializer(valueType);
}
@Override
@@ -54,15 +46,7 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
- candidates.add(kv.copy(keySerializer, valueSerializer));
- }
-
- public InternalRowSerializer getKeySerializer() {
- return keySerializer;
- }
-
- public InternalRowSerializer getValueSerializer() {
- return valueSerializer;
+ candidates.add(kv);
}
@Override
@@ -87,14 +71,18 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
return mergeFunction.getResult();
}
- public static MergeFunctionFactory<KeyValue> wrap(
- MergeFunctionFactory<KeyValue> wrapped, RowType keyType, RowType
valueType) {
+ @Override
+ public boolean requireCopy() {
+ return true;
+ }
+
+ public static MergeFunctionFactory<KeyValue>
wrap(MergeFunctionFactory<KeyValue> wrapped) {
if (wrapped.create() instanceof FirstRowMergeFunction) {
// don't wrap first row, it is already OK
return wrapped;
}
- return new Factory(wrapped, keyType, valueType);
+ return new Factory(wrapped);
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -102,20 +90,14 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
private static final long serialVersionUID = 1L;
private final MergeFunctionFactory<KeyValue> wrapped;
- private final RowType keyType;
- private final RowType rowType;
- private Factory(MergeFunctionFactory<KeyValue> wrapped, RowType
keyType, RowType rowType) {
+ private Factory(MergeFunctionFactory<KeyValue> wrapped) {
this.wrapped = wrapped;
- this.keyType = keyType;
- this.rowType = rowType;
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- RowType valueType =
- projection == null ? rowType :
Projection.of(projection).project(rowType);
- return new LookupMergeFunction(wrapped.create(projection),
keyType, valueType);
+ return new LookupMergeFunction(wrapped.create(projection));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
index dc2c9e4580..46240b038a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunction.java
@@ -44,4 +44,6 @@ public interface MergeFunction<T> {
/** Get current merged value. */
T getResult();
+
+ boolean requireCopy();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3ce51127b1..2497de0893 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -273,6 +273,11 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return reused.replace(currentKey, latestSequenceNumber, rowKind, row);
}
+ @Override
+ public boolean requireCopy() {
+ return false;
+ }
+
public static MergeFunctionFactory<KeyValue> factory(
Options options, RowType rowType, List<String> primaryKeys) {
return new Factory(options, rowType, primaryKeys);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index 9af786ae6c..7b18ac77f3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -96,6 +96,11 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
return reused.replace(latestKv.key(), latestKv.sequenceNumber(),
RowKind.INSERT, row);
}
+ @Override
+ public boolean requireCopy() {
+ return false;
+ }
+
public static MergeFunctionFactory<KeyValue> factory(
Options conf,
List<String> tableNames,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 516ae766ce..ea71204dc9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -79,9 +79,7 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable
{
MergeFunctionFactory<KeyValue> mfFactory =
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
if (options.needLookup()) {
- mfFactory =
- LookupMergeFunction.wrap(
- mfFactory, new
RowType(extractor.keyFields(tableSchema)), rowType);
+ mfFactory = LookupMergeFunction.wrap(mfFactory);
}
lazyStore =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index d156d23a91..a47e44718e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -69,8 +69,7 @@ public class PrimaryKeyTableUtils {
rowType.getFieldTypes(),
tableSchema.primaryKeys());
case FIRST_ROW:
- return FirstRowMergeFunction.factory(
- conf, new RowType(extractor.keyFields(tableSchema)),
rowType);
+ return FirstRowMergeFunction.factory(conf);
default:
throw new UnsupportedOperationException("Unsupported merge
engine: " + mergeEngine);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java
b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java
index 822488f271..02da5a15be 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/OffsetRow.java
@@ -41,6 +41,10 @@ public class OffsetRow implements InternalRow {
this.offset = offset;
}
+ public InternalRow getOriginalRow() {
+ return row;
+ }
+
public OffsetRow replace(InternalRow row) {
this.row = row;
return this;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 27606ee33f..8169d6a840 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -43,7 +43,6 @@ import org.apache.paimon.utils.ReusingTestData;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import java.io.EOFException;
@@ -234,11 +233,7 @@ public abstract class SortBufferWriteBufferTestBase {
Collections.singletonList("value"),
Collections.singletonList(DataTypes.BIGINT()),
Collections.emptyList());
- return LookupMergeFunction.wrap(
- aggMergeFunction,
- RowType.of(DataTypes.INT()),
- RowType.of(DataTypes.BIGINT()))
- .create();
+ return LookupMergeFunction.wrap(aggMergeFunction).create();
}
}
@@ -257,11 +252,7 @@ public abstract class SortBufferWriteBufferTestBase {
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
- return FirstRowMergeFunction.factory(
- new Options(),
- new RowType(Lists.list(new DataField(0, "f0", new
IntType()))),
- new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))))
- .create();
+ return FirstRowMergeFunction.factory(new Options()).create();
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 28cb4c099a..d44264c6f9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -28,17 +28,14 @@ import
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.paimon.utils.ValueEqualiserSupplier;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -69,10 +66,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
Map<InternalRow, KeyValue> highLevel = new HashMap<>();
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
- LookupMergeFunction.wrap(
- DeduplicateMergeFunction.factory(),
- RowType.of(DataTypes.INT()),
- RowType.of(DataTypes.INT())),
+
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
highLevel::get,
changelogRowDeduplicate ? EQUALISER : null,
LookupStrategy.from(false, true, false, false),
@@ -234,10 +228,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
ValueEqualiserSupplier.fromIgnoreFields(valueType,
ignoreFields);
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
- LookupMergeFunction.wrap(
- DeduplicateMergeFunction.factory(),
- RowType.of(DataTypes.INT()),
- valueType),
+
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
highLevel::get,
logDedupEqualSupplier.get(),
LookupStrategy.from(false, true, false, false),
@@ -295,9 +286,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
new FieldAggregator[] {
new FieldSumAggFactory()
.create(DataTypes.INT(), null, null)
- }),
- RowType.of(DataTypes.INT()),
- RowType.of(DataTypes.INT())),
+ })),
key -> null,
changelogRowDeduplicate ? EQUALISER : null,
LookupStrategy.from(false, true, false, false),
@@ -384,9 +373,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
new FieldAggregator[] {
new
FieldLastValueAggFactory()
.create(DataTypes.INT(), null, null)
- }),
- RowType.of(DataTypes.INT()),
- RowType.of(DataTypes.INT())),
+ })),
highLevel::get,
null,
LookupStrategy.from(false, true, false, false),
@@ -454,14 +441,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
Set<InternalRow> highLevel = new HashSet<>();
FirstRowMergeFunctionWrapper function =
new FirstRowMergeFunctionWrapper(
- projection ->
- new FirstRowMergeFunction(
- new RowType(
- Lists.list(new DataField(0,
"f0", new IntType()))),
- new RowType(
- Lists.list(new DataField(1,
"f1", new IntType()))),
- false),
- highLevel::contains);
+ projection -> new FirstRowMergeFunction(false),
highLevel::contains);
// Without level-0
function.reset();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 81e49b81d2..50eebd5007 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -22,14 +22,9 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReusingTestData;
import org.apache.paimon.utils.TestReusingRecordReader;
-import org.assertj.core.util.Lists;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -127,10 +122,7 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
- RowType keyType = new RowType(Lists.list(new DataField(0, "f0",
new IntType())));
- RowType valueType = new RowType(Lists.list(new DataField(1, "f1",
new BigIntType())));
- return new LookupMergeFunction(
- new FirstRowMergeFunction(keyType, valueType, false),
keyType, valueType);
+ return new LookupMergeFunction(new FirstRowMergeFunction(false));
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 59f848a296..44a02699e4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -344,6 +344,11 @@ public class MergeFileSplitReadTest {
GenericRow.of(total));
}
+ @Override
+ public boolean requireCopy() {
+ return false;
+ }
+
private long count(InternalRow value) {
checkArgument(!value.isNullAt(0), "Value count should not be
null.");
return value.getLong(0);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
index f4c554aa1c..e288f8bcbf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/LookupChangelogWithAggITCase.java
@@ -90,10 +90,18 @@ public class LookupChangelogWithAggITCase extends
CatalogITCaseBase {
+ "'fields.v.aggregate-function'='sum')");
BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T");
+ // merge by sort buffer
sql("INSERT INTO T VALUES (1, 1), (2, 2), (1, 3), (1, 4), (1, 5)");
assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1,
13), Row.of(2, 2));
-
iterator.close();
+
+ // merge by compaction
+ sql("INSERT INTO T VALUES (1, 3), (2, 2), (3, 1)");
+ sql("INSERT INTO T VALUES (1, 2), (2, 2), (3, 2)");
+ sql("INSERT INTO T VALUES (1, 1), (2, 2), (3, 3)");
+
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, 19), Row.of(2, 8),
Row.of(3, 6));
}
@Test