This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f49981e0efe [FLINK-38224][table-planner] Do not convert to delta join 
when the sink is a retract sink (#26907)
f49981e0efe is described below

commit f49981e0efebd1fbc8fd926656198147bd949c71
Author: Xuyang <[email protected]>
AuthorDate: Tue Sep 23 14:53:22 2025 +0800

    [FLINK-38224][table-planner] Do not convert to delta join when the sink is 
a retract sink (#26907)
---
 .../physical/stream/DuplicateChangesInferRule.java |   4 +-
 .../stream/DuplicateChangesInferRuleTest.java      | 102 +++++++------
 .../stream/DuplicateChangesInferRuleTest.xml       | 161 ++++++++++++---------
 .../planner/plan/stream/sql/DeltaJoinTest.xml      | 144 ++++++++++++++----
 .../testJsonPlanWithTableHints.out                 |  10 +-
 .../planner/plan/stream/sql/DeltaJoinTest.scala    |  75 +++++++++-
 6 files changed, 341 insertions(+), 155 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
index 9dda578de6e..7092c4c0385 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
@@ -166,7 +166,9 @@ public class DuplicateChangesInferRule extends 
RelRule<DuplicateChangesInferRule
         try {
             ChangelogMode sinkProvidedChangelogMode =
                     sink.tableSink().getChangelogMode(ChangelogMode.all());
-            if (sinkProvidedChangelogMode.containsOnly(RowKind.INSERT)) {
+            boolean sinkIsAppend = 
sinkProvidedChangelogMode.containsOnly(RowKind.INSERT);
+            boolean sinkIsRetract = 
sinkProvidedChangelogMode.contains(RowKind.UPDATE_BEFORE);
+            if (sinkIsAppend || sinkIsRetract) {
                 acceptUpdates = false;
             }
         } catch (Throwable t) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
index 6827e1403ba..f1a4dc34712 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java
@@ -111,7 +111,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
 
         util.tableEnv()
                 .executeSql(
-                        "CREATE TABLE pk_snk (\n"
+                        "CREATE TABLE pk_upsert_snk (\n"
                                 + "   a int not null,\n"
                                 + "   b string not null,\n"
                                 + "   c bigint not null,\n"
@@ -120,14 +120,22 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
                                 + " with (\n"
                                 + "   'connector' = 'values',\n"
                                 + "   'sink-insert-only' = 'false',\n"
-                                + "   'sink-changelog-mode-enforced' = 
'I,UA,UB,D'\n"
+                                + "   'sink-changelog-mode-enforced' = 
'I,UA,D'\n"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE pk_retract_snk WITH (\n"
+                                + "  'sink-changelog-mode-enforced' = 
'I,UA,UB,D'"
+                                + ") LIKE pk_upsert_snk (\n"
+                                + "  OVERWRITING OPTIONS\n"
                                 + ")");
     }
 
     @TestTemplate
     void testCalc() {
         String sql =
-                String.format("insert into %s select a,b,c from append_src1", 
getSinkTableName());
+                String.format("insert into %s select a, b, c from 
append_src1", getSinkTableName());
         verifyRelPlanInsert(sql);
     }
 
@@ -135,7 +143,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     void testCalcWithNonDeterministicFilter1() {
         String sql =
                 String.format(
-                        "insert into %s select a,b,c from append_src1 where c 
< cast(now() as bigint)",
+                        "insert into %s select a, b, c from append_src1 where 
c < cast(now() as bigint)",
                         getSinkTableName());
         verifyRelPlanInsert(sql);
     }
@@ -144,7 +152,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     void testCalcWithNonDeterministicFilter2() {
         String sql =
                 String.format(
-                        "insert into %s select a,b,c from append_src1 where a 
<> 1 and c < cast(now() as bigint)",
+                        "insert into %s select a, b, c from append_src1 where 
a <> 1 and c < cast(now() as bigint)",
                         getSinkTableName());
         verifyRelPlanInsert(sql);
     }
@@ -153,7 +161,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     void testCalcWithNestedNonDeterministicFilter() {
         String sql =
                 String.format(
-                        "insert into %s select a,b,c from append_src1 where c 
< cast(cast(now() as int) as bigint)",
+                        "insert into %s select a, b, c from append_src1 where 
c < cast(cast(now() as int) as bigint)",
                         getSinkTableName());
         verifyRelPlanInsert(sql);
     }
@@ -179,7 +187,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     @TestTemplate
     void testAggregate() {
         assumeTrue(testSinkWithPk);
-        String sql = "insert into pk_snk select a,max(b),sum(c) from 
append_src1 group by a";
+        String sql = "insert into pk_upsert_snk select a,max(b),sum(c) from 
append_src1 group by a";
         verifyRelPlanInsert(sql);
     }
 
@@ -217,12 +225,12 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
 
         util.tableEnv()
                 .executeSql(
-                        "CREATE TABLE pk_snk_with_time_col (\n"
+                        "CREATE TABLE pk_upsert_snk_with_time_col (\n"
                                 + "  w_start timestamp(3),\n"
                                 + "  w_end timestamp(3)\n"
-                                + ") LIKE pk_snk");
+                                + ") LIKE pk_upsert_snk");
         String sql =
-                "insert into pk_snk_with_time_col select a, b, c, 
window_start, window_end "
+                "insert into pk_upsert_snk_with_time_col select a, b, c, 
window_start, window_end "
                         + "from TABLE(TUMBLE(TABLE append_src1, 
DESCRIPTOR(rt), INTERVAL '1' MINUTE))";
         verifyRelPlanInsert(sql);
     }
