This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7080cfae [FLINK-31087] Introduce merge-into action
7080cfae is described below

commit 7080cfae9c9cce9db54b75e607a3d43cefc19ca7
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Fri Mar 3 13:58:32 2023 +0800

    [FLINK-31087] Introduce merge-into action
    
    This closes #540
---
 docs/content/docs/how-to/writing-tables.md         | 118 ++++
 .../flink/table/store/types/DataTypeCasts.java     |  56 +-
 .../flink/table/store/file/catalog/Identifier.java |   4 +
 .../table/store/tests/FlinkActionsE2eTest.java     | 109 ++++
 .../flink/table/store/connector/action/Action.java |  37 +-
 .../table/store/connector/action/ActionBase.java   |  71 +++
 .../table/store/connector/action/DeleteAction.java |  22 +-
 .../store/connector/action/MergeIntoAction.java    | 677 +++++++++++++++++++++
 ...nITCase.java => DropPartitionActionITCase.java} |   2 +-
 .../connector/action/MergeIntoActionITCase.java    | 383 ++++++++++++
 .../connector/util/ReadWriteTableTestUtil.java     |   2 +-
 11 files changed, 1444 insertions(+), 37 deletions(-)

diff --git a/docs/content/docs/how-to/writing-tables.md 
b/docs/content/docs/how-to/writing-tables.md
index e77c47a1..2d3d0deb 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -268,3 +268,121 @@ For more information of 'delete', see
 {{< /tab >}}
 
 {{< /tabs >}}
+
+## Merging into table
+
+Table Store supports "MERGE INTO" via submitting the 'merge-into' job through 
`flink run`.
+
+{{< hint info >}}
+Important table properties setting:
+1. Only [primary key table]({{< ref "docs/concepts/primary-key-table" >}}) 
supports this feature.
+2. The action won't produce UPDATE_BEFORE, so it's not recommended to set 
'changelog-producer' = 'input'.
+{{< /hint >}}
+
+The design referenced such syntax:
+```sql
+MERGE INTO target-table
+  USING source-table | source-expr AS source-alias
+  ON merge-condition
+  WHEN MATCHED [AND matched-condition]
+    THEN UPDATE SET xxx
+  WHEN MATCHED [AND matched-condition]
+    THEN DELETE
+  WHEN NOT MATCHED [AND not-matched-condition]
+    THEN INSERT VALUES (xxx)
+  WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+    THEN UPDATE SET xxx
+  WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+    THEN DELETE
+```
+The merge-into action use "upsert" semantics instead of "update", which means 
if the row exists, 
+then do update, else do insert. For example, for non-primary-key table, you 
can update every column,
+but for primary key table, if you want to update primary keys, you have to 
insert a new row which has 
+different primary keys from rows in the table. In this scenario, "upsert" is 
useful.
+
+{{< tabs "merge-into" >}}
+
+{{< tab "Flink Job" >}}
+
+Run the following command to submit a 'merge-into' job for the table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+    merge-into \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table <target-table> \
+    --using-table <source-table> \
+    --on <merge-condition> \
+    --merge-actions 
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>
 \
+    --matched-upsert-condition <matched-condition> \
+    --matched-upsert-set <upsert-changes> \
+    --matched-delete-condition <matched-condition> \
+    --not-matched-insert-condition <not-matched-condition> \
+    --not-matched-insert-values <insert-values> \
+    --not-matched-by-source-upsert-condition <not-matched-by-source-condition> 
\
+    --not-matched-by-source-upsert-set <not-matched-upsert-changes> \
+    --not-matched-by-source-delete-condition <not-matched-by-source-condition>
+    
+-- Examples:
+-- Find all orders mentioned in the source table, then mark as important if 
the price is above 100 
+-- or delete if the price is under 10.
+./flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+    merge-into \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table T \
+    --using-table S \
+    --on "T.id = S.order_id" \
+    --merge-actions \
+    matched-upsert,matched-delete \
+    --matched-upsert-condition "T.price > 100" \
+    --matched-upsert-set "id = T.id, price = T.price, mark = 'important'" \
+    --matched-delete-condition "T.price < 10" 
+    
+-- For matched order rows, increase the price, and if there is no match, 
insert the order from the 
+-- source table:
+./flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+    merge-into \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table T \
+    --using-table S \
+    --on "T.id = S.order_id" \
+    --merge-actions \
+    matched-upsert,not-matched-insert \
+    --matched-upsert-set "id = T.id, price = T.price + 20, mark = T.mark" \
+    --not-matched-insert-values * 
+
+-- For not matched by source order rows (which are in the target table and 
does not match any row in the
+-- source table based on the merge-condition), decrease the price or if the 
mark is 'trivial', delete them:
+./flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+    merge-into \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table T \
+    --using-table S \
+    --on "T.id = S.order_id" \
+    --merge-actions \
+    not-matched-by-source-upsert,not-matched-by-source-delete \
+    --not-matched-by-source-upsert-condition "T.mark <> 'trivial'" \
+    --not-matched-by-source-upsert-set "id = T.id, price = T.price - 20, mark 
= T.mark" \
+    --not-matched-by-source-delete-condition "T.mark = 'trivial'"
+```
+
+For more information of 'merge-into', see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.flink.table.store.connector.action.FlinkActions \
+    /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+    merge-into --help
+```
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
index 72260d2d..637a6dc1 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
@@ -57,9 +57,12 @@ public final class DataTypeCasts {
 
     private static final Map<DataTypeRoot, Set<DataTypeRoot>> 
explicitCastingRules;
 
+    private static final Map<DataTypeRoot, Set<DataTypeRoot>> 
compatibleCastingRules;
+
     static {
         implicitCastingRules = new HashMap<>();
         explicitCastingRules = new HashMap<>();
+        compatibleCastingRules = new HashMap<>();
 
         // identity casts
 
@@ -69,28 +72,36 @@ public final class DataTypeCasts {
 
         // cast specification
 
-        castTo(CHAR).implicitFrom(CHAR).explicitFromFamily(PREDEFINED, 
CONSTRUCTED).build();
+        castTo(CHAR)
+                .implicitFrom(CHAR)
+                .explicitFromFamily(PREDEFINED, CONSTRUCTED)
+                .compatibleFrom(CHAR, VARCHAR)
+                .build();
 
         castTo(VARCHAR)
                 .implicitFromFamily(CHARACTER_STRING)
                 .explicitFromFamily(PREDEFINED, CONSTRUCTED)
+                .compatibleFrom(CHAR, VARCHAR)
                 .build();
 
         castTo(BOOLEAN)
                 .implicitFrom(BOOLEAN)
                 .explicitFromFamily(CHARACTER_STRING, INTEGER_NUMERIC)
+                .compatibleFrom(BOOLEAN)
                 .build();
 
         castTo(BINARY)
                 .implicitFrom(BINARY)
                 .explicitFromFamily(CHARACTER_STRING)
                 .explicitFrom(VARBINARY)
+                .compatibleFrom(BINARY, VARBINARY)
                 .build();
 
         castTo(VARBINARY)
                 .implicitFromFamily(BINARY_STRING)
                 .explicitFromFamily(CHARACTER_STRING)
                 .explicitFrom(BINARY)
+                .compatibleFrom(BINARY, VARBINARY)
                 .build();
 
         castTo(DECIMAL)
@@ -103,61 +114,71 @@ public final class DataTypeCasts {
                 .implicitFrom(TINYINT)
                 .explicitFromFamily(NUMERIC, CHARACTER_STRING)
                 .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                .compatibleFrom(TINYINT)
                 .build();
 
         castTo(SMALLINT)
                 .implicitFrom(TINYINT, SMALLINT)
                 .explicitFromFamily(NUMERIC, CHARACTER_STRING)
                 .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                .compatibleFrom(SMALLINT)
                 .build();
 
         castTo(INTEGER)
                 .implicitFrom(TINYINT, SMALLINT, INTEGER)
                 .explicitFromFamily(NUMERIC, CHARACTER_STRING)
                 .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE)
                 .build();
 
         castTo(BIGINT)
                 .implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT)
                 .explicitFromFamily(NUMERIC, CHARACTER_STRING)
                 .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                .compatibleFrom(BIGINT)
                 .build();
 
         castTo(FLOAT)
                 .implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, 
DECIMAL)
                 .explicitFromFamily(NUMERIC, CHARACTER_STRING)
                 .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                .compatibleFrom(FLOAT)
                 .build();
 
         castTo(DOUBLE)
                 .implicitFromFamily(NUMERIC)
                 .explicitFromFamily(CHARACTER_STRING)
                 .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+                .compatibleFrom(DOUBLE)
                 .build();
 
         castTo(DATE)
                 .implicitFrom(DATE, TIMESTAMP_WITHOUT_TIME_ZONE)
                 .explicitFromFamily(TIMESTAMP, CHARACTER_STRING)
+                .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE)
                 .build();
 
         castTo(TIME_WITHOUT_TIME_ZONE)
                 .implicitFrom(TIME_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITHOUT_TIME_ZONE)
                 .explicitFromFamily(TIME, TIMESTAMP, CHARACTER_STRING)
