This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6f720642e9 [core] Refactor MergeFunctionFactory to use read rowType
(#7059)
6f720642e9 is described below
commit 6f720642e97ea83e2320e7d3812b104b20925639
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 16 13:07:07 2026 +0800
[core] Refactor MergeFunctionFactory to use read rowType (#7059)
---
.../java/org/apache/paimon/utils/ProjectedRow.java | 39 ------
.../compact/DeduplicateMergeFunction.java | 5 +-
.../mergetree/compact/FirstRowMergeFunction.java | 5 +-
.../mergetree/compact/LookupMergeFunction.java | 10 +-
.../mergetree/compact/MergeFunctionFactory.java | 23 +--
.../compact/PartialUpdateMergeFunction.java | 131 ++++++++----------
.../compact/aggregate/AggregateMergeFunction.java | 35 ++---
.../paimon/operation/MergeFileSplitRead.java | 99 ++++++-------
.../paimon/table/PrimaryKeyFileStoreTable.java | 2 +-
.../apache/paimon/table/PrimaryKeyTableUtils.java | 8 +-
.../mergetree/SortBufferWriteBufferTestBase.java | 20 ++-
.../compact/PartialUpdateMergeFunctionTest.java | 104 +++++---------
.../aggregate/AggregateMergeFunctionTest.java | 39 ++++--
.../paimon/operation/MergeFileSplitReadTest.java | 2 +-
.../paimon/flink/sink/LocalMergeOperator.java | 22 +--
.../apache/paimon/spark/sql/VariantTestBase.scala | 154 +++++++++++++++++++++
16 files changed, 358 insertions(+), 340 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
index f9b78a8bc6..18c7e5db79 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
@@ -188,32 +188,6 @@ public class ProjectedRow implements InternalRow {
+ '}';
}
- /**
- * Like {@link #from(int[])}, but throws {@link IllegalArgumentException}
if the provided {@code
- * projection} array contains nested projections, which are not supported
by {@link
- * ProjectedRow}.
- *
- * <p>The array represents the mapping of the fields of the original
{@link DataType}, including
- * nested rows. For example, {@code [[0, 2, 1], ...]} specifies to include
the 2nd field of the
- * 3rd field of the 1st field in the top-level row.
- *
- * @see Projection
- * @see ProjectedRow
- */
- public static ProjectedRow from(int[][] projection) throws
IllegalArgumentException {
- return new ProjectedRow(
- Arrays.stream(projection)
- .mapToInt(
- arr -> {
- if (arr.length != 1) {
- throw new IllegalArgumentException(
- "ProjectedRowData doesn't
support nested projections");
- }
- return arr[0];
- })
- .toArray());
- }
-
/**
* Create an empty {@link ProjectedRow} starting from a {@code projection}
array.
*
@@ -234,17 +208,4 @@ public class ProjectedRow implements InternalRow {
.mapToInt(field ->
tableType.getFieldIndexByFieldId(field.id()))
.toArray());
}
-
- /**
- * Create an empty {@link ProjectedRow} starting from a {@link Projection}.
- *
- * <p>Throws {@link IllegalStateException} if the provided {@code
projection} array contains
- * nested projections, which are not supported by {@link ProjectedRow}.
- *
- * @see Projection
- * @see ProjectedRow
- */
- public static ProjectedRow from(Projection projection) {
- return new ProjectedRow(projection.toTopLevelIndexes());
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index 5422f5469e..50fdec2ad9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -73,7 +74,7 @@ public class DeduplicateMergeFunction implements
MergeFunction<KeyValue> {
private static class Factory implements MergeFunctionFactory<KeyValue> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final boolean ignoreDelete;
@@ -82,7 +83,7 @@ public class DeduplicateMergeFunction implements
MergeFunction<KeyValue> {
}
@Override
- public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
return new DeduplicateMergeFunction(ignoreDelete);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index 4b9e4a6ff1..1815d1f5df 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -82,7 +83,7 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
private static class Factory implements MergeFunctionFactory<KeyValue> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final boolean ignoreDelete;
public Factory(boolean ignoreDelete) {
@@ -90,7 +91,7 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
}
@Override
- public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
return new FirstRowMergeFunction(ignoreDelete);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 1bd9aaa843..cb494b86d0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -143,7 +143,7 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
/** Factory to create {@link LookupMergeFunction}. */
public static class Factory implements MergeFunctionFactory<KeyValue> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final MergeFunctionFactory<KeyValue> wrapped;
private final CoreOptions options;
@@ -168,14 +168,14 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
}
@Override
- public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
return new LookupMergeFunction(
- wrapped.create(projection), options, keyType, valueType,
ioManager);
+ wrapped.create(readType), options, keyType, valueType,
ioManager);
}
@Override
- public AdjustedProjection adjustProjection(@Nullable int[][]
projection) {
- return wrapped.adjustProjection(projection);
+ public RowType adjustReadType(RowType readType) {
+ return wrapped.adjustReadType(readType);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
index 8b0d3909ae..d05b5f73ea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
@@ -18,6 +18,8 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.types.RowType;
+
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -30,23 +32,10 @@ public interface MergeFunctionFactory<T> extends
Serializable {
return create(null);
}
- MergeFunction<T> create(@Nullable int[][] projection);
-
- // todo: replace projection with rowType
- default AdjustedProjection adjustProjection(@Nullable int[][] projection) {
- return new AdjustedProjection(projection, null);
- }
-
- /** Result of adjusted projection. */
- class AdjustedProjection {
-
- @Nullable public final int[][] pushdownProjection;
-
- @Nullable public final int[][] outerProjection;
+ MergeFunction<T> create(@Nullable RowType readType);
- public AdjustedProjection(int[][] pushdownProjection, int[][]
outerProjection) {
- this.pushdownProjection = pushdownProjection;
- this.outerProjection = outerProjection;
- }
+ /** Adjust read type, if no need to adjust, return the original read type.
*/
+ default RowType adjustReadType(RowType readType) {
+ return readType;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3b6a89d664..2f429191c4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -34,7 +34,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ArrayUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
@@ -51,8 +50,6 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
@@ -379,13 +376,11 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private static class Factory implements MergeFunctionFactory<KeyValue> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final boolean ignoreDelete;
private final RowType rowType;
- private final List<DataType> tableTypes;
-
private final Map<Integer, Supplier<FieldsComparator>>
fieldSeqComparators;
private final Map<Integer, Supplier<FieldAggregator>> fieldAggregators;
@@ -397,7 +392,6 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private Factory(Options options, RowType rowType, List<String>
primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
- this.tableTypes = rowType.getFieldTypes();
this.removeRecordOnDelete =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
String removeRecordOnSequenceGroup =
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP);
@@ -498,26 +492,29 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
}
@Override
- public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- if (projection != null) {
- Map<Integer, FieldsComparator> projectedSeqComparators = new
HashMap<>();
- Map<Integer, FieldAggregator> projectedAggregators = new
HashMap<>();
- int[] projects = Projection.of(projection).toTopLevelIndexes();
+ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
+ RowType targetType = readType != null ? readType : rowType;
+ Map<Integer, FieldsComparator> projectedSeqComparators = new
HashMap<>();
+ Map<Integer, FieldAggregator> projectedAggregators = new
HashMap<>();
+
+ if (readType != null) {
+ // Build index mapping from table schema to read schema
+ List<String> readFieldNames = readType.getFieldNames();
Map<Integer, Integer> indexMap = new HashMap<>();
- List<DataField> dataFields = rowType.getFields();
- List<DataType> newDataTypes = new ArrayList<>();
-
- for (int i = 0; i < projects.length; i++) {
- indexMap.put(projects[i], i);
- newDataTypes.add(dataFields.get(projects[i]).type());
+ for (int i = 0; i < readType.getFieldCount(); i++) {
+ String fieldName = readFieldNames.get(i);
+ int oldIndex = rowType.getFieldIndex(fieldName);
+ if (oldIndex >= 0) {
+ indexMap.put(oldIndex, i);
+ }
}
- RowType newRowType =
RowType.builder().fields(newDataTypes).build();
+ // Remap sequence comparators
fieldSeqComparators.forEach(
(field, comparatorSupplier) -> {
- FieldsComparator comparator =
comparatorSupplier.get();
int newField = indexMap.getOrDefault(field, -1);
if (newField != -1) {
+ FieldsComparator comparator =
comparatorSupplier.get();
int[] newSequenceFields =
Arrays.stream(comparator.compareFields())
.map(
@@ -532,94 +529,76 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
+ "for new field. new field "
+ "index is %s",
newField));
- } else {
- return
newIndex;
}
+ return newIndex;
})
.toArray();
projectedSeqComparators.put(
newField,
UserDefinedSeqComparator.create(
- newRowType, newSequenceFields,
true));
+ readType, newSequenceFields,
true));
}
});
- for (int i = 0; i < projects.length; i++) {
- if (fieldAggregators.containsKey(projects[i])) {
- projectedAggregators.put(i,
fieldAggregators.get(projects[i]).get());
+
+ // Remap field aggregators
+ for (int oldIndex : indexMap.keySet()) {
+ if (fieldAggregators.containsKey(oldIndex)) {
+ int newIndex = indexMap.get(oldIndex);
+ projectedAggregators.put(newIndex,
fieldAggregators.get(oldIndex).get());
}
}
-
- List<DataType> projectedTypes =
Projection.of(projection).project(tableTypes);
- return new PartialUpdateMergeFunction(
- createFieldGetters(projectedTypes),
- ignoreDelete,
- projectedSeqComparators,
- projectedAggregators,
- !fieldSeqComparators.isEmpty(),
- removeRecordOnDelete,
- sequenceGroupPartialDelete,
- ArrayUtils.toPrimitiveBoolean(
- projectedTypes.stream()
- .map(DataType::isNullable)
- .toArray(Boolean[]::new)));
} else {
- Map<Integer, FieldsComparator> fieldSeqComparators = new
HashMap<>();
+ // Use original mappings
this.fieldSeqComparators.forEach(
- (f, supplier) -> fieldSeqComparators.put(f,
supplier.get()));
- Map<Integer, FieldAggregator> fieldAggregators = new
HashMap<>();
+ (f, supplier) -> projectedSeqComparators.put(f,
supplier.get()));
this.fieldAggregators.forEach(
- (f, supplier) -> fieldAggregators.put(f,
supplier.get()));
- return new PartialUpdateMergeFunction(
- createFieldGetters(tableTypes),
- ignoreDelete,
- fieldSeqComparators,
- fieldAggregators,
- !fieldSeqComparators.isEmpty(),
- removeRecordOnDelete,
- sequenceGroupPartialDelete,
- ArrayUtils.toPrimitiveBoolean(
- rowType.getFieldTypes().stream()
- .map(DataType::isNullable)
- .toArray(Boolean[]::new)));
+ (f, supplier) -> projectedAggregators.put(f,
supplier.get()));
}
+
+ List<DataType> fieldTypes = targetType.getFieldTypes();
+ return new PartialUpdateMergeFunction(
+ createFieldGetters(fieldTypes),
+ ignoreDelete,
+ projectedSeqComparators,
+ projectedAggregators,
+ !fieldSeqComparators.isEmpty(),
+ removeRecordOnDelete,
+ sequenceGroupPartialDelete,
+ ArrayUtils.toPrimitiveBoolean(
+
fieldTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new)));
}
@Override
- public AdjustedProjection adjustProjection(@Nullable int[][]
projection) {
+ public RowType adjustReadType(RowType readType) {
if (fieldSeqComparators.isEmpty()) {
- return new AdjustedProjection(projection, null);
+ return readType;
}
- if (projection == null) {
- return new AdjustedProjection(null, null);
- }
- LinkedHashSet<Integer> extraFields = new LinkedHashSet<>();
- int[] topProjects = Projection.of(projection).toTopLevelIndexes();
- Set<Integer> indexSet =
Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
- for (int index : topProjects) {
+ LinkedHashSet<DataField> extraFields = new LinkedHashSet<>();
+ List<String> readFieldNames = readType.getFieldNames();
+ for (DataField readField : readType.getFields()) {
+ int index = rowType.getFieldIndex(readField.name());
Supplier<FieldsComparator> comparatorSupplier =
fieldSeqComparators.get(index);
if (comparatorSupplier == null) {
continue;
}
FieldsComparator comparator = comparatorSupplier.get();
- for (int field : comparator.compareFields()) {
- if (!indexSet.contains(field)) {
+ for (int fieldIndex : comparator.compareFields()) {
+ DataField field = rowType.getFields().get(fieldIndex);
+ if (!readFieldNames.contains(field.name())) {
extraFields.add(field);
}
}
}
- int[] allProjects =
- Stream.concat(Arrays.stream(topProjects).boxed(),
extraFields.stream())
- .mapToInt(Integer::intValue)
- .toArray();
+ if (extraFields.isEmpty()) {
+ return readType;
+ }
- int[][] pushDown = Projection.of(allProjects).toNestedIndexes();
- int[][] outer =
- Projection.of(IntStream.range(0,
topProjects.length).toArray())
- .toNestedIndexes();
- return new AdjustedProjection(pushDown, outer);
+ List<DataField> allFields = new ArrayList<>(readType.getFields());
+ allFields.addAll(extraFields);
+ return new RowType(allFields);
}
private int requireField(String fieldName, List<String> fieldNames) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index 5f3fa3b4b3..eea3c7e499 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -31,8 +31,8 @@ import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggF
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ArrayUtils;
-import org.apache.paimon.utils.Projection;
import javax.annotation.Nullable;
@@ -133,44 +133,31 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
}
public static MergeFunctionFactory<KeyValue> factory(
- Options conf,
- List<String> fieldNames,
- List<DataType> fieldTypes,
- List<String> primaryKeys) {
- return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
+ Options conf, RowType rowType, List<String> primaryKeys) {
+ return new Factory(conf, rowType, primaryKeys);
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final CoreOptions options;
- private final List<String> fieldNames;
- private final List<DataType> fieldTypes;
+ private final RowType rowType;
private final List<String> primaryKeys;
private final boolean removeRecordOnDelete;
- private Factory(
- Options conf,
- List<String> fieldNames,
- List<DataType> fieldTypes,
- List<String> primaryKeys) {
+ private Factory(Options conf, RowType rowType, List<String>
primaryKeys) {
this.options = new CoreOptions(conf);
- this.fieldNames = fieldNames;
- this.fieldTypes = fieldTypes;
+ this.rowType = rowType;
this.primaryKeys = primaryKeys;
this.removeRecordOnDelete =
options.aggregationRemoveRecordOnDelete();
}
@Override
- public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- List<String> fieldNames = this.fieldNames;
- List<DataType> fieldTypes = this.fieldTypes;
- if (projection != null) {
- Projection project = Projection.of(projection);
- fieldNames = project.project(fieldNames);
- fieldTypes = project.project(fieldTypes);
- }
+ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
+ RowType targetType = readType != null ? readType : rowType;
+ List<String> fieldNames = targetType.getFieldNames();
+ List<DataType> fieldTypes = targetType.getFieldTypes();
FieldAggregator[] fieldAggregators = new
FieldAggregator[fieldNames.size()];
List<String> sequenceFields = options.sequenceField();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 2cd3a5322e..f51f592889 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -24,6 +24,7 @@ import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.variant.VariantAccessInfo;
+import org.apache.paimon.data.variant.VariantAccessInfoUtils;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
@@ -39,7 +40,6 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.IntervalPartition;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.predicate.Predicate;
@@ -54,14 +54,12 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ProjectedRow;
-import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
@@ -88,15 +86,11 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
private final boolean sequenceOrder;
@Nullable private RowType readKeyType;
-
+ @Nullable private RowType outerReadType;
+ @Nullable private VariantAccessInfo[] variantAccess;
@Nullable private List<Predicate> filtersForKeys;
-
@Nullable private List<Predicate> filtersForAll;
- @Nullable private int[][] pushdownProjection;
- @Nullable private int[][] outerProjection;
- @Nullable private VariantAccessInfo[] variantAccess;
-
private boolean forceKeepDelete = false;
public MergeFileSplitRead(
@@ -139,58 +133,32 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
@Override
public MergeFileSplitRead withReadType(RowType readType) {
- // todo: replace projectedFields with readType
RowType tableRowType = tableSchema.logicalRowType();
- int[][] projectedFields =
-
Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
- .mapToObj(i -> new int[] {i})
- .toArray(int[][]::new);
- int[][] newProjectedFields = projectedFields;
- if (sequenceFields.size() > 0) {
- // make sure projection contains sequence fields
- List<String> fieldNames = tableSchema.fieldNames();
- List<String> projectedNames =
Projection.of(projectedFields).project(fieldNames);
- int[] lackFields =
- sequenceFields.stream()
- .filter(f -> !projectedNames.contains(f))
- .mapToInt(fieldNames::indexOf)
- .toArray();
- if (lackFields.length > 0) {
- newProjectedFields =
- Arrays.copyOf(projectedFields, projectedFields.length
+ lackFields.length);
- for (int i = 0; i < lackFields.length; i++) {
- newProjectedFields[projectedFields.length + i] = new int[]
{lackFields[i]};
+ RowType adjustedReadType = readType;
+
+ if (!sequenceFields.isEmpty()) {
+ // make sure actual readType contains sequence fields
+ List<String> readFieldNames = readType.getFieldNames();
+ List<DataField> extraFields = new ArrayList<>();
+ for (String seqField : sequenceFields) {
+ if (!readFieldNames.contains(seqField)) {
+ extraFields.add(tableRowType.getField(seqField));
}
}
- }
-
- AdjustedProjection projection =
mfFactory.adjustProjection(newProjectedFields);
- this.pushdownProjection = projection.pushdownProjection;
- this.outerProjection = projection.outerProjection;
- if (pushdownProjection != null) {
- List<DataField> tableFields = tableRowType.getFields();
- List<DataField> readFields = readType.getFields();
- List<DataField> finalReadFields = new ArrayList<>();
- for (int i : Arrays.stream(pushdownProjection).mapToInt(arr ->
arr[0]).toArray()) {
- DataField requiredField = tableFields.get(i);
- finalReadFields.add(
- readFields.stream()
- .filter(x ->
x.name().equals(requiredField.name()))
- .findFirst()
- .orElse(requiredField));
+ if (!extraFields.isEmpty()) {
+ List<DataField> allFields = new
ArrayList<>(readType.getFields());
+ allFields.addAll(extraFields);
+ adjustedReadType = new RowType(allFields);
}
- RowType pushdownRowType = new RowType(finalReadFields);
- readerFactoryBuilder.withReadValueType(pushdownRowType);
- mergeSorter.setProjectedValueType(pushdownRowType);
}
+ adjustedReadType = mfFactory.adjustReadType(adjustedReadType);
- if (newProjectedFields != projectedFields) {
- // Discard the completed sequence fields
- if (outerProjection == null) {
- outerProjection = Projection.range(0,
projectedFields.length).toNestedIndexes();
- } else {
- outerProjection = Arrays.copyOf(outerProjection,
projectedFields.length);
- }
+ readerFactoryBuilder.withReadValueType(adjustedReadType);
+ mergeSorter.setProjectedValueType(adjustedReadType);
+
+ // When finalReadType != readType, need to project the outer read type
+ if (adjustedReadType != readType) {
+ outerReadType = readType;
}
return this;
@@ -339,8 +307,12 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
boolean keepDelete)
throws IOException {
List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
+ RowType mfReadType =
+ variantAccess != null
+ ?
VariantAccessInfoUtils.buildReadRowType(actualReadType(), variantAccess)
+ : actualReadType();
MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
- new
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
+ new ReducerMergeFunctionWrapper(mfFactory.create(mfReadType));
for (List<SortedRun> section : new IntervalPartition(files,
keyComparator).partition()) {
sectionReaders.add(
() ->
@@ -386,6 +358,14 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
return projectOuter(ConcatRecordReader.create(suppliers));
}
+ /**
+ * Returns the pushed read type if {@link #withReadType(RowType)} was
called, else the default
+ * read type.
+ */
+ private RowType actualReadType() {
+ return readerFactoryBuilder.readValueType();
+ }
+
private RecordReader<KeyValue> projectKey(RecordReader<KeyValue> reader) {
if (readKeyType == null) {
return reader;
@@ -396,8 +376,8 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
}
private RecordReader<KeyValue> projectOuter(RecordReader<KeyValue> reader)
{
- if (outerProjection != null) {
- ProjectedRow projectedRow = ProjectedRow.from(outerProjection);
+ if (outerReadType != null) {
+ ProjectedRow projectedRow = ProjectedRow.from(outerReadType,
actualReadType());
reader = reader.transform(kv ->
kv.replaceValue(projectedRow.replaceRow(kv.value())));
}
return reader;
@@ -405,7 +385,6 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
@Nullable
public UserDefinedSeqComparator createUdsComparator() {
- return UserDefinedSeqComparator.create(
- readerFactoryBuilder.readValueType(), sequenceFields,
sequenceOrder);
+ return UserDefinedSeqComparator.create(actualReadType(),
sequenceFields, sequenceOrder);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 62631aaf80..5e4cffcbf6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -80,7 +80,7 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
RowType keyType = new RowType(extractor.keyFields(tableSchema));
MergeFunctionFactory<KeyValue> mfFactory =
-
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
+
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema);
if (options.needLookup()) {
mfFactory = LookupMergeFunction.wrap(mfFactory, options,
keyType, rowType);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index de320d78f9..904ca51de7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -56,7 +56,7 @@ public class PrimaryKeyTableUtils {
}
public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(
- TableSchema tableSchema, KeyValueFieldsExtractor extractor) {
+ TableSchema tableSchema) {
RowType rowType = tableSchema.logicalRowType();
Options conf = Options.fromMap(tableSchema.options());
CoreOptions options = new CoreOptions(conf);
@@ -68,11 +68,7 @@ public class PrimaryKeyTableUtils {
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType,
tableSchema.primaryKeys());
case AGGREGATE:
- return AggregateMergeFunction.factory(
- conf,
- tableSchema.fieldNames(),
- rowType.getFieldTypes(),
- tableSchema.primaryKeys());
+ return AggregateMergeFunction.factory(conf, rowType,
tableSchema.primaryKeys());
case FIRST_ROW:
return FirstRowMergeFunction.factory(conf);
default:
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 19af07dda6..ff8249c221 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -34,6 +34,7 @@ import
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReusingKeyValue;
@@ -45,7 +46,6 @@ import org.junit.jupiter.api.Test;
import java.io.EOFException;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -225,8 +225,13 @@ public abstract class SortBufferWriteBufferTestBase {
options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE,
removeRecordOnDelete);
return AggregateMergeFunction.factory(
options,
- Arrays.asList("f0", "f1"),
- Arrays.asList(DataTypes.INT().notNull(),
DataTypes.BIGINT()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT().notNull(),
DataTypes.BIGINT()
+ },
+ new String[] {"f0", "f1"})
+ .build(),
Collections.singletonList("f0"))
.create();
}
@@ -263,8 +268,13 @@ public abstract class SortBufferWriteBufferTestBase {
MergeFunctionFactory<KeyValue> aggMergeFunction =
AggregateMergeFunction.factory(
options,
- Arrays.asList("f0", "f1"),
- Arrays.asList(DataTypes.INT().notNull(),
DataTypes.BIGINT()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT().notNull(),
DataTypes.BIGINT()
+ },
+ new String[] {"f0", "f1"})
+ .build(),
Collections.singletonList("f0"));
return LookupMergeFunction.wrap(aggMergeFunction, null, null,
null).create();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 5e88d2758e..846734b555 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Projection;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -332,15 +331,15 @@ public class PartialUpdateMergeFunctionTest {
DataTypes.INT(),
DataTypes.INT());
// the sequence field 'f4' is projected too
- int[][] projection = new int[][] {{1}, {4}, {3}, {7}};
+ RowType readType = rowType.project("f1", "f4", "f3", "f7");
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(adjustedProjection, new int[] {1, 4, 3, 7, 5}, new int[] {0,
1, 2, 3});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames()).containsExactly("f1", "f4",
"f3", "f7", "f5");
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(pushdownType);
func.reset();
// if sequence field is null, the related fields should not be updated
add(func, 1, 1, 1, 1, 1);
@@ -364,15 +363,16 @@ public class PartialUpdateMergeFunctionTest {
DataTypes.INT(),
DataTypes.INT());
// the sequence field 'f4' is projected too
- int[][] projection = new int[][] {{1}, {4}, {3}, {7}};
+ RowType readType = rowType.project("f1", "f4", "f3", "f7");
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(adjustedProjection, new int[] {1, 4, 3, 7, 2, 5, 6}, new
int[] {0, 1, 2, 3});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames())
+ .containsExactly("f1", "f4", "f3", "f7", "f2", "f5", "f6");
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(pushdownType);
func.reset();
// if sequence field is null, the related fields should not be updated
add(func, 1, 1, 1, 1, 1, 1, 1);
@@ -396,18 +396,16 @@ public class PartialUpdateMergeFunctionTest {
DataTypes.INT(),
DataTypes.INT());
// all fields are projected
- int[][] projection = new int[][] {{0}, {1}, {2}, {3}, {4}, {5}, {6},
{7}};
+ RowType readType = rowType;
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(
- adjustedProjection,
- new int[] {0, 1, 2, 3, 4, 5, 6, 7},
- new int[] {0, 1, 2, 3, 4, 5, 6, 7});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames())
+ .containsExactly("f0", "f1", "f2", "f3", "f4", "f5", "f6",
"f7");
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(pushdownType);
func.reset();
// 'f6' has no sequence group, it should not be updated by null
add(func, 1, 1, 1, 1, 1, 1, 1, 1);
@@ -431,18 +429,16 @@ public class PartialUpdateMergeFunctionTest {
DataTypes.INT(),
DataTypes.INT());
// all fields are projected
- int[][] projection = new int[][] {{0}, {1}, {2}, {3}, {4}, {5}, {6},
{7}};
+ RowType readType = rowType;
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(
- adjustedProjection,
- new int[] {0, 1, 2, 3, 4, 5, 6, 7},
- new int[] {0, 1, 2, 3, 4, 5, 6, 7});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames())
+ .containsExactly("f0", "f1", "f2", "f3", "f4", "f5", "f6",
"f7");
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(pushdownType);
func.reset();
// 'f6' has no sequence group, it should not be updated by null
add(func, 1, 1, 1, 1, 1, 1, 1, 1);
@@ -465,16 +461,11 @@ public class PartialUpdateMergeFunctionTest {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
- // set the projection = null
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
factory.adjustProjection(null);
- validate(adjustedProjection, null, null);
-
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(rowType);
func.reset();
- // Setting projection with null is similar with projecting all fields
add(func, 1, 1, 1, 1, 1, 1, 1, 1);
add(func, 4, 2, 4, 2, 2, 0, null, 3);
validate(func, 4, 2, 4, 2, 2, 1, 1, 1);
@@ -493,15 +484,14 @@ public class PartialUpdateMergeFunctionTest {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
- int[][] projection = new int[][] {{0}, {1}, {3}, {4}, {7}};
+ RowType readType = rowType.project("f0", "f1", "f3", "f4", "f7");
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(projection);
+ RowType pushdownType = factory.adjustReadType(readType);
- validate(adjustedProjection, new int[] {0, 1, 3, 4, 7}, null);
+ assertThat(pushdownType).isEqualTo(readType);
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(pushdownType);
func.reset();
// Without sequence group, all the fields should not be updated by null
add(func, 1, 1, 1, 1, 1);
@@ -526,12 +516,12 @@ public class PartialUpdateMergeFunctionTest {
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
- int[][] projection = new int[][] {{1}, {7}};
+ RowType readType = rowType.project("f1", "f7");
assertThatThrownBy(
() ->
PartialUpdateMergeFunction.factory(
options, rowType,
ImmutableList.of("f0"))
- .create(projection))
+ .create(readType))
.hasMessageContaining("Can not find new sequence field for new
field.");
}
@@ -734,12 +724,12 @@ public class PartialUpdateMergeFunctionTest {
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(new int[][] {{3}, {2}, {5}});
+ RowType pushdownType = factory.adjustReadType(rowType.project("f3",
"f2", "f5"));
- validate(adjustedProjection, new int[] {3, 2, 5, 1}, new int[] {0, 1,
2});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames()).containsExactly("f3", "f2",
"f5", "f1");
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(pushdownType);
func.reset();
// f0 pk
@@ -790,12 +780,12 @@ public class PartialUpdateMergeFunctionTest {
MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType,
ImmutableList.of("f0"));
- MergeFunctionFactory.AdjustedProjection adjustedProjection =
- factory.adjustProjection(new int[][] {{3}, {2}, {5}});
+ RowType pushdownType = factory.adjustReadType(rowType.project("f3",
"f2", "f5"));
- validate(adjustedProjection, new int[] {3, 2, 5, 1, 8}, new int[] {0,
1, 2});
+ assertThat(pushdownType).isNotNull();
+ assertThat(pushdownType.getFieldNames()).containsExactly("f3", "f2",
"f5", "f1", "f8");
- MergeFunction<KeyValue> func =
factory.create(adjustedProjection.pushdownProjection);
+ MergeFunction<KeyValue> func = factory.create(pushdownType);
func.reset();
// f0 pk
@@ -894,22 +884,4 @@ public class PartialUpdateMergeFunctionTest {
private void validate(MergeFunction<KeyValue> function, Integer... f) {
assertThat(function.getResult().value()).isEqualTo(GenericRow.of(f));
}
-
- private void validate(
- MergeFunctionFactory.AdjustedProjection projection, int[]
pushdown, int[] outer) {
- if (projection.pushdownProjection == null) {
- assertThat(pushdown).isNull();
- } else {
- assertThat(pushdown)
- .containsExactly(
-
Projection.of(projection.pushdownProjection).toTopLevelIndexes());
- }
-
- if (projection.outerProjection == null) {
- assertThat(outer).isNull();
- } else {
- assertThat(outer)
-
.containsExactly(Projection.of(projection.outerProjection).toTopLevelIndexes());
- }
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
index 70cdbe3103..c1b2e9dcd8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -23,12 +23,13 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
import java.util.Collections;
import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
@@ -45,13 +46,17 @@ class AggregateMergeFunctionTest {
MergeFunction<KeyValue> aggregateFunction =
AggregateMergeFunction.factory(
options,
- Arrays.asList("k", "a", "b", "c", "d"),
- Arrays.asList(
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.INT()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT()
+ },
+ new String[] {"k", "a", "b",
"c", "d"})
+ .build(),
Collections.singletonList("k"))
.create();
aggregateFunction.reset();
@@ -76,13 +81,17 @@ class AggregateMergeFunctionTest {
MergeFunction<KeyValue> aggregateFunction =
AggregateMergeFunction.factory(
options,
- Arrays.asList("k", "a", "b", "c", "d"),
- Arrays.asList(
- DataTypes.INT(),
- DataTypes.STRING(),
- DataTypes.STRING(),
- DataTypes.INT(),
- DataTypes.STRING()),
+ RowType.builder()
+ .fields(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.STRING()
+ },
+ new String[] {"k", "a", "b",
"c", "d"})
+ .build(),
Collections.singletonList("k"))
.create();
aggregateFunction.reset();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 44a02699e4..db65eb8d16 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -363,7 +363,7 @@ public class MergeFileSplitReadTest {
private static final long serialVersionUID = 1L;
@Override
- public MergeFunction<KeyValue> create(@Nullable int[][]
projection) {
+ public MergeFunction<KeyValue> create(@Nullable RowType readType) {
return new TestValueCountMergeFunction();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index bb48dbf363..401ca43682 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -32,7 +32,6 @@ import
org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
import org.apache.paimon.mergetree.localmerge.LocalMerger;
import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger;
import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.RowKindGenerator;
@@ -61,8 +60,6 @@ import javax.annotation.Nullable;
import java.util.List;
-import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
-
/**
* {@link AbstractStreamOperator} which buffer input record and apply merge
function when the buffer
* is full. Mainly to resolve data skew on primary keys.
@@ -106,24 +103,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
rowKindGenerator = RowKindGenerator.create(schema, options);
MergeFunction<KeyValue> mergeFunction =
- PrimaryKeyTableUtils.createMergeFunctionFactory(
- schema,
- new KeyValueFieldsExtractor() {
- private static final long serialVersionUID
= 1L;
-
- // At local merge operator, the key
extractor should include
- // partition fields.
- @Override
- public List<DataField>
keyFields(TableSchema schema) {
- return
addKeyNamePrefix(schema.primaryKeysFields());
- }
-
- @Override
- public List<DataField>
valueFields(TableSchema schema) {
- return schema.fields();
- }
- })
- .create();
+
PrimaryKeyTableUtils.createMergeFunctionFactory(schema).create();
boolean canHashMerger = true;
for (DataField field : valueType.getFields()) {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index 9fddb2f64e..5e4c074dbf 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -715,4 +715,158 @@ abstract class VariantTestBase extends
PaimonSparkTestBase {
)
)
}
+
+ test("Paimon Variant: partial update with variant") {
+ withTable("t") {
+ sql("""
+ |CREATE table t (
+ | id INT,
+ | ts INT,
+ | dt INT,
+ | v VARIANT
+ |)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'changelog-producer' = 'lookup',
+ | 'merge-engine' = 'partial-update',
+ | 'fields.dt.sequence-group' = 'ts',
+ | 'fields.ts.aggregate-function' = 'max',
+ | 'write-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t VALUES
+ | (1, 1, 1, parse_json('{"c":{"a1":1,"a2":2}}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t VALUES
+ | (1, 2, 2, parse_json('{"c":{"a1":3,"a2":4}}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t"),
+ sql("""SELECT 1, 2, 2, parse_json('{"c":{"a1":3,"a2":4}}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.c', 'string') FROM t"),
+ Seq(
+ Row("{\"a1\":3,\"a2\":4}")
+ )
+ )
+ }
+ }
+
+ test("Paimon Variant: deduplicate with variant") {
+ withTable("t_dedup") {
+ sql("""
+ |CREATE table t_dedup (
+ | id INT,
+ | name STRING,
+ | v VARIANT
+ |) TBLPROPERTIES
+ |(
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'merge-engine' = 'deduplicate',
+ | 'write-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t_dedup VALUES
+ | (1, 'Alice', parse_json('{"age":30,"city":"NYC"}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t_dedup VALUES
+ | (1, 'Bob', parse_json('{"age":25,"city":"LA"}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t_dedup"),
+ sql("""SELECT 1, 'Bob', parse_json('{"age":25,"city":"LA"}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.age', 'int') FROM t_dedup"),
+ Seq(Row(25))
+ )
+ }
+ }
+
+ test("Paimon Variant: aggregate with variant") {
+ withTable("t_agg") {
+ sql("""
+ |CREATE table t_agg (
+ | id INT,
+ | cnt INT,
+ | v VARIANT
+ |) TBLPROPERTIES
+ |(
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'merge-engine' = 'aggregation',
+ | 'fields.cnt.aggregate-function' = 'sum',
+ | 'write-only' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t_agg VALUES
+ | (1, 10, parse_json('{"data":{"x":1,"y":2}}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t_agg VALUES
+ | (1, 20, parse_json('{"data":{"x":3,"y":4}}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t_agg"),
+ sql("""SELECT 1, 30, parse_json('{"data":{"x":3,"y":4}}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.data.x', 'int') FROM t_agg"),
+ Seq(Row(3))
+ )
+ }
+ }
+
+ test("Paimon Variant: first-row with variant") {
+ withTable("t_first") {
+ sql("""
+ |CREATE table t_first (
+ | id INT,
+ | seq INT,
+ | v VARIANT
+ |) TBLPROPERTIES
+ |(
+ | 'primary-key' = 'id',
+ | 'bucket' = '1',
+ | 'merge-engine' = 'first-row'
+ |)
+ |""".stripMargin)
+
+ sql("""
+ |INSERT INTO t_first VALUES
+ | (1, 100, parse_json('{"status":"active"}'))
+ | """.stripMargin)
+
+ sql("""
+ |INSERT INTO t_first VALUES
+ | (1, 200, parse_json('{"status":"inactive"}'))
+ | """.stripMargin)
+
+ checkAnswer(
+ sql("SELECT * FROM t_first"),
+ sql("""SELECT 1, 100, parse_json('{"status":"active"}')""")
+ )
+ checkAnswer(
+ sql("SELECT variant_get(v, '$.status', 'string') FROM t_first"),
+ Seq(Row("active"))
+ )
+ }
+ }
}