@@ -232,7 +240,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
         assumeTrue(testSinkWithPk);
 
         String sql =
-                "insert into pk_snk select a, b, c "
+                "insert into pk_upsert_snk select a, b, c "
                         + "from ( "
                         + "  select a, b, c, "
                         + "    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c 
DESC) as rank_num "
@@ -247,12 +255,12 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
 
         util.tableEnv()
                 .executeSql(
-                        "CREATE TABLE pk_snk_with_time_col (\n"
+                        "CREATE TABLE pk_upsert_snk_with_time_col (\n"
                                 + "  w_start timestamp(3),\n"
                                 + "  w_end timestamp(3)\n"
-                                + ") LIKE pk_snk");
+                                + ") LIKE pk_upsert_snk");
         String sql =
-                "insert into pk_snk_with_time_col select a, b, c, 
window_start, window_end "
+                "insert into pk_upsert_snk_with_time_col select a, b, c, 
window_start, window_end "
                         + "from ( "
                         + "  select a, b, c, window_start, window_end, "
                         + "    ROW_NUMBER() OVER (PARTITION BY a, 
window_start, window_end ORDER BY c DESC) as rank_num "
@@ -267,7 +275,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
 
         enableMiniBatch();
 
-        String sql = "insert into pk_snk select a,max(b),sum(c) from 
append_src1 group by a";
+        String sql = "insert into pk_upsert_snk select a,max(b),sum(c) from 
append_src1 group by a";
         verifyRelPlanInsert(sql);
     }
 
@@ -284,7 +292,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
                 
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
 
         String sql =
-                "insert into pk_snk select a, max(b), count(distinct c) from 
append_src1 group by a";
+                "insert into pk_upsert_snk select a, max(b), count(distinct c) 
from append_src1 group by a";
         verifyRelPlanInsert(sql);
     }
 
@@ -292,15 +300,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     void testDropUpdateBefore() {
         assumeTrue(testSinkWithPk);
 
-        util.tableEnv()
-                .executeSql(
-                        "CREATE TABLE upsert_snk WITH (\n"
-                                + "  'sink-changelog-mode-enforced' = 'I,UA,D'"
-                                + ") LIKE pk_snk (\n"
-                                + "  OVERWRITING OPTIONS\n"
-                                + ")");
-
-        String sql = "insert into pk_snk select a,b,c from retract_src";
+        String sql = "insert into pk_upsert_snk select a, b, c from 
retract_src";
         verifyRelPlanInsert(sql);
     }
 
@@ -310,13 +310,21 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
 
         util.tableEnv()
                 .executeSql(
-                        "CREATE TABLE another_pk_snk (\n"
+                        "CREATE TABLE another_pk_upsert_snk (\n"
                                 + "  primary key (b) not enforced\n"
-                                + ") LIKE pk_snk (\n"
+                                + ") LIKE pk_upsert_snk (\n"
                                 + "  EXCLUDING CONSTRAINTS\n"
                                 + ")");
 
-        String sql = "insert into another_pk_snk select a,b,c from 
retract_src";
+        String sql = "insert into another_pk_upsert_snk select a, b, c from 
retract_src";
+        verifyRelPlanInsert(sql);
+    }
+
+    @TestTemplate
+    void testRetractSink() {
+        assumeTrue(testSinkWithPk);
+
+        String sql = "insert into pk_retract_snk select a, b, c from 
retract_src";
         verifyRelPlanInsert(sql);
     }
 
@@ -324,7 +332,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     void testChangelogNormalize() {
         assumeTrue(testSinkWithPk);
 
-        String sql = "insert into pk_snk select a,b,c from upsert_src";
+        String sql = "insert into pk_retract_snk select a, b, c from 
upsert_src";
         verifyRelPlanInsert(sql);
     }
 
@@ -375,7 +383,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     void testLimit() {
         String sql =
                 String.format(
-                        "insert into %s select a,b,c from append_src1 limit 
10",
+                        "insert into %s select a, b, c from append_src1 limit 
10",
                         getSinkTableName());
         verifyRelPlanInsert(sql);
     }
@@ -429,7 +437,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     void testUnion() {
         String sql =
                 String.format(
-                        "insert into %s select a,b,c from append_src1 union 
all select a,b,c from append_src2",
+                        "insert into %s select a, b, c from append_src1 union 
all select a, b, c from append_src2",
                         getSinkTableName());
         verifyRelPlanInsert(sql);
     }
@@ -445,14 +453,14 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
                         true);
 
         util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as 
c from append_src1");
-        util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+        util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE 
pk_upsert_snk");
         // left: allow
         // right: disallow
         // merged: disallow
         StatementSet stmtSet = util.tableEnv().createStatementSet();
