loserwang1024 commented on code in PR #3349: URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1610861124
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java: ########## @@ -217,6 +220,11 @@ public JdbcSourceFetchTaskContext createFetchTaskContext(JdbcSourceConfig taskSo @Override public void notifyCheckpointComplete(long checkpointId, Offset offset) throws Exception { Review Comment: What about do it in IncrementalSourceReaderWithCommit#notifyCheckpointComplete and PostgresSourceReader#notifyCheckpointComplete. Reader control when and whether to commit offset, while dialect just support ability to do it. And when put into reader, can just use a long rather than AtomicLong. so we can set checkpointCount = (checkpointCount+1)%3 ########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java: ########## @@ -100,7 +102,9 @@ public PostgresSourceConfig create(int subtaskId) { // The PostgresSource will do snapshot according to its StartupMode. // Do not need debezium to do the snapshot work. - props.put("snapshot.mode", "never"); + props.setProperty("snapshot.mode", "never"); + + props.setProperty("checkpoint.cycle", String.valueOf(checkpointCycle)); Review Comment: I don't know what "checkpoint.cycle" does? Debezium's offet commit cycle? Flink cdc have already been responsible for submitting offset, please not let Debezium do it again(turn off) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org