Au-Miner commented on code in PR #26907:
URL: https://github.com/apache/flink/pull/26907#discussion_r2278400147


##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala:
##########
@@ -545,6 +571,39 @@ class DeltaJoinTest extends TableTestBase {
     util.verifyRelPlanInsert("insert into tmp_snk select a0, a1 from src1")
   }
 

Review Comment:
   Another curious thing is why TABLE_EXEC_SINK_UPSERT_MATERIALIZE should be 
set to NONE. I have observed that other tests also have the same 
`upsertMaterialize=[true]` in DeltaJoinTest. xml



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java:
##########
@@ -310,21 +310,29 @@ void testSinkWithMaterialize() {
 
         util.tableEnv()
                 .executeSql(
-                        "CREATE TABLE another_pk_snk (\n"
+                        "CREATE TABLE another_pk_upsert_snk (\n"
                                 + "  primary key (b) not enforced\n"
-                                + ") LIKE pk_snk (\n"
+                                + ") LIKE pk_upsert_snk (\n"
                                 + "  EXCLUDING CONSTRAINTS\n"
                                 + ")");
 
-        String sql = "insert into another_pk_snk select a,b,c from 
retract_src";
+        String sql = "insert into another_pk_upsert_snk select a,b,c from 
retract_src";
+        verifyRelPlanInsert(sql);
+    }
+
+    @TestTemplate
+    void testRetractSink() {
+        assumeTrue(testSinkWithPk);
+
+        String sql = "insert into pk_retract_snk select a,b,c from 
retract_src";
         verifyRelPlanInsert(sql);
     }
 
     @TestTemplate
     void testChangelogNormalize() {
         assumeTrue(testSinkWithPk);
 
-        String sql = "insert into pk_snk select a,b,c from upsert_src";
+        String sql = "insert into pk_retract_snk select a,b,c from upsert_src";

Review Comment:
   upsert_src is only used here, can we move the ddl here



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala:
##########
@@ -545,6 +571,39 @@ class DeltaJoinTest extends TableTestBase {
     util.verifyRelPlanInsert("insert into tmp_snk select a0, a1 from src1")
   }
 
+  @Test
+  def testRetractSink(): Unit = {
+    util.tableEnv
+      .executeSql(
+        "CREATE TABLE retract_snk WITH (\n"
+          + "  'sink-changelog-mode-enforced' = 'I,UA,UB,D'"
+          + ") LIKE snk (\n"
+          + "  OVERWRITING OPTIONS\n"
+          + ")")
+
+    util.verifyRelPlanInsert(
+      "insert into retract_snk select * from src1 join src2 " +
+        "on src1.a1 = src2.b1 " +
+        "and src1.a2 = src2.b2")
+  }

Review Comment:
   Can we set TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY to FORCE and provide some 
different exception messages to indicate the reason for not supporting it



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java:
##########
@@ -284,23 +292,15 @@ void testExpandAndIncrementalAggregate() {
                 
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
 
         String sql =
-                "insert into pk_snk select a, max(b), count(distinct c) from 
append_src1 group by a";
+                "insert into pk_upsert_snk select a, max(b), count(distinct c) 
from append_src1 group by a";
         verifyRelPlanInsert(sql);
     }
 
     @TestTemplate
     void testDropUpdateBefore() {
         assumeTrue(testSinkWithPk);
 
-        util.tableEnv()
-                .executeSql(
-                        "CREATE TABLE upsert_snk WITH (\n"
-                                + "  'sink-changelog-mode-enforced' = 'I,UA,D'"
-                                + ") LIKE pk_snk (\n"
-                                + "  OVERWRITING OPTIONS\n"
-                                + ")");
-
-        String sql = "insert into pk_snk select a,b,c from retract_src";
+        String sql = "insert into pk_upsert_snk select a,b,c from retract_src";

Review Comment:
   Some places are a,b,c, while others are a, b, c. The code style for spaces 
can be unified.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.java:
##########
@@ -310,21 +310,29 @@ void testSinkWithMaterialize() {
 
         util.tableEnv()
                 .executeSql(
-                        "CREATE TABLE another_pk_snk (\n"
+                        "CREATE TABLE another_pk_upsert_snk (\n"
                                 + "  primary key (b) not enforced\n"
-                                + ") LIKE pk_snk (\n"
+                                + ") LIKE pk_upsert_snk (\n"
                                 + "  EXCLUDING CONSTRAINTS\n"
                                 + ")");
 
-        String sql = "insert into another_pk_snk select a,b,c from 
retract_src";
+        String sql = "insert into another_pk_upsert_snk select a,b,c from 
retract_src";
+        verifyRelPlanInsert(sql);
+    }
+
+    @TestTemplate
+    void testRetractSink() {
+        assumeTrue(testSinkWithPk);
+
+        String sql = "insert into pk_retract_snk select a,b,c from 
retract_src";

Review Comment:
   Maybe there is essentially no difference between testRetractSink and 
testChangelogNormalize, both of which are caused by pk_detract_stnk resulting 
in duplicateChanges being set to disallow. Therefore, they can be merged into 
one test and the ddl of pk_detract_stnk can be moved here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to