-        stmtSet.addInsertSql("insert into pk_snk select a, b, c/2 from 
my_view");
+        stmtSet.addInsertSql("insert into pk_upsert_snk select a, b, c/2 from 
my_view");
         stmtSet.addInsertSql(
-                "insert into pk_snk2 select a, max(b), sum(c) from my_view 
group by a");
+                "insert into pk_upsert_snk2 select a, max(b), sum(c) from 
my_view group by a");
         verifyRelPlanInsert(stmtSet);
     }
 
@@ -467,14 +475,15 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
                         true);
 
         util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as 
c from append_src1");
-        util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+        util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE 
pk_upsert_snk");
 
         // left: disallow
         // right: allow
         // merged: disallow
         StatementSet stmtSet = util.tableEnv().createStatementSet();
-        stmtSet.addInsertSql("insert into pk_snk select a, max(b), sum(c) from 
my_view group by a");
-        stmtSet.addInsertSql("insert into pk_snk2 select a, b, c/2 from 
my_view");
+        stmtSet.addInsertSql(
+                "insert into pk_upsert_snk select a, max(b), sum(c) from 
my_view group by a");
+        stmtSet.addInsertSql("insert into pk_upsert_snk2 select a, b, c/2 from 
my_view");
         verifyRelPlanInsert(stmtSet);
     }
 
@@ -489,14 +498,14 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
                         true);
 
         util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as 
c from append_src1");
-        util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+        util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE 
pk_upsert_snk");
 
         // left: allow
         // right: allow
         // merged: allow
         StatementSet stmtSet = util.tableEnv().createStatementSet();
-        stmtSet.addInsertSql("insert into pk_snk select a, b, c/3 from 
my_view");
-        stmtSet.addInsertSql("insert into pk_snk2 select a, b, c/2 from 
my_view");
+        stmtSet.addInsertSql("insert into pk_upsert_snk select a, b, c/3 from 
my_view");
+        stmtSet.addInsertSql("insert into pk_upsert_snk2 select a, b, c/2 from 
my_view");
         verifyRelPlanInsert(stmtSet);
     }
 
@@ -511,15 +520,16 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
                         true);
 
         util.tableEnv().executeSql("CREATE VIEW my_view as select a, b, c+1 as 
c from append_src1");
-        util.tableEnv().executeSql("CREATE TABLE pk_snk2 LIKE pk_snk");
+        util.tableEnv().executeSql("CREATE TABLE pk_upsert_snk2 LIKE 
pk_upsert_snk");
 
         // left: disallow
         // right: disallow
         // merged: disallow
         StatementSet stmtSet = util.tableEnv().createStatementSet();
-        stmtSet.addInsertSql("insert into pk_snk select a, max(b), sum(c) from 
my_view group by a");
         stmtSet.addInsertSql(
-                "insert into pk_snk2 select a, min(b), max(c) from my_view 
group by a");
+                "insert into pk_upsert_snk select a, max(b), sum(c) from 
my_view group by a");
+        stmtSet.addInsertSql(
+                "insert into pk_upsert_snk2 select a, min(b), max(c) from 
my_view group by a");
         verifyRelPlanInsert(stmtSet);
     }
 
@@ -529,13 +539,13 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
 
         util.tableEnv()
                 .executeSql(
-                        "CREATE TABLE append_only_sink_with_pk (\n"
+                        "CREATE TABLE pk_append_sink (\n"
                                 + "   primary key (a) not enforced\n"
                                 + ") LIKE no_pk_snk (\n"
                                 + "   OVERWRITING OPTIONS\n"
                                 + ")");
 
-        String sql = "insert into append_only_sink_with_pk select a, b, c from 
append_src1";
+        String sql = "insert into pk_append_sink select a, b, c from 
append_src1";
         verifyRelPlanInsert(sql);
     }
 
@@ -557,7 +567,7 @@ public class DuplicateChangesInferRuleTest extends 
TableTestBase {
     }
 
     private String getSinkTableName() {
-        return testSinkWithPk ? "pk_snk" : "no_pk_snk";
+        return testSinkWithPk ? "pk_upsert_snk" : "no_pk_snk";
     }
 
     private void enableMiniBatch() {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
index b2fccab63ef..e762799c559 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
@@ -19,7 +19,7 @@ limitations under the License.
   <TestCase name="testAggregate[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2])
 +- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -28,7 +28,7 @@ LogicalSink(table=[default_catalog.default_database.pk_snk], 
fields=[a, EXPR$1,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2], 
duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
@@ -40,7 +40,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, EXPR$1, EXPR$2]
   <TestCase name="testAppendOnlySinkWithPk[testSinkWithPk = false]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.append_only_sink_with_pk], 
fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_append_sink], 
fields=[a, b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
@@ -48,7 +48,7 @@ 
LogicalSink(table=[default_catalog.default_database.append_only_sink_with_pk], f
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.append_only_sink_with_pk], 
fields=[a, b, c], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_append_sink], fields=[a, b, 
c], duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
       +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -76,7 +76,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testCalc[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
@@ -84,7 +84,7 @@ LogicalSink(table=[default_catalog.default_database.pk_snk], 
fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[ALLOW])
       +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
@@ -113,7 +113,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testCalcWithNestedNonDeterministicFilter[testSinkWithPk = 
true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalFilter(condition=[<($2, CAST(CAST(NOW()):INTEGER NOT NULL):BIGINT 
NOT NULL)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -122,7 +122,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], where=[<(c, CAST(CAST(NOW() AS INTEGER) AS 
BIGINT))], duplicateChanges=[ALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
       +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -150,7 +150,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, EXPR$2],
   <TestCase name="testCalcWithNestedNonDeterministicProjection[testSinkWithPk 
= true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, 
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, EXPR$2])
 +- LogicalProject(a=[$0], b=[$1], EXPR$2=[CAST(CAST(NOW()):INTEGER NOT 
NULL):BIGINT NOT NULL])
    +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
@@ -158,7 +158,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, 
EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, b, CAST(CAST(NOW() AS INTEGER) AS BIGINT) AS EXPR$2], 
duplicateChanges=[ALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
       +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1, project=[a, b, rt], metadata=[]]], fields=[a, b, rt], 
duplicateChanges=[DISALLOW])
@@ -187,7 +187,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testCalcWithNonDeterministicFilter1[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalFilter(condition=[<($2, CAST(NOW()):BIGINT NOT NULL)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -196,7 +196,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], where=[<(c, CAST(NOW() AS BIGINT))], 
duplicateChanges=[ALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
       +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -225,7 +225,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testCalcWithNonDeterministicFilter2[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalFilter(condition=[AND(<>($0, 1), <($2, CAST(NOW()):BIGINT NOT 
NULL))])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -234,7 +234,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], where=[AND(<>(a, 1), <(c, CAST(NOW() AS BIGINT)))], 
duplicateChanges=[ALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
       +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
@@ -262,7 +262,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, EXPR$2],
   <TestCase name="testCalcWithNonDeterministicProjection[testSinkWithPk = 
true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, 
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, EXPR$2])
 +- LogicalProject(a=[$0], b=[$1], EXPR$2=[CAST(NOW()):BIGINT NOT NULL])
    +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
@@ -270,7 +270,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, 
EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, b, CAST(NOW() AS BIGINT) AS EXPR$2], 
duplicateChanges=[ALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
       +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1, project=[a, b, rt], metadata=[]]], fields=[a, b, rt], 
duplicateChanges=[DISALLOW])
@@ -280,7 +280,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, b, EXPR$2], dup
   <TestCase name="testChangelogNormalize[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_retract_snk], 
fields=[a, b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_src]])
@@ -288,8 +288,8 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
-+- ChangelogNormalize(key=[a], duplicateChanges=[ALLOW])
+Sink(table=[default_catalog.default_database.pk_retract_snk], fields=[a, b, 
c], duplicateChanges=[NONE])
++- ChangelogNormalize(key=[a], duplicateChanges=[DISALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
          +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
@@ -300,7 +300,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, b, c], duplicat
   <TestCase name="testDropUpdateBefore[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_src]])
@@ -308,17 +308,18 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[ALLOW])
-      +- TableSourceScan(table=[[default_catalog, default_database, 
retract_src]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
+      +- DropUpdateBefore(duplicateChanges=[ALLOW])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
retract_src]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testExpandAndIncrementalAggregate[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2])
 +- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[COUNT(DISTINCT $2)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -327,7 +328,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, $f1, $f2], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, $f1, 
$f2], duplicateChanges=[NONE])
 +- Calc(select=[a, CAST($f1 AS VARCHAR(2147483647)) AS $f1, $f2], 
duplicateChanges=[ALLOW])
    +- GlobalGroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
