This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f720642e9 [core] Refactor MergeFunctionFactory to use read rowType 
(#7059)
6f720642e9 is described below

commit 6f720642e97ea83e2320e7d3812b104b20925639
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jan 16 13:07:07 2026 +0800

    [core] Refactor MergeFunctionFactory to use read rowType (#7059)
---
 .../java/org/apache/paimon/utils/ProjectedRow.java |  39 ------
 .../compact/DeduplicateMergeFunction.java          |   5 +-
 .../mergetree/compact/FirstRowMergeFunction.java   |   5 +-
 .../mergetree/compact/LookupMergeFunction.java     |  10 +-
 .../mergetree/compact/MergeFunctionFactory.java    |  23 +--
 .../compact/PartialUpdateMergeFunction.java        | 131 ++++++++----------
 .../compact/aggregate/AggregateMergeFunction.java  |  35 ++---
 .../paimon/operation/MergeFileSplitRead.java       |  99 ++++++-------
 .../paimon/table/PrimaryKeyFileStoreTable.java     |   2 +-
 .../apache/paimon/table/PrimaryKeyTableUtils.java  |   8 +-
 .../mergetree/SortBufferWriteBufferTestBase.java   |  20 ++-
 .../compact/PartialUpdateMergeFunctionTest.java    | 104 +++++---------
 .../aggregate/AggregateMergeFunctionTest.java      |  39 ++++--
 .../paimon/operation/MergeFileSplitReadTest.java   |   2 +-
 .../paimon/flink/sink/LocalMergeOperator.java      |  22 +--
 .../apache/paimon/spark/sql/VariantTestBase.scala  | 154 +++++++++++++++++++++
 16 files changed, 358 insertions(+), 340 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
index f9b78a8bc6..18c7e5db79 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProjectedRow.java
@@ -188,32 +188,6 @@ public class ProjectedRow implements InternalRow {
                 + '}';
     }
 
-    /**
-     * Like {@link #from(int[])}, but throws {@link IllegalArgumentException} 
if the provided {@code
-     * projection} array contains nested projections, which are not supported 
by {@link
-     * ProjectedRow}.
-     *
-     * <p>The array represents the mapping of the fields of the original 
{@link DataType}, including
-     * nested rows. For example, {@code [[0, 2, 1], ...]} specifies to include 
the 2nd field of the
-     * 3rd field of the 1st field in the top-level row.
-     *
-     * @see Projection
-     * @see ProjectedRow
-     */
-    public static ProjectedRow from(int[][] projection) throws 
IllegalArgumentException {
-        return new ProjectedRow(
-                Arrays.stream(projection)
-                        .mapToInt(
-                                arr -> {
-                                    if (arr.length != 1) {
-                                        throw new IllegalArgumentException(
-                                                "ProjectedRowData doesn't 
support nested projections");
-                                    }
-                                    return arr[0];
-                                })
-                        .toArray());
-    }
-
     /**
      * Create an empty {@link ProjectedRow} starting from a {@code projection} 
array.
      *
@@ -234,17 +208,4 @@ public class ProjectedRow implements InternalRow {
                         .mapToInt(field -> 
tableType.getFieldIndexByFieldId(field.id()))
                         .toArray());
     }
-
-    /**
-     * Create an empty {@link ProjectedRow} starting from a {@link Projection}.
-     *
-     * <p>Throws {@link IllegalStateException} if the provided {@code 
projection} array contains
-     * nested projections, which are not supported by {@link ProjectedRow}.
-     *
-     * @see Projection
-     * @see ProjectedRow
-     */
-    public static ProjectedRow from(Projection projection) {
-        return new ProjectedRow(projection.toTopLevelIndexes());
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index 5422f5469e..50fdec2ad9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
@@ -73,7 +74,7 @@ public class DeduplicateMergeFunction implements 
MergeFunction<KeyValue> {
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
 
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
 
         private final boolean ignoreDelete;
 
@@ -82,7 +83,7 @@ public class DeduplicateMergeFunction implements 
MergeFunction<KeyValue> {
         }
 
         @Override
-        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+        public MergeFunction<KeyValue> create(@Nullable RowType readType) {
             return new DeduplicateMergeFunction(ignoreDelete);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index 4b9e4a6ff1..1815d1f5df 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
@@ -82,7 +83,7 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
 
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
         private final boolean ignoreDelete;
 
         public Factory(boolean ignoreDelete) {
@@ -90,7 +91,7 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         }
 
         @Override
-        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+        public MergeFunction<KeyValue> create(@Nullable RowType readType) {
             return new FirstRowMergeFunction(ignoreDelete);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 1bd9aaa843..cb494b86d0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -143,7 +143,7 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     /** Factory to create {@link LookupMergeFunction}. */
     public static class Factory implements MergeFunctionFactory<KeyValue> {
 
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
 
         private final MergeFunctionFactory<KeyValue> wrapped;
         private final CoreOptions options;
@@ -168,14 +168,14 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         }
 
         @Override
-        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+        public MergeFunction<KeyValue> create(@Nullable RowType readType) {
             return new LookupMergeFunction(
-                    wrapped.create(projection), options, keyType, valueType, 
ioManager);
+                    wrapped.create(readType), options, keyType, valueType, 
ioManager);
         }
 
         @Override
-        public AdjustedProjection adjustProjection(@Nullable int[][] 
projection) {
-            return wrapped.adjustProjection(projection);
+        public RowType adjustReadType(RowType readType) {
+            return wrapped.adjustReadType(readType);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
index 8b0d3909ae..d05b5f73ea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeFunctionFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.types.RowType;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -30,23 +32,10 @@ public interface MergeFunctionFactory<T> extends 
Serializable {
         return create(null);
     }
 
-    MergeFunction<T> create(@Nullable int[][] projection);
-
-    // todo: replace projection with rowType
-    default AdjustedProjection adjustProjection(@Nullable int[][] projection) {
-        return new AdjustedProjection(projection, null);
-    }
-
-    /** Result of adjusted projection. */
-    class AdjustedProjection {
-
-        @Nullable public final int[][] pushdownProjection;
-
-        @Nullable public final int[][] outerProjection;
+    MergeFunction<T> create(@Nullable RowType readType);
 
-        public AdjustedProjection(int[][] pushdownProjection, int[][] 
outerProjection) {
-            this.pushdownProjection = pushdownProjection;
-            this.outerProjection = outerProjection;
-        }
+    /** Adjust read type, if no need to adjust, return the original read type. 
*/
+    default RowType adjustReadType(RowType readType) {
+        return readType;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 3b6a89d664..2f429191c4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -34,7 +34,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ArrayUtils;
 import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.Projection;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
@@ -51,8 +50,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
@@ -379,13 +376,11 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
 
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
 
         private final boolean ignoreDelete;
         private final RowType rowType;
 
-        private final List<DataType> tableTypes;
-
         private final Map<Integer, Supplier<FieldsComparator>> 
fieldSeqComparators;
 
         private final Map<Integer, Supplier<FieldAggregator>> fieldAggregators;
@@ -397,7 +392,6 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         private Factory(Options options, RowType rowType, List<String> 
primaryKeys) {
             this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
             this.rowType = rowType;
-            this.tableTypes = rowType.getFieldTypes();
             this.removeRecordOnDelete = 
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
             String removeRecordOnSequenceGroup =
                     
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP);
@@ -498,26 +492,29 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         }
 
         @Override
-        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            if (projection != null) {
-                Map<Integer, FieldsComparator> projectedSeqComparators = new 
HashMap<>();
-                Map<Integer, FieldAggregator> projectedAggregators = new 
HashMap<>();
-                int[] projects = Projection.of(projection).toTopLevelIndexes();
+        public MergeFunction<KeyValue> create(@Nullable RowType readType) {
+            RowType targetType = readType != null ? readType : rowType;
+            Map<Integer, FieldsComparator> projectedSeqComparators = new 
HashMap<>();
+            Map<Integer, FieldAggregator> projectedAggregators = new 
HashMap<>();
+
+            if (readType != null) {
+                // Build index mapping from table schema to read schema
+                List<String> readFieldNames = readType.getFieldNames();
                 Map<Integer, Integer> indexMap = new HashMap<>();
-                List<DataField> dataFields = rowType.getFields();
-                List<DataType> newDataTypes = new ArrayList<>();
-
-                for (int i = 0; i < projects.length; i++) {
-                    indexMap.put(projects[i], i);
-                    newDataTypes.add(dataFields.get(projects[i]).type());
+                for (int i = 0; i < readType.getFieldCount(); i++) {
+                    String fieldName = readFieldNames.get(i);
+                    int oldIndex = rowType.getFieldIndex(fieldName);
+                    if (oldIndex >= 0) {
+                        indexMap.put(oldIndex, i);
+                    }
                 }
-                RowType newRowType = 
RowType.builder().fields(newDataTypes).build();
 
+                // Remap sequence comparators
                 fieldSeqComparators.forEach(
                         (field, comparatorSupplier) -> {
-                            FieldsComparator comparator = 
comparatorSupplier.get();
                             int newField = indexMap.getOrDefault(field, -1);
                             if (newField != -1) {
+                                FieldsComparator comparator = 
comparatorSupplier.get();
                                 int[] newSequenceFields =
                                         
Arrays.stream(comparator.compareFields())
                                                 .map(
@@ -532,94 +529,76 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                                                                
         + "for new field. new field "
                                                                                
         + "index is %s",
                                                                                
 newField));
-                                                            } else {
-                                                                return 
newIndex;
                                                             }
+                                                            return newIndex;
                                                         })
                                                 .toArray();
                                 projectedSeqComparators.put(
                                         newField,
                                         UserDefinedSeqComparator.create(
-                                                newRowType, newSequenceFields, 
true));
+                                                readType, newSequenceFields, 
true));
                             }
                         });
-                for (int i = 0; i < projects.length; i++) {
-                    if (fieldAggregators.containsKey(projects[i])) {
-                        projectedAggregators.put(i, 
fieldAggregators.get(projects[i]).get());
+
+                // Remap field aggregators
+                for (int oldIndex : indexMap.keySet()) {
+                    if (fieldAggregators.containsKey(oldIndex)) {
+                        int newIndex = indexMap.get(oldIndex);
+                        projectedAggregators.put(newIndex, 
fieldAggregators.get(oldIndex).get());
                     }
                 }
-
-                List<DataType> projectedTypes = 
Projection.of(projection).project(tableTypes);
-                return new PartialUpdateMergeFunction(
-                        createFieldGetters(projectedTypes),
-                        ignoreDelete,
-                        projectedSeqComparators,
-                        projectedAggregators,
-                        !fieldSeqComparators.isEmpty(),
-                        removeRecordOnDelete,
-                        sequenceGroupPartialDelete,
-                        ArrayUtils.toPrimitiveBoolean(
-                                projectedTypes.stream()
-                                        .map(DataType::isNullable)
-                                        .toArray(Boolean[]::new)));
             } else {
-                Map<Integer, FieldsComparator> fieldSeqComparators = new 
HashMap<>();
+                // Use original mappings
                 this.fieldSeqComparators.forEach(
-                        (f, supplier) -> fieldSeqComparators.put(f, 
supplier.get()));
-                Map<Integer, FieldAggregator> fieldAggregators = new 
HashMap<>();
+                        (f, supplier) -> projectedSeqComparators.put(f, 
supplier.get()));
                 this.fieldAggregators.forEach(
-                        (f, supplier) -> fieldAggregators.put(f, 
supplier.get()));
-                return new PartialUpdateMergeFunction(
-                        createFieldGetters(tableTypes),
-                        ignoreDelete,
-                        fieldSeqComparators,
-                        fieldAggregators,
-                        !fieldSeqComparators.isEmpty(),
-                        removeRecordOnDelete,
-                        sequenceGroupPartialDelete,
-                        ArrayUtils.toPrimitiveBoolean(
-                                rowType.getFieldTypes().stream()
-                                        .map(DataType::isNullable)
-                                        .toArray(Boolean[]::new)));
+                        (f, supplier) -> projectedAggregators.put(f, 
supplier.get()));
             }
+
+            List<DataType> fieldTypes = targetType.getFieldTypes();
+            return new PartialUpdateMergeFunction(
+                    createFieldGetters(fieldTypes),
+                    ignoreDelete,
+                    projectedSeqComparators,
+                    projectedAggregators,
+                    !fieldSeqComparators.isEmpty(),
+                    removeRecordOnDelete,
+                    sequenceGroupPartialDelete,
+                    ArrayUtils.toPrimitiveBoolean(
+                            
fieldTypes.stream().map(DataType::isNullable).toArray(Boolean[]::new)));
         }
 
         @Override
-        public AdjustedProjection adjustProjection(@Nullable int[][] 
projection) {
+        public RowType adjustReadType(RowType readType) {
             if (fieldSeqComparators.isEmpty()) {
-                return new AdjustedProjection(projection, null);
+                return readType;
             }
 
-            if (projection == null) {
-                return new AdjustedProjection(null, null);
-            }
-            LinkedHashSet<Integer> extraFields = new LinkedHashSet<>();
-            int[] topProjects = Projection.of(projection).toTopLevelIndexes();
-            Set<Integer> indexSet = 
Arrays.stream(topProjects).boxed().collect(Collectors.toSet());
-            for (int index : topProjects) {
+            LinkedHashSet<DataField> extraFields = new LinkedHashSet<>();
+            List<String> readFieldNames = readType.getFieldNames();
+            for (DataField readField : readType.getFields()) {
+                int index = rowType.getFieldIndex(readField.name());
                 Supplier<FieldsComparator> comparatorSupplier = 
fieldSeqComparators.get(index);
                 if (comparatorSupplier == null) {
                     continue;
                 }
 
                 FieldsComparator comparator = comparatorSupplier.get();
-                for (int field : comparator.compareFields()) {
-                    if (!indexSet.contains(field)) {
+                for (int fieldIndex : comparator.compareFields()) {
+                    DataField field = rowType.getFields().get(fieldIndex);
+                    if (!readFieldNames.contains(field.name())) {
                         extraFields.add(field);
                     }
                 }
             }
 
-            int[] allProjects =
-                    Stream.concat(Arrays.stream(topProjects).boxed(), 
extraFields.stream())
-                            .mapToInt(Integer::intValue)
-                            .toArray();
+            if (extraFields.isEmpty()) {
+                return readType;
+            }
 
-            int[][] pushDown = Projection.of(allProjects).toNestedIndexes();
-            int[][] outer =
-                    Projection.of(IntStream.range(0, 
topProjects.length).toArray())
-                            .toNestedIndexes();
-            return new AdjustedProjection(pushDown, outer);
+            List<DataField> allFields = new ArrayList<>(readType.getFields());
+            allFields.addAll(extraFields);
+            return new RowType(allFields);
         }
 
         private int requireField(String fieldName, List<String> fieldNames) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index 5f3fa3b4b3..eea3c7e499 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -31,8 +31,8 @@ import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldPrimaryKeyAggF
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ArrayUtils;
-import org.apache.paimon.utils.Projection;
 
 import javax.annotation.Nullable;
 
@@ -133,44 +133,31 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
     }
 
     public static MergeFunctionFactory<KeyValue> factory(
-            Options conf,
-            List<String> fieldNames,
-            List<DataType> fieldTypes,
-            List<String> primaryKeys) {
-        return new Factory(conf, fieldNames, fieldTypes, primaryKeys);
+            Options conf, RowType rowType, List<String> primaryKeys) {
+        return new Factory(conf, rowType, primaryKeys);
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
 
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = 2L;
 
         private final CoreOptions options;
-        private final List<String> fieldNames;
-        private final List<DataType> fieldTypes;
+        private final RowType rowType;
         private final List<String> primaryKeys;
         private final boolean removeRecordOnDelete;
 
-        private Factory(
-                Options conf,
-                List<String> fieldNames,
-                List<DataType> fieldTypes,
-                List<String> primaryKeys) {
+        private Factory(Options conf, RowType rowType, List<String> 
primaryKeys) {
             this.options = new CoreOptions(conf);
-            this.fieldNames = fieldNames;
-            this.fieldTypes = fieldTypes;
+            this.rowType = rowType;
             this.primaryKeys = primaryKeys;
             this.removeRecordOnDelete = 
options.aggregationRemoveRecordOnDelete();
         }
 
         @Override
-        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            List<String> fieldNames = this.fieldNames;
-            List<DataType> fieldTypes = this.fieldTypes;
-            if (projection != null) {
-                Projection project = Projection.of(projection);
-                fieldNames = project.project(fieldNames);
-                fieldTypes = project.project(fieldTypes);
-            }
+        public MergeFunction<KeyValue> create(@Nullable RowType readType) {
+            RowType targetType = readType != null ? readType : rowType;
+            List<String> fieldNames = targetType.getFieldNames();
+            List<DataType> fieldTypes = targetType.getFieldTypes();
 
             FieldAggregator[] fieldAggregators = new 
FieldAggregator[fieldNames.size()];
             List<String> sequenceFields = options.sequenceField();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 2cd3a5322e..f51f592889 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -24,6 +24,7 @@ import org.apache.paimon.KeyValueFileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.variant.VariantAccessInfo;
+import org.apache.paimon.data.variant.VariantAccessInfoUtils;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
@@ -39,7 +40,6 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.mergetree.compact.IntervalPartition;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
-import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
 import org.apache.paimon.predicate.Predicate;
@@ -54,14 +54,12 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ProjectedRow;
-import org.apache.paimon.utils.Projection;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
@@ -88,15 +86,11 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
     private final boolean sequenceOrder;
 
     @Nullable private RowType readKeyType;
-
+    @Nullable private RowType outerReadType;
+    @Nullable private VariantAccessInfo[] variantAccess;
     @Nullable private List<Predicate> filtersForKeys;
-
     @Nullable private List<Predicate> filtersForAll;
 
-    @Nullable private int[][] pushdownProjection;
-    @Nullable private int[][] outerProjection;
-    @Nullable private VariantAccessInfo[] variantAccess;
-
     private boolean forceKeepDelete = false;
 
     public MergeFileSplitRead(
@@ -139,58 +133,32 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
 
     @Override
     public MergeFileSplitRead withReadType(RowType readType) {
-        // todo: replace projectedFields with readType
         RowType tableRowType = tableSchema.logicalRowType();
-        int[][] projectedFields =
-                
Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
-                        .mapToObj(i -> new int[] {i})
-                        .toArray(int[][]::new);
-        int[][] newProjectedFields = projectedFields;
-        if (sequenceFields.size() > 0) {
-            // make sure projection contains sequence fields
-            List<String> fieldNames = tableSchema.fieldNames();
-            List<String> projectedNames = 
Projection.of(projectedFields).project(fieldNames);
-            int[] lackFields =
-                    sequenceFields.stream()
-                            .filter(f -> !projectedNames.contains(f))
-                            .mapToInt(fieldNames::indexOf)
-                            .toArray();
-            if (lackFields.length > 0) {
-                newProjectedFields =
-                        Arrays.copyOf(projectedFields, projectedFields.length 
+ lackFields.length);
-                for (int i = 0; i < lackFields.length; i++) {
-                    newProjectedFields[projectedFields.length + i] = new int[] 
{lackFields[i]};
+        RowType adjustedReadType = readType;
+
+        if (!sequenceFields.isEmpty()) {
+            // make sure actual readType contains sequence fields
+            List<String> readFieldNames = readType.getFieldNames();
+            List<DataField> extraFields = new ArrayList<>();
+            for (String seqField : sequenceFields) {
+                if (!readFieldNames.contains(seqField)) {
+                    extraFields.add(tableRowType.getField(seqField));
                 }
             }
-        }
-
-        AdjustedProjection projection = 
mfFactory.adjustProjection(newProjectedFields);
-        this.pushdownProjection = projection.pushdownProjection;
-        this.outerProjection = projection.outerProjection;
-        if (pushdownProjection != null) {
-            List<DataField> tableFields = tableRowType.getFields();
-            List<DataField> readFields = readType.getFields();
-            List<DataField> finalReadFields = new ArrayList<>();
-            for (int i : Arrays.stream(pushdownProjection).mapToInt(arr -> 
arr[0]).toArray()) {
-                DataField requiredField = tableFields.get(i);
-                finalReadFields.add(
-                        readFields.stream()
-                                .filter(x -> 
x.name().equals(requiredField.name()))
-                                .findFirst()
-                                .orElse(requiredField));
+            if (!extraFields.isEmpty()) {
+                List<DataField> allFields = new 
ArrayList<>(readType.getFields());
+                allFields.addAll(extraFields);
+                adjustedReadType = new RowType(allFields);
             }
-            RowType pushdownRowType = new RowType(finalReadFields);
-            readerFactoryBuilder.withReadValueType(pushdownRowType);
-            mergeSorter.setProjectedValueType(pushdownRowType);
         }
+        adjustedReadType = mfFactory.adjustReadType(adjustedReadType);
 
-        if (newProjectedFields != projectedFields) {
-            // Discard the completed sequence fields
-            if (outerProjection == null) {
-                outerProjection = Projection.range(0, 
projectedFields.length).toNestedIndexes();
-            } else {
-                outerProjection = Arrays.copyOf(outerProjection, 
projectedFields.length);
-            }
+        readerFactoryBuilder.withReadValueType(adjustedReadType);
+        mergeSorter.setProjectedValueType(adjustedReadType);
+
+        // When finalReadType != readType, need to project the outer read type
+        if (adjustedReadType != readType) {
+            outerReadType = readType;
         }
 
         return this;
@@ -339,8 +307,12 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
             boolean keepDelete)
             throws IOException {
         List<ReaderSupplier<KeyValue>> sectionReaders = new ArrayList<>();
+        RowType mfReadType =
+                variantAccess != null
+                        ? 
VariantAccessInfoUtils.buildReadRowType(actualReadType(), variantAccess)
+                        : actualReadType();
         MergeFunctionWrapper<KeyValue> mergeFuncWrapper =
-                new 
ReducerMergeFunctionWrapper(mfFactory.create(pushdownProjection));
+                new ReducerMergeFunctionWrapper(mfFactory.create(mfReadType));
         for (List<SortedRun> section : new IntervalPartition(files, 
keyComparator).partition()) {
             sectionReaders.add(
                     () ->
@@ -386,6 +358,14 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
         return projectOuter(ConcatRecordReader.create(suppliers));
     }
 
+    /**
+     * Returns the pushed read type if {@link #withReadType(RowType)} was 
called, else the default
+     * read type.
+     */
+    private RowType actualReadType() {
+        return readerFactoryBuilder.readValueType();
+    }
+
     private RecordReader<KeyValue> projectKey(RecordReader<KeyValue> reader) {
         if (readKeyType == null) {
             return reader;
@@ -396,8 +376,8 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
     }
 
     private RecordReader<KeyValue> projectOuter(RecordReader<KeyValue> reader) 
{
-        if (outerProjection != null) {
-            ProjectedRow projectedRow = ProjectedRow.from(outerProjection);
+        if (outerReadType != null) {
+            ProjectedRow projectedRow = ProjectedRow.from(outerReadType, 
actualReadType());
             reader = reader.transform(kv -> 
kv.replaceValue(projectedRow.replaceRow(kv.value())));
         }
         return reader;
@@ -405,7 +385,6 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
 
     @Nullable
     public UserDefinedSeqComparator createUdsComparator() {
-        return UserDefinedSeqComparator.create(
-                readerFactoryBuilder.readValueType(), sequenceFields, 
sequenceOrder);
+        return UserDefinedSeqComparator.create(actualReadType(), 
sequenceFields, sequenceOrder);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 62631aaf80..5e4cffcbf6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -80,7 +80,7 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
             RowType keyType = new RowType(extractor.keyFields(tableSchema));
 
             MergeFunctionFactory<KeyValue> mfFactory =
-                    
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
+                    
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema);
             if (options.needLookup()) {
                 mfFactory = LookupMergeFunction.wrap(mfFactory, options, 
keyType, rowType);
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index de320d78f9..904ca51de7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -56,7 +56,7 @@ public class PrimaryKeyTableUtils {
     }
 
     public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(
-            TableSchema tableSchema, KeyValueFieldsExtractor extractor) {
+            TableSchema tableSchema) {
         RowType rowType = tableSchema.logicalRowType();
         Options conf = Options.fromMap(tableSchema.options());
         CoreOptions options = new CoreOptions(conf);
@@ -68,11 +68,7 @@ public class PrimaryKeyTableUtils {
             case PARTIAL_UPDATE:
                 return PartialUpdateMergeFunction.factory(conf, rowType, 
tableSchema.primaryKeys());
             case AGGREGATE:
-                return AggregateMergeFunction.factory(
-                        conf,
-                        tableSchema.fieldNames(),
-                        rowType.getFieldTypes(),
-                        tableSchema.primaryKeys());
+                return AggregateMergeFunction.factory(conf, rowType, 
tableSchema.primaryKeys());
             case FIRST_ROW:
                 return FirstRowMergeFunction.factory(conf);
             default:
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 19af07dda6..ff8249c221 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -34,6 +34,7 @@ import 
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.sort.BinaryInMemorySortBuffer;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ReusingKeyValue;
@@ -45,7 +46,6 @@ import org.junit.jupiter.api.Test;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -225,8 +225,13 @@ public abstract class SortBufferWriteBufferTestBase {
             options.set(CoreOptions.AGGREGATION_REMOVE_RECORD_ON_DELETE, 
removeRecordOnDelete);
             return AggregateMergeFunction.factory(
                             options,
-                            Arrays.asList("f0", "f1"),
-                            Arrays.asList(DataTypes.INT().notNull(), 
DataTypes.BIGINT()),
+                            RowType.builder()
+                                    .fields(
+                                            new DataType[] {
+                                                DataTypes.INT().notNull(), 
DataTypes.BIGINT()
+                                            },
+                                            new String[] {"f0", "f1"})
+                                    .build(),
                             Collections.singletonList("f0"))
                     .create();
         }
@@ -263,8 +268,13 @@ public abstract class SortBufferWriteBufferTestBase {
             MergeFunctionFactory<KeyValue> aggMergeFunction =
                     AggregateMergeFunction.factory(
                             options,
-                            Arrays.asList("f0", "f1"),
-                            Arrays.asList(DataTypes.INT().notNull(), 
DataTypes.BIGINT()),
+                            RowType.builder()
+                                    .fields(
+                                            new DataType[] {
+                                                DataTypes.INT().notNull(), 
DataTypes.BIGINT()
+                                            },
+                                            new String[] {"f0", "f1"})
+                                    .build(),
                             Collections.singletonList("f0"));
             return LookupMergeFunction.wrap(aggMergeFunction, null, null, 
null).create();
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 5e88d2758e..846734b555 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Projection;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
@@ -332,15 +331,15 @@ public class PartialUpdateMergeFunctionTest {
                         DataTypes.INT(),
                         DataTypes.INT());
         // the sequence field 'f4' is projected too
-        int[][] projection = new int[][] {{1}, {4}, {3}, {7}};
+        RowType readType = rowType.project("f1", "f4", "f3", "f7");
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
-        MergeFunctionFactory.AdjustedProjection adjustedProjection =
-                factory.adjustProjection(projection);
+        RowType pushdownType = factory.adjustReadType(readType);
 
-        validate(adjustedProjection, new int[] {1, 4, 3, 7, 5}, new int[] {0, 
1, 2, 3});
+        assertThat(pushdownType).isNotNull();
+        assertThat(pushdownType.getFieldNames()).containsExactly("f1", "f4", 
"f3", "f7", "f5");
 
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(pushdownType);
         func.reset();
         // if sequence field is null, the related fields should not be updated
         add(func, 1, 1, 1, 1, 1);
@@ -364,15 +363,16 @@ public class PartialUpdateMergeFunctionTest {
                         DataTypes.INT(),
                         DataTypes.INT());
         // the sequence field 'f4' is projected too
-        int[][] projection = new int[][] {{1}, {4}, {3}, {7}};
+        RowType readType = rowType.project("f1", "f4", "f3", "f7");
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
-        MergeFunctionFactory.AdjustedProjection adjustedProjection =
-                factory.adjustProjection(projection);
+        RowType pushdownType = factory.adjustReadType(readType);
 
-        validate(adjustedProjection, new int[] {1, 4, 3, 7, 2, 5, 6}, new 
int[] {0, 1, 2, 3});
+        assertThat(pushdownType).isNotNull();
+        assertThat(pushdownType.getFieldNames())
+                .containsExactly("f1", "f4", "f3", "f7", "f2", "f5", "f6");
 
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(pushdownType);
         func.reset();
         // if sequence field is null, the related fields should not be updated
         add(func, 1, 1, 1, 1, 1, 1, 1);
@@ -396,18 +396,16 @@ public class PartialUpdateMergeFunctionTest {
                         DataTypes.INT(),
                         DataTypes.INT());
         // all fields are projected
-        int[][] projection = new int[][] {{0}, {1}, {2}, {3}, {4}, {5}, {6}, 
{7}};
+        RowType readType = rowType;
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
-        MergeFunctionFactory.AdjustedProjection adjustedProjection =
-                factory.adjustProjection(projection);
+        RowType pushdownType = factory.adjustReadType(readType);
 
-        validate(
-                adjustedProjection,
-                new int[] {0, 1, 2, 3, 4, 5, 6, 7},
-                new int[] {0, 1, 2, 3, 4, 5, 6, 7});
+        assertThat(pushdownType).isNotNull();
+        assertThat(pushdownType.getFieldNames())
+                .containsExactly("f0", "f1", "f2", "f3", "f4", "f5", "f6", 
"f7");
 
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(pushdownType);
         func.reset();
         // 'f6' has no sequence group, it should not be updated by null
         add(func, 1, 1, 1, 1, 1, 1, 1, 1);
@@ -431,18 +429,16 @@ public class PartialUpdateMergeFunctionTest {
                         DataTypes.INT(),
                         DataTypes.INT());
         // all fields are projected
-        int[][] projection = new int[][] {{0}, {1}, {2}, {3}, {4}, {5}, {6}, 
{7}};
+        RowType readType = rowType;
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
-        MergeFunctionFactory.AdjustedProjection adjustedProjection =
-                factory.adjustProjection(projection);
+        RowType pushdownType = factory.adjustReadType(readType);
 
-        validate(
-                adjustedProjection,
-                new int[] {0, 1, 2, 3, 4, 5, 6, 7},
-                new int[] {0, 1, 2, 3, 4, 5, 6, 7});
+        assertThat(pushdownType).isNotNull();
+        assertThat(pushdownType.getFieldNames())
+                .containsExactly("f0", "f1", "f2", "f3", "f4", "f5", "f6", 
"f7");
 
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(pushdownType);
         func.reset();
         // 'f6' has no sequence group, it should not be updated by null
         add(func, 1, 1, 1, 1, 1, 1, 1, 1);
@@ -465,16 +461,11 @@ public class PartialUpdateMergeFunctionTest {
                         DataTypes.INT(),
                         DataTypes.INT(),
                         DataTypes.INT());
-        // set the projection = null
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
-        MergeFunctionFactory.AdjustedProjection adjustedProjection = 
factory.adjustProjection(null);
 
-        validate(adjustedProjection, null, null);
-
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(rowType);
         func.reset();
-        // Setting projection with null is similar with projecting all fields
         add(func, 1, 1, 1, 1, 1, 1, 1, 1);
         add(func, 4, 2, 4, 2, 2, 0, null, 3);
         validate(func, 4, 2, 4, 2, 2, 1, 1, 1);
@@ -493,15 +484,14 @@ public class PartialUpdateMergeFunctionTest {
                         DataTypes.INT(),
                         DataTypes.INT(),
                         DataTypes.INT());
-        int[][] projection = new int[][] {{0}, {1}, {3}, {4}, {7}};
+        RowType readType = rowType.project("f0", "f1", "f3", "f4", "f7");
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
-        MergeFunctionFactory.AdjustedProjection adjustedProjection =
-                factory.adjustProjection(projection);
+        RowType pushdownType = factory.adjustReadType(readType);
 
-        validate(adjustedProjection, new int[] {0, 1, 3, 4, 7}, null);
+        assertThat(pushdownType).isEqualTo(readType);
 
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(pushdownType);
         func.reset();
         // Without sequence group, all the fields should not be updated by null
         add(func, 1, 1, 1, 1, 1);
@@ -526,12 +516,12 @@ public class PartialUpdateMergeFunctionTest {
                         DataTypes.INT(),
                         DataTypes.INT(),
                         DataTypes.INT());
-        int[][] projection = new int[][] {{1}, {7}};
+        RowType readType = rowType.project("f1", "f7");
         assertThatThrownBy(
                         () ->
                                 PartialUpdateMergeFunction.factory(
                                                 options, rowType, 
ImmutableList.of("f0"))
-                                        .create(projection))
+                                        .create(readType))
                 .hasMessageContaining("Can not find new sequence field for new 
field.");
     }
 
@@ -734,12 +724,12 @@ public class PartialUpdateMergeFunctionTest {
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
 
-        MergeFunctionFactory.AdjustedProjection adjustedProjection =
-                factory.adjustProjection(new int[][] {{3}, {2}, {5}});
+        RowType pushdownType = factory.adjustReadType(rowType.project("f3", 
"f2", "f5"));
 
-        validate(adjustedProjection, new int[] {3, 2, 5, 1}, new int[] {0, 1, 
2});
+        assertThat(pushdownType).isNotNull();
+        assertThat(pushdownType.getFieldNames()).containsExactly("f3", "f2", 
"f5", "f1");
 
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(pushdownType);
 
         func.reset();
         // f0 pk
@@ -790,12 +780,12 @@ public class PartialUpdateMergeFunctionTest {
         MergeFunctionFactory<KeyValue> factory =
                 PartialUpdateMergeFunction.factory(options, rowType, 
ImmutableList.of("f0"));
 
-        MergeFunctionFactory.AdjustedProjection adjustedProjection =
-                factory.adjustProjection(new int[][] {{3}, {2}, {5}});
+        RowType pushdownType = factory.adjustReadType(rowType.project("f3", 
"f2", "f5"));
 
-        validate(adjustedProjection, new int[] {3, 2, 5, 1, 8}, new int[] {0, 
1, 2});
+        assertThat(pushdownType).isNotNull();
+        assertThat(pushdownType.getFieldNames()).containsExactly("f3", "f2", 
"f5", "f1", "f8");
 
-        MergeFunction<KeyValue> func = 
factory.create(adjustedProjection.pushdownProjection);
+        MergeFunction<KeyValue> func = factory.create(pushdownType);
 
         func.reset();
         // f0 pk
@@ -894,22 +884,4 @@ public class PartialUpdateMergeFunctionTest {
     private void validate(MergeFunction<KeyValue> function, Integer... f) {
         assertThat(function.getResult().value()).isEqualTo(GenericRow.of(f));
     }
-
-    private void validate(
-            MergeFunctionFactory.AdjustedProjection projection, int[] 
pushdown, int[] outer) {
-        if (projection.pushdownProjection == null) {
-            assertThat(pushdown).isNull();
-        } else {
-            assertThat(pushdown)
-                    .containsExactly(
-                            
Projection.of(projection.pushdownProjection).toTopLevelIndexes());
-        }
-
-        if (projection.outerProjection == null) {
-            assertThat(outer).isNull();
-        } else {
-            assertThat(outer)
-                    
.containsExactly(Projection.of(projection.outerProjection).toTopLevelIndexes());
-        }
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
index 70cdbe3103..c1b2e9dcd8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -23,12 +23,13 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 
 import static org.apache.paimon.CoreOptions.FIELDS_DEFAULT_AGG_FUNC;
@@ -45,13 +46,17 @@ class AggregateMergeFunctionTest {
         MergeFunction<KeyValue> aggregateFunction =
                 AggregateMergeFunction.factory(
                                 options,
-                                Arrays.asList("k", "a", "b", "c", "d"),
-                                Arrays.asList(
-                                        DataTypes.INT(),
-                                        DataTypes.INT(),
-                                        DataTypes.INT(),
-                                        DataTypes.INT(),
-                                        DataTypes.INT()),
+                                RowType.builder()
+                                        .fields(
+                                                new DataType[] {
+                                                    DataTypes.INT(),
+                                                    DataTypes.INT(),
+                                                    DataTypes.INT(),
+                                                    DataTypes.INT(),
+                                                    DataTypes.INT()
+                                                },
+                                                new String[] {"k", "a", "b", 
"c", "d"})
+                                        .build(),
                                 Collections.singletonList("k"))
                         .create();
         aggregateFunction.reset();
@@ -76,13 +81,17 @@ class AggregateMergeFunctionTest {
         MergeFunction<KeyValue> aggregateFunction =
                 AggregateMergeFunction.factory(
                                 options,
-                                Arrays.asList("k", "a", "b", "c", "d"),
-                                Arrays.asList(
-                                        DataTypes.INT(),
-                                        DataTypes.STRING(),
-                                        DataTypes.STRING(),
-                                        DataTypes.INT(),
-                                        DataTypes.STRING()),
+                                RowType.builder()
+                                        .fields(
+                                                new DataType[] {
+                                                    DataTypes.INT(),
+                                                    DataTypes.STRING(),
+                                                    DataTypes.STRING(),
+                                                    DataTypes.INT(),
+                                                    DataTypes.STRING()
+                                                },
+                                                new String[] {"k", "a", "b", 
"c", "d"})
+                                        .build(),
                                 Collections.singletonList("k"))
                         .create();
         aggregateFunction.reset();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 44a02699e4..db65eb8d16 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -363,7 +363,7 @@ public class MergeFileSplitReadTest {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public MergeFunction<KeyValue> create(@Nullable int[][] 
projection) {
+            public MergeFunction<KeyValue> create(@Nullable RowType readType) {
                 return new TestValueCountMergeFunction();
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index bb48dbf363..401ca43682 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -32,7 +32,6 @@ import 
org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
 import org.apache.paimon.mergetree.localmerge.LocalMerger;
 import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger;
 import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.PrimaryKeyTableUtils;
 import org.apache.paimon.table.sink.RowKindGenerator;
@@ -61,8 +60,6 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 
-import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
-
 /**
  * {@link AbstractStreamOperator} which buffer input record and apply merge 
function when the buffer
  * is full. Mainly to resolve data skew on primary keys.
@@ -106,24 +103,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
         rowKindGenerator = RowKindGenerator.create(schema, options);
         MergeFunction<KeyValue> mergeFunction =
-                PrimaryKeyTableUtils.createMergeFunctionFactory(
-                                schema,
-                                new KeyValueFieldsExtractor() {
-                                    private static final long serialVersionUID 
= 1L;
-
-                                    // At local merge operator, the key 
extractor should include
-                                    // partition fields.
-                                    @Override
-                                    public List<DataField> 
keyFields(TableSchema schema) {
-                                        return 
addKeyNamePrefix(schema.primaryKeysFields());
-                                    }
-
-                                    @Override
-                                    public List<DataField> 
valueFields(TableSchema schema) {
-                                        return schema.fields();
-                                    }
-                                })
-                        .create();
+                
PrimaryKeyTableUtils.createMergeFunctionFactory(schema).create();
 
         boolean canHashMerger = true;
         for (DataField field : valueType.getFields()) {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
index 9fddb2f64e..5e4c074dbf 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/VariantTestBase.scala
@@ -715,4 +715,158 @@ abstract class VariantTestBase extends 
PaimonSparkTestBase {
       )
     )
   }
+
+  test("Paimon Variant: partial update with variant") {
+    withTable("t") {
+      sql("""
+            |CREATE table t (
+            |  id INT,
+            |  ts INT,
+            |  dt INT,
+            |  v VARIANT
+            |)
+            |TBLPROPERTIES (
+            |  'primary-key' = 'id',
+            |  'bucket' = '1',
+            |  'changelog-producer' = 'lookup',
+            |  'merge-engine' = 'partial-update',
+            |  'fields.dt.sequence-group' = 'ts',
+            |  'fields.ts.aggregate-function' = 'max',
+            |  'write-only' = 'true'
+            |)
+            |""".stripMargin)
+
+      sql("""
+            |INSERT INTO t VALUES
+            | (1, 1, 1, parse_json('{"c":{"a1":1,"a2":2}}'))
+            | """.stripMargin)
+
+      sql("""
+            |INSERT INTO t VALUES
+            | (1, 2, 2, parse_json('{"c":{"a1":3,"a2":4}}'))
+            | """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM t"),
+        sql("""SELECT 1, 2, 2, parse_json('{"c":{"a1":3,"a2":4}}')""")
+      )
+      checkAnswer(
+        sql("SELECT variant_get(v, '$.c', 'string') FROM t"),
+        Seq(
+          Row("{\"a1\":3,\"a2\":4}")
+        )
+      )
+    }
+  }
+
+  test("Paimon Variant: deduplicate with variant") {
+    withTable("t_dedup") {
+      sql("""
+            |CREATE table t_dedup (
+            |  id INT,
+            |  name STRING,
+            |  v VARIANT
+            |) TBLPROPERTIES
+            |(
+            |  'primary-key' = 'id',
+            |  'bucket' = '1',
+            |  'merge-engine' = 'deduplicate',
+            |  'write-only' = 'true'
+            |)
+            |""".stripMargin)
+
+      sql("""
+            |INSERT INTO t_dedup VALUES
+            | (1, 'Alice', parse_json('{"age":30,"city":"NYC"}'))
+            | """.stripMargin)
+
+      sql("""
+            |INSERT INTO t_dedup VALUES
+            | (1, 'Bob', parse_json('{"age":25,"city":"LA"}'))
+            | """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM t_dedup"),
+        sql("""SELECT 1, 'Bob', parse_json('{"age":25,"city":"LA"}')""")
+      )
+      checkAnswer(
+        sql("SELECT variant_get(v, '$.age', 'int') FROM t_dedup"),
+        Seq(Row(25))
+      )
+    }
+  }
+
+  test("Paimon Variant: aggregate with variant") {
+    withTable("t_agg") {
+      sql("""
+            |CREATE table t_agg (
+            |  id INT,
+            |  cnt INT,
+            |  v VARIANT
+            |) TBLPROPERTIES
+            |(
+            |  'primary-key' = 'id',
+            |  'bucket' = '1',
+            |  'merge-engine' = 'aggregation',
+            |  'fields.cnt.aggregate-function' = 'sum',
+            |  'write-only' = 'true'
+            |)
+            |""".stripMargin)
+
+      sql("""
+            |INSERT INTO t_agg VALUES
+            | (1, 10, parse_json('{"data":{"x":1,"y":2}}'))
+            | """.stripMargin)
+
+      sql("""
+            |INSERT INTO t_agg VALUES
+            | (1, 20, parse_json('{"data":{"x":3,"y":4}}'))
+            | """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM t_agg"),
+        sql("""SELECT 1, 30, parse_json('{"data":{"x":3,"y":4}}')""")
+      )
+      checkAnswer(
+        sql("SELECT variant_get(v, '$.data.x', 'int') FROM t_agg"),
+        Seq(Row(3))
+      )
+    }
+  }
+
+  test("Paimon Variant: first-row with variant") {
+    withTable("t_first") {
+      sql("""
+            |CREATE table t_first (
+            |  id INT,
+            |  seq INT,
+            |  v VARIANT
+            |) TBLPROPERTIES
+            |(
+            |  'primary-key' = 'id',
+            |  'bucket' = '1',
+            |  'merge-engine' = 'first-row'
+            |)
+            |""".stripMargin)
+
+      sql("""
+            |INSERT INTO t_first VALUES
+            | (1, 100, parse_json('{"status":"active"}'))
+            | """.stripMargin)
+
+      sql("""
+            |INSERT INTO t_first VALUES
+            | (1, 200, parse_json('{"status":"inactive"}'))
+            | """.stripMargin)
+
+      checkAnswer(
+        sql("SELECT * FROM t_first"),
+        sql("""SELECT 1, 100, parse_json('{"status":"active"}')""")
+      )
+      checkAnswer(
+        sql("SELECT variant_get(v, '$.status', 'string') FROM t_first"),
+        Seq(Row("active"))
+      )
+    }
+  }
 }

Reply via email to