xuyangzhong commented on code in PR #26907:
URL: https://github.com/apache/flink/pull/26907#discussion_r2366547180
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java:
##########
@@ -310,21 +310,29 @@ void testSinkWithMaterialize() {
util.tableEnv()
.executeSql(
- "CREATE TABLE another_pk_snk (\n"
+ "CREATE TABLE another_pk_upsert_snk (\n"
+ " primary key (b) not enforced\n"
- + ") LIKE pk_snk (\n"
+ + ") LIKE pk_upsert_snk (\n"
+ " EXCLUDING CONSTRAINTS\n"
+ ")");
- String sql = "insert into another_pk_snk select a,b,c from
retract_src";
+ String sql = "insert into another_pk_upsert_snk select a,b,c from
retract_src";
+ verifyRelPlanInsert(sql);
+ }
+
+ @TestTemplate
+ void testRetractSink() {
+ assumeTrue(testSinkWithPk);
+
+ String sql = "insert into pk_retract_snk select a,b,c from
retract_src";
verifyRelPlanInsert(sql);
}
@TestTemplate
void testChangelogNormalize() {
assumeTrue(testSinkWithPk);
- String sql = "insert into pk_snk select a,b,c from upsert_src";
+ String sql = "insert into pk_retract_snk select a,b,c from upsert_src";
Review Comment:
As an entity that is on the same level as `append_src` and `retract_src`, I
would like to place it in a global context for easier reuse in the future,
especially when we add support for CDC streams in delta join.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]