+                .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE)
                 .build();
 
         castTo(TIMESTAMP_WITHOUT_TIME_ZONE)
                 .implicitFrom(TIMESTAMP_WITHOUT_TIME_ZONE, 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
                 .explicitFromFamily(DATETIME, CHARACTER_STRING, NUMERIC)
+                .compatibleFrom(TIMESTAMP_WITHOUT_TIME_ZONE)
                 .build();
 
         castTo(TIMESTAMP_WITH_LOCAL_TIME_ZONE)
                 .implicitFrom(TIMESTAMP_WITH_LOCAL_TIME_ZONE, 
TIMESTAMP_WITHOUT_TIME_ZONE)
                 .explicitFromFamily(DATETIME, CHARACTER_STRING, NUMERIC)
+                .compatibleFrom(TIMESTAMP_WITH_LOCAL_TIME_ZONE)
                 .build();
     }
 
     /**
-     * Returns whether the source type can be safely casted to the target type 
without loosing
+     * Returns whether the source type can be safely cast to the target type 
without loosing
      * information.
      *
      * <p>Implicit casts are used for type widening and type generalization 
(finding a common
@@ -169,7 +190,7 @@ public final class DataTypeCasts {
     }
 
     /**
-     * Returns whether the source type can be casted to the target type.
+     * Returns whether the source type can be cast to the target type.
      *
      * <p>Explicit casts correspond to the SQL cast specification and 
represent the logic behind a
      * {@code CAST(sourceType AS targetType)} operation. For example, it 
allows for converting most
@@ -180,6 +201,28 @@ public final class DataTypeCasts {
         return supportsCasting(sourceType, targetType, true);
     }
 
+    /**
+     * Returns whether the source type can be compatibly cast to the target 
type.
+     *
+     * <p>If two types are compatible, they should have the same underlying 
data structure. For
+     * example, {@link CharType} and {@link VarCharType} are both in the {@link
+     * DataTypeFamily#CHARACTER_STRING} family, meaning they both represent a 
character string. But
+     * the rest types are only compatible with themselves. For example, 
although {@link IntType} and
+     * {@link BigIntType} are both in the {@link DataTypeFamily#NUMERIC} 
family, they are not
+     * compatible because IntType represents a 4-byte signed integer while 
BigIntType represents an
+     * 8-byte signed integer. Especially, two {@link DecimalType}s are 
compatible only when they
+     * have the same {@code precision} and {@code scale}.
+     */
+    public static boolean supportsCompatibleCast(DataType sourceType, DataType 
targetType) {
+        if (sourceType.copy(true).equals(targetType.copy(true))) {
+            return true;
+        }
+
+        return compatibleCastingRules
+                .get(targetType.getTypeRoot())
+                .contains(sourceType.getTypeRoot());
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static boolean supportsCasting(
@@ -219,6 +262,7 @@ public final class DataTypeCasts {
         private final DataTypeRoot targetType;
         private final Set<DataTypeRoot> implicitSourceTypes = new HashSet<>();
         private final Set<DataTypeRoot> explicitSourceTypes = new HashSet<>();
+        private final Set<DataTypeRoot> compatibleSourceTypes = new 
HashSet<>();
 
         CastingRuleBuilder(DataTypeRoot targetType) {
             this.targetType = targetType;
@@ -256,6 +300,11 @@ public final class DataTypeCasts {
             return this;
         }
 
+        CastingRuleBuilder compatibleFrom(DataTypeRoot... sourceTypes) {
+            this.compatibleSourceTypes.addAll(Arrays.asList(sourceTypes));
+            return this;
+        }
+
         /**
          * Should be called after {@link 
#explicitFromFamily(DataTypeFamily...)} to remove
          * previously added types.
@@ -274,6 +323,7 @@ public final class DataTypeCasts {
         void build() {
             implicitCastingRules.put(targetType, implicitSourceTypes);
             explicitCastingRules.put(targetType, explicitSourceTypes);
+            compatibleCastingRules.put(targetType, compatibleSourceTypes);
         }
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
index 397172ad..b7b37be8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
@@ -58,6 +58,10 @@ public class Identifier implements Serializable {
         return String.format("%s.%s", database, table);
     }
 
+    public String getEscapedFullName() {
+        return getEscapedFullName('`');
+    }
+
     public String getEscapedFullName(char escapeChar) {
         return String.format(
                 "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar, 
table, escapeChar);
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 69ca1ac0..a5a0376f 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -256,6 +256,115 @@ public class FlinkActionsE2eTest extends E2eTestBase {
                 "2023-01-21, 1, 31");
     }
 
+    @Test
+    public void testMergeInto() throws Exception {
+        String tableTDdl =
+                "CREATE TABLE IF NOT EXISTS T (\n"
+                        + "    k INT,\n"
+                        + "    v STRING,\n"
+                        + "    last_action STRING,\n"
+                        + "    dt STRING,\n"
+                        + "    PRIMARY KEY (k, dt) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (dt);";
+
+        String insertToT =
+                "INSERT INTO T VALUES"
+                        + "(1, 'v_1', 'creation', '02-27'),"
+                        + "(2, 'v_2', 'creation', '02-27'),"
+                        + "(3, 'v_3', 'creation', '02-27'),"
+                        + "(4, 'v_4', 'creation', '02-27'),"
+                        + "(5, 'v_5', 'creation', '02-28'),"
+                        + "(6, 'v_6', 'creation', '02-28'),"
+                        + "(7, 'v_7', 'creation', '02-28'),"
+                        + "(8, 'v_8', 'creation', '02-28'),"
+                        + "(9, 'v_9', 'creation', '02-28'),"
+                        + "(10, 'v_10', 'creation', '02-28');\n";
+
+        String tableSDdl =
+                "CREATE TABLE IF NOT EXISTS S (\n"
+                        + "    k INT,\n"
+                        + "    v STRING,\n"
+                        + "    dt STRING,\n"
+                        + "    PRIMARY KEY (k, dt) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (dt);";
+
+        String insertToS =
+                "INSERT INTO S VALUES"
+                        + "(1, 'v_1', '02-27'),"
+                        + "(4, CAST (NULL AS STRING), '02-27'),"
+                        + "(7, 'Seven', '02-28'),"
+                        + "(8, CAST (NULL AS STRING), '02-28'),"
+                        + "(8, 'v_8', '02-29'),"
+                        + "(11, 'v_11', '02-29'),"
+                        + "(12, 'v_12', '02-29');\n";
+
+        runSql(
+                "SET 'table.dml-sync' = 'true';\n" + insertToT + insertToS,
+                catalogDdl,
+                useCatalogCmd,
+                tableTDdl,
+                tableSDdl);
+
+        // run merge-into job
+        Container.ExecResult execResult =
+                jobManager.execInContainer(
+                        "bin/flink",
+                        "run",
+                        "-p",
+                        "1",
+                        "-c",
+                        
"org.apache.flink.table.store.connector.action.FlinkActions",
+                        "--detached",
+                        "lib/flink-table-store.jar",
+                        "merge-into",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "T",
+                        "--using-table",
+                        "S",
+                        "--on",
+                        "T.k=S.k AND T.dt=S.dt",
+                        "--merge-actions",
+                        "matched-upsert,matched-delete,not-matched-insert",
+                        "--matched-upsert-condition",
+                        "T.v <> S.v AND S.v IS NOT NULL",
+                        "--matched-upsert-set",
+                        "v = S.v, last_action = 'matched_upsert'",
+                        "--matched-delete-condition",
+                        "S.v IS NULL",
+                        "--not-matched-insert-condition",
+                        "S.k < 12",
+                        "--not-matched-insert-values",
+                        "S.k, S.v, 'insert', S.dt");
+
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        // read all data from table store
+        runSql(
+                "INSERT INTO result1 SELECT * FROM T;",
+                catalogDdl,
+                useCatalogCmd,
+                tableTDdl,
+                createResultSink("result1", "k INT, v STRING, last_action 
STRING, dt STRING"));
+
+        // check the left data
+        checkResult(
+                "1, v_1, creation, 02-27",
+                "2, v_2, creation, 02-27",
+                "3, v_3, creation, 02-27",
+                "5, v_5, creation, 02-28",
+                "6, v_6, creation, 02-28",
+                "7, Seven, matched_upsert, 02-28",
+                "8, v_8, insert, 02-29",
+                "9, v_9, creation, 02-28",
+                "10, v_10, creation, 02-28",
+                "11, v_11, insert, 02-29");
+    }
+
     private void runSql(String sql, String... ddls) throws Exception {
         runSql(String.join("\n", ddls) + "\n" + sql);
     }
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
index ab758018..cf4ed56c 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
@@ -79,18 +79,9 @@ public interface Action {
     static List<Map<String, String>> getPartitions(MultipleParameterTool 
params) {
         List<Map<String, String>> partitions = new ArrayList<>();
         for (String partition : params.getMultiParameter("partition")) {
-            Map<String, String> kvs = new HashMap<>();
-            for (String kvString : partition.split(",")) {
-                String[] kv = kvString.split("=");
-                if (kv.length != 2) {
-                    System.err.print(
-                            "Invalid key-value pair \""
-                                    + kvString
-                                    + "\".\n"
-                                    + "Run <action> --help for help.");
-                    return null;
-                }
-                kvs.put(kv[0], kv[1]);
+            Map<String, String> kvs = parseKeyValues(partition);
+            if (kvs == null) {
+                return null;
             }
             partitions.add(kvs);
         }
@@ -98,6 +89,24 @@ public interface Action {
         return partitions;
     }
 
+    static Map<String, String> parseKeyValues(String keyValues) {
+        Map<String, String> kvs = new HashMap<>();
+        for (String kvString : keyValues.split(",")) {
+            String[] kv = kvString.split("=");
+            if (kv.length != 2) {
+                System.err.print(
+                        "Invalid key-value pair \""
+                                + kvString
+                                + "\".\n"
+                                + "Run <action> --help for help.");
+                return null;
+            }
+            kvs.put(kv[0].trim(), kv[1].trim());
+        }
+
+        return kvs;
+    }
+
     /** Factory to create {@link Action}. */
     class Factory {
 
@@ -105,6 +114,7 @@ public interface Action {
         private static final String COMPACT = "compact";
         private static final String DROP_PARTITION = "drop-partition";
         private static final String DELETE = "delete";
+        private static final String MERGE_INTO = "merge-into";
 
         public static Optional<Action> create(String[] args) {
             String action = args[0].toLowerCase();
@@ -117,6 +127,8 @@ public interface Action {
                     return DropPartitionAction.create(actionArgs);
                 case DELETE:
                     return DeleteAction.create(actionArgs);
+                case MERGE_INTO:
+                    return MergeIntoAction.create(actionArgs);
                 default:
                     System.err.println("Unknown action \"" + action + "\"");
                     printHelp();
@@ -132,6 +144,7 @@ public interface Action {
             System.out.println("  " + COMPACT);
             System.out.println("  " + DROP_PARTITION);
             System.out.println("  " + DELETE);
+            System.out.println("  " + MERGE_INTO);
 
             System.out.println("For detailed options of each action, run 
<action> --help");
         }
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
index 3cd36fc0..2e9014c8 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
@@ -18,17 +18,35 @@
 
 package org.apache.flink.table.store.connector.action;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.catalog.CatalogContext;
+import org.apache.flink.table.store.connector.FlinkCatalog;
+import org.apache.flink.table.store.connector.LogicalTypeConversion;
+import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 import org.apache.flink.table.store.file.catalog.Identifier;
+import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.options.CatalogOptions;
 import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypeCasts;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE;
+
 /** Abstract base of {@link Action}. */
 public abstract class ActionBase implements Action {
 
@@ -36,6 +54,12 @@ public abstract class ActionBase implements Action {
 
     protected Catalog catalog;
 
+    protected final FlinkCatalog flinkCatalog;
+
+    protected StreamExecutionEnvironment env;
+
+    protected StreamTableEnvironment tEnv;
+
     protected Identifier identifier;
 
     protected Table table;
@@ -50,6 +74,14 @@ public abstract class ActionBase implements Action {
                 CatalogFactory.createCatalog(
                         CatalogContext.create(
                                 new Options().set(CatalogOptions.WAREHOUSE, 
warehouse)));
+        flinkCatalog = new FlinkCatalog(catalog, "table-store", 
DEFAULT_DATABASE);
+
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
+
+        // register flink catalog to table environment
+        tEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
+        tEnv.useCatalog(flinkCatalog.getName());
 
         try {
             table = catalog.getTable(identifier);
@@ -62,4 +94,43 @@ public abstract class ActionBase implements Action {
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * Extract {@link LogicalType}s from Flink {@link 
org.apache.flink.table.types.DataType}s and
+     * convert to Table Store {@link DataType}s.
+     */
+    protected List<DataType> toTableStoreDataTypes(
+            List<org.apache.flink.table.types.DataType> flinkDataTypes) {
+        return flinkDataTypes.stream()
+                .map(org.apache.flink.table.types.DataType::getLogicalType)
+                .map(LogicalTypeConversion::toDataType)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Check whether each {@link DataType} of actualTypes is compatible with 
that of expectedTypes
+     * respectively.
+     */
+    protected boolean compatibleCheck(List<DataType> actualTypes, 
List<DataType> expectedTypes) {
+        if (actualTypes.size() != expectedTypes.size()) {
+            return false;
+        }
+
+        for (int i = 0; i < actualTypes.size(); i++) {
+            if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i), 
expectedTypes.get(i))) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    /** Sink {@link DataStream} dataStream to table. */
+    protected void sink(DataStream<RowData> dataStream) throws Exception {
+        new FlinkSinkBuilder((FileStoreTable) table)
+                .withInput(dataStream)
+                
.withLockFactory(Lock.factory(catalog.lockFactory().orElse(null), identifier))
+                .build();
+        env.execute();
+    }
 }
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
index 8f616766..6d18d52d 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
@@ -21,17 +21,11 @@ package org.apache.flink.table.store.connector.action;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.MultipleParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
-import org.apache.flink.table.store.connector.FlinkCatalog;
-import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
-import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.types.RowKind;
 
 import org.slf4j.Logger;
@@ -42,21 +36,17 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.store.connector.action.Action.getTablePath;
-import static 
org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE;
 
 /** Delete from table action for Flink. */
 public class DeleteAction extends ActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DeleteAction.class);
 
-    private final FlinkCatalog flinkCatalog;
-
     private final String filter;
 
     public DeleteAction(String warehouse, String databaseName, String 
tableName, String filter) {
         super(warehouse, databaseName, tableName);
         this.filter = filter;
-        flinkCatalog = new FlinkCatalog(catalog, "table-store", 
DEFAULT_DATABASE);
     }
 
     public static Optional<Action> create(String[] args) {
@@ -111,18 +101,11 @@ public class DeleteAction extends ActionBase {
     public void run() throws Exception {
         LOG.debug("Run delete action with filter '{}'.", filter);
 
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        StreamTableEnvironment tEnv =
-                StreamTableEnvironment.create(env, 
EnvironmentSettings.inBatchMode());
-
-        tEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
-        tEnv.useCatalog(flinkCatalog.getName());
-
         Table queriedTable =
                 tEnv.sqlQuery(
                         String.format(
                                 "SELECT * FROM %s WHERE %s",
-                                identifier.getEscapedFullName('`'), filter));
+                                identifier.getEscapedFullName(), filter));
 
         List<DataStructureConverter<Object, Object>> converters =
                 queriedTable.getResolvedSchema().getColumnDataTypes().stream()
@@ -146,7 +129,6 @@ public class DeleteAction extends ActionBase {
                                     return rowData;
                                 });
 
-        new FlinkSinkBuilder((FileStoreTable) 
table).withInput(dataStream).build();
-        env.execute();
+        sink(dataStream);
     }
 }
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
new file mode 100644
index 00000000..e131087d
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.store.connector.LogicalTypeConversion;
+import org.apache.flink.table.store.file.catalog.Identifier;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.types.DataField;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.store.connector.action.Action.getTablePath;
+import static 
org.apache.flink.table.store.connector.action.Action.parseKeyValues;
+
+/**
+ * Flink action for 'MERGE INTO', which references the syntax as follows (we 
use 'upsert' semantics
+ * instead of 'update'):
+ *
+ * <pre><code>
+ *  MERGE INTO target-table
+ *  USING source-table | source-expr AS source-alias
+ *  ON merge-condition
+ *  WHEN MATCHED [AND matched-condition]
+ *    THEN UPDATE SET xxx
+ *  WHEN MATCHED [AND matched-condition]
+ *    THEN DELETE
+ *  WHEN NOT MATCHED [AND not-matched-condition]
+ *    THEN INSERT VALUES (xxx)
+ *  WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ *    THEN UPDATE SET xxx
+ *  WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ *    THEN DELETE
+ * </code></pre>
+ *
+ * <p>It builds a query to find the rows to be changed. INNER JOIN with 
merge-condition is used to
+ * find MATCHED rows, and NOT EXISTS with merge-condition is used to find NOT 
MATCHED rows, then the
+ * condition of each action is used to filter the rows.
+ */
+public class MergeIntoAction extends ActionBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MergeIntoAction.class);
+
+    // primary keys of target table
+    private final List<String> primaryKeys;
+
+    // converters for Row to RowData
+    private final List<DataStructureConverter<Object, Object>> converters;
+
+    // field names of target table
+    private final List<String> targetFieldNames;
+
+    // source
+    @Nullable private String sourceTable;
+    private Identifier sourceTableIdentifier;
+    @Nullable private String source;
+    @Nullable private String sourceAlias;
+
+    // merge condition
+    private String mergeCondition;
+
+    // actions to be taken
+    private boolean matchedUpsert;
+    private boolean notMatchedUpsert;
+    private boolean matchedDelete;
+    private boolean notMatchedDelete;
+    private boolean insert;
+
+    // upsert
+    @Nullable private String matchedUpsertCondition;
+    @Nullable private String matchedUpsertSet;
+
+    @Nullable private String notMatchedBySourceUpsertCondition;
+    @Nullable private String notMatchedBySourceUpsertSet;
+
+    // delete
+    @Nullable private String matchedDeleteCondition;
+    @Nullable private String notMatchedBySourceDeleteCondition;
+
+    // insert
+    @Nullable private String notMatchedInsertCondition;
+    @Nullable private String notMatchedInsertValues;
+
+    MergeIntoAction(String warehouse, String database, String tableName) {
+        super(warehouse, database, tableName);
+
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports merge-into action. 
The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+
+        // init primaryKeys of target table
+        primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
+        if (primaryKeys.isEmpty()) {
+            throw new UnsupportedOperationException(
+                    "merge-into action doesn't support table with no primary 
keys defined.");
+        }
+
+        // init DataStructureConverters
+        converters =
+                table.rowType().getFieldTypes().stream()
+                        .map(LogicalTypeConversion::toLogicalType)
+                        .map(TypeConversions::fromLogicalToDataType)
+                        .map(DataStructureConverters::getConverter)
+                        .collect(Collectors.toList());
+
+        // init field names of target table
+        targetFieldNames =
+                table.rowType().getFields().stream()
+                        .map(DataField::name)
+                        .collect(Collectors.toList());
+    }
+
+    public MergeIntoAction withSourceTable(String sourceTable) {
+        this.sourceTable = sourceTable;
+        return this;
+    }
+
+    public MergeIntoAction withSource(String source, String sourceAlias) {
+        this.source = source;
+        this.sourceAlias = sourceAlias;
+        return this;
+    }
+
+    public MergeIntoAction withMergeCondition(String mergeCondition) {
+        this.mergeCondition = mergeCondition;
+        return this;
+    }
+
+    public MergeIntoAction withMatchedUpsert(
+            @Nullable String matchedUpsertCondition, String matchedUpsertSet) {
+        this.matchedUpsert = true;
+        this.matchedUpsertCondition = matchedUpsertCondition;
+        this.matchedUpsertSet = matchedUpsertSet;
+        return this;
+    }
+
+    public MergeIntoAction withNotMatchedBySourceUpsert(
+            @Nullable String notMatchedBySourceUpsertCondition,
+            String notMatchedBySourceUpsertSet) {
+        this.notMatchedUpsert = true;
+        this.notMatchedBySourceUpsertCondition = 
notMatchedBySourceUpsertCondition;
+        this.notMatchedBySourceUpsertSet = notMatchedBySourceUpsertSet;
+        return this;
+    }
+
+    public MergeIntoAction withMatchedDelete(@Nullable String 
matchedDeleteCondition) {
+        this.matchedDelete = true;
+        this.matchedDeleteCondition = matchedDeleteCondition;
+        return this;
+    }
+
+    public MergeIntoAction withNotMatchedBySourceDelete(
+            @Nullable String notMatchedBySourceDeleteCondition) {
+        this.notMatchedDelete = true;
+        this.notMatchedBySourceDeleteCondition = 
notMatchedBySourceDeleteCondition;
+        return this;
+    }
+
+    public MergeIntoAction withNotMatchedInsert(
+            @Nullable String notMatchedInsertCondition, String 
notMatchedInsertValues) {
+        this.insert = true;
+        this.notMatchedInsertCondition = notMatchedInsertCondition;
+        this.notMatchedInsertValues = notMatchedInsertValues;
+        return this;
+    }
+
+    public static Optional<Action> create(String[] args) {
+        LOG.info("merge-into job args: {}", String.join(" ", args));
+
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+        if (tablePath == null) {
+            return Optional.empty();
+        }
+
+        MergeIntoAction action = new MergeIntoAction(tablePath.f0, 
tablePath.f1, tablePath.f2);
+
+        if (!initSource(params, action)) {
+            return Optional.empty();
+        }
+
+        if (argumentAbsent(params, "on")) {
+            return Optional.empty();
+        }
+        action.withMergeCondition(params.get("on"));
+
+        List<String> actions =
+                Arrays.stream(params.get("merge-actions").split(","))
+                        .map(String::trim)
+                        .collect(Collectors.toList());
+        if (actions.contains("matched-upsert")) {
+            if (argumentAbsent(params, "matched-upsert-set")) {
+                return Optional.empty();
+            }
+            action.withMatchedUpsert(
+                    params.get("matched-upsert-condition"), 
params.get("matched-upsert-set"));
+        }
+        if (actions.contains("not-matched-by-source-upsert")) {
+            if (argumentAbsent(params, "not-matched-by-source-upsert-set")) {
+                return Optional.empty();
+            }
+            action.withNotMatchedBySourceUpsert(
+                    params.get("not-matched-by-source-upsert-condition"),
+                    params.get("not-matched-by-source-upsert-set"));
+        }
+        if (actions.contains("matched-delete")) {
+            action.withMatchedDelete(params.get("matched-delete-condition"));
+        }
+        if (actions.contains("not-matched-by-source-delete")) {
+            action.withNotMatchedBySourceDelete(
+                    params.get("not-matched-by-source-delete-condition"));
+        }
+        if (actions.contains("not-matched-insert")) {
+            if (argumentAbsent(params, "not-matched-insert-values")) {
+                return Optional.empty();
+            }
+            action.withNotMatchedInsert(
+                    params.get("not-matched-insert-condition"),
+                    params.get("not-matched-insert-values"));
+        }
+
+        if (!validate(action)) {
+            return Optional.empty();
+        }
+
+        return Optional.of(action);
+    }
+
+    private static boolean initSource(MultipleParameterTool params, 
MergeIntoAction action) {
+        String sourceTable = params.get("using-table");
+        String source = params.get("using-source");
+        String sourceAlias = params.get("as");
+
+        int count = 0;
+        if (sourceTable != null) {
+            action.withSourceTable(params.get("using-table"));
+            count++;
+        }
+
+        if (source != null) {
+            if (sourceAlias == null) {
+                System.err.println(
+                        "The source and its alias must be specified 
together.\n"
+                                + "Run <action> --help for help.");
+                return false;
+            }
+            action.withSource(source, sourceAlias);
+            count++;
+        }
+
+        if (count != 1) {
+            System.err.println(
+                    "Please specify either \"source table\" or \"source, 
source's alias\".\n"
+                            + "Run <action> --help for help.");
+            return false;
+        }
+
+        return true;
+    }
+
+    private static boolean argumentAbsent(MultipleParameterTool params, String 
key) {
+        if (!params.has(key)) {
+            System.err.println(key + " is absent.\nRun <action> --help for 
help.");
+            return true;
+        }
+
+        return false;
+    }
+
+    private static boolean validate(MergeIntoAction action) {
+        if (!action.matchedUpsert
+                && !action.notMatchedUpsert
+                && !action.matchedDelete
+                && !action.notMatchedDelete
+                && !action.insert) {
+            System.err.println(
+                    "Must specify at least one merge action.\nRun <action> 
--help for help.");
+            return false;
+        }
+
+        if ((action.matchedUpsert && action.matchedDelete)
+                && (action.matchedUpsertCondition == null
+                        || action.matchedDeleteCondition == null)) {
+            System.err.println(
+                    "If both matched-upsert and matched-delete actions are 
present, their conditions must both be present too.\n"
+                            + "Run <action> --help for help.");
+            return false;
+        }
+
+        if ((action.notMatchedUpsert && action.notMatchedDelete)
+                && (action.notMatchedBySourceUpsertCondition == null
+                        || action.notMatchedBySourceDeleteCondition == null)) {
+            System.err.println(
+                    "If both not-matched-by-source-upsert and 
not-matched-by--source-delete actions are present, their conditions must both 
be present too.\n"
+                            + "Run <action> --help for help.");
+            return false;
+        }
+
+        if (action.notMatchedBySourceUpsertSet != null
+                && action.notMatchedBySourceUpsertSet.equals("*")) {
+            System.err.println("The '*' cannot be used in 
not-matched-by-source-upsert-set");
+            return false;
+        }
+
+        return true;
+    }
+
+    private static void printHelp() {
+        System.out.println("Action \"merge-into\" simulates the \"MERGE INTO\" 
syntax.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  merge-into --warehouse <warehouse-path>\n"
+                        + "             --database <database-name>\n"
+                        + "             --table <table-name>\n"
+                        + "             --using-table <source-table>\n"
+                        + "             --on <merge-condition>\n"
+                        + "             --merge-actions 
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>\n"
+                        + "             --matched-upsert-condition 
<matched-condition>\n"
+                        + "             --matched-upsert-set 
<upsert-changes>\n"
+                        + "             --matched-delete-condition 
<matched-condition>\n"
+                        + "             --not-matched-insert-condition 
<not-matched-condition>\n"
+                        + "             --not-matched-insert-values 
<insert-values>\n"
+                        + "             
--not-matched-by-source-upsert-condition <not-matched-by-source-condition>\n"
+                        + "             --not-matched-by-source-upsert-set 
<not-matched-upsert-changes>\n"
+                        + "             
--not-matched-by-source-delete-condition <not-matched-by-source-condition>");
+        System.out.println("  matched-upsert-changes format:");
+        System.out.println(
+                "    col=<source-table>.col | expression [, ...] (do not add 
'<target-table>.' before 'col')");
+        System.out.println(
+                "    * (upsert with all source cols; require target table's 
schema is equal to source's)");
+        System.out.println("  not-matched-upsert-changes format:");
+        System.out.println("    col=expression (cannot use source table's 
col)");
+        System.out.println("  insert-values format:");
+        System.out.println(
+                "    col1,col2,...,col_end (must specify values of all 
columns; can use <source-table>.col or expression)");
+        System.out.println(
+                "    * (insert with all source cols; require target table's 
schema is equal to source's)");
+        System.out.println(
+                "  not-matched-condition: cannot use target table's columns to 
construct condition expression.");
+        System.out.println(
+                "  not-matched-by-source-condition: cannot use source table's 
columns to construct condition expression.");
+        System.out.println("  alternative arguments:");
+        System.out.println("    --path <table-path> to represent the table 
path.");
+        System.out.println(
+                "    --using-source <source-expression> --as <source-alias> to 
replace --using-table.");
+        System.out.println();
+
+        System.out.println("Note: ");
+        System.out.println("  1. Target table must has primary keys.");
+        System.out.println(
+                "  2. All conditions, set changes and values should use Flink 
SQL syntax. Please quote them with \" to escape special characters.");
+        System.out.println("  3. source-alias cannot be duplicated with 
existed table name.");
+        System.out.println("  4. At least one merge action must be 
specified.");
+        System.out.println("  5. How to determine the changed rows with 
different \"matched\":");
+        System.out.println(
+                "    matched: changed rows are from target table and each can 
match a source table row "
+                        + "based on merge-condition and optional 
matched-condition.");
+        System.out.println(
+                "    not-matched: changed rows are from source table and all 
rows cannot match any target table row "
+                        + "based on merge-condition and optional 
not-matched-condition.");
+        System.out.println(
+                "    not-matched-by-source: changed rows are from target table 
and all row cannot match any source table row "
+                        + "based on merge-condition and optional 
not-matched-by-source-condition.");
+        System.out.println(
+                "  6. If both matched-upsert and matched-delete actions are 
present, their conditions must both be present too "
+                        + "(same to not-matched-by-source-upsert and 
not-matched-by-source-delete). Otherwise, all conditions are optional.");
+        System.out.println("  7. source-alias cannot be duplicated with 
existed table name.");
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println(
+                "  merge-into --path hdfs:///path/to/T\n"
+                        + "             --using-table S\n"
+                        + "             --on \"T.k=S.k\"\n"
+                        + "             --merge-actions matched-upsert\n"
+                        + "             --matched-upsert-condition 
\"T.v<>S.v\"\n"
+                        + "             --matched-upsert-set \"S.k,S.v\"");
+        System.out.println(
+                "  It will find matched rows of target table that meet 
condition (T.k=S.k AND T.v<>S.v) ind '-D' to test_table, then update T.v with 
S.v.");
+    }
+
+    @Override
+    public void run() throws Exception {
+        // prepare source table
+        if (sourceTable != null) {
+            if (sourceTable.contains(".")) {
+                sourceTableIdentifier = Identifier.fromString(sourceTable);
+            } else {
+                sourceTableIdentifier =
+                        Identifier.create(identifier.getDatabaseName(), 
sourceTable);
+            }
+        } else {
+            tEnv.registerTable(sourceAlias, tEnv.sqlQuery(source));
+            sourceTableIdentifier = 
Identifier.create(identifier.getDatabaseName(), sourceAlias);
+        }
+
+        List<DataStream<RowData>> dataStreams =
+                Stream.of(
+                                getMatchedUpsertDataStream(),
+                                getNotMatchedUpsertDataStream(),
+                                getMatchedDeleteDataStream(),
+                                getNotMatchedDeleteDataStream(),
+                                getInsertDataStream())
+                        .filter(Optional::isPresent)
+                        .map(Optional::get)
+                        .collect(Collectors.toList());
+
+        DataStream<RowData> firstDs = dataStreams.get(0);
+        
sink(firstDs.union(dataStreams.stream().skip(1).toArray(DataStream[]::new)));
+    }
+
+    private Optional<DataStream<RowData>> getMatchedUpsertDataStream() {
+        if (!matchedUpsert) {
+            return Optional.empty();
+        }
+
+        List<String> project;
+        // extract project
+        if (matchedUpsertSet.equals("*")) {
+            project = 
Collections.singletonList(sourceTableIdentifier.getObjectName() + "." + "*");
+        } else {
+            // validate upsert changes
+            // no need to check primary keys changes because merge condition 
must contain all pks
+            // of the target table
+            Map<String, String> changes = parseKeyValues(matchedUpsertSet);
+            if (changes == null) {
+                throw new IllegalArgumentException(
+                        "matched-upsert-set is invalid.\nRun <action> --help 
for help.");
+            }
+            for (String targetField : changes.keySet()) {
+                if (!targetFieldNames.contains(targetField)) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "Invalid column reference '%s' of table 
'%s' at matched-upsert action.",
+                                    targetField, identifier.getFullName()));
+                }
+            }
+
+            // replace field names
+            // the table name is added before column name to avoid ambiguous 
column reference
+            project =
+                    targetFieldNames.stream()
+                            .map(
+                                    name ->
+                                            changes.getOrDefault(
+                                                    name, 
identifier.getObjectName() + "." + name))
+                            .collect(Collectors.toList());
+        }
+
+        // use inner join to find matched records
+        String query =
+                String.format(
+                        "SELECT %s FROM %s INNER JOIN %s ON %s %s",
+                        String.join(",", project),
+                        identifier.getEscapedFullName(),
+                        sourceTableIdentifier.getEscapedFullName(),
+                        mergeCondition,
+                        matchedUpsertCondition == null ? "" : "WHERE " + 
matchedUpsertCondition);
+        LOG.info("Query used for matched-update:\n{}", query);
+
+        Table source = tEnv.sqlQuery(query);
+        checkSchema("matched-upsert", source);
+
+        return Optional.of(toDataStream(source, RowKind.UPDATE_AFTER, 
converters));
+    }
+
+    private Optional<DataStream<RowData>> getNotMatchedUpsertDataStream() {
+        if (!notMatchedUpsert) {
+            return Optional.empty();
+        }
+
+        // validate upsert change
+        Map<String, String> changes = 
parseKeyValues(notMatchedBySourceUpsertSet);
+        if (changes == null) {
+            throw new IllegalArgumentException(
+                    "matched-upsert-set is invalid.\nRun <action> --help for 
help.");
+        }
+        for (String targetField : changes.keySet()) {
+            if (!targetFieldNames.contains(targetField)) {
+                throw new RuntimeException(
+                        String.format(
+                                "Invalid column reference '%s' of table '%s' 
at not-matched-by-source-upsert action.\nRun <action> --help for help.",
+                                targetField, identifier.getFullName()));
+            }
+
+            if (primaryKeys.contains(targetField)) {
+                throw new RuntimeException(
+                        "Not allowed to change primary key in 
not-matched-by-source-upsert-set.\nRun <action> --help for help.");
+            }
+        }
+
+        // replace field names (won't be ambiguous here)
+        List<String> project =
+                targetFieldNames.stream()
+                        .map(name -> changes.getOrDefault(name, name))
+                        .collect(Collectors.toList());
+
+        // use not exists to find not matched records
+        String query =
+                String.format(
+                        "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
+                        String.join(",", project),
+                        identifier.getEscapedFullName(),
+                        sourceTableIdentifier.getEscapedFullName(),
+                        mergeCondition,
+                        notMatchedBySourceUpsertCondition == null
+                                ? ""
+                                : String.format("AND (%s)", 
notMatchedBySourceUpsertCondition));
+
+        LOG.info("Query used for not-matched-by-source-upsert:\n{}", query);
+
+        Table source = tEnv.sqlQuery(query);
+        checkSchema("not-matched-by-source-upsert", source);
+
+        return Optional.of(toDataStream(source, RowKind.UPDATE_AFTER, 
converters));
+    }
+
+    private Optional<DataStream<RowData>> getMatchedDeleteDataStream() {
+        if (!matchedDelete) {
+            return Optional.empty();
+        }
+
+        // the table name is added before column name to avoid ambiguous 
column reference
+        List<String> project =
+                targetFieldNames.stream()
+                        .map(name -> identifier.getObjectName() + "." + name)
+                        .collect(Collectors.toList());
+
+        // use inner join to find matched records
+        String query =
+                String.format(
+                        "SELECT %s FROM %s INNER JOIN %s ON %s %s",
+                        String.join(",", project),
+                        identifier.getEscapedFullName(),
+                        sourceTableIdentifier.getEscapedFullName(),
+                        mergeCondition,
+                        matchedDeleteCondition == null ? "" : "WHERE " + 
matchedDeleteCondition);
+        LOG.info("Query used by matched-delete:\n{}", query);
+
+        Table source = tEnv.sqlQuery(query);
+        checkSchema("matched-delete", source);
+
+        return Optional.of(toDataStream(source, RowKind.DELETE, converters));
+    }
+
+    private Optional<DataStream<RowData>> getNotMatchedDeleteDataStream() {
+        if (!notMatchedDelete) {
+            return Optional.empty();
+        }
+
+        // use not exists to find not matched records
+        String query =
+                String.format(
+                        "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
+                        String.join(",", targetFieldNames),
+                        identifier.getEscapedFullName(),
+                        sourceTableIdentifier.getEscapedFullName(),
+                        mergeCondition,
+                        notMatchedBySourceDeleteCondition == null
+                                ? ""
+                                : String.format("AND (%s)", 
notMatchedBySourceDeleteCondition));
+        LOG.info("Query used by not-matched-by-source-delete:\n{}", query);
+
+        Table source = tEnv.sqlQuery(query);
+        checkSchema("not-matched-by-source-delete", source);
+
+        return Optional.of(toDataStream(source, RowKind.DELETE, converters));
+    }
+
+    private Optional<DataStream<RowData>> getInsertDataStream() {
+        if (!insert) {
+            return Optional.empty();
+        }
+
+        // use not exist to find rows to insert
+        String query =
+                String.format(
+                        "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s 
WHERE %s) %s",
+                        notMatchedInsertValues,
+                        sourceTableIdentifier.getEscapedFullName(),
+                        identifier.getEscapedFullName(),
+                        mergeCondition,
+                        notMatchedInsertCondition == null
+                                ? ""
+                                : String.format("AND (%s)", 
notMatchedInsertCondition));
+        LOG.info("Query used by not-matched-insert:\n{}", query);
+
+        Table source = tEnv.sqlQuery(query);
+        checkSchema("not-matched-insert", source);
+
+        return Optional.of(toDataStream(source, RowKind.INSERT, converters));
+    }
+
+    private void checkSchema(String action, Table source) {
+        List<DataType> actualTypes =
+                
toTableStoreDataTypes(source.getResolvedSchema().getColumnDataTypes());
+        List<DataType> expectedTypes = this.table.rowType().getFieldTypes();
+        if (!compatibleCheck(actualTypes, expectedTypes)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "The schema of result in action '%s' is invalid.\n"
+                                    + "Result schema:   [%s]\n"
+                                    + "Expected schema: [%s]",
+                            action,
+                            actualTypes.stream()
+                                    .map(DataType::asSQLString)
+                                    .collect(Collectors.joining(", ")),
+                            expectedTypes.stream()
+                                    .map(DataType::asSQLString)
+                                    .collect(Collectors.joining(", "))));
+        }
+    }
+
+    // pass converters to avoid "not serializable" exception
+    private DataStream<RowData> toDataStream(
+            Table source, RowKind kind, List<DataStructureConverter<Object, 
Object>> converters) {
+        return tEnv.toChangelogStream(source)
+                .map(
+                        row -> {
+                            int arity = row.getArity();
+                            GenericRowData rowData = new GenericRowData(kind, 
arity);
+                            for (int i = 0; i < arity; i++) {
+                                rowData.setField(
+                                        i, 
converters.get(i).toInternalOrNull(row.getField(i)));
+                            }
+                            return rowData;
+                        });
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java
similarity index 99%
rename from 
flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
rename to 
flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java
index 8b941fdc..28da14c4 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java
@@ -38,7 +38,7 @@ import java.util.Map;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link DropPartitionAction}. */
-public class DropPartitionITCase extends ActionITCaseBase {
+public class DropPartitionActionITCase extends ActionITCaseBase {
 
     private static final DataType[] FIELD_TYPES =
             new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING(), DataTypes.INT()};
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
new file mode 100644
index 00000000..0cefe77f
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.buildDdl;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.buildSimpleQuery;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.createTable;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.init;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.insertInto;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.sEnv;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.testBatchRead;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.testStreamingRead;
+import static 
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.validateStreamingReadResult;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+/** IT cases for {@link MergeIntoAction}. */
+public class MergeIntoActionITCase extends ActionITCaseBase {
+
+    private static final List<Row> initialRecords =
+            Arrays.asList(
+                    changelogRow("+I", 1, "v_1", "creation", "02-27"),
+                    changelogRow("+I", 2, "v_2", "creation", "02-27"),
+                    changelogRow("+I", 3, "v_3", "creation", "02-27"),
+                    changelogRow("+I", 4, "v_4", "creation", "02-27"),
+                    changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                    changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                    changelogRow("+I", 7, "v_7", "creation", "02-28"),
+                    changelogRow("+I", 8, "v_8", "creation", "02-28"),
+                    changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                    changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        init(warehouse);
+
+        // prepare table S
+        sEnv.executeSql(
+                buildDdl(
+                        "S",
+                        Arrays.asList("k INT", "v STRING", "dt STRING"),
+                        Arrays.asList("k", "dt"),
+                        Collections.singletonList("dt"),
+                        new HashMap<>()));
+
+        insertInto(
+                "S",
+                "(1, 'v_1', '02-27')",
+                "(4, CAST (NULL AS STRING), '02-27')",
+                "(7, 'Seven', '02-28')",
+                "(8, CAST (NULL AS STRING), '02-28')",
+                "(8, 'v_8', '02-29')",
+                "(11, 'v_11', '02-29')",
+                "(12, 'v_12', '02-29')");
+    }
+
+    @ParameterizedTest(name = "changelog-producer = {0}")
+    @MethodSource("producerTestData")
+    public void testVariousChangelogProducer(
+            CoreOptions.ChangelogProducer producer, List<Row> expected) throws 
Exception {
+        // prepare table T
+        prepareTable(producer);
+
+        // similar to:
+        // MERGE INTO T
+        // USING S
+        // ON T.k = S.k AND T.dt = S.dt
+        // WHEN MATCHED AND (T.v <> S.v AND S.v IS NOT NULL) THEN UPDATE
+        //   SET v = S.v, last_action = 'matched_upsert'
+        // WHEN MATCHED AND S.v IS NULL THEN DELETE
+        // WHEN NOT MATCHED THEN INSERT VALUES (S.k, S.v, 'insert', S.dt)
+        // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
+        //   SET v = v || '_nmu', last_action = 'not_matched_upsert'
+        // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
+        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        action.withSourceTable("S")
+                .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+                .withMatchedUpsert(
+                        "T.v <> S.v AND S.v IS NOT NULL", "v = S.v, 
last_action = 'matched_upsert'")
+                .withMatchedDelete("S.v IS NULL")
+                .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
+                .withNotMatchedBySourceUpsert(
+                        "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
+                .withNotMatchedBySourceDelete("dt >= '02-28'");
+
+        validateActionRunResult(
+                action,
+                expected,
+                Arrays.asList(
+                        changelogRow("+I", 1, "v_1", "creation", "02-27"),
+                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
+                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
+                        changelogRow("+I", 11, "v_11", "insert", "02-29"),
+                        changelogRow("+I", 12, "v_12", "insert", "02-29")));
+    }
+
+    @Test
+    public void testUsingSource() throws Exception {
+        // prepare table T
+        prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+        // similar to:
+        // MERGE INTO T
+        // USING (SELECT * FROM S WHERE k < 12) AS SS
+        // ON T.k = SS.k AND T.dt = SS.dt
+        // WHEN MATCHED AND (T.v <> SS.v AND SS.v IS NOT NULL) THEN UPDATE
+        //   SET v = SS.v, last_action = 'matched_upsert'
+        // WHEN MATCHED AND SS.v IS NULL THEN DELETE
+        // WHEN NOT MATCHED THEN INSERT VALUES (SS.k, SS.v, 'insert', SS.dt)
+        // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
+        //   SET v = v || '_nmu', last_action = 'not_matched_upsert'
+        // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
+        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        action.withSource("SELECT * FROM S WHERE k < 12", "SS")
+                .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+                .withMatchedUpsert(
+                        "T.v <> SS.v AND SS.v IS NOT NULL",
+                        "v = SS.v, last_action = 'matched_upsert'")
+                .withMatchedDelete("SS.v IS NULL")
+                .withNotMatchedInsert(null, "SS.k, SS.v, 'insert', SS.dt")
+                .withNotMatchedBySourceUpsert(
+                        "dt < '02-28'", "v = v || '_nmu', last_action = 
'not_matched_upsert'")
+                .withNotMatchedBySourceDelete("dt >= '02-28'");
+
+        validateActionRunResult(
+                action,
+                Arrays.asList(
+                        changelogRow("-U", 7, "v_7", "creation", "02-28"),
+                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
+                        changelogRow("-D", 4, "v_4", "creation", "02-27"),
+                        changelogRow("-D", 8, "v_8", "creation", "02-28"),
+                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
+                        changelogRow("+I", 11, "v_11", "insert", "02-29"),
+                        changelogRow("-U", 2, "v_2", "creation", "02-27"),
+                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("-U", 3, "v_3", "creation", "02-27"),
+                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("-D", 5, "v_5", "creation", "02-28"),
+                        changelogRow("-D", 6, "v_6", "creation", "02-28"),
+                        changelogRow("-D", 9, "v_9", "creation", "02-28"),
+                        changelogRow("-D", 10, "v_10", "creation", "02-28")),
+                Arrays.asList(
+                        changelogRow("+I", 1, "v_1", "creation", "02-27"),
+                        changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", 
"02-27"),
+                        changelogRow("+U", 7, "Seven", "matched_upsert", 
"02-28"),
+                        changelogRow("+I", 8, "v_8", "insert", "02-29"),
+                        changelogRow("+I", 11, "v_11", "insert", "02-29")));
+    }
+
+    @Test
+    public void testMatchedUpsertSetAll() throws Exception {
+        // prepare table T
+        prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+        // build MergeIntoAction
+        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+                .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+                .withMatchedUpsert(null, "*");
+
+        validateActionRunResult(
+                action,
+                Arrays.asList(
+                        changelogRow("-U", 1, "v_1", "creation", "02-27"),
+                        changelogRow("+U", 1, "v_1", "unknown", "02-27"),
+                        changelogRow("-U", 4, "v_4", "creation", "02-27"),
+                        changelogRow("+U", 4, null, "unknown", "02-27"),
+                        changelogRow("-U", 7, "v_7", "creation", "02-28"),
+                        changelogRow("+U", 7, "Seven", "unknown", "02-28"),
+                        changelogRow("-U", 8, "v_8", "creation", "02-28"),
+                        changelogRow("+U", 8, null, "unknown", "02-28")),
+                Arrays.asList(
+                        changelogRow("+U", 1, "v_1", "unknown", "02-27"),
+                        changelogRow("+I", 2, "v_2", "creation", "02-27"),
+                        changelogRow("+I", 3, "v_3", "creation", "02-27"),
+                        changelogRow("+U", 4, null, "unknown", "02-27"),
+                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                        changelogRow("+U", 7, "Seven", "unknown", "02-28"),
+                        changelogRow("+U", 8, null, "unknown", "02-28"),
+                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                        changelogRow("+I", 10, "v_10", "creation", "02-28")));
+    }
+
+    @Test
+    public void testNotMatchedInsertAll() throws Exception {
+        // prepare table T
+        prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+        // build MergeIntoAction
+        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+                .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+                .withNotMatchedInsert("SS.k < 12", "*");
+
+        validateActionRunResult(
+                action,
+                Arrays.asList(
+                        changelogRow("+I", 8, "v_8", "unknown", "02-29"),
+                        changelogRow("+I", 11, "v_11", "unknown", "02-29")),
+                Arrays.asList(
+                        changelogRow("+I", 1, "v_1", "creation", "02-27"),
+                        changelogRow("+I", 2, "v_2", "creation", "02-27"),
+                        changelogRow("+I", 3, "v_3", "creation", "02-27"),
+                        changelogRow("+I", 4, "v_4", "creation", "02-27"),
+                        changelogRow("+I", 5, "v_5", "creation", "02-28"),
+                        changelogRow("+I", 6, "v_6", "creation", "02-28"),
+                        changelogRow("+I", 7, "v_7", "creation", "02-28"),
+                        changelogRow("+I", 8, "v_8", "creation", "02-28"),
+                        changelogRow("+I", 9, "v_9", "creation", "02-28"),
+                        changelogRow("+I", 10, "v_10", "creation", "02-28"),
+                        changelogRow("+I", 8, "v_8", "unknown", "02-29"),
+                        changelogRow("+I", 11, "v_11", "unknown", "02-29")));
+    }
+
+    // 
----------------------------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
----------------------------------------------------------------------------------------------------------------
+
+    @Test
+    public void testInsertChangesActionWithNonPkTable() {
+        String nonPkTable =
+                createTable(
+                        Collections.singletonList("k int"),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+
+        assertThatThrownBy(() -> new MergeIntoAction(warehouse, database, 
nonPkTable))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(
+                        "merge-into action doesn't support table with no 
primary keys defined.");
+    }
+
+    @Test
+    public void testIncompatibleSchema() throws Exception {
+        // prepare table T
+        prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+        // build MergeIntoAction
+        MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+        action.withSourceTable("S")
+                .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+                .withNotMatchedInsert(null, "S.k, S.v, 0, S.dt");
+
+        assertThatThrownBy(action::run)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "The schema of result in action 'not-matched-insert' 
is invalid.\n"
+                                + "Result schema:   [INT NOT NULL, STRING, INT 
NOT NULL, STRING NOT NULL]\n"
+                                + "Expected schema: [INT NOT NULL, STRING, 
STRING, STRING NOT NULL]");
+    }
+
+    private void validateActionRunResult(
+            MergeIntoAction action, List<Row> streamingExpected, List<Row> 
batchExpected)
+            throws Exception {
+        BlockingIterator<Row, Row> iterator =
+                testStreamingRead(buildSimpleQuery("T"), initialRecords);
+        action.run();
+        // test streaming read
+        validateStreamingReadResult(iterator, streamingExpected);
+        iterator.close();
+        // test batch read
+        testBatchRead(buildSimpleQuery("T"), batchExpected);
+    }
+
+    private void prepareTable(CoreOptions.ChangelogProducer producer) throws 
Exception {
+        sEnv.executeSql(
+                buildDdl(
+                        "T",
+                        Arrays.asList("k INT", "v STRING", "last_action 
STRING", "dt STRING"),
+                        Arrays.asList("k", "dt"),
+                        Collections.singletonList("dt"),
+                        new HashMap<String, String>() {
+                            {
+                                put(CHANGELOG_PRODUCER.key(), 
producer.toString());
+                            }
+                        }));
+
+        insertInto(
+                "T",
+                "(1, 'v_1', 'creation', '02-27')",
+                "(2, 'v_2', 'creation', '02-27')",
+                "(3, 'v_3', 'creation', '02-27')",
+                "(4, 'v_4', 'creation', '02-27')",
+                "(5, 'v_5', 'creation', '02-28')",
+                "(6, 'v_6', 'creation', '02-28')",
+                "(7, 'v_7', 'creation', '02-28')",
+                "(8, 'v_8', 'creation', '02-28')",
+                "(9, 'v_9', 'creation', '02-28')",
+                "(10, 'v_10', 'creation', '02-28')");
+    }
+
+    private static List<Arguments> producerTestData() {
+        return Arrays.asList(
+                arguments(
+                        CoreOptions.ChangelogProducer.NONE,
+                        Arrays.asList(
+                                changelogRow("-U", 7, "v_7", "creation", 
"02-28"),
+                                changelogRow("+U", 7, "Seven", 
"matched_upsert", "02-28"),
+                                changelogRow("-D", 4, "v_4", "creation", 
"02-27"),
+                                changelogRow("-D", 8, "v_8", "creation", 
"02-28"),
+                                changelogRow("+I", 8, "v_8", "insert", 
"02-29"),
+                                changelogRow("+I", 11, "v_11", "insert", 
"02-29"),
+                                changelogRow("+I", 12, "v_12", "insert", 
"02-29"),
+                                changelogRow("-U", 2, "v_2", "creation", 
"02-27"),
+                                changelogRow("+U", 2, "v_2_nmu", 
"not_matched_upsert", "02-27"),
+                                changelogRow("-U", 3, "v_3", "creation", 
"02-27"),
+                                changelogRow("+U", 3, "v_3_nmu", 
"not_matched_upsert", "02-27"),
+                                changelogRow("-D", 5, "v_5", "creation", 
"02-28"),
+                                changelogRow("-D", 6, "v_6", "creation", 
"02-28"),
+                                changelogRow("-D", 9, "v_9", "creation", 
"02-28"),
+                                changelogRow("-D", 10, "v_10", "creation", 
"02-28"))),
+                arguments(
+                        CoreOptions.ChangelogProducer.INPUT,
+                        Arrays.asList(
+                                changelogRow("+U", 7, "Seven", 
"matched_upsert", "02-28"),
+                                changelogRow("-D", 4, "v_4", "creation", 
"02-27"),
+                                changelogRow("-D", 8, "v_8", "creation", 
"02-28"),
+                                changelogRow("+I", 8, "v_8", "insert", 
"02-29"),
+                                changelogRow("+I", 11, "v_11", "insert", 
"02-29"),
+                                changelogRow("+I", 12, "v_12", "insert", 
"02-29"),
+                                changelogRow("+U", 2, "v_2_nmu", 
"not_matched_upsert", "02-27"),
+                                changelogRow("+U", 3, "v_3_nmu", 
"not_matched_upsert", "02-27"),
+                                changelogRow("-D", 5, "v_5", "creation", 
"02-28"),
+                                changelogRow("-D", 6, "v_6", "creation", 
"02-28"),
+                                changelogRow("-D", 9, "v_9", "creation", 
"02-28"),
+                                changelogRow("-D", 10, "v_10", "creation", 
"02-28"))),
+                arguments(
+                        CoreOptions.ChangelogProducer.FULL_COMPACTION,
+                        Arrays.asList(
+                                changelogRow("-U", 7, "v_7", "creation", 
"02-28"),
+                                changelogRow("+U", 7, "Seven", 
"matched_upsert", "02-28"),
+                                changelogRow("-D", 4, "v_4", "creation", 
"02-27"),
+                                changelogRow("-D", 8, "v_8", "creation", 
"02-28"),
+                                changelogRow("+I", 8, "v_8", "insert", 
"02-29"),
+                                changelogRow("+I", 11, "v_11", "insert", 
"02-29"),
+                                changelogRow("+I", 12, "v_12", "insert", 
"02-29"),
+                                changelogRow("-U", 2, "v_2", "creation", 
"02-27"),
+                                changelogRow("+U", 2, "v_2_nmu", 
"not_matched_upsert", "02-27"),
+                                changelogRow("-U", 3, "v_3", "creation", 
"02-27"),
+                                changelogRow("+U", 3, "v_3_nmu", 
"not_matched_upsert", "02-27"),
+                                changelogRow("-D", 5, "v_5", "creation", 
"02-28"),
+                                changelogRow("-D", 6, "v_6", "creation", 
"02-28"),
+                                changelogRow("-D", 9, "v_9", "creation", 
"02-28"),
+                                changelogRow("-D", 10, "v_10", "creation", 
"02-28"))));
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
index 92d72b6e..69841a59 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
@@ -317,7 +317,7 @@ public class ReadWriteTableTestUtil {
         assertThat(expectedRecords).isEmpty();
     }
 
-    private static String buildDdl(
+    public static String buildDdl(
             String table,
             List<String> fieldsSpec,
             List<String> primaryKeys,

Reply via email to