MAX(max$0) AS $f1, $SUM0(count$1) AS $f2], duplicateChanges=[ALLOW])
       +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -346,7 +347,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, $f1, $f2], dupl
   <TestCase name="testIntervalJoin[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$5], c=[$2])
    +- LogicalJoin(condition=[AND(=($0, $4), >($3, -($7, 60000:INTERVAL 
MINUTE)), <($3, +($7, 60000:INTERVAL MINUTE)))], joinType=[inner])
       :- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -357,7 +358,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-59999, leftUpperBound=59999, leftTimeIndex=2, 
rightTimeIndex=2], where=[AND(=(a, a0), >(rt, -(rt0, 60000:INTERVAL MINUTE)), 
<(rt, +(rt0, 60000:INTERVAL MINUTE)))], select=[a, c, rt, a0, b, rt0], 
duplicateChanges=[ALLOW])
       :- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -400,7 +401,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testJoin[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$5], c=[$2])
    +- LogicalJoin(condition=[=($0, $4)], joinType=[inner])
       :- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -411,7 +412,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- Join(joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, a0, b], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], 
duplicateChanges=[ALLOW])
       :- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -449,7 +450,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testLimit[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalSort(fetch=[10])
    +- LogicalProject(a=[$0], b=[$1], c=[$2])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -458,7 +459,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- Limit(offset=[0], fetch=[10], duplicateChanges=[ALLOW])
       +- Exchange(distribution=[single], duplicateChanges=[DISALLOW])
@@ -495,7 +496,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testLookupJoin[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 4}])
       :- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3], pt=[PROCTIME()])
@@ -508,7 +509,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- LookupJoin(table=[default_catalog.default_database.dim_src], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], 
duplicateChanges=[ALLOW])
       +- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
@@ -520,7 +521,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, b, c], duplicat
   <TestCase name="testMiniBatchTwoStageAggregate[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2])
 +- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
    +- LogicalProject(a=[$0], b=[$1], c=[$2])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -529,7 +530,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- GlobalGroupAggregate(groupBy=[a], select=[a, MAX(max$0) AS EXPR$1, 
SUM(sum$1) AS EXPR$2], duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- LocalGroupAggregate(groupBy=[a], select=[a, MAX(b) AS max$0, SUM(c) 
AS sum$1], duplicateChanges=[DISALLOW])
@@ -543,13 +544,13 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, EXPR$1, EXPR$2]
   <TestCase name="testMultiSink1[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, 
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, EXPR$2])
 +- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 2)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
 
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2], 
fields=[a, EXPR$1, EXPR$2])
 +- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -558,13 +559,13 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, 
EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, b, /(c, 2) AS EXPR$2], duplicateChanges=[ALLOW])
    +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
       +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
          +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
 
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2], 
duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
@@ -576,13 +577,13 @@ Sink(table=[default_catalog.default_database.pk_snk2], 
fields=[a, EXPR$1, EXPR$2
   <TestCase name="testMultiSink2[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2])
 +- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
 
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, 
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2], 
fields=[a, b, EXPR$2])
 +- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 2)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -591,14 +592,14 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2], 
duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
          +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
             +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
 
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR$2], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a, b, 
EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, b, /(c, 2) AS EXPR$2], duplicateChanges=[ALLOW])
    +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
       +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
@@ -609,13 +610,13 @@ Sink(table=[default_catalog.default_database.pk_snk2], 
fields=[a, b, EXPR$2], du
   <TestCase name="testMultiSink3[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, 
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, EXPR$2])
 +- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 3)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
 
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, 
EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2], 
fields=[a, b, EXPR$2])
 +- LogicalProject(a=[$0], b=[$1], EXPR$2=[/($2, 2)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -624,13 +625,13 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, EXPR$2], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, 
EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, b, /(c, 3) AS EXPR$2], duplicateChanges=[ALLOW])
    +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[ALLOW])
       +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[ALLOW])
          +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
 
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, b, EXPR$2], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a, b, 
EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, b, /(c, 2) AS EXPR$2], duplicateChanges=[ALLOW])
    +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[ALLOW])
       +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[ALLOW])
@@ -641,13 +642,13 @@ Sink(table=[default_catalog.default_database.pk_snk2], 
fields=[a, b, EXPR$2], du
   <TestCase name="testMultiSink4[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2])
 +- LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[SUM($2)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
append_src1]])
 
-LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk2], 
fields=[a, EXPR$1, EXPR$2])
 +- LogicalAggregate(group=[{0}], EXPR$1=[MIN($1)], EXPR$2=[MAX($2)])
    +- LogicalProject(a=[$0], b=[$1], c=[+($2, 1)])
       +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -656,14 +657,14 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS EXPR$1, SUM(c) AS EXPR$2], 
duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
          +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
             +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
 
-Sink(table=[default_catalog.default_database.pk_snk2], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk2], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- GroupAggregate(groupBy=[a], select=[a, MIN(b) AS EXPR$1, MAX(c) AS EXPR$2], 
duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, +(c, 1) AS c], duplicateChanges=[DISALLOW])
@@ -699,7 +700,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, EXPR$1, EXPR
   <TestCase name="testOneStageWindowAggregate[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2])
 +- LogicalProject(a=[$0], EXPR$1=[$3], EXPR$2=[$4])
    +- LogicalAggregate(group=[{0, 1, 2}], EXPR$1=[MAX($3)], EXPR$2=[MAX($4)])
       +- LogicalProject(a=[$0], window_start=[$4], window_end=[$5], b=[$1], 
c=[$2])
@@ -711,7 +712,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, EXPR$1, EXPR$2], duplicateChanges=[ALLOW])
    +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rt], size=[1 
min])], select=[a, MAX(b) AS EXPR$1, MAX(c) AS EXPR$2, start('w$) AS 
window_start, end('w$) AS window_end], duplicateChanges=[ALLOW])
       +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -723,7 +724,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, EXPR$1, EXPR$2]
   <TestCase name="testRank[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalFilter(condition=[<=($3, 1)])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], rank_num=[ROW_NUMBER() OVER 
(PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
@@ -733,19 +734,37 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[a], orderBy=[c DESC], 
select=[a, b, c], duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
          +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
             +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRetractSink[testSinkWithPk = true]">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.pk_retract_snk], 
fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.pk_retract_snk], fields=[a, b, 
c], duplicateChanges=[NONE])
++- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
+   +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
retract_src]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testSinkWithMaterialize[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.another_pk_snk], 
fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.another_pk_upsert_snk], 
fields=[a, b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_src]])
@@ -753,10 +772,10 @@ 
LogicalSink(table=[default_catalog.default_database.another_pk_snk], fields=[a,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.another_pk_snk], fields=[a, b, 
c], duplicateChanges=[NONE])
-+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
-   +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[ALLOW])
-      +- TableSourceScan(table=[[default_catalog, default_database, 
retract_src]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
+Sink(table=[default_catalog.default_database.another_pk_upsert_snk], 
fields=[a, b, c], upsertMaterialize=[true], duplicateChanges=[NONE])
++- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
+   +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL SECOND)], 
duplicateChanges=[DISALLOW])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
retract_src]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
 ]]>
     </Resource>
   </TestCase>
@@ -794,7 +813,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testTemporalJoin[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 4}])
       :- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3], pt=[PROCTIME()])
@@ -808,7 +827,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- TemporalJoin(joinType=[InnerJoin], where=[AND(=(a, a0), 
__TEMPORAL_JOIN_CONDITION(pt, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(a0), 
__TEMPORAL_JOIN_LEFT_KEY(a), __TEMPORAL_JOIN_RIGHT_KEY(a0)))], select=[a, b, c, 
pt, a0], duplicateChanges=[ALLOW])
       :- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -850,7 +869,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, EXPR$1, EXPR
   <TestCase name="testTwoStageWindowAggregate[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, 
