loserwang1024 commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1610861124


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java:
##########
@@ -217,6 +220,11 @@ public JdbcSourceFetchTaskContext 
createFetchTaskContext(JdbcSourceConfig taskSo
 
     @Override
     public void notifyCheckpointComplete(long checkpointId, Offset offset) 
throws Exception {

Review Comment:
   What about do it in 
IncrementalSourceReaderWithCommit#notifyCheckpointComplete and 
PostgresSourceReader#notifyCheckpointComplete. Reader control when and whether 
to commit offset, while dialect just support ability to do it.
   
   And when put into reader, can just use a long rather than AtomicLong. so we 
can set checkpointCount = (checkpointCount+1)%3



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java:
##########
@@ -100,7 +102,9 @@ public PostgresSourceConfig create(int subtaskId) {
 
         // The PostgresSource will do snapshot according to its StartupMode.
         // Do not need debezium to do the snapshot work.
-        props.put("snapshot.mode", "never");
+        props.setProperty("snapshot.mode", "never");
+
+        props.setProperty("checkpoint.cycle", String.valueOf(checkpointCycle));

Review Comment:
   I don't know what "checkpoint.cycle" does? Debezium's offet commit cycle? 
Flink cdc have already been responsible for submitting offset, please not let 
Debezium do it again(turn off)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to