This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bc6d341b20 [flink] introduce a simplified MERGE INTO procedure on 
data-evolution-table for flink (#7128)
bc6d341b20 is described below

commit bc6d341b20935b8baca57ec64eb3bab8109283b7
Author: Faiz <[email protected]>
AuthorDate: Thu Jan 29 10:46:02 2026 +0800

    [flink] introduce a simplified MERGE INTO procedure on data-evolution-table 
for flink (#7128)
---
 docs/content/append-table/data-evolution.md        |  31 +-
 .../flink/action/DataEvolutionMergeIntoAction.java | 518 +++++++++++++++++++
 .../DataEvolutionMergeIntoActionFactory.java       | 130 +++++
 .../paimon/flink/action/TableActionBase.java       |   2 +-
 .../DataEvolutionPartialWriteOperator.java         | 340 ++++++++++++
 .../flink/dataevolution/FirstRowIdAssigner.java    |  81 +++
 .../flink/dataevolution/FirstRowIdLookup.java      |  52 ++
 .../procedure/DataEvolutionMergeIntoProcedure.java | 139 +++++
 .../services/org.apache.paimon.factories.Factory   |   4 +-
 .../action/DataEvolutionMergeIntoActionITCase.java | 574 +++++++++++++++++++++
 10 files changed, 1868 insertions(+), 3 deletions(-)

diff --git a/docs/content/append-table/data-evolution.md 
b/docs/content/append-table/data-evolution.md
index 87d7a3390b..cb3b72597a 100644
--- a/docs/content/append-table/data-evolution.md
+++ b/docs/content/append-table/data-evolution.md
@@ -57,7 +57,9 @@ CREATE TABLE target_table (id INT, b INT, c INT) 
TBLPROPERTIES (
 INSERT INTO target_table VALUES (1, 1, 1), (2, 2, 2);
 ```
 
-Now we could only support spark 'MERGE INTO' statement to update partial 
columns.
+Now we could update partial columns by spark 'MERGE INTO' statement or flink 
'data_evolution_merge_into' procedure:
+
+### Spark
 
 ```sql
 CREATE TABLE source_table (id INT, b INT);
@@ -85,6 +87,33 @@ Note that:
 * Data Evolution Table does not support 'Delete', 'Update', or 'Compact' 
statement yet.
 * Merge Into for Data Evolution Table does not support 'WHEN NOT MATCHED BY 
SOURCE' clause.
 
+### Flink
+Since Flink does not currently support the MERGE INTO syntax, we simulate the 
merge-into process using the data_evolution_merge_into procedure, as shown 
below:
+
+```sql
+CREATE TABLE source_table (id INT, b INT);
+INSERT INTO source_table VALUES (1, 11), (2, 22), (3, 33);
+
+CALL sys.data_evolution_merge_into(
+    'my_db.target_table', 
+    '',   /* Optional target alias */
+    '',   /* Optional source sqls */
+    'source_table',
+    'source_table.id=target_table.id',
+    'b=source_table.b',
+    2     /* Specify sink parallelism */
+);
+
+SELECT * FROM source_table
++----+----+----+
+| id | b  | c  |
++----+----+----+
+| 1  | 11 | 1  |
+| 2  | 22 | 2  |
+```
+Note that:
+* Compared to Spark implementation, Flink data_evolution_merge_into procedure 
only supports updating/inserting new columns now. Inserting new rows is not 
supported yet.
+
 ## File Group Spec
 
 Through the RowId metadata, files are organized into a file group.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
new file mode 100644
index 0000000000..bec57cd7f4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.dataevolution.DataEvolutionPartialWriteOperator;
+import org.apache.paimon.flink.dataevolution.FirstRowIdAssigner;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.CommitterOperatorFactory;
+import org.apache.paimon.flink.sink.NoopCommittableStateManager;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.flink.sorter.SortOperator;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
+
+/**
+ * The flink action for 'MERGE INTO' on Data-Evolution Table. This action is 
specially implemented
+ * for the Data-Evolution pattern which can batch insert and update columns 
without rewriting
+ * existing data files. This is a simplified version of standard 'MERGE INTO': 
we do not support
+ * deleting or appending data now.
+ *
+ * <pre><code>
+ *  MERGE INTO target-table
+ *  USING source-table | source-expr AS source-alias
+ *  ON merge-condition
+ *  WHEN MATCHED
+ *    THEN UPDATE SET xxx
+ * </code></pre>
+ */
+public class DataEvolutionMergeIntoAction extends TableActionBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataEvolutionMergeIntoAction.class);
+    public static final String IDENTIFIER_QUOTE = "`";
+
+    private final CoreOptions coreOptions;
+
+    // field names of target table
+    private final List<String> targetFieldNames;
+
+    /**
+     * Target Table's alias. The alias is implemented through rewriting 
merge-condition. For
+     * example, if the original condition is `TempT.id=S.id`, it will be 
rewritten to `RT.id=S.id`.
+     * `RT` means 'row-tracking target'. The reason is that _ROW_ID metadata 
field is exposed via
+     * system table like `T$row_tracking`, so we have to rewrite merge 
condition. Moreover, if we
+     * still create a temporary view such as `viewT`, then 
`viewT$row_tracking` is not a valid
+     * table.
+     */
+    @Nullable private String targetAlias;
+
+    // source table name
+    private String sourceTable;
+
+    // sqls to config environment and create source table
+    @Nullable private String[] sourceSqls;
+
+    // merge condition
+    private String mergeCondition;
+
+    // set statement
+    private String matchedUpdateSet;
+
+    private int sinkParallelism;
+
+    public DataEvolutionMergeIntoAction(
+            String databaseName, String tableName, Map<String, String> 
catalogConfig) {
+        super(databaseName, tableName, catalogConfig);
+
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports merge-into action. 
The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+
+        this.coreOptions = ((FileStoreTable) table).coreOptions();
+
+        if (!coreOptions.dataEvolutionEnabled()) {
+            throw new UnsupportedOperationException(
+                    "Only DataEvolutionTable supports data-evolution 
merge-into action.");
+        }
+
+        // init field names of target table
+        targetFieldNames =
+                table.rowType().getFields().stream()
+                        .map(DataField::name)
+                        .collect(Collectors.toList());
+    }
+
+    public DataEvolutionMergeIntoAction withSourceTable(String sourceTable) {
+        this.sourceTable = sourceTable;
+        return this;
+    }
+
+    public DataEvolutionMergeIntoAction withSourceSqls(String... sourceSqls) {
+        this.sourceSqls = sourceSqls;
+        return this;
+    }
+
+    public DataEvolutionMergeIntoAction withTargetAlias(String targetAlias) {
+        this.targetAlias = targetAlias;
+        return this;
+    }
+
+    public DataEvolutionMergeIntoAction withMergeCondition(String 
mergeCondition) {
+        this.mergeCondition = mergeCondition;
+        return this;
+    }
+
+    public DataEvolutionMergeIntoAction withMatchedUpdateSet(String 
matchedUpdateSet) {
+        this.matchedUpdateSet = matchedUpdateSet;
+        return this;
+    }
+
+    public DataEvolutionMergeIntoAction withSinkParallelism(int 
sinkParallelism) {
+        this.sinkParallelism = sinkParallelism;
+        return this;
+    }
+
+    @Override
+    public void run() throws Exception {
+        runInternal().await();
+    }
+
+    public TableResult runInternal() {
+        // 1. build source
+        Tuple2<DataStream<RowData>, RowType> sourceWithType = buildSource();
+        // 2. shuffle by firstRowId
+        DataStream<Tuple2<Long, RowData>> shuffled =
+                shuffleByFirstRowId(sourceWithType.f0, sourceWithType.f1);
+        // 3. write partial columns
+        DataStream<Committable> written =
+                writePartialColumns(shuffled, sourceWithType.f1, 
sinkParallelism);
+        // 4. commit
+        DataStream<?> committed = commit(written);
+
+        // execute internal
+        Transformation<?> transformations =
+                committed
+                        .sinkTo(new DiscardingSink<>())
+                        .name("END")
+                        .setParallelism(1)
+                        .getTransformation();
+
+        return executeInternal(
+                Collections.singletonList(transformations),
+                Collections.singletonList(identifier.getFullName()));
+    }
+
+    public Tuple2<DataStream<RowData>, RowType> buildSource() {
+        // handle sqls
+        handleSqls();
+
+        // assign row id for each source row
+        List<String> project;
+        if (matchedUpdateSet.equals("*")) {
+            // if sourceName is qualified like 'default.S', we should build a 
project like S.*
+            String[] splits = sourceTable.split("\\.");
+            project = Collections.singletonList(splits[splits.length - 1] + 
".*");
+        } else {
+            // validate upsert changes
+            Map<String, String> changes = 
parseCommaSeparatedKeyValues(matchedUpdateSet);
+            for (String targetField : changes.keySet()) {
+                if (!targetFieldNames.contains(extractFieldName(targetField))) 
{
+                    throw new RuntimeException(
+                            String.format(
+                                    "Invalid column reference '%s' of table 
'%s' at matched-upsert action.",
+                                    targetField, identifier.getFullName()));
+                }
+            }
+
+            // rename source table's selected columns according to SET 
statement
+            project =
+                    changes.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s AS `%s`",
+                                                    entry.getValue(),
+                                                    
extractFieldName(entry.getKey())))
+                            .collect(Collectors.toList());
+        }
+
+        // use join to find matched rows and assign row id for each source row.
+        // _ROW_ID is the first field of joined table.
+        String query =
+                String.format(
+                        "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s",
+                        "`RT`.`_ROW_ID` as `_ROW_ID`",
+                        String.join(",", project),
+                        escapedSourceName(),
+                        escapedRowTrackingTargetName(),
+                        rewriteMergeCondition(mergeCondition));
+
+        LOG.info("Source query: {}", query);
+
+        Table source = batchTEnv.sqlQuery(query);
+
+        checkSchema(source);
+        RowType sourceType =
+                SpecialFields.rowTypeWithRowId(table.rowType())
+                        .project(source.getResolvedSchema().getColumnNames());
+
+        return Tuple2.of(toDataStream(source), sourceType);
+    }
+
+    public DataStream<Tuple2<Long, RowData>> shuffleByFirstRowId(
+            DataStream<RowData> source, RowType sourceType) {
+        Transformation<RowData> sourceTransformation = 
source.getTransformation();
+        List<Long> firstRowIds =
+                ((FileStoreTable) table)
+                        .store().newScan()
+                                .withManifestEntryFilter(
+                                        entry ->
+                                                entry.file().firstRowId() != 
null
+                                                        && 
!isBlobFile(entry.file().fileName()))
+                                .plan().files().stream()
+                                .map(entry -> entry.file().nonNullFirstRowId())
+                                .sorted()
+                                .collect(Collectors.toList());
+
+        Preconditions.checkState(
+                !firstRowIds.isEmpty(), "Should not MERGE INTO an empty target 
table.");
+
+        OneInputTransformation<RowData, Tuple2<Long, RowData>> 
assignedFirstRowId =
+                new OneInputTransformation<>(
+                        sourceTransformation,
+                        "ASSIGN FIRST_ROW_ID",
+                        new StreamMap<>(new FirstRowIdAssigner(firstRowIds, 
sourceType)),
+                        new TupleTypeInfo<>(
+                                BasicTypeInfo.LONG_TYPE_INFO, 
sourceTransformation.getOutputType()),
+                        sourceTransformation.getParallelism(),
+                        sourceTransformation.isParallelismConfigured());
+
+        // shuffle by firstRowId
+        return new DataStream<>(source.getExecutionEnvironment(), 
assignedFirstRowId)
+                .partitionCustom(
+                        new FirstRowIdAssigner.FirstRowIdPartitioner(),
+                        new FirstRowIdAssigner.FirstRowIdKeySelector());
+    }
+
+    public DataStream<Committable> writePartialColumns(
+            DataStream<Tuple2<Long, RowData>> shuffled, RowType rowType, int 
sinkParallelism) {
+        // 1. sort data by row id
+        InternalTypeInfo<InternalRow> typeInfo = 
InternalTypeInfo.fromRowType(rowType);
+        RowType sortType = rowType.project(SpecialFields.ROW_ID.name());
+        DataStream<InternalRow> sorted =
+                shuffled.map(t -> new FlinkRowWrapper(t.f1), typeInfo)
+                        .setParallelism(sinkParallelism)
+                        .transform(
+                                "SORT BY _ROW_ID",
+                                typeInfo,
+                                new SortOperator(
+                                        sortType,
+                                        rowType,
+                                        coreOptions.writeBufferSize(),
+                                        coreOptions.pageSize(),
+                                        
coreOptions.localSortMaxNumFileHandles(),
+                                        coreOptions.spillCompressOptions(),
+                                        sinkParallelism,
+                                        coreOptions.writeBufferSpillDiskSize(),
+                                        
coreOptions.sequenceFieldSortOrderIsAscending()))
+                        .setParallelism(sinkParallelism);
+
+        // 2. write partial columns
+        return sorted.transform(
+                        "PARTIAL WRITE COLUMNS",
+                        new CommittableTypeInfo(),
+                        new DataEvolutionPartialWriteOperator((FileStoreTable) 
table, rowType))
+                .setParallelism(sinkParallelism);
+    }
+
+    public DataStream<Committable> commit(DataStream<Committable> written) {
+        FileStoreTable storeTable = (FileStoreTable) table;
+        OneInputStreamOperatorFactory<Committable, Committable> 
committerOperator =
+                new CommitterOperatorFactory<>(
+                        false,
+                        true,
+                        "DataEvolutionMergeInto",
+                        context ->
+                                new StoreCommitter(
+                                        storeTable,
+                                        
storeTable.newCommit(context.commitUser()),
+                                        context),
+                        new NoopCommittableStateManager());
+
+        return written.transform("COMMIT OPERATOR", new CommittableTypeInfo(), 
committerOperator)
+                .setParallelism(1)
+                .setMaxParallelism(1);
+    }
+
+    private DataStream<RowData> toDataStream(Table source) {
+        List<DataStructureConverter<Object, Object>> converters =
+                source.getResolvedSchema().getColumns().stream()
+                        .map(Column::getDataType)
+                        .map(DataStructureConverters::getConverter)
+                        .collect(Collectors.toList());
+
+        return batchTEnv
+                .toDataStream(source)
+                .map(
+                        row -> {
+                            int arity = row.getArity();
+                            GenericRowData rowData = new 
GenericRowData(row.getKind(), arity);
+                            for (int i = 0; i < arity; i++) {
+                                rowData.setField(
+                                        i, 
converters.get(i).toInternalOrNull(row.getField(i)));
+                            }
+                            return rowData;
+                        });
+    }
+
+    /**
+     * Rewrite merge condition, replacing all references to target table with 
the alias 'RT'. This
+     * is necessary because in Flink, row-tracking metadata columns (e.g. 
_ROW_ID, SEQUENCE_NUMBER)
+     * are exposed through system table (i.e. {@code SELECT * FROM 
T$row_tracking}), we use 'RT' to
+     * simplify its representation.
+     */
+    @VisibleForTesting
+    public String rewriteMergeCondition(String mergeCondition) {
+        // skip single and double-quoted chunks
+        String skipQuoted = "'(?:''|[^'])*'" + "|\"(?:\"\"|[^\"])*\"";
+        String targetTableRegex =
+                "(?i)(?:\\b"
+                        + Pattern.quote(targetTableName())
+                        + "\\b|`"
+                        + Pattern.quote(targetTableName())
+                        + "`)\\s*\\.";
+
+        Pattern pattern = Pattern.compile(skipQuoted + "|(" + targetTableRegex 
+ ")");
+        Matcher matcher = pattern.matcher(mergeCondition);
+
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            if (matcher.group(1) != null) {
+                matcher.appendReplacement(sb, 
Matcher.quoteReplacement("`RT`."));
+            } else {
+                matcher.appendReplacement(sb, 
Matcher.quoteReplacement(matcher.group(0)));
+            }
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+
+    /**
+     * Check the schema of generated source data. All columns of source table 
should be present in
+     * target table, and it should also contain a _ROW_ID column.
+     *
+     * @param source source table
+     */
+    private void checkSchema(Table source) {
+        Map<String, DataField> targetFields =
+                table.rowType().getFields().stream()
+                        .collect(Collectors.toMap(DataField::name, 
Function.identity()));
+        List<String> partitionKeys = ((FileStoreTable) 
table).schema().partitionKeys();
+
+        List<Column> flinkColumns = source.getResolvedSchema().getColumns();
+        boolean foundRowIdColumn = false;
+        for (Column flinkColumn : flinkColumns) {
+            if (partitionKeys.contains(flinkColumn.getName())) {
+                throw new IllegalStateException(
+                        "User should not update partition columns: " + 
flinkColumn.getName());
+            }
+            if (flinkColumn.getName().equals("_ROW_ID")) {
+                foundRowIdColumn = true;
+                Preconditions.checkState(
+                        
flinkColumn.getDataType().getLogicalType().getTypeRoot()
+                                == LogicalTypeRoot.BIGINT);
+            } else {
+                DataField targetField = 
targetFields.get(flinkColumn.getName());
+                if (targetField == null) {
+                    throw new IllegalStateException(
+                            "Column not found in target table: " + 
flinkColumn.getName());
+                }
+                if (targetField.type().getTypeRoot() == DataTypeRoot.BLOB) {
+                    throw new IllegalStateException(
+                            "Should not append/update new BLOB column through 
MERGE INTO.");
+                }
+
+                DataType paimonType =
+                        LogicalTypeConversion.toDataType(
+                                flinkColumn.getDataType().getLogicalType());
+                if (!DataTypeCasts.supportsCompatibleCast(paimonType, 
targetField.type())) {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "DataType incompatible of field %s: %s is 
not compatible with %s",
+                                    flinkColumn.getName(), paimonType, 
targetField.type()));
+                }
+            }
+        }
+        if (!foundRowIdColumn) {
+            throw new IllegalStateException("_ROW_ID column not found in 
generated source.");
+        }
+    }
+
+    private void handleSqls() {
+        // NOTE: sql may change current catalog and database
+        if (sourceSqls != null) {
+            for (String sql : sourceSqls) {
+                try {
+                    batchTEnv.executeSql(sql).await();
+                } catch (Throwable t) {
+                    String errMsg = "Error occurs when executing sql:\n%s";
+                    LOG.error(String.format(errMsg, sql), t);
+                    throw new RuntimeException(String.format(errMsg, sql), t);
+                }
+            }
+        }
+    }
+
+    private String targetTableName() {
+        return targetAlias == null ? identifier.getObjectName() : targetAlias;
+    }
+
+    private String escapedSourceName() {
+        return Arrays.stream(sourceTable.split("\\."))
+                .map(s -> String.format("`%s`", s))
+                .collect(Collectors.joining("."));
+    }
+
+    private String extractFieldName(String sourceField) {
+        String[] fieldPath = sourceField.split("\\.");
+        return fieldPath[fieldPath.length - 1];
+    }
+
+    private String escapedRowTrackingTargetName() {
+        return String.format(
+                "`%s`.`%s`.`%s$row_tracking`",
+                catalogName, identifier.getDatabaseName(), 
identifier.getObjectName());
+    }
+
+    private List<String> normalizeFieldName(List<String> fieldNames) {
+        return 
fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList());
+    }
+
+    private String normalizeFieldName(String fieldName) {
+        if (StringUtils.isNullOrWhitespaceOnly(fieldName) || 
fieldName.endsWith(IDENTIFIER_QUOTE)) {
+            return fieldName;
+        }
+
+        String[] splitFieldNames = fieldName.split("\\.");
+        if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length 
- 1])) {
+            return fieldName;
+        }
+
+        return String.join(
+                ".",
+                Arrays.stream(splitFieldNames)
+                        .map(
+                                part ->
+                                        part.endsWith(IDENTIFIER_QUOTE)
+                                                ? part
+                                                : IDENTIFIER_QUOTE + part + 
IDENTIFIER_QUOTE)
+                        .toArray(String[]::new));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionFactory.java
new file mode 100644
index 0000000000..6d921a984e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/** The {@link ActionFactory} for {@link DataEvolutionMergeIntoAction}. */
+public class DataEvolutionMergeIntoActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "data_evolution_merge_into";
+
+    private static final String TARGET_AS = "target_as";
+    private static final String SOURCE_SQL = "source_sql";
+    private static final String SOURCE_TABLE = "source_table";
+    private static final String ON = "on";
+    private static final String MATCHED_UPDATE_SET = "matched_update_set";
+    private static final String SINK_PARALLELISM = "sink_parallelism";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        DataEvolutionMergeIntoAction action =
+                new DataEvolutionMergeIntoAction(
+                        params.getRequired(DATABASE),
+                        params.getRequired(TABLE),
+                        catalogConfigMap(params));
+
+        // optional params
+        if (params.has(TARGET_AS)) {
+            action.withTargetAlias(params.getRequired(TARGET_AS));
+        }
+
+        if (params.has(SOURCE_SQL)) {
+            Collection<String> sourceSqls = 
params.getMultiParameter(SOURCE_SQL);
+            action.withSourceSqls(sourceSqls.toArray(new String[0]));
+        }
+
+        // required params
+        action.withSourceTable(params.getRequired(SOURCE_TABLE));
+        action.withMergeCondition(params.getRequired(ON));
+        action.withMatchedUpdateSet(params.getRequired(MATCHED_UPDATE_SET));
+
+        int sinkParallelism;
+        try {
+            sinkParallelism = 
Integer.parseInt(params.getRequired(SINK_PARALLELISM));
+        } catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Invalid sink parallelism, must 
be an integer", e);
+        }
+        action.withSinkParallelism(sinkParallelism);
+
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"merge_into\" specially implemented for 
DataEvolutionTables.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  data_evolution_merge_into \\\n"
+                        + "--warehouse <warehouse_path> \\\n"
+                        + "--database <database_name> \\\n"
+                        + "--table <target_table_name> \\\n"
+                        + "[--target_as <target_table_alias>] \\\n"
+                        + "[--source_sql <sql> ...] \\\n"
+                        + "--source_table <source_table_name> \\\n"
+                        + "--on <merge_condition> \\\n"
+                        + "--matched_update_set <update_changes> \\\n"
+                        + "--sink_parallelism <sink_parallelism>");
+
+        System.out.println("  matched_update_set format:");
+        System.out.println(
+                "    col=<source_table>.col | expression [, ...] (do not add 
'<target_table>.' before 'col')");
+        System.out.println(
+                "    * (update with all source cols; require target table's 
schema is a projection of source's)");
+
+        System.out.println("  alternative arguments:");
+        System.out.println("    --path <table_path> to represent the table 
path.");
+        System.out.println();
+
+        System.out.println("Note: ");
+        System.out.println("  1. Target table must be a data-evolution 
table.");
+        System.out.println(
+                "  2. This is a simplified merge-into action, specially 
implemented for DataEvolutionTables:\n"
+                        + "       (1) Only supports matched update action.\n"
+                        + "       (2) Only generates new data files without 
rewriting existing files.\n"
+                        + "       (3) Nulls will also override existing 
values.");
+        System.out.println(
+                "  2. All conditions, set changes and values should use Flink 
SQL syntax. Please quote them with \" to escape special characters.");
+        System.out.println(
+                "  3. You can pass sqls by --source_sql to config environment 
and create source table at runtime");
+        System.out.println("  4. Target alias cannot be duplicated with 
existed table name.");
+        System.out.println(
+                "  5. If the source table is not in the current catalog and 
current database, "
+                        + "the source_table_name must be qualified 
(database.table or catalog.database.table if in different catalog).");
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  data_evolution_merge_into \\\n"
+                        + "--path hdfs:///path/to/T \\\n"
+                        + "--source_table S \\\n"
+                        + "--on \"T.id = S.id\" \\\n"
+                        + "--matched_upsert_set \"value = S.`value`\"");
+        System.out.println(
+                "  It will find matched rows of target table that meet 
condition (T.id = S.id), then write new files which only contain"
+                        + " the updated column `value`.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 870d22012b..aa977a9e2b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -69,7 +69,7 @@ public abstract class TableActionBase extends ActionBase {
      * Invoke {@code 
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
      * from a {@link StreamTableEnvironment} instance through reflecting.
      */
-    private TableResult executeInternal(
+    protected TableResult executeInternal(
             List<Transformation<?>> transformations, List<String> 
sinkIdentifierNames) {
         Class<?> clazz = batchTEnv.getClass().getSuperclass().getSuperclass();
         try {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
new file mode 100644
index 0000000000..4f9ec8c643
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RecordWriter;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+
+/**
+ * The Flink Batch Operator to process sorted new rows for data-evolution 
partial write. It assumes
+ * that input data has already been shuffled by firstRowId and sorted by rowId.
+ */
+public class DataEvolutionPartialWriteOperator
+        extends BoundedOneInputOperator<InternalRow, Committable> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DataEvolutionPartialWriteOperator.class);
+
+    private final FileStoreTable table;
+
+    // dataType
+    private final RowType dataType;
+    private final InternalRow.FieldGetter[] fieldGetters;
+    private final int rowIdIndex;
+
+    // data type excludes of _ROW_ID field.
+    private final RowType writeType;
+
+    private List<Committable> committables = new ArrayList<>();
+
+    // --------------------- transient fields ---------------------------
+
+    // first-row-id related fields
+    private transient FirstRowIdLookup firstRowIdLookup;
+    private transient Map<Long, BinaryRow> firstIdToPartition;
+    private transient Map<Long, List<DataFileMeta>> firstIdToFiles;
+
+    private transient InnerTableRead tableRead;
+    private transient AbstractFileStoreWrite<InternalRow> tableWrite;
+    private transient Writer writer;
+
+    public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType 
dataType) {
+        this.table = table;
+        List<String> fieldNames =
+                dataType.getFieldNames().stream()
+                        .filter(name -> 
!SpecialFields.ROW_ID.name().equals(name))
+                        .collect(Collectors.toList());
+        this.writeType = table.rowType().project(fieldNames);
+        this.dataType =
+                
SpecialFields.rowTypeWithRowId(table.rowType()).project(dataType.getFieldNames());
+        this.rowIdIndex = 
this.dataType.getFieldIndex(SpecialFields.ROW_ID.name());
+        this.fieldGetters = new 
InternalRow.FieldGetter[dataType.getFieldCount()];
+        List<DataField> fields = this.dataType.getFields();
+        for (int i = 0; i < fields.size(); i++) {
+            this.fieldGetters[i] = 
InternalRow.createFieldGetter(fields.get(i).type(), i);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        // initialize row-id-related data structures
+        TreeSet<Long> rowIdSet = new TreeSet<>();
+        firstIdToPartition = new HashMap<>();
+        firstIdToFiles = new HashMap<>();
+
+        List<ManifestEntry> entries =
+                table.store()
+                        .newScan()
+                        .withManifestEntryFilter(
+                                entry ->
+                                        entry.file().firstRowId() != null
+                                                && 
!isBlobFile(entry.file().fileName()))
+                        .plan()
+                        .files();
+
+        for (ManifestEntry entry : entries) {
+            DataFileMeta fileMeta = entry.file();
+            long firstRowId = fileMeta.nonNullFirstRowId();
+
+            rowIdSet.add(firstRowId);
+            firstIdToFiles.computeIfAbsent(firstRowId, k -> new 
ArrayList<>()).add(fileMeta);
+            firstIdToPartition.put(firstRowId, entry.partition());
+        }
+        firstRowIdLookup = new FirstRowIdLookup(new ArrayList<>(rowIdSet));
+
+        // initialize table read & table write
+        tableRead = table.newRead().withReadType(dataType);
+        @SuppressWarnings("unchecked")
+        TableWriteImpl<InternalRow> writeImpl =
+                (TableWriteImpl<InternalRow>)
+                        
table.newBatchWriteBuilder().newWrite().withWriteType(writeType);
+        tableWrite = (AbstractFileStoreWrite<InternalRow>) 
(writeImpl.getWrite());
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        finishWriter();
+
+        for (Committable committable : committables) {
+            output.collect(new StreamRecord<>(committable));
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<InternalRow> element) throws 
Exception {
+        InternalRow row = element.getValue();
+        long rowId = row.getLong(rowIdIndex);
+
+        if (writer == null || !writer.contains(rowId)) {
+            finishWriter();
+            writer = createWriter(firstRowIdLookup.lookup(rowId));
+        }
+
+        writer.write(row);
+    }
+
+    private void finishWriter() throws Exception {
+        if (writer != null) {
+            committables.add(writer.finish());
+        }
+        writer = null;
+    }
+
+    private Writer createWriter(long firstRowId) throws IOException {
+        LOG.debug("Creating writer for row id {}", firstRowId);
+
+        BinaryRow partition = firstIdToPartition.get(firstRowId);
+        Preconditions.checkNotNull(
+                partition,
+                String.format("Cannot find the partition for firstRowId: %s ", 
firstRowId));
+
+        List<DataFileMeta> files = firstIdToFiles.get(firstRowId);
+        Preconditions.checkState(
+                files != null && !files.isEmpty(),
+                String.format("Cannot find files for firstRowId: %s", 
firstRowId));
+
+        long rowCount = files.get(0).rowCount();
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withPartition(partition)
+                        .withBucket(0)
+                        .withDataFiles(files)
+                        .withBucketPath(
+                                
table.store().pathFactory().bucketPath(partition, 0).toString())
+                        .rawConvertible(false)
+                        .build();
+
+        return new Writer(
+                tableRead.createReader(dataSplit),
+                tableWrite.createWriter(partition, 0),
+                firstRowId,
+                rowCount);
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (tableWrite != null) {
+            tableWrite.close();
+        }
+    }
+
+    /**
+     * The writer to write partial columns for a single file. The written new 
file should be aligned
+     * with existing ones.
+     */
+    private class Writer {
+        // reader for original data
+        private final CloseableIterator<InternalRow> reader;
+
+        // writer to write new columns
+        private final RecordWriter<InternalRow> writer;
+
+        private final ProjectedRow reusedRow;
+        private final long firstRowId;
+        private final long rowCount;
+        private final BinaryRow partition;
+
+        private long writtenNum = 0;
+        private long prevRowId = -1;
+
+        Writer(
+                RecordReader<InternalRow> reader,
+                RecordWriter<InternalRow> writer,
+                long firstRowId,
+                long rowCount) {
+            this.reader = reader.toCloseableIterator();
+            this.writer = writer;
+            this.reusedRow = ProjectedRow.from(writeType, dataType);
+            this.firstRowId = firstRowId;
+            this.rowCount = rowCount;
+            this.partition = firstIdToPartition.get(firstRowId);
+        }
+
+        void write(InternalRow row) throws Exception {
+            long currentRowId = row.getLong(rowIdIndex);
+
+            // for rows with duplicated _ROW_ID, just choose the first one.
+            if (checkDuplication(currentRowId)) {
+                return;
+            }
+
+            if (!reader.hasNext()) {
+                throw new IllegalStateException(
+                        "New file should be aligned with original file, it's a 
bug.");
+            }
+
+            InternalRow originalRow;
+            while (reader.hasNext()) {
+                originalRow = reader.next();
+                long originalRowId = originalRow.getLong(rowIdIndex);
+
+                if (originalRowId < currentRowId) {
+                    // new row is absent, we should use the original row
+                    reusedRow.replaceRow(originalRow);
+                    writer.write(reusedRow);
+                    writtenNum++;
+                } else if (originalRowId == currentRowId) {
+                    // new row is present, we should use the new row
+                    reusedRow.replaceRow(row);
+                    writer.write(reusedRow);
+                    writtenNum++;
+                    break;
+                } else {
+                    // original row id > new row id, this means there are 
duplicated row ids
+                    // in the input rows, it cannot happen here.
+                    throw new IllegalStateException("Duplicated row id " + 
currentRowId);
+                }
+            }
+        }
+
+        private Committable finish() throws Exception {
+            // 1. write remaining original rows
+            try {
+                InternalRow row;
+                while (reader.hasNext()) {
+                    row = reader.next();
+                    reusedRow.replaceRow(row);
+                    writer.write(reusedRow);
+                    writtenNum++;
+                }
+            } finally {
+                reader.close();
+            }
+
+            Preconditions.checkState(
+                    writtenNum == rowCount,
+                    String.format(
+                            "Written num %s not equal to original row num %s, 
it's a bug.",
+                            writtenNum, rowCount));
+
+            // 2. finish writer
+            CommitIncrement written = writer.prepareCommit(false);
+            List<DataFileMeta> fileMetas = 
written.newFilesIncrement().newFiles();
+            Preconditions.checkState(
+                    fileMetas.size() == 1, "This is a bug, Writer could only 
produce one file");
+            DataFileMeta fileMeta = 
fileMetas.get(0).assignFirstRowId(firstRowId);
+
+            CommitMessage commitMessage =
+                    new CommitMessageImpl(
+                            partition,
+                            0,
+                            null,
+                            new DataIncrement(
+                                    Collections.singletonList(fileMeta),
+                                    Collections.emptyList(),
+                                    Collections.emptyList()),
+                            CompactIncrement.emptyIncrement());
+
+            return new Committable(Long.MAX_VALUE, commitMessage);
+        }
+
+        private boolean contains(long rowId) {
+            return rowId >= firstRowId && rowId < firstRowId + rowCount;
+        }
+
+        private boolean checkDuplication(long rowId) {
+            if (prevRowId == rowId) {
+                return true;
+            }
+            prevRowId = rowId;
+            return false;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
new file mode 100644
index 0000000000..ad87c5b016
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MurmurHashUtils;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** Assign first row id for each row through binary search. */
+public class FirstRowIdAssigner extends RichMapFunction<RowData, Tuple2<Long, 
RowData>> {
+
+    private final FirstRowIdLookup firstRowIdLookup;
+
+    private final int rowIdFieldIndex;
+
+    public FirstRowIdAssigner(List<Long> firstRowIds, RowType rowType) {
+        this.firstRowIdLookup = new FirstRowIdLookup(firstRowIds);
+        this.rowIdFieldIndex = 
rowType.getFieldNames().indexOf(SpecialFields.ROW_ID.name());
+        Preconditions.checkState(this.rowIdFieldIndex >= 0, "Do not found 
_ROW_ID column.");
+    }
+
+    @Override
+    public Tuple2<Long, RowData> map(RowData value) throws Exception {
+        long rowId = value.getLong(rowIdFieldIndex);
+        return new Tuple2<>(firstRowIdLookup.lookup(rowId), value);
+    }
+
+    /** The Key Selector to get firstRowId from tuple2. */
+    public static class FirstRowIdKeySelector implements 
KeySelector<Tuple2<Long, RowData>, Long> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Long getKey(Tuple2<Long, RowData> value) throws Exception {
+            return value.f0;
+        }
+    }
+
+    /** The Partitioner to partition rows by their firstRowId. */
+    public static class FirstRowIdPartitioner implements Partitioner<Long> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int partition(Long firstRowId, int numPartitions) {
+            Preconditions.checkNotNull(firstRowId, "FirstRowId should not be 
null.");
+            // Now we just simply floorMod the hash result of the firstRowId.
+            // We could make it more balanced by considering the number of 
records of each row id
+            // range.
+            return floorMod(MurmurHashUtils.fmix(firstRowId), numPartitions);
+        }
+
+        /** For compatible with java-1.8. */
+        private int floorMod(long x, int y) {
+            return (int) (x - Math.floorDiv(x, (long) y) * y);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdLookup.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdLookup.java
new file mode 100644
index 0000000000..9c3a80bdda
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdLookup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/** Lookup related first row id for each row id through binary search. */
+public class FirstRowIdLookup implements Serializable {
+    // a sorted list of first row ids
+    private final List<Long> firstRowIds;
+
+    public FirstRowIdLookup(List<Long> firstRowIds) {
+        this.firstRowIds = firstRowIds;
+    }
+
+    public long lookup(long rowId) {
+        int index = Collections.binarySearch(firstRowIds, rowId);
+        long firstRowId;
+        if (index >= 0) {
+            firstRowId = firstRowIds.get(index);
+        } else {
+            // (-index - 1) is the position of the first element greater than 
the rowId
+            Preconditions.checkState(
+                    -index - 2 >= 0,
+                    String.format(
+                            "Unexpected RowID: %s which is smaller than the 
smallest FirstRowId: %s",
+                            rowId, firstRowIds.get(0)));
+            firstRowId = firstRowIds.get(-index - 2);
+        }
+        return firstRowId;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DataEvolutionMergeIntoProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DataEvolutionMergeIntoProcedure.java
new file mode 100644
index 0000000000..66423c2a64
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DataEvolutionMergeIntoProcedure.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.DataEvolutionMergeIntoAction;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * The MergeInto Procedure specially implemented for data evolution table. 
Please see {@code
+ * DataEvolutionMergeIntoAction} for more information.
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *  CALL sys.data_evolution_merge_into(
+ *      'targetTableId',     --required
+ *      'targetAlias',       --optional
+ *      'sourceSqls',        --optional
+ *      'sourceTable',       --required
+ *      'mergeCondition',    --required
+ *      'matchedUpdateSet',  --required
+ *      'sinkParallelism'    --required
+ *  )
+ * </code></pre>
+ *
+ * <p>This procedure will be forced to use batch environments.
+ */
+public class DataEvolutionMergeIntoProcedure extends ProcedureBase {
+
+    public static final String IDENTIFIER = "data_evolution_merge_into";
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "target_table", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "target_alias",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "source_sqls",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "source_table",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "merge_condition",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "matched_update_set",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "sink_parallelism",
+                        type = @DataTypeHint("INTEGER"),
+                        isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String targetTableId,
+            String targetAlias,
+            String sourceSqls,
+            String sourceTable,
+            String mergeCondition,
+            String matchedUpdateSet,
+            Integer sinkParallelism) {
+        targetTableId = notnull(targetTableId);
+        targetAlias = notnull(targetAlias);
+        sourceSqls = notnull(sourceSqls);
+        sourceTable = notnull(sourceTable);
+        mergeCondition = notnull(mergeCondition);
+        matchedUpdateSet = notnull(matchedUpdateSet);
+        Preconditions.checkArgument(sinkParallelism != null && sinkParallelism 
> 0);
+
+        Map<String, String> catalogOptions = catalog.options();
+        Identifier identifier = Identifier.fromString(targetTableId);
+        DataEvolutionMergeIntoAction action =
+                new DataEvolutionMergeIntoAction(
+                        identifier.getDatabaseName(), 
identifier.getObjectName(), catalogOptions);
+
+        action.withTargetAlias(nullable(targetAlias));
+
+        if (!sourceSqls.isEmpty()) {
+            action.withSourceSqls(sourceSqls.split(";"));
+        }
+
+        checkArgument(!sourceTable.isEmpty(), "Must specify source table.");
+        action.withSourceTable(sourceTable);
+
+        checkArgument(!mergeCondition.isEmpty(), "Must specify merge 
condition.");
+        action.withMergeCondition(mergeCondition);
+
+        checkArgument(!matchedUpdateSet.isEmpty(), "Must specify matched 
update set.");
+        action.withMatchedUpdateSet(matchedUpdateSet);
+
+        action.withSinkParallelism(sinkParallelism);
+
+        
action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
+
+        TableResult result = action.runInternal();
+        JobClient jobClient = result.getJobClient().get();
+
+        return execute(procedureContext, jobClient);
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index aef64a43d2..09474a3110 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -48,6 +48,7 @@ 
org.apache.paimon.flink.action.RemoveUnexistingManifestsActionFactory
 org.apache.paimon.flink.action.ClearConsumerActionFactory
 org.apache.paimon.flink.action.RescaleActionFactory
 org.apache.paimon.flink.action.CloneActionFactory
+org.apache.paimon.flink.action.DataEvolutionMergeIntoActionFactory
 
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -97,4 +98,5 @@ org.apache.paimon.flink.procedure.DropFunctionProcedure
 org.apache.paimon.flink.procedure.AlterFunctionProcedure
 org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
 org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
-org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
+org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
new file mode 100644
index 0000000000..0aa0941270
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv;
+import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** ITCase for {@link DataEvolutionMergeIntoAction}. */
+public class DataEvolutionMergeIntoActionITCase extends ActionITCaseBase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DataEvolutionMergeIntoActionITCase.class);
+
+    private static Stream<Arguments> testArguments() {
+        return Stream.of(
+                Arguments.of(true, "action"),
+                Arguments.of(false, "action"),
+                Arguments.of(true, "procedure"),
+                Arguments.of(false, "procedure"));
+    }
+
+    @BeforeEach
+    public void setup() throws Exception {
+        init(warehouse);
+
+        prepareTargetTable();
+
+        prepareSourceTable();
+    }
+
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testUpdateSingleColumn(boolean inDefault, String invoker) 
throws Exception {
+        String targetDb = inDefault ? database : "test_db";
+        if (!inDefault) {
+            // create target table in a new database
+            sEnv.executeSql("DROP TABLE T");
+            sEnv.executeSql("CREATE DATABASE test_db");
+            sEnv.executeSql("USE test_db");
+            bEnv.executeSql("USE test_db");
+            prepareTargetTable();
+        }
+
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1, "new_name1"),
+                        changelogRow("+I", 2, "name2"),
+                        changelogRow("+I", 3, "name3"),
+                        changelogRow("+I", 7, "new_name7"),
+                        changelogRow("+I", 8, "name8"),
+                        changelogRow("+I", 11, "new_name11"),
+                        changelogRow("+I", 15, null),
+                        changelogRow("+I", 18, "new_name18"));
+
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, targetDb, "T")
+                            .withMergeCondition("T.id=S.id")
+                            .withMatchedUpdateSet("T.name=S.name")
+                            .withSourceTable("S")
+                            .withSinkParallelism(2);
+
+            builder.build().run();
+        } else {
+            String procedureStatement =
+                    String.format(
+                            "CALL sys.data_evolution_merge_into('%s.T', '', 
'', 'S', 'T.id=S.id', 'name=S.name', 2)",
+                            targetDb);
+
+            executeSQL(procedureStatement, false, true);
+        }
+
+        testBatchRead("SELECT id, name FROM T where id in (1, 2, 3, 7, 8, 11, 
15, 18)", expected);
+    }
+
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testUpdateMultipleColumns(boolean inDefault, String invoker) 
throws Exception {
+        String targetDb = inDefault ? database : "test_db";
+        if (!inDefault) {
+            // create target table in a new database
+            sEnv.executeSql("DROP TABLE T");
+            sEnv.executeSql("CREATE DATABASE test_db");
+            sEnv.executeSql("USE test_db");
+            bEnv.executeSql("USE test_db");
+            prepareTargetTable();
+        }
+
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1, "new_name1", 100.1),
+                        changelogRow("+I", 2, "name2", 0.2),
+                        changelogRow("+I", 3, "name3", 0.3),
+                        changelogRow("+I", 7, "new_name7", null),
+                        changelogRow("+I", 8, "name8", 0.8),
+                        changelogRow("+I", 11, "new_name11", 101.1),
+                        changelogRow("+I", 15, null, 101.1),
+                        changelogRow("+I", 18, "new_name18", 101.8));
+
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, targetDb, "T")
+                            .withMergeCondition("T.id=S.id")
+                            
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+                            .withSourceTable("S")
+                            .withSinkParallelism(2);
+
+            builder.build().run();
+        } else {
+            String procedureStatement =
+                    String.format(
+                            "CALL sys.data_evolution_merge_into('%s.T', '', 
'', 'S', 'T.id=S.id', 'name=S.name,value=S.`value`', 2)",
+                            targetDb);
+
+            executeSQL(procedureStatement, false, true);
+        }
+
+        testBatchRead(
+                "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8, 
11, 15, 18)",
+                expected);
+    }
+
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testSetLiterals(boolean inDefault, String invoker) throws 
Exception {
+        String targetDb = inDefault ? database : "test_db";
+        if (!inDefault) {
+            // create target table in a new database
+            sEnv.executeSql("DROP TABLE T");
+            sEnv.executeSql("CREATE DATABASE test_db");
+            sEnv.executeSql("USE test_db");
+            bEnv.executeSql("USE test_db");
+            prepareTargetTable();
+        }
+
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1, "testName", 0.0),
+                        changelogRow("+I", 2, "name2", 0.2),
+                        changelogRow("+I", 3, "name3", 0.3),
+                        changelogRow("+I", 7, "testName", 0.0),
+                        changelogRow("+I", 8, "name8", 0.8),
+                        changelogRow("+I", 11, "testName", 0.0),
+                        changelogRow("+I", 15, "testName", 0.0),
+                        changelogRow("+I", 18, "testName", 0.0));
+
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, targetDb, "T")
+                            .withMergeCondition("T.id=S.id")
+                            
.withMatchedUpdateSet("T.name='testName',T.value=CAST(0.0 as DOUBLE)")
+                            .withSourceTable("S")
+                            .withSinkParallelism(2);
+
+            builder.build().run();
+        } else {
+            String procedureStatement =
+                    String.format(
+                            "CALL sys.data_evolution_merge_into('%s.T', '', 
'', 'S', 'T.id=S.id', 'name=''testName'',value=CAST(0.0 as DOUBLE)', 2)",
+                            targetDb);
+
+            executeSQL(procedureStatement, false, true);
+        }
+
+        testBatchRead(
+                "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8, 
11, 15, 18)",
+                expected);
+    }
+
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testUpdatePartitionColumnThrowsError(boolean inDefault, String 
invoker)
+            throws Exception {
+        Throwable t;
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, database, "T")
+                            .withMergeCondition("T.id=S.id")
+                            .withMatchedUpdateSet("T.dt=S.id")
+                            .withSourceTable("S")
+                            .withSinkParallelism(2);
+            t = Assertions.assertThrows(IllegalStateException.class, () -> 
builder.build().run());
+            Assertions.assertTrue(
+                    t.getMessage().startsWith("User should not update 
partition columns:"));
+        } else {
+            String procedureStatement =
+                    "CALL sys.data_evolution_merge_into('default.T', '', '', 
'S', 'T.id=S.id', 'dt=S.id', 2)";
+            t =
+                    Assertions.assertThrows(
+                            Exception.class, () -> 
executeSQL(procedureStatement, false, true));
+            org.assertj.core.api.Assertions.assertThat(t)
+                    .hasRootCauseInstanceOf(IllegalStateException.class)
+                    .message()
+                    .contains("User should not update partition columns:");
+        }
+    }
+
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testOneToManyUpdate(boolean inDefault, String invoker) throws 
Exception {
+        // A single row in the target table may map to multiple rows in the 
source table.
+        // For example:
+        // In target table we have: (id=1, value='val', _ROW_ID=0)
+        // In source table we have: (id=1, value='val1'), (id=1, value='val2')
+        // If we execute MERGE INTO T SET T.`value`=S.`VALUE` ON T.id=S.id
+        // There would be 2 rows with the same _ROW_ID but different new 
values:
+        // (id=1, value='val1', _ROW_ID=0) and (id=1, value='val2', _ROW_ID=0)
+        // At that case, we will choose a random row as the final result
+        insertInto("S", "(1, 'dup_new_name1', 200.1)");
+
+        String targetDb = inDefault ? database : "test_db";
+        if (!inDefault) {
+            // create target table in a new database
+            sEnv.executeSql("DROP TABLE T");
+            sEnv.executeSql("CREATE DATABASE test_db");
+            sEnv.executeSql("USE test_db");
+            bEnv.executeSql("USE test_db");
+            prepareTargetTable();
+        }
+
+        // First validate results except of id=1 row.
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 2, "name2", 0.2),
+                        changelogRow("+I", 3, "name3", 0.3),
+                        changelogRow("+I", 7, "new_name7", null),
+                        changelogRow("+I", 8, "name8", 0.8),
+                        changelogRow("+I", 11, "new_name11", 101.1),
+                        changelogRow("+I", 15, null, 101.1),
+                        changelogRow("+I", 18, "new_name18", 101.8));
+
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, targetDb, "T")
+                            .withMergeCondition("T.id=S.id")
+                            
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+                            .withSourceTable("S")
+                            .withSinkParallelism(2);
+
+            builder.build().run();
+        } else {
+            String procedureStatement =
+                    String.format(
+                            "CALL sys.data_evolution_merge_into('%s.T', '', 
'', 'S', 'T.id=S.id', 'name=S.name,value=S.`value`', 2)",
+                            targetDb);
+
+            executeSQL(procedureStatement, false, true);
+        }
+
+        testBatchRead(
+                "SELECT id, name, `value` FROM T where id in (2, 3, 7, 8, 11, 
15, 18)", expected);
+
+        // then validate id=1 row
+        List<Row> possibleRows =
+                Arrays.asList(
+                        changelogRow("+I", 1, "dup_new_name1", 200.1),
+                        changelogRow("+I", 1, "new_name1", 100.1));
+        boolean passed = false;
+        for (Row row : possibleRows) {
+            try {
+                testBatchRead(
+                        "SELECT id, name, `value` FROM T where id = 1",
+                        Collections.singletonList(row));
+                passed = true;
+                break;
+            } catch (Throwable e) {
+                // error happens, just log it.
+                LOG.info("Error happens in testing one-to-many merge into.", 
e);
+            }
+        }
+
+        Assertions.assertTrue(
+                passed,
+                "one-to-many merge into test fails, please check log for more 
information.");
+    }
+
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testAlias(boolean inDefault, String invoker) throws Exception {
+        String targetDb = inDefault ? database : "test_db";
+        if (!inDefault) {
+            // create target table in a new database
+            sEnv.executeSql("DROP TABLE T");
+            sEnv.executeSql("CREATE DATABASE test_db");
+            sEnv.executeSql("USE test_db");
+            bEnv.executeSql("USE test_db");
+            prepareTargetTable();
+        }
+
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1, "new_name1", 100.1),
+                        changelogRow("+I", 2, "name2", 0.2),
+                        changelogRow("+I", 3, "name3", 0.3),
+                        changelogRow("+I", 7, "new_name7", null),
+                        changelogRow("+I", 8, "name8", 0.8),
+                        changelogRow("+I", 11, "new_name11", 101.1),
+                        changelogRow("+I", 15, null, 101.1),
+                        changelogRow("+I", 18, "new_name18", 101.8));
+
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, targetDb, "T")
+                            .withMergeCondition("TempT.id=S.id")
+                            
.withMatchedUpdateSet("TempT.name=S.name,TempT.value=S.`value`")
+                            .withSourceTable("S")
+                            .withTargetAlias("TempT")
+                            .withSinkParallelism(2);
+
+            builder.build().run();
+        } else {
+            String procedureStatement =
+                    String.format(
+                            "CALL sys.data_evolution_merge_into('%s.T', 
'TempT', '', 'S', 'TempT.id=S.id', 'name=S.name,value=S.`value`', 2)",
+                            targetDb);
+
+            executeSQL(procedureStatement, false, true);
+        }
+
+        testBatchRead(
+                "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8, 
11, 15, 18)",
+                expected);
+    }
+
+    @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+    @MethodSource("testArguments")
+    public void testSqls(boolean inDefault, String invoker) throws Exception {
+        String targetDb = inDefault ? database : "test_db";
+        if (!inDefault) {
+            // create target table in a new database
+            sEnv.executeSql("DROP TABLE T");
+            sEnv.executeSql("CREATE DATABASE test_db");
+            sEnv.executeSql("USE test_db");
+            bEnv.executeSql("USE test_db");
+            prepareTargetTable();
+        }
+
+        List<Row> expected =
+                Arrays.asList(
+                        changelogRow("+I", 1, "new_name1", 100.1),
+                        changelogRow("+I", 2, "name2", 0.2),
+                        changelogRow("+I", 3, "name3", 0.3),
+                        changelogRow("+I", 7, "new_name7", null),
+                        changelogRow("+I", 8, "name8", 0.8),
+                        changelogRow("+I", 11, "new_name11", 101.1),
+                        changelogRow("+I", 15, null, 101.1),
+                        changelogRow("+I", 18, "new_name18", 101.8));
+
+        if (invoker.equals("action")) {
+            DataEvolutionMergeIntoActionBuilder builder =
+                    builder(warehouse, targetDb, "T")
+                            .withMergeCondition("TempT.id=SS.id")
+                            
.withMatchedUpdateSet("TempT.name=SS.name,TempT.value=SS.`value`")
+                            .withSourceTable("SS")
+                            .withTargetAlias("TempT")
+                            .withSourceSqls(
+                                    "CREATE TEMPORARY VIEW SS AS SELECT id, 
name, `value` FROM S")
+                            .withSinkParallelism(2);
+
+            builder.build().run();
+        } else {
+            String procedureStatement =
+                    String.format(
+                            "CALL sys.data_evolution_merge_into('%s.T', 
'TempT', 'CREATE TEMPORARY VIEW SS AS SELECT id, name, `value` FROM S',"
+                                    + " 'SS', 'TempT.id=SS.id', 
'name=SS.name,value=SS.`value`', 2)",
+                            targetDb);
+
+            executeSQL(procedureStatement, false, true);
+        }
+
+        testBatchRead(
+                "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8, 
11, 15, 18)",
+                expected);
+    }
+
+    @Test
+    public void testRewriteMergeCondition() throws Exception {
+        Map<String, String> config = new HashMap<>();
+        config.put("warehouse", warehouse);
+        DataEvolutionMergeIntoAction action =
+                new DataEvolutionMergeIntoAction(database, "T", config);
+
+        String mergeCondition = "T.id=S.id";
+        assertEquals("`RT`.id=S.id", 
action.rewriteMergeCondition(mergeCondition));
+
+        mergeCondition = "`T`.id=S.id";
+        assertEquals("`RT`.id=S.id", 
action.rewriteMergeCondition(mergeCondition));
+
+        mergeCondition = "t.id = s.id AND T.pt = s.pt";
+        assertEquals(
+                "`RT`.id = s.id AND `RT`.pt = s.pt", 
action.rewriteMergeCondition(mergeCondition));
+
+        mergeCondition = "TT.id = 1 AND T.id = 2";
+        assertEquals("TT.id = 1 AND `RT`.id = 2", 
action.rewriteMergeCondition(mergeCondition));
+
+        mergeCondition = "TT.id = 'T.id' AND T.id = \"T.id\"";
+        assertEquals(
+                "TT.id = 'T.id' AND `RT`.id = \"T.id\"",
+                action.rewriteMergeCondition(mergeCondition));
+    }
+
+    private void prepareTargetTable() throws Exception {
+        sEnv.executeSql(
+                buildDdl(
+                        "T",
+                        Arrays.asList("id INT", "name STRING", "`value` 
DOUBLE", "dt STRING"),
+                        Collections.emptyList(),
+                        Collections.singletonList("dt"),
+                        new HashMap<String, String>() {
+                            {
+                                put(ROW_TRACKING_ENABLED.key(), "true");
+                                put(DATA_EVOLUTION_ENABLED.key(), "true");
+                            }
+                        }));
+        insertInto(
+                "T",
+                "(1, 'name1', 0.1, '01-22')",
+                "(2, 'name2', 0.2, '01-22')",
+                "(3, 'name3', 0.3, '01-22')",
+                "(4, 'name4', 0.4, '01-22')",
+                "(5, 'name5', 0.5, '01-22')",
+                "(6, 'name6', 0.6, '01-22')",
+                "(7, 'name7', 0.7, '01-22')",
+                "(8, 'name8', 0.8, '01-22')",
+                "(9, 'name9', 0.9, '01-22')",
+                "(10, 'name10', 1.0, '01-22')");
+
+        insertInto(
+                "T",
+                "(11, 'name11', 1.1, '01-22')",
+                "(12, 'name12', 1.2, '01-22')",
+                "(13, 'name13', 1.3, '01-22')",
+                "(14, 'name14', 1.4, '01-22')",
+                "(15, 'name15', 1.5, '01-22')",
+                "(16, 'name16', 1.6, '01-22')",
+                "(17, 'name17', 1.7, '01-22')",
+                "(18, 'name18', 1.8, '01-22')",
+                "(19, 'name19', 1.9, '01-22')",
+                "(20, 'name20', 2.0, '01-22')");
+    }
+
+    private void prepareSourceTable() throws Exception {
+        sEnv.executeSql(
+                buildDdl(
+                        "S",
+                        Arrays.asList("id INT", "name STRING", "`value` 
DOUBLE"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<String, String>() {
+                            {
+                                put(ROW_TRACKING_ENABLED.key(), "true");
+                                put(DATA_EVOLUTION_ENABLED.key(), "true");
+                            }
+                        }));
+        insertInto(
+                "S",
+                "(1, 'new_name1', 100.1)",
+                "(7, 'new_name7', CAST(NULL AS DOUBLE))",
+                "(11, 'new_name11', 101.1)",
+                "(15, CAST(NULL AS STRING), 101.1)",
+                "(18, 'new_name18', 101.8)",
+                "(21, 'new_name21', 102.1)");
+    }
+
+    private DataEvolutionMergeIntoActionBuilder builder(
+            String warehouse, String database, String table) {
+        return new DataEvolutionMergeIntoActionBuilder(warehouse, database, 
table);
+    }
+
+    private class DataEvolutionMergeIntoActionBuilder {
+        private final List<String> args;
+
+        DataEvolutionMergeIntoActionBuilder(String warehouse, String database, 
String table) {
+            this.args =
+                    new ArrayList<>(
+                            Arrays.asList(
+                                    "data_evolution_merge_into",
+                                    "--warehouse",
+                                    warehouse,
+                                    "--database",
+                                    database,
+                                    "--table",
+                                    table));
+        }
+
+        public DataEvolutionMergeIntoActionBuilder withTargetAlias(String 
targetAlias) {
+            if (targetAlias != null) {
+                args.add("--target_as");
+                args.add(targetAlias);
+            }
+            return this;
+        }
+
+        public DataEvolutionMergeIntoActionBuilder withSourceTable(String 
sourceTable) {
+            args.add("--source_table");
+            args.add(sourceTable);
+            return this;
+        }
+
+        public DataEvolutionMergeIntoActionBuilder withSourceSqls(String... 
sourceSqls) {
+            if (sourceSqls != null) {
+                for (String sql : sourceSqls) {
+                    args.add("--source_sql");
+                    args.add(sql);
+                }
+            }
+            return this;
+        }
+
+        public DataEvolutionMergeIntoActionBuilder withMergeCondition(String 
mergeCondition) {
+            args.add("--on");
+            args.add(mergeCondition);
+            return this;
+        }
+
+        public DataEvolutionMergeIntoActionBuilder withMatchedUpdateSet(String 
matchedUpdateSet) {
+            args.add("--matched_update_set");
+            args.add(matchedUpdateSet);
+            return this;
+        }
+
+        public DataEvolutionMergeIntoActionBuilder withSinkParallelism(int 
sinkParallelism) {
+            args.add("--sink_parallelism");
+            args.add(String.valueOf(sinkParallelism));
+            return this;
+        }
+
+        public DataEvolutionMergeIntoAction build() {
+            return createAction(DataEvolutionMergeIntoAction.class, args);
+        }
+    }
+}

Reply via email to