twalthr commented on code in PR #26893:
URL: https://github.com/apache/flink/pull/26893#discussion_r2275826167


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * {@link TableTestProgram} definitions for semantic testing {@link 
StreamExecChangelogNormalize}.
+ */
+public class ChangelogNormalizeSemanticTestPrograms {
+
+    private static final String[] SINK_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+    private static final String[] SOURCE_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_FILTER =
+            TableTestProgram.of("upsert-with-filter", "validates upsert with 
filter")

Review Comment:
   synchronize name with test id and description



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java:
##########
@@ -151,5 +151,42 @@ public SpecificBuilder addPartitionKeys(String... 
partitionKeys) {
             this.partitionKeys.addAll(Arrays.asList(partitionKeys));
             return (SpecificBuilder) this;
         }
+
+        public SpecificBuilder enforceAppendOnly() {
+            this.options.put("connector", "values");
+            this.options.put("changelog-mode", "I");
+            this.options.put("sink-changelog-mode-enforced", "I");
+            return (SpecificBuilder) this;
+        }
+
+        public SpecificBuilder enforceUpsert() {
+            return enforceUpsert(false);
+        }
+
+        public SpecificBuilder enforceUpsert(boolean withPartialDeleteOnly) {
+            this.options.put("connector", "values");
+            this.options.put("changelog-mode", "I,UA,D");
+            this.options.put("sink-changelog-mode-enforced", "I,UA,D");
+            final String withPartialDeleteOnlyStr =
+                    Boolean.valueOf(withPartialDeleteOnly).toString();
+            this.options.put("source.produces-delete-by-key", 
withPartialDeleteOnlyStr);
+            this.options.put("sink.supports-delete-by-key", 
withPartialDeleteOnlyStr);
+            return (SpecificBuilder) this;
+        }
+
+        public SpecificBuilder enforceRetract() {
+            return enforceRetract(false);
+        }
+
+        public SpecificBuilder enforceRetract(boolean withPartialDeleteOnly) {

Review Comment:
   partial deletes are a pure upsert property. they don't exist in retract mode



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestStep.java:
##########
@@ -151,5 +151,42 @@ public SpecificBuilder addPartitionKeys(String... 
partitionKeys) {
             this.partitionKeys.addAll(Arrays.asList(partitionKeys));
             return (SpecificBuilder) this;
         }
+
+        public SpecificBuilder enforceAppendOnly() {
+            this.options.put("connector", "values");
+            this.options.put("changelog-mode", "I");
+            this.options.put("sink-changelog-mode-enforced", "I");
+            return (SpecificBuilder) this;
+        }
+
+        public SpecificBuilder enforceUpsert() {

Review Comment:
   How about we change the methods to `inMode(ChangelogMode)`. Then one can use 
all the prepared methods from ChangelogMode: 
`builder.inMode(ChangelogMode.all())`.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * {@link TableTestProgram} definitions for semantic testing {@link 
StreamExecChangelogNormalize}.
+ */
+public class ChangelogNormalizeSemanticTestPrograms {
+
+    private static final String[] SINK_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+    private static final String[] SOURCE_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_FILTER =
+            TableTestProgram.of("upsert-with-filter", "validates upsert with 
filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 0, "dd"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "dd"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 5, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-D[one, 2, bb]",
+                                            "+I[two, 0, dd]",
+                                            "-U[two, 0, dd]",
+                                            "+U[two, 1, dd]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 3")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_KEY_FILTER =
+            TableTestProgram.of("upsert-key-filter", "validates upsert with 
key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.INSERT, "two", 
1, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "d"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.DELETE, "two", 
1, "d"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[two, 1, d]",
+                                            "-D[two, 1, d]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE a <> 'three'")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_RETRACT_SINK =
+            TableTestProgram.of("upsert-with-retract-sink", "validates 
changelog normalize")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.INSERT, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "cc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[three, 3, ccc]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+
+    static final TableTestProgram CHANGELOG_NORMALIZE_WITH_FILTER =
+            TableTestProgram.of(
+                            "changelog-normalize-filter",
+                            "validates changelog normalize with filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceRetract()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .enforceRetract()
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "+U[three, 3, ccc]",
+                                            "+U[three, 3, ccc]",
+                                            "+I[one, 4, aaaa]",
+                                            "+U[one, 4, aaaa]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 10")
+                    .build();
+
+    static final TableTestProgram CHANGELOG_NORMALIZE_WITH_KEY_FILTER =
+            TableTestProgram.of(
+                            "changelog-normalize-key-filter",
+                            "validates changelog normalize with key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceRetract()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.INSERT, "two", 
1, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "d"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "cc"),
+                                            Row.ofKind(RowKind.DELETE, "two", 
1, "d"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .enforceRetract()
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "+I[one, 4, aaaa]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[two, 1, d]",
+                                            "+U[two, 1, d]",
+                                            "-D[two, 1, d]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE a <> 'three'")
+                    .build();
+
+    static final TableTestProgram CHANGELOG_NORMALIZE =
+            TableTestProgram.of("changelog-normalize", "validates changelog 
normalize")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceRetract()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "+U[three, 3, ccc]",
+                                            "+I[one, 4, aaaa]",
+                                            "+U[one, 4, aaaa]",
+                                            "+U[three, 3, ccc]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")

Review Comment:
   test cases that are missing now:
   - upsert source with partial deletes and UA, D. This corresponds to Kafka.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * {@link TableTestProgram} definitions for semantic testing {@link 
StreamExecChangelogNormalize}.
+ */
+public class ChangelogNormalizeSemanticTestPrograms {
+
+    private static final String[] SINK_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+    private static final String[] SOURCE_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_FILTER =
+            TableTestProgram.of("upsert-with-filter", "validates upsert with 
filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 0, "dd"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "dd"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 5, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-D[one, 2, bb]",
+                                            "+I[two, 0, dd]",
+                                            "-U[two, 0, dd]",
+                                            "+U[two, 1, dd]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 3")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_KEY_FILTER =
+            TableTestProgram.of("upsert-key-filter", "validates upsert with 
key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.INSERT, "two", 
1, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "d"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.DELETE, "two", 
1, "d"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[two, 1, d]",
+                                            "-D[two, 1, d]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE a <> 'three'")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_RETRACT_SINK =
+            TableTestProgram.of("upsert-with-retract-sink", "validates 
changelog normalize")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.INSERT, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "cc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[three, 3, ccc]",
+                                            "-D[three, 3, ccc]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+
+    static final TableTestProgram CHANGELOG_NORMALIZE_WITH_FILTER =
+            TableTestProgram.of(
+                            "changelog-normalize-filter",

Review Comment:
   are you sure changelog normalize is added in this case?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * {@link TableTestProgram} definitions for semantic testing {@link 
StreamExecChangelogNormalize}.
+ */
+public class ChangelogNormalizeSemanticTestPrograms {
+
+    private static final String[] SINK_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+    private static final String[] SOURCE_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_FILTER =
+            TableTestProgram.of("upsert-with-filter", "validates upsert with 
filter")

Review Comment:
   ```suggestion
               TableTestProgram.of("upsert-non-key-filter", "validates upsert 
with filter")
   ```



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/utils/DeduplicateFunctionHelper.java:
##########
@@ -292,6 +292,19 @@ public static boolean shouldKeepCurrentRow(
         }
     }
 
+    private static boolean areRowsWithSameContent(
+            RecordEqualiser equaliser, RowData prevRow, RowData currentRow) {
+        final RowKind currentRowKind = currentRow.getRowKind();
+        if (currentRowKind == RowKind.UPDATE_AFTER) {
+            // setting row kind to prevRowKind to check whether the row 
content is the same
+            currentRow.setRowKind(RowKind.INSERT);
+            final boolean result = equaliser.equals(prevRow, currentRow);
+            currentRow.setRowKind(currentRowKind);
+            return result;
+        }
+        return equaliser.equals(prevRow, currentRow);

Review Comment:
   but why can't we immediately return false, the the row kind is different, it 
will return false



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeSemanticTestPrograms.java:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * {@link TableTestProgram} definitions for semantic testing {@link 
StreamExecChangelogNormalize}.
+ */
+public class ChangelogNormalizeSemanticTestPrograms {
+
+    private static final String[] SINK_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+    private static final String[] SOURCE_SCHEMA = {
+        "a STRING", "b INT", "c STRING", "PRIMARY KEY(a) NOT ENFORCED"
+    };
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_FILTER =
+            TableTestProgram.of("upsert-with-filter", "validates upsert with 
filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 0, "dd"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "dd"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 5, "ccc"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-D[one, 2, bb]",
+                                            "+I[two, 0, dd]",
+                                            "-U[two, 0, dd]",
+                                            "+U[two, 1, dd]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE b < 3")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_KEY_FILTER =
+            TableTestProgram.of("upsert-key-filter", "validates upsert with 
key filter")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .enforceUpsert()
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "one", 
1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 2, "bb"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.INSERT, "one", 
4, "aaaa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 4, "aaaa"),
+                                            Row.ofKind(RowKind.INSERT, "two", 
1, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 1, "d"),
+                                            Row.ofKind(RowKind.DELETE, 
"three", 3, "ccc"),
+                                            Row.ofKind(RowKind.DELETE, "two", 
1, "d"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .enforceRetract()
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[one, 1, a]",
+                                            "-U[one, 1, a]",
+                                            "+U[one, 2, bb]",
+                                            "-U[one, 2, bb]",
+                                            "+U[one, 4, aaaa]",
+                                            "+I[two, 1, d]",
+                                            "-D[two, 1, d]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t 
WHERE a <> 'three'")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE_WITH_RETRACT_SINK =
+            TableTestProgram.of("upsert-with-retract-sink", "validates 
changelog normalize")

Review Comment:
   update name of this test. all tests use a retract sink now. what is so 
special about this test compared to others? I guess no filter?
   ```suggestion
               TableTestProgram.of("upsert-no-filter", "validates changelog 
normalize")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to