EXPR$1, EXPR$2])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2])
 +- LogicalProject(a=[$0], EXPR$1=[$3], EXPR$2=[$4])
    +- LogicalAggregate(group=[{0, 1, 2}], EXPR$1=[MAX($3)], EXPR$2=[MAX($4)])
       +- LogicalProject(a=[$0], window_start=[$4], window_end=[$5], b=[$1], 
c=[$2])
@@ -862,7 +881,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1,
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, EXPR$1, 
EXPR$2], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
EXPR$1, EXPR$2], duplicateChanges=[NONE])
 +- Calc(select=[a, EXPR$1, EXPR$2], duplicateChanges=[ALLOW])
    +- GlobalWindowAggregate(groupBy=[a], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[a, MAX(max$0) AS 
EXPR$1, MAX(max$1) AS EXPR$2, start('w$) AS window_start, end('w$) AS 
window_end], duplicateChanges=[ALLOW])
       +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -901,7 +920,7 @@ Sink(table=[default_catalog.default_database.no_pk_snk], 
fields=[a, b, c], dupli
   <TestCase name="testUnion[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalUnion(all=[true])
    :- LogicalProject(a=[$0], b=[$1], c=[$2])
    :  +- LogicalWatermarkAssigner(rowtime=[rt], watermark=[-($3, 1000:INTERVAL 
SECOND)])
@@ -913,7 +932,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Union(all=[true], union=[a, b, c], duplicateChanges=[ALLOW])
    :- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
    :  +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
@@ -927,7 +946,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, b, c], duplicat
   <TestCase name="testWindowTVF[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col], 
fields=[a, b, c, window_start, window_end])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col],
 fields=[a, b, c, window_start, window_end])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], window_start=[$4], window_end=[$5])
    +- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'rt'), 60000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) *ROWTIME* rt, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3])
@@ -937,7 +956,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col], field
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk_with_time_col], fields=[a, 
b, c, window_start, window_end], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col], 
fields=[a, b, c, window_start, window_end], duplicateChanges=[NONE])
 +- Calc(select=[a, b, c, window_start, window_end], duplicateChanges=[ALLOW])
    +- WindowTableFunction(window=[TUMBLE(time_col=[rt], size=[1 min])], 
duplicateChanges=[ALLOW])
       +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
@@ -948,7 +967,7 @@ 
Sink(table=[default_catalog.default_database.pk_snk_with_time_col], fields=[a, b
   <TestCase name="testWindowJoin[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, 
b, c])
 +- LogicalProject(a=[$0], b=[$8], c=[$2])
    +- LogicalJoin(condition=[AND(=($0, $7), =($4, $11), =($5, $12))], 
joinType=[inner])
       :- LogicalProject(a=[$0], b=[$1], c=[$2], rt=[$3], window_start=[$4], 
window_end=[$5], window_time=[$6])
@@ -965,7 +984,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c])
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, c, window_start, 
window_end, a0, b, window_start0, window_end0], duplicateChanges=[ALLOW])
       :- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
