This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bc6d341b20 [flink] introduce a simplified MERGE INTO procedure on
data-evolution-table for flink (#7128)
bc6d341b20 is described below
commit bc6d341b20935b8baca57ec64eb3bab8109283b7
Author: Faiz <[email protected]>
AuthorDate: Thu Jan 29 10:46:02 2026 +0800
[flink] introduce a simplified MERGE INTO procedure on data-evolution-table
for flink (#7128)
---
docs/content/append-table/data-evolution.md | 31 +-
.../flink/action/DataEvolutionMergeIntoAction.java | 518 +++++++++++++++++++
.../DataEvolutionMergeIntoActionFactory.java | 130 +++++
.../paimon/flink/action/TableActionBase.java | 2 +-
.../DataEvolutionPartialWriteOperator.java | 340 ++++++++++++
.../flink/dataevolution/FirstRowIdAssigner.java | 81 +++
.../flink/dataevolution/FirstRowIdLookup.java | 52 ++
.../procedure/DataEvolutionMergeIntoProcedure.java | 139 +++++
.../services/org.apache.paimon.factories.Factory | 4 +-
.../action/DataEvolutionMergeIntoActionITCase.java | 574 +++++++++++++++++++++
10 files changed, 1868 insertions(+), 3 deletions(-)
diff --git a/docs/content/append-table/data-evolution.md
b/docs/content/append-table/data-evolution.md
index 87d7a3390b..cb3b72597a 100644
--- a/docs/content/append-table/data-evolution.md
+++ b/docs/content/append-table/data-evolution.md
@@ -57,7 +57,9 @@ CREATE TABLE target_table (id INT, b INT, c INT)
TBLPROPERTIES (
INSERT INTO target_table VALUES (1, 1, 1), (2, 2, 2);
```
-Now we could only support spark 'MERGE INTO' statement to update partial
columns.
+Now we could update partial columns by spark 'MERGE INTO' statement or flink
'data_evolution_merge_into' procedure:
+
+### Spark
```sql
CREATE TABLE source_table (id INT, b INT);
@@ -85,6 +87,33 @@ Note that:
* Data Evolution Table does not support 'Delete', 'Update', or 'Compact'
statement yet.
* Merge Into for Data Evolution Table does not support 'WHEN NOT MATCHED BY
SOURCE' clause.
+### Flink
+Since Flink does not currently support the MERGE INTO syntax, we simulate the
merge-into process using the data_evolution_merge_into procedure, as shown
below:
+
+```sql
+CREATE TABLE source_table (id INT, b INT);
+INSERT INTO source_table VALUES (1, 11), (2, 22), (3, 33);
+
+CALL sys.data_evolution_merge_into(
+ 'my_db.target_table',
+ '', /* Optional target alias */
+ '', /* Optional source sqls */
+ 'source_table',
+ 'source_table.id=target_table.id',
+ 'b=source_table.b',
+ 2 /* Specify sink parallelism */
+);
+
+SELECT * FROM source_table
++----+----+----+
+| id | b | c |
++----+----+----+
+| 1 | 11 | 1 |
+| 2 | 22 | 2 |
+```
+Note that:
+* Compared to Spark implementation, Flink data_evolution_merge_into procedure
only supports updating/inserting new columns now. Inserting new rows is not
supported yet.
+
## File Group Spec
Through the RowId metadata, files are organized into a file group.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
new file mode 100644
index 0000000000..bec57cd7f4
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoAction.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.dataevolution.DataEvolutionPartialWriteOperator;
+import org.apache.paimon.flink.dataevolution.FirstRowIdAssigner;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.CommitterOperatorFactory;
+import org.apache.paimon.flink.sink.NoopCommittableStateManager;
+import org.apache.paimon.flink.sink.StoreCommitter;
+import org.apache.paimon.flink.sorter.SortOperator;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
+
+/**
+ * The flink action for 'MERGE INTO' on Data-Evolution Table. This action is
specially implemented
+ * for the Data-Evolution pattern which can batch insert and update columns
without rewriting
+ * existing data files. This is a simplified version of standard 'MERGE INTO':
we do not support
+ * deleting or appending data now.
+ *
+ * <pre><code>
+ * MERGE INTO target-table
+ * USING source-table | source-expr AS source-alias
+ * ON merge-condition
+ * WHEN MATCHED
+ * THEN UPDATE SET xxx
+ * </code></pre>
+ */
+public class DataEvolutionMergeIntoAction extends TableActionBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DataEvolutionMergeIntoAction.class);
+ public static final String IDENTIFIER_QUOTE = "`";
+
+ private final CoreOptions coreOptions;
+
+ // field names of target table
+ private final List<String> targetFieldNames;
+
+ /**
+ * Target Table's alias. The alias is implemented through rewriting
merge-condition. For
+ * example, if the original condition is `TempT.id=S.id`, it will be
rewritten to `RT.id=S.id`.
+ * `RT` means 'row-tracking target'. The reason is that _ROW_ID metadata
field is exposed via
+ * system table like `T$row_tracking`, so we have to rewrite merge
condition. Moreover, if we
+ * still create a temporary view such as `viewT`, then
`viewT$row_tracking` is not a valid
+ * table.
+ */
+ @Nullable private String targetAlias;
+
+ // source table name
+ private String sourceTable;
+
+ // sqls to config environment and create source table
+ @Nullable private String[] sourceSqls;
+
+ // merge condition
+ private String mergeCondition;
+
+ // set statement
+ private String matchedUpdateSet;
+
+ private int sinkParallelism;
+
+ public DataEvolutionMergeIntoAction(
+ String databaseName, String tableName, Map<String, String>
catalogConfig) {
+ super(databaseName, tableName, catalogConfig);
+
+ if (!(table instanceof FileStoreTable)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Only FileStoreTable supports merge-into action.
The table type is '%s'.",
+ table.getClass().getName()));
+ }
+
+ this.coreOptions = ((FileStoreTable) table).coreOptions();
+
+ if (!coreOptions.dataEvolutionEnabled()) {
+ throw new UnsupportedOperationException(
+ "Only DataEvolutionTable supports data-evolution
merge-into action.");
+ }
+
+ // init field names of target table
+ targetFieldNames =
+ table.rowType().getFields().stream()
+ .map(DataField::name)
+ .collect(Collectors.toList());
+ }
+
+ public DataEvolutionMergeIntoAction withSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ return this;
+ }
+
+ public DataEvolutionMergeIntoAction withSourceSqls(String... sourceSqls) {
+ this.sourceSqls = sourceSqls;
+ return this;
+ }
+
+ public DataEvolutionMergeIntoAction withTargetAlias(String targetAlias) {
+ this.targetAlias = targetAlias;
+ return this;
+ }
+
+ public DataEvolutionMergeIntoAction withMergeCondition(String
mergeCondition) {
+ this.mergeCondition = mergeCondition;
+ return this;
+ }
+
+ public DataEvolutionMergeIntoAction withMatchedUpdateSet(String
matchedUpdateSet) {
+ this.matchedUpdateSet = matchedUpdateSet;
+ return this;
+ }
+
+ public DataEvolutionMergeIntoAction withSinkParallelism(int
sinkParallelism) {
+ this.sinkParallelism = sinkParallelism;
+ return this;
+ }
+
+ @Override
+ public void run() throws Exception {
+ runInternal().await();
+ }
+
+ public TableResult runInternal() {
+ // 1. build source
+ Tuple2<DataStream<RowData>, RowType> sourceWithType = buildSource();
+ // 2. shuffle by firstRowId
+ DataStream<Tuple2<Long, RowData>> shuffled =
+ shuffleByFirstRowId(sourceWithType.f0, sourceWithType.f1);
+ // 3. write partial columns
+ DataStream<Committable> written =
+ writePartialColumns(shuffled, sourceWithType.f1,
sinkParallelism);
+ // 4. commit
+ DataStream<?> committed = commit(written);
+
+ // execute internal
+ Transformation<?> transformations =
+ committed
+ .sinkTo(new DiscardingSink<>())
+ .name("END")
+ .setParallelism(1)
+ .getTransformation();
+
+ return executeInternal(
+ Collections.singletonList(transformations),
+ Collections.singletonList(identifier.getFullName()));
+ }
+
+ public Tuple2<DataStream<RowData>, RowType> buildSource() {
+ // handle sqls
+ handleSqls();
+
+ // assign row id for each source row
+ List<String> project;
+ if (matchedUpdateSet.equals("*")) {
+ // if sourceName is qualified like 'default.S', we should build a
project like S.*
+ String[] splits = sourceTable.split("\\.");
+ project = Collections.singletonList(splits[splits.length - 1] +
".*");
+ } else {
+ // validate upsert changes
+ Map<String, String> changes =
parseCommaSeparatedKeyValues(matchedUpdateSet);
+ for (String targetField : changes.keySet()) {
+ if (!targetFieldNames.contains(extractFieldName(targetField)))
{
+ throw new RuntimeException(
+ String.format(
+ "Invalid column reference '%s' of table
'%s' at matched-upsert action.",
+ targetField, identifier.getFullName()));
+ }
+ }
+
+ // rename source table's selected columns according to SET
statement
+ project =
+ changes.entrySet().stream()
+ .map(
+ entry ->
+ String.format(
+ "%s AS `%s`",
+ entry.getValue(),
+
extractFieldName(entry.getKey())))
+ .collect(Collectors.toList());
+ }
+
+ // use join to find matched rows and assign row id for each source row.
+ // _ROW_ID is the first field of joined table.
+ String query =
+ String.format(
+ "SELECT %s, %s FROM %s INNER JOIN %s AS RT ON %s",
+ "`RT`.`_ROW_ID` as `_ROW_ID`",
+ String.join(",", project),
+ escapedSourceName(),
+ escapedRowTrackingTargetName(),
+ rewriteMergeCondition(mergeCondition));
+
+ LOG.info("Source query: {}", query);
+
+ Table source = batchTEnv.sqlQuery(query);
+
+ checkSchema(source);
+ RowType sourceType =
+ SpecialFields.rowTypeWithRowId(table.rowType())
+ .project(source.getResolvedSchema().getColumnNames());
+
+ return Tuple2.of(toDataStream(source), sourceType);
+ }
+
+ public DataStream<Tuple2<Long, RowData>> shuffleByFirstRowId(
+ DataStream<RowData> source, RowType sourceType) {
+ Transformation<RowData> sourceTransformation =
source.getTransformation();
+ List<Long> firstRowIds =
+ ((FileStoreTable) table)
+ .store().newScan()
+ .withManifestEntryFilter(
+ entry ->
+ entry.file().firstRowId() !=
null
+ &&
!isBlobFile(entry.file().fileName()))
+ .plan().files().stream()
+ .map(entry -> entry.file().nonNullFirstRowId())
+ .sorted()
+ .collect(Collectors.toList());
+
+ Preconditions.checkState(
+ !firstRowIds.isEmpty(), "Should not MERGE INTO an empty target
table.");
+
+ OneInputTransformation<RowData, Tuple2<Long, RowData>>
assignedFirstRowId =
+ new OneInputTransformation<>(
+ sourceTransformation,
+ "ASSIGN FIRST_ROW_ID",
+ new StreamMap<>(new FirstRowIdAssigner(firstRowIds,
sourceType)),
+ new TupleTypeInfo<>(
+ BasicTypeInfo.LONG_TYPE_INFO,
sourceTransformation.getOutputType()),
+ sourceTransformation.getParallelism(),
+ sourceTransformation.isParallelismConfigured());
+
+ // shuffle by firstRowId
+ return new DataStream<>(source.getExecutionEnvironment(),
assignedFirstRowId)
+ .partitionCustom(
+ new FirstRowIdAssigner.FirstRowIdPartitioner(),
+ new FirstRowIdAssigner.FirstRowIdKeySelector());
+ }
+
+ public DataStream<Committable> writePartialColumns(
+ DataStream<Tuple2<Long, RowData>> shuffled, RowType rowType, int
sinkParallelism) {
+ // 1. sort data by row id
+ InternalTypeInfo<InternalRow> typeInfo =
InternalTypeInfo.fromRowType(rowType);
+ RowType sortType = rowType.project(SpecialFields.ROW_ID.name());
+ DataStream<InternalRow> sorted =
+ shuffled.map(t -> new FlinkRowWrapper(t.f1), typeInfo)
+ .setParallelism(sinkParallelism)
+ .transform(
+ "SORT BY _ROW_ID",
+ typeInfo,
+ new SortOperator(
+ sortType,
+ rowType,
+ coreOptions.writeBufferSize(),
+ coreOptions.pageSize(),
+
coreOptions.localSortMaxNumFileHandles(),
+ coreOptions.spillCompressOptions(),
+ sinkParallelism,
+ coreOptions.writeBufferSpillDiskSize(),
+
coreOptions.sequenceFieldSortOrderIsAscending()))
+ .setParallelism(sinkParallelism);
+
+ // 2. write partial columns
+ return sorted.transform(
+ "PARTIAL WRITE COLUMNS",
+ new CommittableTypeInfo(),
+ new DataEvolutionPartialWriteOperator((FileStoreTable)
table, rowType))
+ .setParallelism(sinkParallelism);
+ }
+
+ public DataStream<Committable> commit(DataStream<Committable> written) {
+ FileStoreTable storeTable = (FileStoreTable) table;
+ OneInputStreamOperatorFactory<Committable, Committable>
committerOperator =
+ new CommitterOperatorFactory<>(
+ false,
+ true,
+ "DataEvolutionMergeInto",
+ context ->
+ new StoreCommitter(
+ storeTable,
+
storeTable.newCommit(context.commitUser()),
+ context),
+ new NoopCommittableStateManager());
+
+ return written.transform("COMMIT OPERATOR", new CommittableTypeInfo(),
committerOperator)
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ }
+
+ private DataStream<RowData> toDataStream(Table source) {
+ List<DataStructureConverter<Object, Object>> converters =
+ source.getResolvedSchema().getColumns().stream()
+ .map(Column::getDataType)
+ .map(DataStructureConverters::getConverter)
+ .collect(Collectors.toList());
+
+ return batchTEnv
+ .toDataStream(source)
+ .map(
+ row -> {
+ int arity = row.getArity();
+ GenericRowData rowData = new
GenericRowData(row.getKind(), arity);
+ for (int i = 0; i < arity; i++) {
+ rowData.setField(
+ i,
converters.get(i).toInternalOrNull(row.getField(i)));
+ }
+ return rowData;
+ });
+ }
+
+ /**
+ * Rewrite merge condition, replacing all references to target table with
the alias 'RT'. This
+ * is necessary because in Flink, row-tracking metadata columns (e.g.
_ROW_ID, SEQUENCE_NUMBER)
+ * are exposed through system table (i.e. {@code SELECT * FROM
T$row_tracking}), we use 'RT' to
+ * simplify its representation.
+ */
+ @VisibleForTesting
+ public String rewriteMergeCondition(String mergeCondition) {
+ // skip single and double-quoted chunks
+ String skipQuoted = "'(?:''|[^'])*'" + "|\"(?:\"\"|[^\"])*\"";
+ String targetTableRegex =
+ "(?i)(?:\\b"
+ + Pattern.quote(targetTableName())
+ + "\\b|`"
+ + Pattern.quote(targetTableName())
+ + "`)\\s*\\.";
+
+ Pattern pattern = Pattern.compile(skipQuoted + "|(" + targetTableRegex
+ ")");
+ Matcher matcher = pattern.matcher(mergeCondition);
+
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ if (matcher.group(1) != null) {
+ matcher.appendReplacement(sb,
Matcher.quoteReplacement("`RT`."));
+ } else {
+ matcher.appendReplacement(sb,
Matcher.quoteReplacement(matcher.group(0)));
+ }
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * Check the schema of generated source data. All columns of source table
should be present in
+ * target table, and it should also contain a _ROW_ID column.
+ *
+ * @param source source table
+ */
+ private void checkSchema(Table source) {
+ Map<String, DataField> targetFields =
+ table.rowType().getFields().stream()
+ .collect(Collectors.toMap(DataField::name,
Function.identity()));
+ List<String> partitionKeys = ((FileStoreTable)
table).schema().partitionKeys();
+
+ List<Column> flinkColumns = source.getResolvedSchema().getColumns();
+ boolean foundRowIdColumn = false;
+ for (Column flinkColumn : flinkColumns) {
+ if (partitionKeys.contains(flinkColumn.getName())) {
+ throw new IllegalStateException(
+ "User should not update partition columns: " +
flinkColumn.getName());
+ }
+ if (flinkColumn.getName().equals("_ROW_ID")) {
+ foundRowIdColumn = true;
+ Preconditions.checkState(
+
flinkColumn.getDataType().getLogicalType().getTypeRoot()
+ == LogicalTypeRoot.BIGINT);
+ } else {
+ DataField targetField =
targetFields.get(flinkColumn.getName());
+ if (targetField == null) {
+ throw new IllegalStateException(
+ "Column not found in target table: " +
flinkColumn.getName());
+ }
+ if (targetField.type().getTypeRoot() == DataTypeRoot.BLOB) {
+ throw new IllegalStateException(
+ "Should not append/update new BLOB column through
MERGE INTO.");
+ }
+
+ DataType paimonType =
+ LogicalTypeConversion.toDataType(
+ flinkColumn.getDataType().getLogicalType());
+ if (!DataTypeCasts.supportsCompatibleCast(paimonType,
targetField.type())) {
+ throw new IllegalStateException(
+ String.format(
+ "DataType incompatible of field %s: %s is
not compatible with %s",
+ flinkColumn.getName(), paimonType,
targetField.type()));
+ }
+ }
+ }
+ if (!foundRowIdColumn) {
+ throw new IllegalStateException("_ROW_ID column not found in
generated source.");
+ }
+ }
+
+ private void handleSqls() {
+ // NOTE: sql may change current catalog and database
+ if (sourceSqls != null) {
+ for (String sql : sourceSqls) {
+ try {
+ batchTEnv.executeSql(sql).await();
+ } catch (Throwable t) {
+ String errMsg = "Error occurs when executing sql:\n%s";
+ LOG.error(String.format(errMsg, sql), t);
+ throw new RuntimeException(String.format(errMsg, sql), t);
+ }
+ }
+ }
+ }
+
+ private String targetTableName() {
+ return targetAlias == null ? identifier.getObjectName() : targetAlias;
+ }
+
+ private String escapedSourceName() {
+ return Arrays.stream(sourceTable.split("\\."))
+ .map(s -> String.format("`%s`", s))
+ .collect(Collectors.joining("."));
+ }
+
+ private String extractFieldName(String sourceField) {
+ String[] fieldPath = sourceField.split("\\.");
+ return fieldPath[fieldPath.length - 1];
+ }
+
+ private String escapedRowTrackingTargetName() {
+ return String.format(
+ "`%s`.`%s`.`%s$row_tracking`",
+ catalogName, identifier.getDatabaseName(),
identifier.getObjectName());
+ }
+
+ private List<String> normalizeFieldName(List<String> fieldNames) {
+ return
fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList());
+ }
+
+ private String normalizeFieldName(String fieldName) {
+ if (StringUtils.isNullOrWhitespaceOnly(fieldName) ||
fieldName.endsWith(IDENTIFIER_QUOTE)) {
+ return fieldName;
+ }
+
+ String[] splitFieldNames = fieldName.split("\\.");
+ if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length
- 1])) {
+ return fieldName;
+ }
+
+ return String.join(
+ ".",
+ Arrays.stream(splitFieldNames)
+ .map(
+ part ->
+ part.endsWith(IDENTIFIER_QUOTE)
+ ? part
+ : IDENTIFIER_QUOTE + part +
IDENTIFIER_QUOTE)
+ .toArray(String[]::new));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionFactory.java
new file mode 100644
index 0000000000..6d921a984e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Collection;
+import java.util.Optional;
+
+/** The {@link ActionFactory} for {@link DataEvolutionMergeIntoAction}. */
+public class DataEvolutionMergeIntoActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "data_evolution_merge_into";
+
+ private static final String TARGET_AS = "target_as";
+ private static final String SOURCE_SQL = "source_sql";
+ private static final String SOURCE_TABLE = "source_table";
+ private static final String ON = "on";
+ private static final String MATCHED_UPDATE_SET = "matched_update_set";
+ private static final String SINK_PARALLELISM = "sink_parallelism";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ DataEvolutionMergeIntoAction action =
+ new DataEvolutionMergeIntoAction(
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ catalogConfigMap(params));
+
+ // optional params
+ if (params.has(TARGET_AS)) {
+ action.withTargetAlias(params.getRequired(TARGET_AS));
+ }
+
+ if (params.has(SOURCE_SQL)) {
+ Collection<String> sourceSqls =
params.getMultiParameter(SOURCE_SQL);
+ action.withSourceSqls(sourceSqls.toArray(new String[0]));
+ }
+
+ // required params
+ action.withSourceTable(params.getRequired(SOURCE_TABLE));
+ action.withMergeCondition(params.getRequired(ON));
+ action.withMatchedUpdateSet(params.getRequired(MATCHED_UPDATE_SET));
+
+ int sinkParallelism;
+ try {
+ sinkParallelism =
Integer.parseInt(params.getRequired(SINK_PARALLELISM));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid sink parallelism, must
be an integer", e);
+ }
+ action.withSinkParallelism(sinkParallelism);
+
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println("Action \"merge_into\" specially implemented for
DataEvolutionTables.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " data_evolution_merge_into \\\n"
+ + "--warehouse <warehouse_path> \\\n"
+ + "--database <database_name> \\\n"
+ + "--table <target_table_name> \\\n"
+ + "[--target_as <target_table_alias>] \\\n"
+ + "[--source_sql <sql> ...] \\\n"
+ + "--source_table <source_table_name> \\\n"
+ + "--on <merge_condition> \\\n"
+ + "--matched_update_set <update_changes> \\\n"
+ + "--sink_parallelism <sink_parallelism>");
+
+ System.out.println(" matched_update_set format:");
+ System.out.println(
+ " col=<source_table>.col | expression [, ...] (do not add
'<target_table>.' before 'col')");
+ System.out.println(
+ " * (update with all source cols; require target table's
schema is a projection of source's)");
+
+ System.out.println(" alternative arguments:");
+ System.out.println(" --path <table_path> to represent the table
path.");
+ System.out.println();
+
+ System.out.println("Note: ");
+ System.out.println(" 1. Target table must be a data-evolution
table.");
+ System.out.println(
+ " 2. This is a simplified merge-into action, specially
implemented for DataEvolutionTables:\n"
+ + " (1) Only supports matched update action.\n"
+ + " (2) Only generates new data files without
rewriting existing files.\n"
+ + " (3) Nulls will also override existing
values.");
+ System.out.println(
+ " 2. All conditions, set changes and values should use Flink
SQL syntax. Please quote them with \" to escape special characters.");
+ System.out.println(
+ " 3. You can pass sqls by --source_sql to config environment
and create source table at runtime");
+ System.out.println(" 4. Target alias cannot be duplicated with
existed table name.");
+ System.out.println(
+ " 5. If the source table is not in the current catalog and
current database, "
+ + "the source_table_name must be qualified
(database.table or catalog.database.table if in different catalog).");
+
+ System.out.println("Examples:");
+ System.out.println(
+ " data_evolution_merge_into \\\n"
+ + "--path hdfs:///path/to/T \\\n"
+ + "--source_table S \\\n"
+ + "--on \"T.id = S.id\" \\\n"
+ + "--matched_upsert_set \"value = S.`value`\"");
+ System.out.println(
+ " It will find matched rows of target table that meet
condition (T.id = S.id), then write new files which only contain"
+ + " the updated column `value`.");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 870d22012b..aa977a9e2b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -69,7 +69,7 @@ public abstract class TableActionBase extends ActionBase {
* Invoke {@code
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
* from a {@link StreamTableEnvironment} instance through reflecting.
*/
- private TableResult executeInternal(
+ protected TableResult executeInternal(
List<Transformation<?>> transformations, List<String>
sinkIdentifierNames) {
Class<?> clazz = batchTEnv.getClass().getSuperclass().getSuperclass();
try {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
new file mode 100644
index 0000000000..4f9ec8c643
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/DataEvolutionPartialWriteOperator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProjectedRow;
+import org.apache.paimon.utils.RecordWriter;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+
+/**
+ * The Flink Batch Operator to process sorted new rows for data-evolution
partial write. It assumes
+ * that input data has already been shuffled by firstRowId and sorted by rowId.
+ */
+public class DataEvolutionPartialWriteOperator
+ extends BoundedOneInputOperator<InternalRow, Committable> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataEvolutionPartialWriteOperator.class);
+
+ private final FileStoreTable table;
+
+ // dataType
+ private final RowType dataType;
+ private final InternalRow.FieldGetter[] fieldGetters;
+ private final int rowIdIndex;
+
+ // data type excludes of _ROW_ID field.
+ private final RowType writeType;
+
+ private List<Committable> committables = new ArrayList<>();
+
+ // --------------------- transient fields ---------------------------
+
+ // first-row-id related fields
+ private transient FirstRowIdLookup firstRowIdLookup;
+ private transient Map<Long, BinaryRow> firstIdToPartition;
+ private transient Map<Long, List<DataFileMeta>> firstIdToFiles;
+
+ private transient InnerTableRead tableRead;
+ private transient AbstractFileStoreWrite<InternalRow> tableWrite;
+ private transient Writer writer;
+
+ public DataEvolutionPartialWriteOperator(FileStoreTable table, RowType
dataType) {
+ this.table = table;
+ List<String> fieldNames =
+ dataType.getFieldNames().stream()
+ .filter(name ->
!SpecialFields.ROW_ID.name().equals(name))
+ .collect(Collectors.toList());
+ this.writeType = table.rowType().project(fieldNames);
+ this.dataType =
+
SpecialFields.rowTypeWithRowId(table.rowType()).project(dataType.getFieldNames());
+ this.rowIdIndex =
this.dataType.getFieldIndex(SpecialFields.ROW_ID.name());
+ this.fieldGetters = new
InternalRow.FieldGetter[dataType.getFieldCount()];
+ List<DataField> fields = this.dataType.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ this.fieldGetters[i] =
InternalRow.createFieldGetter(fields.get(i).type(), i);
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ // initialize row-id-related data structures
+ TreeSet<Long> rowIdSet = new TreeSet<>();
+ firstIdToPartition = new HashMap<>();
+ firstIdToFiles = new HashMap<>();
+
+ List<ManifestEntry> entries =
+ table.store()
+ .newScan()
+ .withManifestEntryFilter(
+ entry ->
+ entry.file().firstRowId() != null
+ &&
!isBlobFile(entry.file().fileName()))
+ .plan()
+ .files();
+
+ for (ManifestEntry entry : entries) {
+ DataFileMeta fileMeta = entry.file();
+ long firstRowId = fileMeta.nonNullFirstRowId();
+
+ rowIdSet.add(firstRowId);
+ firstIdToFiles.computeIfAbsent(firstRowId, k -> new
ArrayList<>()).add(fileMeta);
+ firstIdToPartition.put(firstRowId, entry.partition());
+ }
+ firstRowIdLookup = new FirstRowIdLookup(new ArrayList<>(rowIdSet));
+
+ // initialize table read & table write
+ tableRead = table.newRead().withReadType(dataType);
+ @SuppressWarnings("unchecked")
+ TableWriteImpl<InternalRow> writeImpl =
+ (TableWriteImpl<InternalRow>)
+
table.newBatchWriteBuilder().newWrite().withWriteType(writeType);
+ tableWrite = (AbstractFileStoreWrite<InternalRow>)
(writeImpl.getWrite());
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ finishWriter();
+
+ for (Committable committable : committables) {
+ output.collect(new StreamRecord<>(committable));
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<InternalRow> element) throws
Exception {
+ InternalRow row = element.getValue();
+ long rowId = row.getLong(rowIdIndex);
+
+ if (writer == null || !writer.contains(rowId)) {
+ finishWriter();
+ writer = createWriter(firstRowIdLookup.lookup(rowId));
+ }
+
+ writer.write(row);
+ }
+
+ private void finishWriter() throws Exception {
+ if (writer != null) {
+ committables.add(writer.finish());
+ }
+ writer = null;
+ }
+
+ private Writer createWriter(long firstRowId) throws IOException {
+ LOG.debug("Creating writer for row id {}", firstRowId);
+
+ BinaryRow partition = firstIdToPartition.get(firstRowId);
+ Preconditions.checkNotNull(
+ partition,
+ String.format("Cannot find the partition for firstRowId: %s ",
firstRowId));
+
+ List<DataFileMeta> files = firstIdToFiles.get(firstRowId);
+ Preconditions.checkState(
+ files != null && !files.isEmpty(),
+ String.format("Cannot find files for firstRowId: %s",
firstRowId));
+
+ long rowCount = files.get(0).rowCount();
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(0)
+ .withDataFiles(files)
+ .withBucketPath(
+
table.store().pathFactory().bucketPath(partition, 0).toString())
+ .rawConvertible(false)
+ .build();
+
+ return new Writer(
+ tableRead.createReader(dataSplit),
+ tableWrite.createWriter(partition, 0),
+ firstRowId,
+ rowCount);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (tableWrite != null) {
+ tableWrite.close();
+ }
+ }
+
+ /**
+ * The writer to write partial columns for a single file. The written new
file should be aligned
+ * with existing ones.
+ */
+ private class Writer {
+ // reader for original data
+ private final CloseableIterator<InternalRow> reader;
+
+ // writer to write new columns
+ private final RecordWriter<InternalRow> writer;
+
+ private final ProjectedRow reusedRow;
+ private final long firstRowId;
+ private final long rowCount;
+ private final BinaryRow partition;
+
+ private long writtenNum = 0;
+ private long prevRowId = -1;
+
+ Writer(
+ RecordReader<InternalRow> reader,
+ RecordWriter<InternalRow> writer,
+ long firstRowId,
+ long rowCount) {
+ this.reader = reader.toCloseableIterator();
+ this.writer = writer;
+ this.reusedRow = ProjectedRow.from(writeType, dataType);
+ this.firstRowId = firstRowId;
+ this.rowCount = rowCount;
+ this.partition = firstIdToPartition.get(firstRowId);
+ }
+
+ void write(InternalRow row) throws Exception {
+ long currentRowId = row.getLong(rowIdIndex);
+
+ // for rows with duplicated _ROW_ID, just choose the first one.
+ if (checkDuplication(currentRowId)) {
+ return;
+ }
+
+ if (!reader.hasNext()) {
+ throw new IllegalStateException(
+ "New file should be aligned with original file, it's a
bug.");
+ }
+
+ InternalRow originalRow;
+ while (reader.hasNext()) {
+ originalRow = reader.next();
+ long originalRowId = originalRow.getLong(rowIdIndex);
+
+ if (originalRowId < currentRowId) {
+ // new row is absent, we should use the original row
+ reusedRow.replaceRow(originalRow);
+ writer.write(reusedRow);
+ writtenNum++;
+ } else if (originalRowId == currentRowId) {
+ // new row is present, we should use the new row
+ reusedRow.replaceRow(row);
+ writer.write(reusedRow);
+ writtenNum++;
+ break;
+ } else {
+ // original row id > new row id, this means there are
duplicated row ids
+ // in the input rows, it cannot happen here.
+ throw new IllegalStateException("Duplicated row id " +
currentRowId);
+ }
+ }
+ }
+
+ private Committable finish() throws Exception {
+ // 1. write remaining original rows
+ try {
+ InternalRow row;
+ while (reader.hasNext()) {
+ row = reader.next();
+ reusedRow.replaceRow(row);
+ writer.write(reusedRow);
+ writtenNum++;
+ }
+ } finally {
+ reader.close();
+ }
+
+ Preconditions.checkState(
+ writtenNum == rowCount,
+ String.format(
+ "Written num %s not equal to original row num %s,
it's a bug.",
+ writtenNum, rowCount));
+
+ // 2. finish writer
+ CommitIncrement written = writer.prepareCommit(false);
+ List<DataFileMeta> fileMetas =
written.newFilesIncrement().newFiles();
+ Preconditions.checkState(
+ fileMetas.size() == 1, "This is a bug, Writer could only
produce one file");
+ DataFileMeta fileMeta =
fileMetas.get(0).assignFirstRowId(firstRowId);
+
+ CommitMessage commitMessage =
+ new CommitMessageImpl(
+ partition,
+ 0,
+ null,
+ new DataIncrement(
+ Collections.singletonList(fileMeta),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
+
+ return new Committable(Long.MAX_VALUE, commitMessage);
+ }
+
+ private boolean contains(long rowId) {
+ return rowId >= firstRowId && rowId < firstRowId + rowCount;
+ }
+
+ private boolean checkDuplication(long rowId) {
+ if (prevRowId == rowId) {
+ return true;
+ }
+ prevRowId = rowId;
+ return false;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
new file mode 100644
index 0000000000..ad87c5b016
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdAssigner.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MurmurHashUtils;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+
+import java.util.List;
+
+/** Assign first row id for each row through binary search. */
+public class FirstRowIdAssigner extends RichMapFunction<RowData, Tuple2<Long,
RowData>> {
+
+ private final FirstRowIdLookup firstRowIdLookup;
+
+ private final int rowIdFieldIndex;
+
+ public FirstRowIdAssigner(List<Long> firstRowIds, RowType rowType) {
+ this.firstRowIdLookup = new FirstRowIdLookup(firstRowIds);
+ this.rowIdFieldIndex =
rowType.getFieldNames().indexOf(SpecialFields.ROW_ID.name());
+ Preconditions.checkState(this.rowIdFieldIndex >= 0, "Do not found
_ROW_ID column.");
+ }
+
+ @Override
+ public Tuple2<Long, RowData> map(RowData value) throws Exception {
+ long rowId = value.getLong(rowIdFieldIndex);
+ return new Tuple2<>(firstRowIdLookup.lookup(rowId), value);
+ }
+
+ /** The Key Selector to get firstRowId from tuple2. */
+ public static class FirstRowIdKeySelector implements
KeySelector<Tuple2<Long, RowData>, Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Long getKey(Tuple2<Long, RowData> value) throws Exception {
+ return value.f0;
+ }
+ }
+
+ /** The Partitioner to partition rows by their firstRowId. */
+ public static class FirstRowIdPartitioner implements Partitioner<Long> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public int partition(Long firstRowId, int numPartitions) {
+ Preconditions.checkNotNull(firstRowId, "FirstRowId should not be
null.");
+ // Now we just simply floorMod the hash result of the firstRowId.
+ // We could make it more balanced by considering the number of
records of each row id
+ // range.
+ return floorMod(MurmurHashUtils.fmix(firstRowId), numPartitions);
+ }
+
+ /** For compatible with java-1.8. */
+ private int floorMod(long x, int y) {
+ return (int) (x - Math.floorDiv(x, (long) y) * y);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdLookup.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdLookup.java
new file mode 100644
index 0000000000..9c3a80bdda
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/FirstRowIdLookup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.dataevolution;
+
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/** Lookup related first row id for each row id through binary search. */
+public class FirstRowIdLookup implements Serializable {
+ // a sorted list of first row ids
+ private final List<Long> firstRowIds;
+
+ public FirstRowIdLookup(List<Long> firstRowIds) {
+ this.firstRowIds = firstRowIds;
+ }
+
+ public long lookup(long rowId) {
+ int index = Collections.binarySearch(firstRowIds, rowId);
+ long firstRowId;
+ if (index >= 0) {
+ firstRowId = firstRowIds.get(index);
+ } else {
+ // (-index - 1) is the position of the first element greater than
the rowId
+ Preconditions.checkState(
+ -index - 2 >= 0,
+ String.format(
+ "Unexpected RowID: %s which is smaller than the
smallest FirstRowId: %s",
+ rowId, firstRowIds.get(0)));
+ firstRowId = firstRowIds.get(-index - 2);
+ }
+ return firstRowId;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DataEvolutionMergeIntoProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DataEvolutionMergeIntoProcedure.java
new file mode 100644
index 0000000000..66423c2a64
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DataEvolutionMergeIntoProcedure.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.DataEvolutionMergeIntoAction;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * The MergeInto Procedure specially implemented for data evolution table.
Please see {@code
+ * DataEvolutionMergeIntoAction} for more information.
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ * CALL sys.data_evolution_merge_into(
+ * 'targetTableId', --required
+ * 'targetAlias', --optional
+ * 'sourceSqls', --optional
+ * 'sourceTable', --required
+ * 'mergeCondition', --required
+ * 'matchedUpdateSet', --required
+ * 'sinkParallelism' --required
+ * )
+ * </code></pre>
+ *
+ * <p>This procedure will be forced to use batch environments.
+ */
+public class DataEvolutionMergeIntoProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "data_evolution_merge_into";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "target_table", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "target_alias",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "source_sqls",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "source_table",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "merge_condition",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "matched_update_set",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "sink_parallelism",
+ type = @DataTypeHint("INTEGER"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String matchedUpdateSet,
+ Integer sinkParallelism) {
+ targetTableId = notnull(targetTableId);
+ targetAlias = notnull(targetAlias);
+ sourceSqls = notnull(sourceSqls);
+ sourceTable = notnull(sourceTable);
+ mergeCondition = notnull(mergeCondition);
+ matchedUpdateSet = notnull(matchedUpdateSet);
+ Preconditions.checkArgument(sinkParallelism != null && sinkParallelism
> 0);
+
+ Map<String, String> catalogOptions = catalog.options();
+ Identifier identifier = Identifier.fromString(targetTableId);
+ DataEvolutionMergeIntoAction action =
+ new DataEvolutionMergeIntoAction(
+ identifier.getDatabaseName(),
identifier.getObjectName(), catalogOptions);
+
+ action.withTargetAlias(nullable(targetAlias));
+
+ if (!sourceSqls.isEmpty()) {
+ action.withSourceSqls(sourceSqls.split(";"));
+ }
+
+ checkArgument(!sourceTable.isEmpty(), "Must specify source table.");
+ action.withSourceTable(sourceTable);
+
+ checkArgument(!mergeCondition.isEmpty(), "Must specify merge
condition.");
+ action.withMergeCondition(mergeCondition);
+
+ checkArgument(!matchedUpdateSet.isEmpty(), "Must specify matched
update set.");
+ action.withMatchedUpdateSet(matchedUpdateSet);
+
+ action.withSinkParallelism(sinkParallelism);
+
+
action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
+
+ TableResult result = action.runInternal();
+ JobClient jobClient = result.getJobClient().get();
+
+ return execute(procedureContext, jobClient);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index aef64a43d2..09474a3110 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -48,6 +48,7 @@
org.apache.paimon.flink.action.RemoveUnexistingManifestsActionFactory
org.apache.paimon.flink.action.ClearConsumerActionFactory
org.apache.paimon.flink.action.RescaleActionFactory
org.apache.paimon.flink.action.CloneActionFactory
+org.apache.paimon.flink.action.DataEvolutionMergeIntoActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -97,4 +98,5 @@ org.apache.paimon.flink.procedure.DropFunctionProcedure
org.apache.paimon.flink.procedure.AlterFunctionProcedure
org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
-org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
+org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
new file mode 100644
index 0000000000..0aa0941270
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DataEvolutionMergeIntoActionITCase.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.sEnv;
+import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** ITCase for {@link DataEvolutionMergeIntoAction}. */
+public class DataEvolutionMergeIntoActionITCase extends ActionITCaseBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataEvolutionMergeIntoActionITCase.class);
+
+ private static Stream<Arguments> testArguments() {
+ return Stream.of(
+ Arguments.of(true, "action"),
+ Arguments.of(false, "action"),
+ Arguments.of(true, "procedure"),
+ Arguments.of(false, "procedure"));
+ }
+
+ @BeforeEach
+ public void setup() throws Exception {
+ init(warehouse);
+
+ prepareTargetTable();
+
+ prepareSourceTable();
+ }
+
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testUpdateSingleColumn(boolean inDefault, String invoker)
throws Exception {
+ String targetDb = inDefault ? database : "test_db";
+ if (!inDefault) {
+ // create target table in a new database
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql("CREATE DATABASE test_db");
+ sEnv.executeSql("USE test_db");
+ bEnv.executeSql("USE test_db");
+ prepareTargetTable();
+ }
+
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1, "new_name1"),
+ changelogRow("+I", 2, "name2"),
+ changelogRow("+I", 3, "name3"),
+ changelogRow("+I", 7, "new_name7"),
+ changelogRow("+I", 8, "name8"),
+ changelogRow("+I", 11, "new_name11"),
+ changelogRow("+I", 15, null),
+ changelogRow("+I", 18, "new_name18"));
+
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, targetDb, "T")
+ .withMergeCondition("T.id=S.id")
+ .withMatchedUpdateSet("T.name=S.name")
+ .withSourceTable("S")
+ .withSinkParallelism(2);
+
+ builder.build().run();
+ } else {
+ String procedureStatement =
+ String.format(
+ "CALL sys.data_evolution_merge_into('%s.T', '',
'', 'S', 'T.id=S.id', 'name=S.name', 2)",
+ targetDb);
+
+ executeSQL(procedureStatement, false, true);
+ }
+
+ testBatchRead("SELECT id, name FROM T where id in (1, 2, 3, 7, 8, 11,
15, 18)", expected);
+ }
+
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testUpdateMultipleColumns(boolean inDefault, String invoker)
throws Exception {
+ String targetDb = inDefault ? database : "test_db";
+ if (!inDefault) {
+ // create target table in a new database
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql("CREATE DATABASE test_db");
+ sEnv.executeSql("USE test_db");
+ bEnv.executeSql("USE test_db");
+ prepareTargetTable();
+ }
+
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1, "new_name1", 100.1),
+ changelogRow("+I", 2, "name2", 0.2),
+ changelogRow("+I", 3, "name3", 0.3),
+ changelogRow("+I", 7, "new_name7", null),
+ changelogRow("+I", 8, "name8", 0.8),
+ changelogRow("+I", 11, "new_name11", 101.1),
+ changelogRow("+I", 15, null, 101.1),
+ changelogRow("+I", 18, "new_name18", 101.8));
+
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, targetDb, "T")
+ .withMergeCondition("T.id=S.id")
+
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+ .withSourceTable("S")
+ .withSinkParallelism(2);
+
+ builder.build().run();
+ } else {
+ String procedureStatement =
+ String.format(
+ "CALL sys.data_evolution_merge_into('%s.T', '',
'', 'S', 'T.id=S.id', 'name=S.name,value=S.`value`', 2)",
+ targetDb);
+
+ executeSQL(procedureStatement, false, true);
+ }
+
+ testBatchRead(
+ "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8,
11, 15, 18)",
+ expected);
+ }
+
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testSetLiterals(boolean inDefault, String invoker) throws
Exception {
+ String targetDb = inDefault ? database : "test_db";
+ if (!inDefault) {
+ // create target table in a new database
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql("CREATE DATABASE test_db");
+ sEnv.executeSql("USE test_db");
+ bEnv.executeSql("USE test_db");
+ prepareTargetTable();
+ }
+
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1, "testName", 0.0),
+ changelogRow("+I", 2, "name2", 0.2),
+ changelogRow("+I", 3, "name3", 0.3),
+ changelogRow("+I", 7, "testName", 0.0),
+ changelogRow("+I", 8, "name8", 0.8),
+ changelogRow("+I", 11, "testName", 0.0),
+ changelogRow("+I", 15, "testName", 0.0),
+ changelogRow("+I", 18, "testName", 0.0));
+
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, targetDb, "T")
+ .withMergeCondition("T.id=S.id")
+
.withMatchedUpdateSet("T.name='testName',T.value=CAST(0.0 as DOUBLE)")
+ .withSourceTable("S")
+ .withSinkParallelism(2);
+
+ builder.build().run();
+ } else {
+ String procedureStatement =
+ String.format(
+ "CALL sys.data_evolution_merge_into('%s.T', '',
'', 'S', 'T.id=S.id', 'name=''testName'',value=CAST(0.0 as DOUBLE)', 2)",
+ targetDb);
+
+ executeSQL(procedureStatement, false, true);
+ }
+
+ testBatchRead(
+ "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8,
11, 15, 18)",
+ expected);
+ }
+
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testUpdatePartitionColumnThrowsError(boolean inDefault, String
invoker)
+ throws Exception {
+ Throwable t;
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, database, "T")
+ .withMergeCondition("T.id=S.id")
+ .withMatchedUpdateSet("T.dt=S.id")
+ .withSourceTable("S")
+ .withSinkParallelism(2);
+ t = Assertions.assertThrows(IllegalStateException.class, () ->
builder.build().run());
+ Assertions.assertTrue(
+ t.getMessage().startsWith("User should not update
partition columns:"));
+ } else {
+ String procedureStatement =
+ "CALL sys.data_evolution_merge_into('default.T', '', '',
'S', 'T.id=S.id', 'dt=S.id', 2)";
+ t =
+ Assertions.assertThrows(
+ Exception.class, () ->
executeSQL(procedureStatement, false, true));
+ org.assertj.core.api.Assertions.assertThat(t)
+ .hasRootCauseInstanceOf(IllegalStateException.class)
+ .message()
+ .contains("User should not update partition columns:");
+ }
+ }
+
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testOneToManyUpdate(boolean inDefault, String invoker) throws
Exception {
+ // A single row in the target table may map to multiple rows in the
source table.
+ // For example:
+ // In target table we have: (id=1, value='val', _ROW_ID=0)
+ // In source table we have: (id=1, value='val1'), (id=1, value='val2')
+ // If we execute MERGE INTO T SET T.`value`=S.`VALUE` ON T.id=S.id
+ // There would be 2 rows with the same _ROW_ID but different new
values:
+ // (id=1, value='val1', _ROW_ID=0) and (id=1, value='val2', _ROW_ID=0)
+ // At that case, we will choose a random row as the final result
+ insertInto("S", "(1, 'dup_new_name1', 200.1)");
+
+ String targetDb = inDefault ? database : "test_db";
+ if (!inDefault) {
+ // create target table in a new database
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql("CREATE DATABASE test_db");
+ sEnv.executeSql("USE test_db");
+ bEnv.executeSql("USE test_db");
+ prepareTargetTable();
+ }
+
+ // First validate results except of id=1 row.
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 2, "name2", 0.2),
+ changelogRow("+I", 3, "name3", 0.3),
+ changelogRow("+I", 7, "new_name7", null),
+ changelogRow("+I", 8, "name8", 0.8),
+ changelogRow("+I", 11, "new_name11", 101.1),
+ changelogRow("+I", 15, null, 101.1),
+ changelogRow("+I", 18, "new_name18", 101.8));
+
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, targetDb, "T")
+ .withMergeCondition("T.id=S.id")
+
.withMatchedUpdateSet("T.name=S.name,T.value=S.`value`")
+ .withSourceTable("S")
+ .withSinkParallelism(2);
+
+ builder.build().run();
+ } else {
+ String procedureStatement =
+ String.format(
+ "CALL sys.data_evolution_merge_into('%s.T', '',
'', 'S', 'T.id=S.id', 'name=S.name,value=S.`value`', 2)",
+ targetDb);
+
+ executeSQL(procedureStatement, false, true);
+ }
+
+ testBatchRead(
+ "SELECT id, name, `value` FROM T where id in (2, 3, 7, 8, 11,
15, 18)", expected);
+
+ // then validate id=1 row
+ List<Row> possibleRows =
+ Arrays.asList(
+ changelogRow("+I", 1, "dup_new_name1", 200.1),
+ changelogRow("+I", 1, "new_name1", 100.1));
+ boolean passed = false;
+ for (Row row : possibleRows) {
+ try {
+ testBatchRead(
+ "SELECT id, name, `value` FROM T where id = 1",
+ Collections.singletonList(row));
+ passed = true;
+ break;
+ } catch (Throwable e) {
+ // error happens, just log it.
+ LOG.info("Error happens in testing one-to-many merge into.",
e);
+ }
+ }
+
+ Assertions.assertTrue(
+ passed,
+ "one-to-many merge into test fails, please check log for more
information.");
+ }
+
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testAlias(boolean inDefault, String invoker) throws Exception {
+ String targetDb = inDefault ? database : "test_db";
+ if (!inDefault) {
+ // create target table in a new database
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql("CREATE DATABASE test_db");
+ sEnv.executeSql("USE test_db");
+ bEnv.executeSql("USE test_db");
+ prepareTargetTable();
+ }
+
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1, "new_name1", 100.1),
+ changelogRow("+I", 2, "name2", 0.2),
+ changelogRow("+I", 3, "name3", 0.3),
+ changelogRow("+I", 7, "new_name7", null),
+ changelogRow("+I", 8, "name8", 0.8),
+ changelogRow("+I", 11, "new_name11", 101.1),
+ changelogRow("+I", 15, null, 101.1),
+ changelogRow("+I", 18, "new_name18", 101.8));
+
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, targetDb, "T")
+ .withMergeCondition("TempT.id=S.id")
+
.withMatchedUpdateSet("TempT.name=S.name,TempT.value=S.`value`")
+ .withSourceTable("S")
+ .withTargetAlias("TempT")
+ .withSinkParallelism(2);
+
+ builder.build().run();
+ } else {
+ String procedureStatement =
+ String.format(
+ "CALL sys.data_evolution_merge_into('%s.T',
'TempT', '', 'S', 'TempT.id=S.id', 'name=S.name,value=S.`value`', 2)",
+ targetDb);
+
+ executeSQL(procedureStatement, false, true);
+ }
+
+ testBatchRead(
+ "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8,
11, 15, 18)",
+ expected);
+ }
+
+ @ParameterizedTest(name = "use default db = {0}, invoker - {1}")
+ @MethodSource("testArguments")
+ public void testSqls(boolean inDefault, String invoker) throws Exception {
+ String targetDb = inDefault ? database : "test_db";
+ if (!inDefault) {
+ // create target table in a new database
+ sEnv.executeSql("DROP TABLE T");
+ sEnv.executeSql("CREATE DATABASE test_db");
+ sEnv.executeSql("USE test_db");
+ bEnv.executeSql("USE test_db");
+ prepareTargetTable();
+ }
+
+ List<Row> expected =
+ Arrays.asList(
+ changelogRow("+I", 1, "new_name1", 100.1),
+ changelogRow("+I", 2, "name2", 0.2),
+ changelogRow("+I", 3, "name3", 0.3),
+ changelogRow("+I", 7, "new_name7", null),
+ changelogRow("+I", 8, "name8", 0.8),
+ changelogRow("+I", 11, "new_name11", 101.1),
+ changelogRow("+I", 15, null, 101.1),
+ changelogRow("+I", 18, "new_name18", 101.8));
+
+ if (invoker.equals("action")) {
+ DataEvolutionMergeIntoActionBuilder builder =
+ builder(warehouse, targetDb, "T")
+ .withMergeCondition("TempT.id=SS.id")
+
.withMatchedUpdateSet("TempT.name=SS.name,TempT.value=SS.`value`")
+ .withSourceTable("SS")
+ .withTargetAlias("TempT")
+ .withSourceSqls(
+ "CREATE TEMPORARY VIEW SS AS SELECT id,
name, `value` FROM S")
+ .withSinkParallelism(2);
+
+ builder.build().run();
+ } else {
+ String procedureStatement =
+ String.format(
+ "CALL sys.data_evolution_merge_into('%s.T',
'TempT', 'CREATE TEMPORARY VIEW SS AS SELECT id, name, `value` FROM S',"
+ + " 'SS', 'TempT.id=SS.id',
'name=SS.name,value=SS.`value`', 2)",
+ targetDb);
+
+ executeSQL(procedureStatement, false, true);
+ }
+
+ testBatchRead(
+ "SELECT id, name, `value` FROM T where id in (1, 2, 3, 7, 8,
11, 15, 18)",
+ expected);
+ }
+
+ @Test
+ public void testRewriteMergeCondition() throws Exception {
+ Map<String, String> config = new HashMap<>();
+ config.put("warehouse", warehouse);
+ DataEvolutionMergeIntoAction action =
+ new DataEvolutionMergeIntoAction(database, "T", config);
+
+ String mergeCondition = "T.id=S.id";
+ assertEquals("`RT`.id=S.id",
action.rewriteMergeCondition(mergeCondition));
+
+ mergeCondition = "`T`.id=S.id";
+ assertEquals("`RT`.id=S.id",
action.rewriteMergeCondition(mergeCondition));
+
+ mergeCondition = "t.id = s.id AND T.pt = s.pt";
+ assertEquals(
+ "`RT`.id = s.id AND `RT`.pt = s.pt",
action.rewriteMergeCondition(mergeCondition));
+
+ mergeCondition = "TT.id = 1 AND T.id = 2";
+ assertEquals("TT.id = 1 AND `RT`.id = 2",
action.rewriteMergeCondition(mergeCondition));
+
+ mergeCondition = "TT.id = 'T.id' AND T.id = \"T.id\"";
+ assertEquals(
+ "TT.id = 'T.id' AND `RT`.id = \"T.id\"",
+ action.rewriteMergeCondition(mergeCondition));
+ }
+
+ private void prepareTargetTable() throws Exception {
+ sEnv.executeSql(
+ buildDdl(
+ "T",
+ Arrays.asList("id INT", "name STRING", "`value`
DOUBLE", "dt STRING"),
+ Collections.emptyList(),
+ Collections.singletonList("dt"),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ }
+ }));
+ insertInto(
+ "T",
+ "(1, 'name1', 0.1, '01-22')",
+ "(2, 'name2', 0.2, '01-22')",
+ "(3, 'name3', 0.3, '01-22')",
+ "(4, 'name4', 0.4, '01-22')",
+ "(5, 'name5', 0.5, '01-22')",
+ "(6, 'name6', 0.6, '01-22')",
+ "(7, 'name7', 0.7, '01-22')",
+ "(8, 'name8', 0.8, '01-22')",
+ "(9, 'name9', 0.9, '01-22')",
+ "(10, 'name10', 1.0, '01-22')");
+
+ insertInto(
+ "T",
+ "(11, 'name11', 1.1, '01-22')",
+ "(12, 'name12', 1.2, '01-22')",
+ "(13, 'name13', 1.3, '01-22')",
+ "(14, 'name14', 1.4, '01-22')",
+ "(15, 'name15', 1.5, '01-22')",
+ "(16, 'name16', 1.6, '01-22')",
+ "(17, 'name17', 1.7, '01-22')",
+ "(18, 'name18', 1.8, '01-22')",
+ "(19, 'name19', 1.9, '01-22')",
+ "(20, 'name20', 2.0, '01-22')");
+ }
+
+ private void prepareSourceTable() throws Exception {
+ sEnv.executeSql(
+ buildDdl(
+ "S",
+ Arrays.asList("id INT", "name STRING", "`value`
DOUBLE"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<String, String>() {
+ {
+ put(ROW_TRACKING_ENABLED.key(), "true");
+ put(DATA_EVOLUTION_ENABLED.key(), "true");
+ }
+ }));
+ insertInto(
+ "S",
+ "(1, 'new_name1', 100.1)",
+ "(7, 'new_name7', CAST(NULL AS DOUBLE))",
+ "(11, 'new_name11', 101.1)",
+ "(15, CAST(NULL AS STRING), 101.1)",
+ "(18, 'new_name18', 101.8)",
+ "(21, 'new_name21', 102.1)");
+ }
+
+ private DataEvolutionMergeIntoActionBuilder builder(
+ String warehouse, String database, String table) {
+ return new DataEvolutionMergeIntoActionBuilder(warehouse, database,
table);
+ }
+
+ private class DataEvolutionMergeIntoActionBuilder {
+ private final List<String> args;
+
+ DataEvolutionMergeIntoActionBuilder(String warehouse, String database,
String table) {
+ this.args =
+ new ArrayList<>(
+ Arrays.asList(
+ "data_evolution_merge_into",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ table));
+ }
+
+ public DataEvolutionMergeIntoActionBuilder withTargetAlias(String
targetAlias) {
+ if (targetAlias != null) {
+ args.add("--target_as");
+ args.add(targetAlias);
+ }
+ return this;
+ }
+
+ public DataEvolutionMergeIntoActionBuilder withSourceTable(String
sourceTable) {
+ args.add("--source_table");
+ args.add(sourceTable);
+ return this;
+ }
+
+ public DataEvolutionMergeIntoActionBuilder withSourceSqls(String...
sourceSqls) {
+ if (sourceSqls != null) {
+ for (String sql : sourceSqls) {
+ args.add("--source_sql");
+ args.add(sql);
+ }
+ }
+ return this;
+ }
+
+ public DataEvolutionMergeIntoActionBuilder withMergeCondition(String
mergeCondition) {
+ args.add("--on");
+ args.add(mergeCondition);
+ return this;
+ }
+
+ public DataEvolutionMergeIntoActionBuilder withMatchedUpdateSet(String
matchedUpdateSet) {
+ args.add("--matched_update_set");
+ args.add(matchedUpdateSet);
+ return this;
+ }
+
+ public DataEvolutionMergeIntoActionBuilder withSinkParallelism(int
sinkParallelism) {
+ args.add("--sink_parallelism");
+ args.add(String.valueOf(sinkParallelism));
+ return this;
+ }
+
+ public DataEvolutionMergeIntoAction build() {
+ return createAction(DataEvolutionMergeIntoAction.class, args);
+ }
+ }
+}