leonardBang commented on a change in pull request #14160:
URL: https://github.com/apache/flink/pull/14160#discussion_r530101454



##########
File path: docs/dev/table/connectors/formats/canal.zh.md
##########
@@ -219,6 +219,17 @@ Format 参数
     </tbody>
 </table>
 
+注意事项
+----------------
+
+### 重复的变更事件
+
+在正常的操作环境下,Canal 应用能以 **exactly-once** 的语义投递每条变更事件。在这种情况下,Flink 消费 Canal 
产生的变更事件能够工作地很好。

Review comment:
       ```suggestion
   在正常的操作环境下,Canal 应用能以 **exactly-once** 的语义投递每条变更事件。在这种情况下,Flink 消费 Canal 
产生的变更事件能够工作得很好。
   ```

##########
File path: docs/dev/table/connectors/formats/debezium.md
##########
@@ -282,6 +282,14 @@ Use format `debezium-avro-confluent` to interpret Debezium 
Avro messages and for
 Caveats
 ----------------
 
+### Duplicate change events
+
+Under normal operating scenarios, the Debezium application delivers every 
change event **exactly-once**. This works pretty well when Flink consumes 
Debezium produced events in this situation.

Review comment:
       ```suggestion
   Under normal operating scenarios, the Debezium application delivers every 
change event **exactly-once**. Flink works pretty well when consumes Debezium 
produced events in this situation.
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java
##########
@@ -267,6 +302,20 @@ private static void validateScanSourceForStreaming(
                                        scanSource.getClass().getName()
                                )
                        );
+               } else if (!changelogMode.containsOnly(RowKind.INSERT)) {

Review comment:
       The condition`!changelogMode.containsOnly(RowKind.INSERT)` does not 
indicate the source is a `CDC source which is non-upsert`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalJoinITCase.scala
##########
@@ -455,15 +455,12 @@ class TemporalJoinITCase(state: StateBackendMode)
       " ON o.currency = r.currency"
 
     tEnv.executeSql(sql).await()
-    val rawResult = getRawResults("rowtime_default_sink")
     val expected = List(
-      "+I(1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01)",
-      "+I(2,US Dollar,1,2020-08-15T00:02,102,2020-08-15T00:00:02)",
-      "+I(3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04)",
-      "+I(4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01)",
-      "-U(2,US Dollar,1,2020-08-16T00:03,106,2020-08-16T00:02)",
-      "+U(2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02)")
-    assertEquals(expected.sorted, rawResult.sorted)
+      "1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01",
+      "3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04",
+      "4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01",
+      "2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02")

Review comment:
       ```suggestion
         "1,Euro,12,2020-08-15T00:01,114,2020-08-15T00:00:01",
         "2,US Dollar,18,2020-08-16T00:03,106,2020-08-16T00:02",
         "3,RMB,40,2020-08-15T00:03,702,2020-08-15T00:00:04",
         "4,Euro,14,2020-08-16T00:04,118,2020-08-16T00:01")
   ```
   minor: we can give a more readable order that is same as the left table 
input.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to