@@ -984,7 +1003,7 @@ Sink(table=[default_catalog.default_database.pk_snk], 
fields=[a, b, c], duplicat
   <TestCase name="testWindowRank[testSinkWithPk = true]">
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col], 
fields=[a, b, c, window_start, window_end])
+LogicalSink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col],
 fields=[a, b, c, window_start, window_end])
 +- LogicalProject(a=[$0], b=[$1], c=[$2], window_start=[$3], window_end=[$4])
    +- LogicalFilter(condition=[<=($5, 1)])
       +- LogicalProject(a=[$0], b=[$1], c=[$2], window_start=[$4], 
window_end=[$5], rank_num=[ROW_NUMBER() OVER (PARTITION BY $0, $4, $5 ORDER BY 
$2 DESC NULLS LAST)])
@@ -996,7 +1015,7 @@ 
LogicalSink(table=[default_catalog.default_database.pk_snk_with_time_col], field
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.pk_snk_with_time_col], fields=[a, 
b, c, window_start, window_end], duplicateChanges=[NONE])
+Sink(table=[default_catalog.default_database.pk_upsert_snk_with_time_col], 
fields=[a, b, c, window_start, window_end], duplicateChanges=[NONE])
 +- WindowRank(window=[TUMBLE(win_start=[window_start], win_end=[window_end], 
size=[1 min])], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], 
partitionBy=[a], orderBy=[c DESC], select=[a, b, c, window_start, window_end], 
duplicateChanges=[ALLOW])
    +- Exchange(distribution=[hash[a]], duplicateChanges=[DISALLOW])
       +- Calc(select=[a, b, c, window_start, window_end], 
duplicateChanges=[DISALLOW])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index 201f4ec63bf..7cd943cb20a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -16,6 +16,30 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testAppendSink">
+    <Resource name="sql">
+      <![CDATA[insert into append_snk select * from src1 join src2 on src1.a1 
= src2.b1 and src1.a2 = src2.b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.append_snk], fields=[a0, 
a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.append_snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
+   +- Exchange(distribution=[hash[b1, b2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testCdcSource">
     <Resource name="sql">
       <![CDATA[insert into snk select * from src1 join cdc_src on src1.a1 = 
cdc_src.b1 and src1.a2 = cdc_src.b2]]>
@@ -349,7 +373,43 @@ Sink(table=[default_catalog.default_database.snk], 
fields=[a0, a1, a2, a3, b0, b
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testMultiRootsWithoutReusingDeltaJoin">
+  <TestCase name="testMultiRootsWithoutReusingDeltaJoin1">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+
+LogicalSink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Exchange(distribution=[hash[a1, a2]])(reuse_id=[1])
++- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
+
+Exchange(distribution=[hash[b1, b2]])(reuse_id=[2])
++- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1])
++- DeltaJoin(joinType=[InnerJoin], where=[((a1 = b1) AND (a2 = b2))], 
select=[a0, a1, a2, a3, b0, b2, b1])
+   :- Reused(reference_id=[1])
+   +- Reused(reference_id=[2])
+
+Sink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, a3, 
b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[((a1 = b1) AND (a2 = b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Reused(reference_id=[1])
+   +- Reused(reference_id=[2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiRootsWithoutReusingDeltaJoin2">
     <Resource name="ast">
       <![CDATA[
 LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
@@ -582,35 +642,6 @@ Sink(table=[default_catalog.default_database.snk], 
fields=[a0, a1, a2, a3, b0, b
    :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
    +- Exchange(distribution=[hash[b1, b2]])
       +- TableSourceScan(table=[[default_catalog, default_database, 
non_index_src]], fields=[b0, b2, b1])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testWithAggregatingAfterJoin">
-    <Resource name="sql">
-      <![CDATA[insert into snk select a0, max(a1), max(a2), max(a3), max(b0), 
max(b2), b1 from src1 join src2 on src1.a1 = src2.b1 and src1.a2 = src2.b2 
group by a0, b1]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, 
EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
-+- LogicalProject(a0=[$0], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[$5], 
EXPR$5=[$6], b1=[$1])
-   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)], EXPR$2=[MAX($3)], 
EXPR$3=[MAX($4)], EXPR$4=[MAX($5)], EXPR$5=[MAX($6)])
-      +- LogicalProject(a0=[$0], b1=[$6], a1=[$1], a2=[$2], a3=[$3], b0=[$4], 
b2=[$5])
-         +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], 
joinType=[inner])
-            :- LogicalTableScan(table=[[default_catalog, default_database, 
src1]])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
src2]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Sink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, EXPR$2, 
EXPR$3, EXPR$4, EXPR$5, b1])
-+- Calc(select=[a0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
-   +- GroupAggregate(groupBy=[a0, b1], select=[a0, b1, MAX(a1) AS EXPR$1, 
MAX(a2) AS EXPR$2, MAX(a3) AS EXPR$3, MAX(b0) AS EXPR$4, MAX(b2) AS EXPR$5])
-      +- Exchange(distribution=[hash[a0, b1]])
-         +- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], 
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
-            :- Exchange(distribution=[hash[a1, a2]])
-            :  +- TableSourceScan(table=[[default_catalog, default_database, 
src1]], fields=[a0, a1, a2, a3])
-            +- Exchange(distribution=[hash[b1, b2]])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
src2]], fields=[b0, b2, b1])
 ]]>
     </Resource>
   </TestCase>
@@ -641,6 +672,59 @@ Sink(table=[default_catalog.default_database.tmp_snk], 
fields=[a0, a1, a2, a3, b
    :        +- TableSourceScan(table=[[default_catalog, default_database, 
src2]], fields=[b0, b2, b1])
    +- Exchange(distribution=[hash[b1, b2]])
       +- TableSourceScan(table=[[default_catalog, default_database, src3]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRetractSink">
+    <Resource name="sql">
+      <![CDATA[insert into retract_snk select * from src1 join src2 on src1.a1 
= src2.b1 and src1.a2 = src2.b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.retract_snk], fields=[a0, 
a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.retract_snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
+   +- Exchange(distribution=[hash[b1, b2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithAggregatingAfterJoin">
+    <Resource name="sql">
+      <![CDATA[insert into snk select a0, max(a1), max(a2), max(a3), max(b0), 
max(b2), b1 from src1 join src2 on src1.a1 = src2.b1 and src1.a2 = src2.b2 
group by a0, b1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, 
EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
++- LogicalProject(a0=[$0], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[$5], 
EXPR$5=[$6], b1=[$1])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$1=[MAX($2)], EXPR$2=[MAX($3)], 
EXPR$3=[MAX($4)], EXPR$4=[MAX($5)], EXPR$5=[MAX($6)])
+      +- LogicalProject(a0=[$0], b1=[$6], a1=[$1], a2=[$2], a3=[$3], b0=[$4], 
b2=[$5])
+         +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], 
joinType=[inner])
+            :- LogicalTableScan(table=[[default_catalog, default_database, 
src1]])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, EXPR$1, EXPR$2, 
EXPR$3, EXPR$4, EXPR$5, b1])
++- Calc(select=[a0, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, b1])
+   +- GroupAggregate(groupBy=[a0, b1], select=[a0, b1, MAX(a1) AS EXPR$1, 
MAX(a2) AS EXPR$2, MAX(a3) AS EXPR$3, MAX(b0) AS EXPR$4, MAX(b2) AS EXPR$5])
+      +- Exchange(distribution=[hash[a0, b1]])
+         +- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], 
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[a1, a2]])
+            :  +- TableSourceScan(table=[[default_catalog, default_database, 
src1]], fields=[a0, a1, a2, a3])
+            +- Exchange(distribution=[hash[b1, b2]])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
src2]], fields=[b0, b2, b1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
index 7822ff82dea..a285abe7cad 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
@@ -32,7 +32,7 @@
             "bounded" : "false",
             "connector" : "values",
             "failing-source" : "true",
-            "sink-changelog-mode-enforced" : "I,UA,UB,D",
+            "sink-changelog-mode-enforced" : "I,UA,D",
             "sink-insert-only" : "false"
           }
         }
@@ -82,7 +82,7 @@
             "bounded" : "false",
             "connector" : "values",
             "failing-source" : "true",
-            "sink-changelog-mode-enforced" : "I,UA,UB,D",
+            "sink-changelog-mode-enforced" : "I,UA,D",
             "sink-insert-only" : "false"
           }
         }
@@ -140,7 +140,7 @@
                 "bounded" : "false",
                 "connector" : "values",
                 "failing-source" : "true",
-                "sink-changelog-mode-enforced" : "I,UA,UB,D",
+                "sink-changelog-mode-enforced" : "I,UA,D",
                 "sink-insert-only" : "false"
               }
             }
