Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-04 Thread via GitHub


leonardBang merged PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-03 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1624166875


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,17 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  scan.lsn-commit.checkpoints-num-delay
+  optional
+  3
+  Integer
+  The number of checkpoint delays before starting to commit the LSN 
offsets. 
+  The checkpoint LSN offsets will be committed in rolling fashion, the 
earliest checkpoint identifier will be committed first from the delayed 
checkpoints.
+  This will enable continuous recycling of log files, preventing disk 
space issues. 
+  This feature is not available in `PostgreSQLSource` since it is 
deprecated.

Review Comment:
   Hey @leonardBang, I have committed what I meant, please have another look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-03 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1624123074


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,17 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  scan.lsn-commit.checkpoints-num-delay
+  optional
+  3
+  Integer
+  The number of checkpoint delays before starting to commit the LSN 
offsets. 
+  The checkpoint LSN offsets will be committed in rolling fashion, the 
earliest checkpoint identifier will be committed first from the delayed 
checkpoints.
+  This will enable continuous recycling of log files, preventing disk 
space issues. 
+  This feature is not available in `PostgreSQLSource` since it is 
deprecated.

Review Comment:
   Great, would it be fine to add it this to the another `Remarks` section 
below? 
   
   I could revert to previous version and add this note to the below. Or should 
I add it to the options description?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-03 Thread via GitHub


leonardBang commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1623938661


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,17 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  scan.lsn-commit.checkpoints-num-delay
+  optional
+  3
+  Integer
+  The number of checkpoint delays before starting to commit the LSN 
offsets. 
+  The checkpoint LSN offsets will be committed in rolling fashion, the 
earliest checkpoint identifier will be committed first from the delayed 
checkpoints.
+  This will enable continuous recycling of log files, preventing disk 
space issues. 
+  This feature is not available in `PostgreSQLSource` since it is 
deprecated.

Review Comment:
   When consuming PostgreSQL logs, the LSN offset must be committed to trigger 
the log data cleanup for the corresponding slot. However, once the LSN offset 
is committed, earlier offsets become invalid. To ensure access to earlier LSN 
offsets for job recovery, we delay the LSN commit by 3 checkpoints by default. 
This feature is available when config option 
`scan.incremental.snapshot.enabled` is set to true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-03 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1623914704


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,15 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  scan.lsn-commit.checkpoints-num-delay
+  optional
+  3
+  Integer
+  The number of checkpoint delays before starting to commit the LSN 
offsets. 
+  The checkpoint LSN offsets will be committed in rolling fashion, the 
earliest checkpoint identifier will be committed first from the delayed 
checkpoints.

Review Comment:
   Done, please have a look @leonardBang, thanks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-03 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1623904431


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,15 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  scan.lsn-commit.checkpoints-num-delay
+  optional
+  3
+  Integer
+  The number of checkpoint delays before starting to commit the LSN 
offsets. 
+  The checkpoint LSN offsets will be committed in rolling fashion, the 
earliest checkpoint identifier will be committed first from the delayed 
checkpoints.

