JingsongLi commented on code in PR #540:
URL: https://github.com/apache/flink-table-store/pull/540#discussion_r1122671042
##########
docs/content/docs/how-to/writing-tables.md:
##########
@@ -268,3 +268,146 @@ For more information of 'delete', see
{{< /tab >}}
{{< /tabs >}}
+
+## Merging into table
+
+Table Store supports "MERGE INTO" via submitting the 'merge-into' job through
`flink run`. The design referenced such syntax:
Review Comment:
Document only primary key table is supported, you can refer to
https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/concepts/primary-key-table/
##########
docs/content/docs/how-to/writing-tables.md:
##########
@@ -268,3 +268,146 @@ For more information of 'delete', see
{{< /tab >}}
{{< /tabs >}}
+
+## Merging into table
+
+Table Store supports "MERGE INTO" via submitting the 'merge-into' job through
`flink run`. The design referenced such syntax:
+```sql
+MERGE INTO target-table
+ USING source-table | source-expr AS source-alias
+ ON merge-condition
+ WHEN MATCHED [AND matched-condition]
+ THEN UPDATE SET xxx
+ WHEN MATCHED [AND matched-condition]
+ THEN DELETE
+ WHEN NOT MATCHED [AND not-matched-condition]
+ THEN INSERT VALUES (xxx)
+ WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ THEN UPDATE SET xxx
+ WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ THEN DELETE
+```
+
+{{< tabs "merge-into" >}}
+
+{{< tab "Flink Job" >}}
+
+Run the following command to submit a 'merge-into' job for the table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+ merge-into \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <target-table>
+ --using-table <source-table>
+ --on <merge-condition>
+ --merge-actions
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>
+ --matched-upsert-condition <matched-condition>
+ --matched-upsert-set <upsert-changes>
+ --matched-delete-condition <matched-condition>
+ --not-matched-insert-condition <not-matched-condition>
+ --not-matched-insert-values <insert-values>
+ --not-matched-by-source-upsert-condition <not-matched-by-source-condition>
+ --not-matched-by-source-upsert-set <not-matched-upsert-changes>
+ --not-matched-by-source-delete-condition <not-matched-by-source-condition>
+
+An example:
+-- Target table T (pk (k, dt)) is:
++----+------+-------------+-------+
+| k | v | last_action | dt |
++----+------+-------------+-------+
+| 1 | v_1 | creation | 02-27 |
+| 2 | v_2 | creation | 02-27 |
+| 3 | v_3 | creation | 02-27 |
+| 4 | v_4 | creation | 02-27 |
+| 5 | v_5 | creation | 02-28 |
+| 6 | v_6 | creation | 02-28 |
+| 7 | v_7 | creation | 02-28 |
+| 8 | v_8 | creation | 02-28 |
+| 9 | v_9 | creation | 02-28 |
+| 10 | v_10 | creation | 02-28 |
++----+------+-------------+-------+
+
+-- Source table S is:
++----+--------+-------+
+| k | v | dt |
++----+--------+-------+
+| 1 | v_1 | 02-27 |
+| 4 | <NULL> | 02-27 |
+| 7 | Seven | 02-28 |
+| 8 | <NULL> | 02-28 |
+| 8 | v_8 | 02-29 |
+| 11 | v_11 | 02-29 |
+| 12 | v_12 | 02-29 |
++----+--------+-------+
+
+-- Supposed SQL is:
+MERGE INTO T
Review Comment:
Can we provide a simpler example?
##########
flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java:
##########
@@ -180,6 +201,32 @@ public static boolean supportsExplicitCast(DataType
sourceType, DataType targetT
return supportsCasting(sourceType, targetType, true);
}
+ /**
+ * Returns whether the source type can be compatibly cast to the target
type.
+ *
+ * <p>If two types are compatible, they should have the same underlying
data structure. For
+ * example, {@link CharType} and {@link VarCharType} are both in the {@link
+ * DataTypeFamily#CHARACTER_STRING} family, meaning they both represent a
character string. But
+ * the rest types are only compatible with themselves. For example,
although {@link IntType} and
+ * {@link BigIntType} are both in the {@link DataTypeFamily#NUMERIC}
family, they are not
+ * compatible because IntType represents a 4-byte signed integer while
BigIntType represents an
+ * 8-byte signed integer. Especially, two {@link DecimalType}s are
compatible only when they
+ * have the same {@code precision} and {@code scale}.
+ */
+ public static boolean supportsCompatibleCast(DataType sourceType, DataType
targetType) {
+ if (sourceType.isNullable() && !targetType.isNullable()) {
Review Comment:
Remove this? Not null check is very difficult to use.
##########
docs/content/docs/how-to/writing-tables.md:
##########
@@ -268,3 +268,146 @@ For more information of 'delete', see
{{< /tab >}}
{{< /tabs >}}
+
+## Merging into table
+
+Table Store supports "MERGE INTO" via submitting the 'merge-into' job through
`flink run`. The design referenced such syntax:
+```sql
+MERGE INTO target-table
+ USING source-table | source-expr AS source-alias
+ ON merge-condition
+ WHEN MATCHED [AND matched-condition]
+ THEN UPDATE SET xxx
+ WHEN MATCHED [AND matched-condition]
+ THEN DELETE
+ WHEN NOT MATCHED [AND not-matched-condition]
+ THEN INSERT VALUES (xxx)
+ WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ THEN UPDATE SET xxx
+ WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ THEN DELETE
+```
+
+{{< tabs "merge-into" >}}
+
+{{< tab "Flink Job" >}}
+
+Run the following command to submit a 'merge-into' job for the table.
Review Comment:
Document the different between upsert and update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]