@@ -206,7 +206,7 @@
                 "bounded" : "false",
                 "connector" : "values",
                 "failing-source" : "true",
-                "sink-changelog-mode-enforced" : "I,UA,UB,D",
+                "sink-changelog-mode-enforced" : "I,UA,D",
                 "sink-insert-only" : "false"
               }
             }
@@ -309,7 +309,7 @@
             "async" : "true",
             "bounded" : "false",
             "connector" : "values",
-            "sink-changelog-mode-enforced" : "I,UA,UB,D",
+            "sink-changelog-mode-enforced" : "I,UA,D",
             "sink-insert-only" : "false"
           }
         }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index 8c92844dec2..50347c42915 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -42,7 +42,7 @@ class DeltaJoinTest extends TableTestBase {
     options.put("connector", "values")
     options.put("bounded", "false")
     options.put("sink-insert-only", "false")
-    options.put("sink-changelog-mode-enforced", "I,UA,UB,D")
+    options.put("sink-changelog-mode-enforced", "I,UA,D")
     options.put("async", "true")
     options
   }
@@ -254,7 +254,8 @@ class DeltaJoinTest extends TableTestBase {
   }
 
   @Test
-  def testMultiRootsWithoutReusingDeltaJoin(): Unit = {
+  def testMultiRootsWithoutReusingDeltaJoin1(): Unit = {
+    // one sink has pk but another doesn't
     util.tableEnv.executeSql(
       "create table snk2 like snk(" +
         "  EXCLUDING CONSTRAINTS" +
@@ -274,6 +275,31 @@ class DeltaJoinTest extends TableTestBase {
     util.verifyExecPlan(stmt)
   }
 
+  @Test
+  def testMultiRootsWithoutReusingDeltaJoin2(): Unit = {
+    // one sink is an upsert sink but another is a retract sink
+    util.tableEnv
+      .executeSql(
+        "CREATE TABLE snk2 WITH (\n"
+          + "  'sink-changelog-mode-enforced' = 'I,UA,UB,D'"
+          + ") LIKE snk (\n"
+          + "  OVERWRITING OPTIONS\n"
+          + ")")
+
+    val stmt = tEnv.createStatementSet()
+    stmt.addInsertSql(
+      "insert into snk select * from src1 join src2 " +
+        "on src1.a1 = src2.b1 " +
+        "and src1.a2 = src2.b2")
+
+    stmt.addInsertSql(
+      "insert into snk2 select * from src1 join src2 " +
+        "on src1.a1 = src2.b1 " +
+        "and src1.a2 = src2.b2")
+
+    util.verifyExecPlan(stmt)
+  }
+
   @Test
   def testExplainPlanAdvice(): Unit = {
     util.verifyExplainInsert(
@@ -347,6 +373,10 @@ class DeltaJoinTest extends TableTestBase {
 
   @Test
   def testCdcSource(): Unit = {
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+      UpsertMaterialize.NONE)
+
     util.tableEnv.executeSql(
       "create table cdc_src with ('changelog-mode' = 'I,UA,UB,D') " +
         "like src2 (OVERWRITING OPTIONS)")
@@ -372,6 +402,10 @@ class DeltaJoinTest extends TableTestBase {
 
   @Test
   def testWithAggregatingSourceTableBeforeJoin(): Unit = {
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+      UpsertMaterialize.NONE)
+
     util.verifyRelPlanInsert(
       "insert into snk select * from ( " +
         "  select distinct max(a0) as a0, a1, max(a2) as a2, max(a3) as a3 
from src1 group by a1" +
@@ -382,6 +416,10 @@ class DeltaJoinTest extends TableTestBase {
 
   @Test
   def testWithAggregatingAfterJoin(): Unit = {
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+      UpsertMaterialize.NONE)
+
     util.verifyRelPlanInsert(
       "insert into snk " +
         "select a0, max(a1), max(a2), max(a3), max(b0), max(b2), b1 from src1 
join src2 " +
@@ -545,6 +583,39 @@ class DeltaJoinTest extends TableTestBase {
     util.verifyRelPlanInsert("insert into tmp_snk select a0, a1 from src1")
   }
 
+  @Test
+  def testRetractSink(): Unit = {
+    util.tableEnv
+      .executeSql(
+        "CREATE TABLE retract_snk WITH (\n"
+          + "  'sink-changelog-mode-enforced' = 'I,UA,UB,D'"
+          + ") LIKE snk (\n"
+          + "  OVERWRITING OPTIONS\n"
+          + ")")
+
+    util.verifyRelPlanInsert(
+      "insert into retract_snk select * from src1 join src2 " +
+        "on src1.a1 = src2.b1 " +
+        "and src1.a2 = src2.b2")
+  }
+
+  @Test
+  def testAppendSink(): Unit = {
+    util.tableEnv
+      .executeSql(
+        "CREATE TABLE append_snk WITH (\n"
+          + "  'sink-insert-only' = 'true',\n"
+          + "  'sink-changelog-mode-enforced' = 'I'\n"
+          + ") LIKE snk (\n"
+          + "  OVERWRITING OPTIONS\n"
+          + ")")
+
+    util.verifyRelPlanInsert(
+      "insert into append_snk select * from src1 join src2 " +
+        "on src1.a1 = src2.b1 " +
+        "and src1.a2 = src2.b2")
+  }
+
   private def addTable(
       tableName: String,
       schema: Schema,

Reply via email to