Review Comment:
   Sure, I think the previous version had a reason, I will add this:
   
   ```
   This will enable continuous recycling of log files, preventing disk space 
issues.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-02 Thread via GitHub


leonardBang commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1623742652


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,15 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  scan.lsn-commit.checkpoints-num-delay
+  optional
+  3
+  Integer
+  The number of checkpoint delays before starting to commit the LSN 
offsets. 
+  The checkpoint LSN offsets will be committed in rolling fashion, the 
earliest checkpoint identifier will be committed first from the delayed 
checkpoints.

Review Comment:
   Could you explain to users why we need delay the offset commit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-06-02 Thread via GitHub


leonardBang commented on PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#issuecomment-2144201121

   adding note on the docs makes sense to me as the old API is marked as 
@Deprecated 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-31 Thread via GitHub


loserwang1024 commented on PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#issuecomment-2143266042

   > Should I still add it to the PostgresSQLSource. Or adding note on the docs 
would be fine?
   
   Hi, @leonardBang , WDYT?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-30 Thread via GitHub


morazow commented on PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#issuecomment-2139025700

   Hey @loserwang1024,
   
   Thanks for the suggestions! I was actually wondering about it. 
   
   Since this is only feature for Postgres, initially I considered adding it in 
PostgresSQLSource, but this is `@Deprecated` now.
   
   Should I still add it to the PostgresSQLSource. Or adding note on the docs 
would be fine?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-29 Thread via GitHub


loserwang1024 commented on PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#issuecomment-2138531074

   The finally advice, you can do same thing in 
org.apache.flink.cdc.debezium.DebeziumSourceFunction#notifyCheckpointComplete , 
or you mention in configuration document that this option is only when 
scan.incremental.snapshot.enabled = true.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-28 Thread via GitHub


morazow commented on PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#issuecomment-2135981748

   Thanks all, applied your suggestions. Please have a look  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-28 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1617755311


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,13 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  checkpoint.cycle

Review Comment:
   Thanks @leonardBang, it is clearer and describes the intent. I'll refactor 
it accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-28 Thread via GitHub


leonardBang commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1616779002


##
docs/content/docs/connectors/flink-sources/postgres-cdc.md:
##
@@ -236,6 +236,13 @@ Connector Options
   so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
   
 
+
+  checkpoint.cycle

Review Comment:
   Naming is hard, how about `scan.lsn-commit.checkpoints-num-delay` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-28 Thread via GitHub


morazow commented on PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#issuecomment-2134514762

   I don’t like it :)) How about, `checkpoint.rolling-window.size`? This seems 
to express the config option clearer?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-27 Thread via GitHub


loserwang1024 commented on PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#issuecomment-2134261272

   LGTM! @leonardBang @ruanhang1993 , CC, WDYT of the option name 
`checkpoint.cycle`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-27 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1615606254


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java:
##
@@ -0,0 +1,75 @@
+package org.apache.flink.cdc.connectors.postgres.source.reader;

Review Comment:
   Yeap, added  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-26 Thread via GitHub


loserwang1024 commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1615396296


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java:
##
@@ -0,0 +1,75 @@
+package org.apache.flink.cdc.connectors.postgres.source.reader;

Review Comment:
   Please add license and javadoc comment of class, nor compile will fails



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-24 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1613394340


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java:
##
@@ -104,6 +108,10 @@ public List snapshotState(long 
checkpointId) {
 
 @Override
 public void notifyCheckpointComplete(long checkpointId) throws Exception {
+this.checkpointCount = (this.checkpointCount + 1) % 
this.checkpointCycle;

Review Comment:
   Thanks guys for the feedback, I am going to check it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-24 Thread via GitHub


leonardBang commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1613201109


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java:
##
@@ -104,6 +108,10 @@ public List snapshotState(long 
checkpointId) {
 
 @Override
 public void notifyCheckpointComplete(long checkpointId) throws Exception {
+this.checkpointCount = (this.checkpointCount + 1) % 
this.checkpointCycle;

Review Comment:
   +1 for @loserwang1024 ‘s comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-24 Thread via GitHub


loserwang1024 commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1613185257


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java:
##
@@ -104,6 +108,10 @@ public List snapshotState(long 
checkpointId) {
 
 @Override
 public void notifyCheckpointComplete(long checkpointId) throws Exception {
+this.checkpointCount = (this.checkpointCount + 1) % 
this.checkpointCycle;

Review Comment:
   @morazow, great job. I generally agree with your approach. However, I 
currently have a different perspective. Instead of committing at every third 
checkpoint cycle (rolling window), I prefer to commit the offsets three 
checkpoints in advance of current checkpoint (sliding window).
   
   For a detailed design, we can store successful checkpoint IDs in a min heap, 
whose size is three (as decided by the configuration). When a checkpoint is 
successfully performed, we can push its ID into the heap and take the minimum 
checkpoint ID value, then commit it. By doing this, we always have three 
checkpoints whose offsets have not been recycled.
   
   (P.S.: Let's log the heap at each checkpoint, so users can know from which 
checkpoint IDs they can restore.)
   
   @leonardBang , @ruanhang1993 , CC, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-24 Thread via GitHub


loserwang1024 commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1613185257


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java:
##
@@ -104,6 +108,10 @@ public List snapshotState(long 
checkpointId) {
 
 @Override
 public void notifyCheckpointComplete(long checkpointId) throws Exception {
+this.checkpointCount = (this.checkpointCount + 1) % 
this.checkpointCycle;

Review Comment:
   @morazow, great job. I generally agree with your approach. However, I 
currently have a different perspective. Instead of committing at every third 
checkpoint cycle (rolling window), I prefer to commit the offsets three 
checkpoints in advance (sliding window).
   
   For a detailed design, we can store successful checkpoint IDs in a min heap, 
whose size is three (as decided by the configuration). When a checkpoint is 
successfully performed, we can push its ID into the heap and take the minimum 
checkpoint ID value, then commit it. By doing this, we always have three 
checkpoints whose offsets have not been recycled.
   
   (P.S.: Let's log the heap at each checkpoint, so users can know from which 
checkpoint IDs they can restore.)
   
   @leonardBang , @ruanhang1993 , CC, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-23 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1612232449


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java:
##
@@ -217,6 +220,11 @@ public JdbcSourceFetchTaskContext 
createFetchTaskContext(JdbcSourceConfig taskSo
 
 @Override
 public void notifyCheckpointComplete(long checkpointId, Offset offset) 
throws Exception {

Review Comment:
   Yes, maybe PostgresSourceReader is good option.
   I didn't want to have it in IncrementalSourceReaderWithCommit since this is 
only PG related feature. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-23 Thread via GitHub


morazow commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1612231333


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java:
##
@@ -100,7 +102,9 @@ public PostgresSourceConfig create(int subtaskId) {
 
 // The PostgresSource will do snapshot according to its StartupMode.
 // Do not need debezium to do the snapshot work.
-props.put("snapshot.mode", "never");
+props.setProperty("snapshot.mode", "never");
+
+props.setProperty("checkpoint.cycle", String.valueOf(checkpointCycle));

Review Comment:
   Ahh yes, this was my poor attempt to provide the parameter with properties 
to the dialect.
   
   I just realized that this is for Debezium, removing it. And I will consider 
it differently  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35129][cdc][postgres] Add checkpoint cycle option for commit offsets [flink-cdc]

2024-05-22 Thread via GitHub


loserwang1024 commented on code in PR #3349:
URL: https://github.com/apache/flink-cdc/pull/3349#discussion_r1610861124


##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java:
##
@@ -217,6 +220,11 @@ public JdbcSourceFetchTaskContext 
createFetchTaskContext(JdbcSourceConfig taskSo
 
 @Override
 public void notifyCheckpointComplete(long checkpointId, Offset offset) 
throws Exception {

Review Comment:
   What about do it in 
IncrementalSourceReaderWithCommit#notifyCheckpointComplete and 
PostgresSourceReader#notifyCheckpointComplete. Reader control when and whether 
to commit offset, while dialect just support ability to do it.
   
   And when put into reader, can just use a long rather than AtomicLong. so we 
can set checkpointCount = (checkpointCount+1)%3



##
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java:
##
@@ -100,7 +102,9 @@ public PostgresSourceConfig create(int subtaskId) {
 
 // The PostgresSource will do snapshot according to its StartupMode.
 // Do not need debezium to do the snapshot work.
-props.put("snapshot.mode", "never");
+props.setProperty("snapshot.mode", "never");
+
+props.setProperty("checkpoint.cycle", String.valueOf(checkpointCycle));

Review Comment:
   I don't know what "checkpoint.cycle" does? Debezium's offet commit cycle? 
Flink cdc have already been responsible for submitting offset, please not let 
Debezium do it again(turn off)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org