This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 7080cfae [FLINK-31087] Introduce merge-into action 7080cfae is described below commit 7080cfae9c9cce9db54b75e607a3d43cefc19ca7 Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Fri Mar 3 13:58:32 2023 +0800 [FLINK-31087] Introduce merge-into action This closes #540 --- docs/content/docs/how-to/writing-tables.md | 118 ++++ .../flink/table/store/types/DataTypeCasts.java | 56 +- .../flink/table/store/file/catalog/Identifier.java | 4 + .../table/store/tests/FlinkActionsE2eTest.java | 109 ++++ .../flink/table/store/connector/action/Action.java | 37 +- .../table/store/connector/action/ActionBase.java | 71 +++ .../table/store/connector/action/DeleteAction.java | 22 +- .../store/connector/action/MergeIntoAction.java | 677 +++++++++++++++++++++ ...nITCase.java => DropPartitionActionITCase.java} | 2 +- .../connector/action/MergeIntoActionITCase.java | 383 ++++++++++++ .../connector/util/ReadWriteTableTestUtil.java | 2 +- 11 files changed, 1444 insertions(+), 37 deletions(-) diff --git a/docs/content/docs/how-to/writing-tables.md b/docs/content/docs/how-to/writing-tables.md index e77c47a1..2d3d0deb 100644 --- a/docs/content/docs/how-to/writing-tables.md +++ b/docs/content/docs/how-to/writing-tables.md @@ -268,3 +268,121 @@ For more information of 'delete', see {{< /tab >}} {{< /tabs >}} + +## Merging into table + +Table Store supports "MERGE INTO" via submitting the 'merge-into' job through `flink run`. + +{{< hint info >}} +Important table properties setting: +1. Only [primary key table]({{< ref "docs/concepts/primary-key-table" >}}) supports this feature. +2. The action won't produce UPDATE_BEFORE, so it's not recommended to set 'changelog-producer' = 'input'. +{{< /hint >}} + +The design referenced such syntax: +```sql +MERGE INTO target-table + USING source-table | source-expr AS source-alias + ON merge-condition + WHEN MATCHED [AND matched-condition] + THEN UPDATE SET xxx + WHEN MATCHED [AND matched-condition] + THEN DELETE + WHEN NOT MATCHED [AND not-matched-condition] + THEN INSERT VALUES (xxx) + WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition] + THEN UPDATE SET xxx + WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition] + THEN DELETE +``` +The merge-into action use "upsert" semantics instead of "update", which means if the row exists, +then do update, else do insert. For example, for non-primary-key table, you can update every column, +but for primary key table, if you want to update primary keys, you have to insert a new row which has +different primary keys from rows in the table. In this scenario, "upsert" is useful. + +{{< tabs "merge-into" >}} + +{{< tab "Flink Job" >}} + +Run the following command to submit a 'merge-into' job for the table. + +```bash +<FLINK_HOME>/bin/flink run \ + -c org.apache.flink.table.store.connector.action.FlinkActions \ + /path/to/flink-table-store-flink-**-{{< version >}}.jar \ + merge-into \ + --warehouse <warehouse-path> \ + --database <database-name> \ + --table <target-table> \ + --using-table <source-table> \ + --on <merge-condition> \ + --merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete> \ + --matched-upsert-condition <matched-condition> \ + --matched-upsert-set <upsert-changes> \ + --matched-delete-condition <matched-condition> \ + --not-matched-insert-condition <not-matched-condition> \ + --not-matched-insert-values <insert-values> \ + --not-matched-by-source-upsert-condition <not-matched-by-source-condition> \ + --not-matched-by-source-upsert-set <not-matched-upsert-changes> \ + --not-matched-by-source-delete-condition <not-matched-by-source-condition> + +-- Examples: +-- Find all orders mentioned in the source table, then mark as important if the price is above 100 +-- or delete if the price is under 10. +./flink run \ + -c org.apache.flink.table.store.connector.action.FlinkActions \ + /path/to/flink-table-store-flink-**-{{< version >}}.jar \ + merge-into \ + --warehouse <warehouse-path> \ + --database <database-name> \ + --table T \ + --using-table S \ + --on "T.id = S.order_id" \ + --merge-actions \ + matched-upsert,matched-delete \ + --matched-upsert-condition "T.price > 100" \ + --matched-upsert-set "id = T.id, price = T.price, mark = 'important'" \ + --matched-delete-condition "T.price < 10" + +-- For matched order rows, increase the price, and if there is no match, insert the order from the +-- source table: +./flink run \ + -c org.apache.flink.table.store.connector.action.FlinkActions \ + /path/to/flink-table-store-flink-**-{{< version >}}.jar \ + merge-into \ + --warehouse <warehouse-path> \ + --database <database-name> \ + --table T \ + --using-table S \ + --on "T.id = S.order_id" \ + --merge-actions \ + matched-upsert,not-matched-insert \ + --matched-upsert-set "id = T.id, price = T.price + 20, mark = T.mark" \ + --not-matched-insert-values * + +-- For not matched by source order rows (which are in the target table and does not match any row in the +-- source table based on the merge-condition), decrease the price or if the mark is 'trivial', delete them: +./flink run \ + -c org.apache.flink.table.store.connector.action.FlinkActions \ + /path/to/flink-table-store-flink-**-{{< version >}}.jar \ + merge-into \ + --warehouse <warehouse-path> \ + --database <database-name> \ + --table T \ + --using-table S \ + --on "T.id = S.order_id" \ + --merge-actions \ + not-matched-by-source-upsert,not-matched-by-source-delete \ + --not-matched-by-source-upsert-condition "T.mark <> 'trivial'" \ + --not-matched-by-source-upsert-set "id = T.id, price = T.price - 20, mark = T.mark" \ + --not-matched-by-source-delete-condition "T.mark = 'trivial'" +``` + +For more information of 'merge-into', see + +```bash +<FLINK_HOME>/bin/flink run \ + -c org.apache.flink.table.store.connector.action.FlinkActions \ + /path/to/flink-table-store-flink-**-{{< version >}}.jar \ + merge-into --help +``` diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java index 72260d2d..637a6dc1 100644 --- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java @@ -57,9 +57,12 @@ public final class DataTypeCasts { private static final Map<DataTypeRoot, Set<DataTypeRoot>> explicitCastingRules; + private static final Map<DataTypeRoot, Set<DataTypeRoot>> compatibleCastingRules; + static { implicitCastingRules = new HashMap<>(); explicitCastingRules = new HashMap<>(); + compatibleCastingRules = new HashMap<>(); // identity casts @@ -69,28 +72,36 @@ public final class DataTypeCasts { // cast specification - castTo(CHAR).implicitFrom(CHAR).explicitFromFamily(PREDEFINED, CONSTRUCTED).build(); + castTo(CHAR) + .implicitFrom(CHAR) + .explicitFromFamily(PREDEFINED, CONSTRUCTED) + .compatibleFrom(CHAR, VARCHAR) + .build(); castTo(VARCHAR) .implicitFromFamily(CHARACTER_STRING) .explicitFromFamily(PREDEFINED, CONSTRUCTED) + .compatibleFrom(CHAR, VARCHAR) .build(); castTo(BOOLEAN) .implicitFrom(BOOLEAN) .explicitFromFamily(CHARACTER_STRING, INTEGER_NUMERIC) + .compatibleFrom(BOOLEAN) .build(); castTo(BINARY) .implicitFrom(BINARY) .explicitFromFamily(CHARACTER_STRING) .explicitFrom(VARBINARY) + .compatibleFrom(BINARY, VARBINARY) .build(); castTo(VARBINARY) .implicitFromFamily(BINARY_STRING) .explicitFromFamily(CHARACTER_STRING) .explicitFrom(BINARY) + .compatibleFrom(BINARY, VARBINARY) .build(); castTo(DECIMAL) @@ -103,61 +114,71 @@ public final class DataTypeCasts { .implicitFrom(TINYINT) .explicitFromFamily(NUMERIC, CHARACTER_STRING) .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) + .compatibleFrom(TINYINT) .build(); castTo(SMALLINT) .implicitFrom(TINYINT, SMALLINT) .explicitFromFamily(NUMERIC, CHARACTER_STRING) .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) + .compatibleFrom(SMALLINT) .build(); castTo(INTEGER) .implicitFrom(TINYINT, SMALLINT, INTEGER) .explicitFromFamily(NUMERIC, CHARACTER_STRING) .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) + .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE) .build(); castTo(BIGINT) .implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT) .explicitFromFamily(NUMERIC, CHARACTER_STRING) .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) + .compatibleFrom(BIGINT) .build(); castTo(FLOAT) .implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DECIMAL) .explicitFromFamily(NUMERIC, CHARACTER_STRING) .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) + .compatibleFrom(FLOAT) .build(); castTo(DOUBLE) .implicitFromFamily(NUMERIC) .explicitFromFamily(CHARACTER_STRING) .explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) + .compatibleFrom(DOUBLE) .build(); castTo(DATE) .implicitFrom(DATE, TIMESTAMP_WITHOUT_TIME_ZONE) .explicitFromFamily(TIMESTAMP, CHARACTER_STRING) + .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE) .build(); castTo(TIME_WITHOUT_TIME_ZONE) .implicitFrom(TIME_WITHOUT_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE) .explicitFromFamily(TIME, TIMESTAMP, CHARACTER_STRING) + .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE) .build(); castTo(TIMESTAMP_WITHOUT_TIME_ZONE) .implicitFrom(TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) .explicitFromFamily(DATETIME, CHARACTER_STRING, NUMERIC) + .compatibleFrom(TIMESTAMP_WITHOUT_TIME_ZONE) .build(); castTo(TIMESTAMP_WITH_LOCAL_TIME_ZONE) .implicitFrom(TIMESTAMP_WITH_LOCAL_TIME_ZONE, TIMESTAMP_WITHOUT_TIME_ZONE) .explicitFromFamily(DATETIME, CHARACTER_STRING, NUMERIC) + .compatibleFrom(TIMESTAMP_WITH_LOCAL_TIME_ZONE) .build(); } /** - * Returns whether the source type can be safely casted to the target type without loosing + * Returns whether the source type can be safely cast to the target type without loosing * information. * * <p>Implicit casts are used for type widening and type generalization (finding a common @@ -169,7 +190,7 @@ public final class DataTypeCasts { } /** - * Returns whether the source type can be casted to the target type. + * Returns whether the source type can be cast to the target type. * * <p>Explicit casts correspond to the SQL cast specification and represent the logic behind a * {@code CAST(sourceType AS targetType)} operation. For example, it allows for converting most @@ -180,6 +201,28 @@ public final class DataTypeCasts { return supportsCasting(sourceType, targetType, true); } + /** + * Returns whether the source type can be compatibly cast to the target type. + * + * <p>If two types are compatible, they should have the same underlying data structure. For + * example, {@link CharType} and {@link VarCharType} are both in the {@link + * DataTypeFamily#CHARACTER_STRING} family, meaning they both represent a character string. But + * the rest types are only compatible with themselves. For example, although {@link IntType} and + * {@link BigIntType} are both in the {@link DataTypeFamily#NUMERIC} family, they are not + * compatible because IntType represents a 4-byte signed integer while BigIntType represents an + * 8-byte signed integer. Especially, two {@link DecimalType}s are compatible only when they + * have the same {@code precision} and {@code scale}. + */ + public static boolean supportsCompatibleCast(DataType sourceType, DataType targetType) { + if (sourceType.copy(true).equals(targetType.copy(true))) { + return true; + } + + return compatibleCastingRules + .get(targetType.getTypeRoot()) + .contains(sourceType.getTypeRoot()); + } + // -------------------------------------------------------------------------------------------- private static boolean supportsCasting( @@ -219,6 +262,7 @@ public final class DataTypeCasts { private final DataTypeRoot targetType; private final Set<DataTypeRoot> implicitSourceTypes = new HashSet<>(); private final Set<DataTypeRoot> explicitSourceTypes = new HashSet<>(); + private final Set<DataTypeRoot> compatibleSourceTypes = new HashSet<>(); CastingRuleBuilder(DataTypeRoot targetType) { this.targetType = targetType; @@ -256,6 +300,11 @@ public final class DataTypeCasts { return this; } + CastingRuleBuilder compatibleFrom(DataTypeRoot... sourceTypes) { + this.compatibleSourceTypes.addAll(Arrays.asList(sourceTypes)); + return this; + } + /** * Should be called after {@link #explicitFromFamily(DataTypeFamily...)} to remove * previously added types. @@ -274,6 +323,7 @@ public final class DataTypeCasts { void build() { implicitCastingRules.put(targetType, implicitSourceTypes); explicitCastingRules.put(targetType, explicitSourceTypes); + compatibleCastingRules.put(targetType, compatibleSourceTypes); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java index 397172ad..b7b37be8 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java @@ -58,6 +58,10 @@ public class Identifier implements Serializable { return String.format("%s.%s", database, table); } + public String getEscapedFullName() { + return getEscapedFullName('`'); + } + public String getEscapedFullName(char escapeChar) { return String.format( "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar, table, escapeChar); diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java index 69ca1ac0..a5a0376f 100644 --- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java +++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java @@ -256,6 +256,115 @@ public class FlinkActionsE2eTest extends E2eTestBase { "2023-01-21, 1, 31"); } + @Test + public void testMergeInto() throws Exception { + String tableTDdl = + "CREATE TABLE IF NOT EXISTS T (\n" + + " k INT,\n" + + " v STRING,\n" + + " last_action STRING,\n" + + " dt STRING,\n" + + " PRIMARY KEY (k, dt) NOT ENFORCED\n" + + ") PARTITIONED BY (dt);"; + + String insertToT = + "INSERT INTO T VALUES" + + "(1, 'v_1', 'creation', '02-27')," + + "(2, 'v_2', 'creation', '02-27')," + + "(3, 'v_3', 'creation', '02-27')," + + "(4, 'v_4', 'creation', '02-27')," + + "(5, 'v_5', 'creation', '02-28')," + + "(6, 'v_6', 'creation', '02-28')," + + "(7, 'v_7', 'creation', '02-28')," + + "(8, 'v_8', 'creation', '02-28')," + + "(9, 'v_9', 'creation', '02-28')," + + "(10, 'v_10', 'creation', '02-28');\n"; + + String tableSDdl = + "CREATE TABLE IF NOT EXISTS S (\n" + + " k INT,\n" + + " v STRING,\n" + + " dt STRING,\n" + + " PRIMARY KEY (k, dt) NOT ENFORCED\n" + + ") PARTITIONED BY (dt);"; + + String insertToS = + "INSERT INTO S VALUES" + + "(1, 'v_1', '02-27')," + + "(4, CAST (NULL AS STRING), '02-27')," + + "(7, 'Seven', '02-28')," + + "(8, CAST (NULL AS STRING), '02-28')," + + "(8, 'v_8', '02-29')," + + "(11, 'v_11', '02-29')," + + "(12, 'v_12', '02-29');\n"; + + runSql( + "SET 'table.dml-sync' = 'true';\n" + insertToT + insertToS, + catalogDdl, + useCatalogCmd, + tableTDdl, + tableSDdl); + + // run merge-into job + Container.ExecResult execResult = + jobManager.execInContainer( + "bin/flink", + "run", + "-p", + "1", + "-c", + "org.apache.flink.table.store.connector.action.FlinkActions", + "--detached", + "lib/flink-table-store.jar", + "merge-into", + "--warehouse", + warehousePath, + "--database", + "default", + "--table", + "T", + "--using-table", + "S", + "--on", + "T.k=S.k AND T.dt=S.dt", + "--merge-actions", + "matched-upsert,matched-delete,not-matched-insert", + "--matched-upsert-condition", + "T.v <> S.v AND S.v IS NOT NULL", + "--matched-upsert-set", + "v = S.v, last_action = 'matched_upsert'", + "--matched-delete-condition", + "S.v IS NULL", + "--not-matched-insert-condition", + "S.k < 12", + "--not-matched-insert-values", + "S.k, S.v, 'insert', S.dt"); + + LOG.info(execResult.getStdout()); + LOG.info(execResult.getStderr()); + + // read all data from table store + runSql( + "INSERT INTO result1 SELECT * FROM T;", + catalogDdl, + useCatalogCmd, + tableTDdl, + createResultSink("result1", "k INT, v STRING, last_action STRING, dt STRING")); + + // check the left data + checkResult( + "1, v_1, creation, 02-27", + "2, v_2, creation, 02-27", + "3, v_3, creation, 02-27", + "5, v_5, creation, 02-28", + "6, v_6, creation, 02-28", + "7, Seven, matched_upsert, 02-28", + "8, v_8, insert, 02-29", + "9, v_9, creation, 02-28", + "10, v_10, creation, 02-28", + "11, v_11, insert, 02-29"); + } + private void runSql(String sql, String... ddls) throws Exception { runSql(String.join("\n", ddls) + "\n" + sql); } diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java index ab758018..cf4ed56c 100644 --- a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java +++ b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java @@ -79,18 +79,9 @@ public interface Action { static List<Map<String, String>> getPartitions(MultipleParameterTool params) { List<Map<String, String>> partitions = new ArrayList<>(); for (String partition : params.getMultiParameter("partition")) { - Map<String, String> kvs = new HashMap<>(); - for (String kvString : partition.split(",")) { - String[] kv = kvString.split("="); - if (kv.length != 2) { - System.err.print( - "Invalid key-value pair \"" - + kvString - + "\".\n" - + "Run <action> --help for help."); - return null; - } - kvs.put(kv[0], kv[1]); + Map<String, String> kvs = parseKeyValues(partition); + if (kvs == null) { + return null; } partitions.add(kvs); } @@ -98,6 +89,24 @@ public interface Action { return partitions; } + static Map<String, String> parseKeyValues(String keyValues) { + Map<String, String> kvs = new HashMap<>(); + for (String kvString : keyValues.split(",")) { + String[] kv = kvString.split("="); + if (kv.length != 2) { + System.err.print( + "Invalid key-value pair \"" + + kvString + + "\".\n" + + "Run <action> --help for help."); + return null; + } + kvs.put(kv[0].trim(), kv[1].trim()); + } + + return kvs; + } + /** Factory to create {@link Action}. */ class Factory { @@ -105,6 +114,7 @@ public interface Action { private static final String COMPACT = "compact"; private static final String DROP_PARTITION = "drop-partition"; private static final String DELETE = "delete"; + private static final String MERGE_INTO = "merge-into"; public static Optional<Action> create(String[] args) { String action = args[0].toLowerCase(); @@ -117,6 +127,8 @@ public interface Action { return DropPartitionAction.create(actionArgs); case DELETE: return DeleteAction.create(actionArgs); + case MERGE_INTO: + return MergeIntoAction.create(actionArgs); default: System.err.println("Unknown action \"" + action + "\""); printHelp(); @@ -132,6 +144,7 @@ public interface Action { System.out.println(" " + COMPACT); System.out.println(" " + DROP_PARTITION); System.out.println(" " + DELETE); + System.out.println(" " + MERGE_INTO); System.out.println("For detailed options of each action, run <action> --help"); } diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java index 3cd36fc0..2e9014c8 100644 --- a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java +++ b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java @@ -18,17 +18,35 @@ package org.apache.flink.table.store.connector.action; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.catalog.CatalogContext; +import org.apache.flink.table.store.connector.FlinkCatalog; +import org.apache.flink.table.store.connector.LogicalTypeConversion; +import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder; import org.apache.flink.table.store.file.catalog.Catalog; import org.apache.flink.table.store.file.catalog.CatalogFactory; import org.apache.flink.table.store.file.catalog.Identifier; +import org.apache.flink.table.store.file.operation.Lock; import org.apache.flink.table.store.options.CatalogOptions; import org.apache.flink.table.store.options.Options; +import org.apache.flink.table.store.table.FileStoreTable; import org.apache.flink.table.store.table.Table; +import org.apache.flink.table.store.types.DataType; +import org.apache.flink.table.store.types.DataTypeCasts; +import org.apache.flink.table.types.logical.LogicalType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE; + /** Abstract base of {@link Action}. */ public abstract class ActionBase implements Action { @@ -36,6 +54,12 @@ public abstract class ActionBase implements Action { protected Catalog catalog; + protected final FlinkCatalog flinkCatalog; + + protected StreamExecutionEnvironment env; + + protected StreamTableEnvironment tEnv; + protected Identifier identifier; protected Table table; @@ -50,6 +74,14 @@ public abstract class ActionBase implements Action { CatalogFactory.createCatalog( CatalogContext.create( new Options().set(CatalogOptions.WAREHOUSE, warehouse))); + flinkCatalog = new FlinkCatalog(catalog, "table-store", DEFAULT_DATABASE); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); + + // register flink catalog to table environment + tEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog); + tEnv.useCatalog(flinkCatalog.getName()); try { table = catalog.getTable(identifier); @@ -62,4 +94,43 @@ public abstract class ActionBase implements Action { throw new RuntimeException(e); } } + + /** + * Extract {@link LogicalType}s from Flink {@link org.apache.flink.table.types.DataType}s and + * convert to Table Store {@link DataType}s. + */ + protected List<DataType> toTableStoreDataTypes( + List<org.apache.flink.table.types.DataType> flinkDataTypes) { + return flinkDataTypes.stream() + .map(org.apache.flink.table.types.DataType::getLogicalType) + .map(LogicalTypeConversion::toDataType) + .collect(Collectors.toList()); + } + + /** + * Check whether each {@link DataType} of actualTypes is compatible with that of expectedTypes + * respectively. + */ + protected boolean compatibleCheck(List<DataType> actualTypes, List<DataType> expectedTypes) { + if (actualTypes.size() != expectedTypes.size()) { + return false; + } + + for (int i = 0; i < actualTypes.size(); i++) { + if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i), expectedTypes.get(i))) { + return false; + } + } + + return true; + } + + /** Sink {@link DataStream} dataStream to table. */ + protected void sink(DataStream<RowData> dataStream) throws Exception { + new FlinkSinkBuilder((FileStoreTable) table) + .withInput(dataStream) + .withLockFactory(Lock.factory(catalog.lockFactory().orElse(null), identifier)) + .build(); + env.execute(); + } } diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java index 8f616766..6d18d52d 100644 --- a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java +++ b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java @@ -21,17 +21,11 @@ package org.apache.flink.table.store.connector.action; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.conversion.DataStructureConverter; import org.apache.flink.table.data.conversion.DataStructureConverters; -import org.apache.flink.table.store.connector.FlinkCatalog; -import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder; -import org.apache.flink.table.store.table.FileStoreTable; import org.apache.flink.types.RowKind; import org.slf4j.Logger; @@ -42,21 +36,17 @@ import java.util.Optional; import java.util.stream.Collectors; import static org.apache.flink.table.store.connector.action.Action.getTablePath; -import static org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE; /** Delete from table action for Flink. */ public class DeleteAction extends ActionBase { private static final Logger LOG = LoggerFactory.getLogger(DeleteAction.class); - private final FlinkCatalog flinkCatalog; - private final String filter; public DeleteAction(String warehouse, String databaseName, String tableName, String filter) { super(warehouse, databaseName, tableName); this.filter = filter; - flinkCatalog = new FlinkCatalog(catalog, "table-store", DEFAULT_DATABASE); } public static Optional<Action> create(String[] args) { @@ -111,18 +101,11 @@ public class DeleteAction extends ActionBase { public void run() throws Exception { LOG.debug("Run delete action with filter '{}'.", filter); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamTableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode()); - - tEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog); - tEnv.useCatalog(flinkCatalog.getName()); - Table queriedTable = tEnv.sqlQuery( String.format( "SELECT * FROM %s WHERE %s", - identifier.getEscapedFullName('`'), filter)); + identifier.getEscapedFullName(), filter)); List<DataStructureConverter<Object, Object>> converters = queriedTable.getResolvedSchema().getColumnDataTypes().stream() @@ -146,7 +129,6 @@ public class DeleteAction extends ActionBase { return rowData; }); - new FlinkSinkBuilder((FileStoreTable) table).withInput(dataStream).build(); - env.execute(); + sink(dataStream); } } diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java new file mode 100644 index 00000000..e131087d --- /dev/null +++ b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java @@ -0,0 +1,677 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.action; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.MultipleParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.store.connector.LogicalTypeConversion; +import org.apache.flink.table.store.file.catalog.Identifier; +import org.apache.flink.table.store.table.FileStoreTable; +import org.apache.flink.table.store.types.DataField; +import org.apache.flink.table.store.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowKind; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.table.store.connector.action.Action.getTablePath; +import static org.apache.flink.table.store.connector.action.Action.parseKeyValues; + +/** + * Flink action for 'MERGE INTO', which references the syntax as follows (we use 'upsert' semantics + * instead of 'update'): + * + * <pre><code> + * MERGE INTO target-table + * USING source-table | source-expr AS source-alias + * ON merge-condition + * WHEN MATCHED [AND matched-condition] + * THEN UPDATE SET xxx + * WHEN MATCHED [AND matched-condition] + * THEN DELETE + * WHEN NOT MATCHED [AND not-matched-condition] + * THEN INSERT VALUES (xxx) + * WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition] + * THEN UPDATE SET xxx + * WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition] + * THEN DELETE + * </code></pre> + * + * <p>It builds a query to find the rows to be changed. INNER JOIN with merge-condition is used to + * find MATCHED rows, and NOT EXISTS with merge-condition is used to find NOT MATCHED rows, then the + * condition of each action is used to filter the rows. + */ +public class MergeIntoAction extends ActionBase { + + private static final Logger LOG = LoggerFactory.getLogger(MergeIntoAction.class); + + // primary keys of target table + private final List<String> primaryKeys; + + // converters for Row to RowData + private final List<DataStructureConverter<Object, Object>> converters; + + // field names of target table + private final List<String> targetFieldNames; + + // source + @Nullable private String sourceTable; + private Identifier sourceTableIdentifier; + @Nullable private String source; + @Nullable private String sourceAlias; + + // merge condition + private String mergeCondition; + + // actions to be taken + private boolean matchedUpsert; + private boolean notMatchedUpsert; + private boolean matchedDelete; + private boolean notMatchedDelete; + private boolean insert; + + // upsert + @Nullable private String matchedUpsertCondition; + @Nullable private String matchedUpsertSet; + + @Nullable private String notMatchedBySourceUpsertCondition; + @Nullable private String notMatchedBySourceUpsertSet; + + // delete + @Nullable private String matchedDeleteCondition; + @Nullable private String notMatchedBySourceDeleteCondition; + + // insert + @Nullable private String notMatchedInsertCondition; + @Nullable private String notMatchedInsertValues; + + MergeIntoAction(String warehouse, String database, String tableName) { + super(warehouse, database, tableName); + + if (!(table instanceof FileStoreTable)) { + throw new UnsupportedOperationException( + String.format( + "Only FileStoreTable supports merge-into action. The table type is '%s'.", + table.getClass().getName())); + } + + // init primaryKeys of target table + primaryKeys = ((FileStoreTable) table).schema().primaryKeys(); + if (primaryKeys.isEmpty()) { + throw new UnsupportedOperationException( + "merge-into action doesn't support table with no primary keys defined."); + } + + // init DataStructureConverters + converters = + table.rowType().getFieldTypes().stream() + .map(LogicalTypeConversion::toLogicalType) + .map(TypeConversions::fromLogicalToDataType) + .map(DataStructureConverters::getConverter) + .collect(Collectors.toList()); + + // init field names of target table + targetFieldNames = + table.rowType().getFields().stream() + .map(DataField::name) + .collect(Collectors.toList()); + } + + public MergeIntoAction withSourceTable(String sourceTable) { + this.sourceTable = sourceTable; + return this; + } + + public MergeIntoAction withSource(String source, String sourceAlias) { + this.source = source; + this.sourceAlias = sourceAlias; + return this; + } + + public MergeIntoAction withMergeCondition(String mergeCondition) { + this.mergeCondition = mergeCondition; + return this; + } + + public MergeIntoAction withMatchedUpsert( + @Nullable String matchedUpsertCondition, String matchedUpsertSet) { + this.matchedUpsert = true; + this.matchedUpsertCondition = matchedUpsertCondition; + this.matchedUpsertSet = matchedUpsertSet; + return this; + } + + public MergeIntoAction withNotMatchedBySourceUpsert( + @Nullable String notMatchedBySourceUpsertCondition, + String notMatchedBySourceUpsertSet) { + this.notMatchedUpsert = true; + this.notMatchedBySourceUpsertCondition = notMatchedBySourceUpsertCondition; + this.notMatchedBySourceUpsertSet = notMatchedBySourceUpsertSet; + return this; + } + + public MergeIntoAction withMatchedDelete(@Nullable String matchedDeleteCondition) { + this.matchedDelete = true; + this.matchedDeleteCondition = matchedDeleteCondition; + return this; + } + + public MergeIntoAction withNotMatchedBySourceDelete( + @Nullable String notMatchedBySourceDeleteCondition) { + this.notMatchedDelete = true; + this.notMatchedBySourceDeleteCondition = notMatchedBySourceDeleteCondition; + return this; + } + + public MergeIntoAction withNotMatchedInsert( + @Nullable String notMatchedInsertCondition, String notMatchedInsertValues) { + this.insert = true; + this.notMatchedInsertCondition = notMatchedInsertCondition; + this.notMatchedInsertValues = notMatchedInsertValues; + return this; + } + + public static Optional<Action> create(String[] args) { + LOG.info("merge-into job args: {}", String.join(" ", args)); + + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + + if (params.has("help")) { + printHelp(); + return Optional.empty(); + } + + Tuple3<String, String, String> tablePath = getTablePath(params); + if (tablePath == null) { + return Optional.empty(); + } + + MergeIntoAction action = new MergeIntoAction(tablePath.f0, tablePath.f1, tablePath.f2); + + if (!initSource(params, action)) { + return Optional.empty(); + } + + if (argumentAbsent(params, "on")) { + return Optional.empty(); + } + action.withMergeCondition(params.get("on")); + + List<String> actions = + Arrays.stream(params.get("merge-actions").split(",")) + .map(String::trim) + .collect(Collectors.toList()); + if (actions.contains("matched-upsert")) { + if (argumentAbsent(params, "matched-upsert-set")) { + return Optional.empty(); + } + action.withMatchedUpsert( + params.get("matched-upsert-condition"), params.get("matched-upsert-set")); + } + if (actions.contains("not-matched-by-source-upsert")) { + if (argumentAbsent(params, "not-matched-by-source-upsert-set")) { + return Optional.empty(); + } + action.withNotMatchedBySourceUpsert( + params.get("not-matched-by-source-upsert-condition"), + params.get("not-matched-by-source-upsert-set")); + } + if (actions.contains("matched-delete")) { + action.withMatchedDelete(params.get("matched-delete-condition")); + } + if (actions.contains("not-matched-by-source-delete")) { + action.withNotMatchedBySourceDelete( + params.get("not-matched-by-source-delete-condition")); + } + if (actions.contains("not-matched-insert")) { + if (argumentAbsent(params, "not-matched-insert-values")) { + return Optional.empty(); + } + action.withNotMatchedInsert( + params.get("not-matched-insert-condition"), + params.get("not-matched-insert-values")); + } + + if (!validate(action)) { + return Optional.empty(); + } + + return Optional.of(action); + } + + private static boolean initSource(MultipleParameterTool params, MergeIntoAction action) { + String sourceTable = params.get("using-table"); + String source = params.get("using-source"); + String sourceAlias = params.get("as"); + + int count = 0; + if (sourceTable != null) { + action.withSourceTable(params.get("using-table")); + count++; + } + + if (source != null) { + if (sourceAlias == null) { + System.err.println( + "The source and its alias must be specified together.\n" + + "Run <action> --help for help."); + return false; + } + action.withSource(source, sourceAlias); + count++; + } + + if (count != 1) { + System.err.println( + "Please specify either \"source table\" or \"source, source's alias\".\n" + + "Run <action> --help for help."); + return false; + } + + return true; + } + + private static boolean argumentAbsent(MultipleParameterTool params, String key) { + if (!params.has(key)) { + System.err.println(key + " is absent.\nRun <action> --help for help."); + return true; + } + + return false; + } + + private static boolean validate(MergeIntoAction action) { + if (!action.matchedUpsert + && !action.notMatchedUpsert + && !action.matchedDelete + && !action.notMatchedDelete + && !action.insert) { + System.err.println( + "Must specify at least one merge action.\nRun <action> --help for help."); + return false; + } + + if ((action.matchedUpsert && action.matchedDelete) + && (action.matchedUpsertCondition == null + || action.matchedDeleteCondition == null)) { + System.err.println( + "If both matched-upsert and matched-delete actions are present, their conditions must both be present too.\n" + + "Run <action> --help for help."); + return false; + } + + if ((action.notMatchedUpsert && action.notMatchedDelete) + && (action.notMatchedBySourceUpsertCondition == null + || action.notMatchedBySourceDeleteCondition == null)) { + System.err.println( + "If both not-matched-by-source-upsert and not-matched-by--source-delete actions are present, their conditions must both be present too.\n" + + "Run <action> --help for help."); + return false; + } + + if (action.notMatchedBySourceUpsertSet != null + && action.notMatchedBySourceUpsertSet.equals("*")) { + System.err.println("The '*' cannot be used in not-matched-by-source-upsert-set"); + return false; + } + + return true; + } + + private static void printHelp() { + System.out.println("Action \"merge-into\" simulates the \"MERGE INTO\" syntax."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " merge-into --warehouse <warehouse-path>\n" + + " --database <database-name>\n" + + " --table <table-name>\n" + + " --using-table <source-table>\n" + + " --on <merge-condition>\n" + + " --merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>\n" + + " --matched-upsert-condition <matched-condition>\n" + + " --matched-upsert-set <upsert-changes>\n" + + " --matched-delete-condition <matched-condition>\n" + + " --not-matched-insert-condition <not-matched-condition>\n" + + " --not-matched-insert-values <insert-values>\n" + + " --not-matched-by-source-upsert-condition <not-matched-by-source-condition>\n" + + " --not-matched-by-source-upsert-set <not-matched-upsert-changes>\n" + + " --not-matched-by-source-delete-condition <not-matched-by-source-condition>"); + System.out.println(" matched-upsert-changes format:"); + System.out.println( + " col=<source-table>.col | expression [, ...] (do not add '<target-table>.' before 'col')"); + System.out.println( + " * (upsert with all source cols; require target table's schema is equal to source's)"); + System.out.println(" not-matched-upsert-changes format:"); + System.out.println(" col=expression (cannot use source table's col)"); + System.out.println(" insert-values format:"); + System.out.println( + " col1,col2,...,col_end (must specify values of all columns; can use <source-table>.col or expression)"); + System.out.println( + " * (insert with all source cols; require target table's schema is equal to source's)"); + System.out.println( + " not-matched-condition: cannot use target table's columns to construct condition expression."); + System.out.println( + " not-matched-by-source-condition: cannot use source table's columns to construct condition expression."); + System.out.println(" alternative arguments:"); + System.out.println(" --path <table-path> to represent the table path."); + System.out.println( + " --using-source <source-expression> --as <source-alias> to replace --using-table."); + System.out.println(); + + System.out.println("Note: "); + System.out.println(" 1. Target table must has primary keys."); + System.out.println( + " 2. All conditions, set changes and values should use Flink SQL syntax. Please quote them with \" to escape special characters."); + System.out.println(" 3. source-alias cannot be duplicated with existed table name."); + System.out.println(" 4. At least one merge action must be specified."); + System.out.println(" 5. How to determine the changed rows with different \"matched\":"); + System.out.println( + " matched: changed rows are from target table and each can match a source table row " + + "based on merge-condition and optional matched-condition."); + System.out.println( + " not-matched: changed rows are from source table and all rows cannot match any target table row " + + "based on merge-condition and optional not-matched-condition."); + System.out.println( + " not-matched-by-source: changed rows are from target table and all row cannot match any source table row " + + "based on merge-condition and optional not-matched-by-source-condition."); + System.out.println( + " 6. If both matched-upsert and matched-delete actions are present, their conditions must both be present too " + + "(same to not-matched-by-source-upsert and not-matched-by-source-delete). Otherwise, all conditions are optional."); + System.out.println(" 7. source-alias cannot be duplicated with existed table name."); + System.out.println(); + + System.out.println("Examples:"); + System.out.println( + " merge-into --path hdfs:///path/to/T\n" + + " --using-table S\n" + + " --on \"T.k=S.k\"\n" + + " --merge-actions matched-upsert\n" + + " --matched-upsert-condition \"T.v<>S.v\"\n" + + " --matched-upsert-set \"S.k,S.v\""); + System.out.println( + " It will find matched rows of target table that meet condition (T.k=S.k AND T.v<>S.v) ind '-D' to test_table, then update T.v with S.v."); + } + + @Override + public void run() throws Exception { + // prepare source table + if (sourceTable != null) { + if (sourceTable.contains(".")) { + sourceTableIdentifier = Identifier.fromString(sourceTable); + } else { + sourceTableIdentifier = + Identifier.create(identifier.getDatabaseName(), sourceTable); + } + } else { + tEnv.registerTable(sourceAlias, tEnv.sqlQuery(source)); + sourceTableIdentifier = Identifier.create(identifier.getDatabaseName(), sourceAlias); + } + + List<DataStream<RowData>> dataStreams = + Stream.of( + getMatchedUpsertDataStream(), + getNotMatchedUpsertDataStream(), + getMatchedDeleteDataStream(), + getNotMatchedDeleteDataStream(), + getInsertDataStream()) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + + DataStream<RowData> firstDs = dataStreams.get(0); + sink(firstDs.union(dataStreams.stream().skip(1).toArray(DataStream[]::new))); + } + + private Optional<DataStream<RowData>> getMatchedUpsertDataStream() { + if (!matchedUpsert) { + return Optional.empty(); + } + + List<String> project; + // extract project + if (matchedUpsertSet.equals("*")) { + project = Collections.singletonList(sourceTableIdentifier.getObjectName() + "." + "*"); + } else { + // validate upsert changes + // no need to check primary keys changes because merge condition must contain all pks + // of the target table + Map<String, String> changes = parseKeyValues(matchedUpsertSet); + if (changes == null) { + throw new IllegalArgumentException( + "matched-upsert-set is invalid.\nRun <action> --help for help."); + } + for (String targetField : changes.keySet()) { + if (!targetFieldNames.contains(targetField)) { + throw new RuntimeException( + String.format( + "Invalid column reference '%s' of table '%s' at matched-upsert action.", + targetField, identifier.getFullName())); + } + } + + // replace field names + // the table name is added before column name to avoid ambiguous column reference + project = + targetFieldNames.stream() + .map( + name -> + changes.getOrDefault( + name, identifier.getObjectName() + "." + name)) + .collect(Collectors.toList()); + } + + // use inner join to find matched records + String query = + String.format( + "SELECT %s FROM %s INNER JOIN %s ON %s %s", + String.join(",", project), + identifier.getEscapedFullName(), + sourceTableIdentifier.getEscapedFullName(), + mergeCondition, + matchedUpsertCondition == null ? "" : "WHERE " + matchedUpsertCondition); + LOG.info("Query used for matched-update:\n{}", query); + + Table source = tEnv.sqlQuery(query); + checkSchema("matched-upsert", source); + + return Optional.of(toDataStream(source, RowKind.UPDATE_AFTER, converters)); + } + + private Optional<DataStream<RowData>> getNotMatchedUpsertDataStream() { + if (!notMatchedUpsert) { + return Optional.empty(); + } + + // validate upsert change + Map<String, String> changes = parseKeyValues(notMatchedBySourceUpsertSet); + if (changes == null) { + throw new IllegalArgumentException( + "matched-upsert-set is invalid.\nRun <action> --help for help."); + } + for (String targetField : changes.keySet()) { + if (!targetFieldNames.contains(targetField)) { + throw new RuntimeException( + String.format( + "Invalid column reference '%s' of table '%s' at not-matched-by-source-upsert action.\nRun <action> --help for help.", + targetField, identifier.getFullName())); + } + + if (primaryKeys.contains(targetField)) { + throw new RuntimeException( + "Not allowed to change primary key in not-matched-by-source-upsert-set.\nRun <action> --help for help."); + } + } + + // replace field names (won't be ambiguous here) + List<String> project = + targetFieldNames.stream() + .map(name -> changes.getOrDefault(name, name)) + .collect(Collectors.toList()); + + // use not exists to find not matched records + String query = + String.format( + "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", + String.join(",", project), + identifier.getEscapedFullName(), + sourceTableIdentifier.getEscapedFullName(), + mergeCondition, + notMatchedBySourceUpsertCondition == null + ? "" + : String.format("AND (%s)", notMatchedBySourceUpsertCondition)); + + LOG.info("Query used for not-matched-by-source-upsert:\n{}", query); + + Table source = tEnv.sqlQuery(query); + checkSchema("not-matched-by-source-upsert", source); + + return Optional.of(toDataStream(source, RowKind.UPDATE_AFTER, converters)); + } + + private Optional<DataStream<RowData>> getMatchedDeleteDataStream() { + if (!matchedDelete) { + return Optional.empty(); + } + + // the table name is added before column name to avoid ambiguous column reference + List<String> project = + targetFieldNames.stream() + .map(name -> identifier.getObjectName() + "." + name) + .collect(Collectors.toList()); + + // use inner join to find matched records + String query = + String.format( + "SELECT %s FROM %s INNER JOIN %s ON %s %s", + String.join(",", project), + identifier.getEscapedFullName(), + sourceTableIdentifier.getEscapedFullName(), + mergeCondition, + matchedDeleteCondition == null ? "" : "WHERE " + matchedDeleteCondition); + LOG.info("Query used by matched-delete:\n{}", query); + + Table source = tEnv.sqlQuery(query); + checkSchema("matched-delete", source); + + return Optional.of(toDataStream(source, RowKind.DELETE, converters)); + } + + private Optional<DataStream<RowData>> getNotMatchedDeleteDataStream() { + if (!notMatchedDelete) { + return Optional.empty(); + } + + // use not exists to find not matched records + String query = + String.format( + "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", + String.join(",", targetFieldNames), + identifier.getEscapedFullName(), + sourceTableIdentifier.getEscapedFullName(), + mergeCondition, + notMatchedBySourceDeleteCondition == null + ? "" + : String.format("AND (%s)", notMatchedBySourceDeleteCondition)); + LOG.info("Query used by not-matched-by-source-delete:\n{}", query); + + Table source = tEnv.sqlQuery(query); + checkSchema("not-matched-by-source-delete", source); + + return Optional.of(toDataStream(source, RowKind.DELETE, converters)); + } + + private Optional<DataStream<RowData>> getInsertDataStream() { + if (!insert) { + return Optional.empty(); + } + + // use not exist to find rows to insert + String query = + String.format( + "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", + notMatchedInsertValues, + sourceTableIdentifier.getEscapedFullName(), + identifier.getEscapedFullName(), + mergeCondition, + notMatchedInsertCondition == null + ? "" + : String.format("AND (%s)", notMatchedInsertCondition)); + LOG.info("Query used by not-matched-insert:\n{}", query); + + Table source = tEnv.sqlQuery(query); + checkSchema("not-matched-insert", source); + + return Optional.of(toDataStream(source, RowKind.INSERT, converters)); + } + + private void checkSchema(String action, Table source) { + List<DataType> actualTypes = + toTableStoreDataTypes(source.getResolvedSchema().getColumnDataTypes()); + List<DataType> expectedTypes = this.table.rowType().getFieldTypes(); + if (!compatibleCheck(actualTypes, expectedTypes)) { + throw new IllegalStateException( + String.format( + "The schema of result in action '%s' is invalid.\n" + + "Result schema: [%s]\n" + + "Expected schema: [%s]", + action, + actualTypes.stream() + .map(DataType::asSQLString) + .collect(Collectors.joining(", ")), + expectedTypes.stream() + .map(DataType::asSQLString) + .collect(Collectors.joining(", ")))); + } + } + + // pass converters to avoid "not serializable" exception + private DataStream<RowData> toDataStream( + Table source, RowKind kind, List<DataStructureConverter<Object, Object>> converters) { + return tEnv.toChangelogStream(source) + .map( + row -> { + int arity = row.getArity(); + GenericRowData rowData = new GenericRowData(kind, arity); + for (int i = 0; i < arity; i++) { + rowData.setField( + i, converters.get(i).toInternalOrNull(row.getField(i))); + } + return rowData; + }); + } +} diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java similarity index 99% rename from flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java rename to flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java index 8b941fdc..28da14c4 100644 --- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java +++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java @@ -38,7 +38,7 @@ import java.util.Map; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for {@link DropPartitionAction}. */ -public class DropPartitionITCase extends ActionITCaseBase { +public class DropPartitionActionITCase extends ActionITCaseBase { private static final DataType[] FIELD_TYPES = new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()}; diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java new file mode 100644 index 00000000..0cefe77f --- /dev/null +++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.action; + +import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.file.utils.BlockingIterator; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; +import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.buildDdl; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.buildSimpleQuery; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.createTable; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.init; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.insertInto; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.sEnv; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.testBatchRead; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.testStreamingRead; +import static org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.validateStreamingReadResult; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +/** IT cases for {@link MergeIntoAction}. */ +public class MergeIntoActionITCase extends ActionITCaseBase { + + private static final List<Row> initialRecords = + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+I", 2, "v_2", "creation", "02-27"), + changelogRow("+I", 3, "v_3", "creation", "02-27"), + changelogRow("+I", 4, "v_4", "creation", "02-27"), + changelogRow("+I", 5, "v_5", "creation", "02-28"), + changelogRow("+I", 6, "v_6", "creation", "02-28"), + changelogRow("+I", 7, "v_7", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 9, "v_9", "creation", "02-28"), + changelogRow("+I", 10, "v_10", "creation", "02-28")); + + @BeforeEach + public void setUp() throws Exception { + init(warehouse); + + // prepare table S + sEnv.executeSql( + buildDdl( + "S", + Arrays.asList("k INT", "v STRING", "dt STRING"), + Arrays.asList("k", "dt"), + Collections.singletonList("dt"), + new HashMap<>())); + + insertInto( + "S", + "(1, 'v_1', '02-27')", + "(4, CAST (NULL AS STRING), '02-27')", + "(7, 'Seven', '02-28')", + "(8, CAST (NULL AS STRING), '02-28')", + "(8, 'v_8', '02-29')", + "(11, 'v_11', '02-29')", + "(12, 'v_12', '02-29')"); + } + + @ParameterizedTest(name = "changelog-producer = {0}") + @MethodSource("producerTestData") + public void testVariousChangelogProducer( + CoreOptions.ChangelogProducer producer, List<Row> expected) throws Exception { + // prepare table T + prepareTable(producer); + + // similar to: + // MERGE INTO T + // USING S + // ON T.k = S.k AND T.dt = S.dt + // WHEN MATCHED AND (T.v <> S.v AND S.v IS NOT NULL) THEN UPDATE + // SET v = S.v, last_action = 'matched_upsert' + // WHEN MATCHED AND S.v IS NULL THEN DELETE + // WHEN NOT MATCHED THEN INSERT VALUES (S.k, S.v, 'insert', S.dt) + // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE + // SET v = v || '_nmu', last_action = 'not_matched_upsert' + // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE + MergeIntoAction action = new MergeIntoAction(warehouse, database, "T"); + action.withSourceTable("S") + .withMergeCondition("T.k = S.k AND T.dt = S.dt") + .withMatchedUpsert( + "T.v <> S.v AND S.v IS NOT NULL", "v = S.v, last_action = 'matched_upsert'") + .withMatchedDelete("S.v IS NULL") + .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt") + .withNotMatchedBySourceUpsert( + "dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'") + .withNotMatchedBySourceDelete("dt >= '02-28'"); + + validateActionRunResult( + action, + expected, + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 11, "v_11", "insert", "02-29"), + changelogRow("+I", 12, "v_12", "insert", "02-29"))); + } + + @Test + public void testUsingSource() throws Exception { + // prepare table T + prepareTable(CoreOptions.ChangelogProducer.NONE); + + // similar to: + // MERGE INTO T + // USING (SELECT * FROM S WHERE k < 12) AS SS + // ON T.k = SS.k AND T.dt = SS.dt + // WHEN MATCHED AND (T.v <> SS.v AND SS.v IS NOT NULL) THEN UPDATE + // SET v = SS.v, last_action = 'matched_upsert' + // WHEN MATCHED AND SS.v IS NULL THEN DELETE + // WHEN NOT MATCHED THEN INSERT VALUES (SS.k, SS.v, 'insert', SS.dt) + // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE + // SET v = v || '_nmu', last_action = 'not_matched_upsert' + // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE + MergeIntoAction action = new MergeIntoAction(warehouse, database, "T"); + action.withSource("SELECT * FROM S WHERE k < 12", "SS") + .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") + .withMatchedUpsert( + "T.v <> SS.v AND SS.v IS NOT NULL", + "v = SS.v, last_action = 'matched_upsert'") + .withMatchedDelete("SS.v IS NULL") + .withNotMatchedInsert(null, "SS.k, SS.v, 'insert', SS.dt") + .withNotMatchedBySourceUpsert( + "dt < '02-28'", "v = v || '_nmu', last_action = 'not_matched_upsert'") + .withNotMatchedBySourceDelete("dt >= '02-28'"); + + validateActionRunResult( + action, + Arrays.asList( + changelogRow("-U", 7, "v_7", "creation", "02-28"), + changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("-D", 4, "v_4", "creation", "02-27"), + changelogRow("-D", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 11, "v_11", "insert", "02-29"), + changelogRow("-U", 2, "v_2", "creation", "02-27"), + changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("-U", 3, "v_3", "creation", "02-27"), + changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("-D", 5, "v_5", "creation", "02-28"), + changelogRow("-D", 6, "v_6", "creation", "02-28"), + changelogRow("-D", 9, "v_9", "creation", "02-28"), + changelogRow("-D", 10, "v_10", "creation", "02-28")), + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 11, "v_11", "insert", "02-29"))); + } + + @Test + public void testMatchedUpsertSetAll() throws Exception { + // prepare table T + prepareTable(CoreOptions.ChangelogProducer.NONE); + + // build MergeIntoAction + MergeIntoAction action = new MergeIntoAction(warehouse, database, "T"); + action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS") + .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") + .withMatchedUpsert(null, "*"); + + validateActionRunResult( + action, + Arrays.asList( + changelogRow("-U", 1, "v_1", "creation", "02-27"), + changelogRow("+U", 1, "v_1", "unknown", "02-27"), + changelogRow("-U", 4, "v_4", "creation", "02-27"), + changelogRow("+U", 4, null, "unknown", "02-27"), + changelogRow("-U", 7, "v_7", "creation", "02-28"), + changelogRow("+U", 7, "Seven", "unknown", "02-28"), + changelogRow("-U", 8, "v_8", "creation", "02-28"), + changelogRow("+U", 8, null, "unknown", "02-28")), + Arrays.asList( + changelogRow("+U", 1, "v_1", "unknown", "02-27"), + changelogRow("+I", 2, "v_2", "creation", "02-27"), + changelogRow("+I", 3, "v_3", "creation", "02-27"), + changelogRow("+U", 4, null, "unknown", "02-27"), + changelogRow("+I", 5, "v_5", "creation", "02-28"), + changelogRow("+I", 6, "v_6", "creation", "02-28"), + changelogRow("+U", 7, "Seven", "unknown", "02-28"), + changelogRow("+U", 8, null, "unknown", "02-28"), + changelogRow("+I", 9, "v_9", "creation", "02-28"), + changelogRow("+I", 10, "v_10", "creation", "02-28"))); + } + + @Test + public void testNotMatchedInsertAll() throws Exception { + // prepare table T + prepareTable(CoreOptions.ChangelogProducer.NONE); + + // build MergeIntoAction + MergeIntoAction action = new MergeIntoAction(warehouse, database, "T"); + action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS") + .withMergeCondition("T.k = SS.k AND T.dt = SS.dt") + .withNotMatchedInsert("SS.k < 12", "*"); + + validateActionRunResult( + action, + Arrays.asList( + changelogRow("+I", 8, "v_8", "unknown", "02-29"), + changelogRow("+I", 11, "v_11", "unknown", "02-29")), + Arrays.asList( + changelogRow("+I", 1, "v_1", "creation", "02-27"), + changelogRow("+I", 2, "v_2", "creation", "02-27"), + changelogRow("+I", 3, "v_3", "creation", "02-27"), + changelogRow("+I", 4, "v_4", "creation", "02-27"), + changelogRow("+I", 5, "v_5", "creation", "02-28"), + changelogRow("+I", 6, "v_6", "creation", "02-28"), + changelogRow("+I", 7, "v_7", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 9, "v_9", "creation", "02-28"), + changelogRow("+I", 10, "v_10", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "unknown", "02-29"), + changelogRow("+I", 11, "v_11", "unknown", "02-29"))); + } + + // ---------------------------------------------------------------------------------------------------------------- + // Negative tests + // ---------------------------------------------------------------------------------------------------------------- + + @Test + public void testInsertChangesActionWithNonPkTable() { + String nonPkTable = + createTable( + Collections.singletonList("k int"), + Collections.emptyList(), + Collections.emptyList()); + + assertThatThrownBy(() -> new MergeIntoAction(warehouse, database, nonPkTable)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "merge-into action doesn't support table with no primary keys defined."); + } + + @Test + public void testIncompatibleSchema() throws Exception { + // prepare table T + prepareTable(CoreOptions.ChangelogProducer.NONE); + + // build MergeIntoAction + MergeIntoAction action = new MergeIntoAction(warehouse, database, "T"); + action.withSourceTable("S") + .withMergeCondition("T.k = S.k AND T.dt = S.dt") + .withNotMatchedInsert(null, "S.k, S.v, 0, S.dt"); + + assertThatThrownBy(action::run) + .isInstanceOf(IllegalStateException.class) + .hasMessage( + "The schema of result in action 'not-matched-insert' is invalid.\n" + + "Result schema: [INT NOT NULL, STRING, INT NOT NULL, STRING NOT NULL]\n" + + "Expected schema: [INT NOT NULL, STRING, STRING, STRING NOT NULL]"); + } + + private void validateActionRunResult( + MergeIntoAction action, List<Row> streamingExpected, List<Row> batchExpected) + throws Exception { + BlockingIterator<Row, Row> iterator = + testStreamingRead(buildSimpleQuery("T"), initialRecords); + action.run(); + // test streaming read + validateStreamingReadResult(iterator, streamingExpected); + iterator.close(); + // test batch read + testBatchRead(buildSimpleQuery("T"), batchExpected); + } + + private void prepareTable(CoreOptions.ChangelogProducer producer) throws Exception { + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("k INT", "v STRING", "last_action STRING", "dt STRING"), + Arrays.asList("k", "dt"), + Collections.singletonList("dt"), + new HashMap<String, String>() { + { + put(CHANGELOG_PRODUCER.key(), producer.toString()); + } + })); + + insertInto( + "T", + "(1, 'v_1', 'creation', '02-27')", + "(2, 'v_2', 'creation', '02-27')", + "(3, 'v_3', 'creation', '02-27')", + "(4, 'v_4', 'creation', '02-27')", + "(5, 'v_5', 'creation', '02-28')", + "(6, 'v_6', 'creation', '02-28')", + "(7, 'v_7', 'creation', '02-28')", + "(8, 'v_8', 'creation', '02-28')", + "(9, 'v_9', 'creation', '02-28')", + "(10, 'v_10', 'creation', '02-28')"); + } + + private static List<Arguments> producerTestData() { + return Arrays.asList( + arguments( + CoreOptions.ChangelogProducer.NONE, + Arrays.asList( + changelogRow("-U", 7, "v_7", "creation", "02-28"), + changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("-D", 4, "v_4", "creation", "02-27"), + changelogRow("-D", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 11, "v_11", "insert", "02-29"), + changelogRow("+I", 12, "v_12", "insert", "02-29"), + changelogRow("-U", 2, "v_2", "creation", "02-27"), + changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("-U", 3, "v_3", "creation", "02-27"), + changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("-D", 5, "v_5", "creation", "02-28"), + changelogRow("-D", 6, "v_6", "creation", "02-28"), + changelogRow("-D", 9, "v_9", "creation", "02-28"), + changelogRow("-D", 10, "v_10", "creation", "02-28"))), + arguments( + CoreOptions.ChangelogProducer.INPUT, + Arrays.asList( + changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("-D", 4, "v_4", "creation", "02-27"), + changelogRow("-D", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 11, "v_11", "insert", "02-29"), + changelogRow("+I", 12, "v_12", "insert", "02-29"), + changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("-D", 5, "v_5", "creation", "02-28"), + changelogRow("-D", 6, "v_6", "creation", "02-28"), + changelogRow("-D", 9, "v_9", "creation", "02-28"), + changelogRow("-D", 10, "v_10", "creation", "02-28"))), + arguments( + CoreOptions.ChangelogProducer.FULL_COMPACTION, + Arrays.asList( + changelogRow("-U", 7, "v_7", "creation", "02-28"), + changelogRow("+U", 7, "Seven", "matched_upsert", "02-28"), + changelogRow("-D", 4, "v_4", "creation", "02-27"), + changelogRow("-D", 8, "v_8", "creation", "02-28"), + changelogRow("+I", 8, "v_8", "insert", "02-29"), + changelogRow("+I", 11, "v_11", "insert", "02-29"), + changelogRow("+I", 12, "v_12", "insert", "02-29"), + changelogRow("-U", 2, "v_2", "creation", "02-27"), + changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert", "02-27"), + changelogRow("-U", 3, "v_3", "creation", "02-27"), + changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert", "02-27"), + changelogRow("-D", 5, "v_5", "creation", "02-28"), + changelogRow("-D", 6, "v_6", "creation", "02-28"), + changelogRow("-D", 9, "v_9", "creation", "02-28"), + changelogRow("-D", 10, "v_10", "creation", "02-28")))); + } +} diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java index 92d72b6e..69841a59 100644 --- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java +++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java @@ -317,7 +317,7 @@ public class ReadWriteTableTestUtil { assertThat(expectedRecords).isEmpty(); } - private static String buildDdl( + public static String buildDdl( String table, List<String> fieldsSpec, List<String> primaryKeys,