This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b28d1a57 [FLINK-35129][postgres] Introduce 
scan.lsn-commit.checkpoints-num-delay option to control LSN offset commit lazily
5b28d1a57 is described below

commit 5b28d1a579919b29acac6acded46d9bee5596bde
Author: Muhammet Orazov <916295+mora...@users.noreply.github.com>
AuthorDate: Tue Jun 4 10:56:03 2024 +0200

    [FLINK-35129][postgres] Introduce scan.lsn-commit.checkpoints-num-delay 
option to control LSN offset commit lazily
    
     This close #3349.
---
 .../docs/connectors/flink-sources/postgres-cdc.md  | 40 +++++----
 .../flink-connector-postgres-cdc/pom.xml           |  8 +-
 .../postgres/source/PostgresDialect.java           |  4 +-
 .../postgres/source/PostgresSourceBuilder.java     |  6 ++
 .../source/config/PostgresSourceConfig.java        | 26 +++++-
 .../source/config/PostgresSourceConfigFactory.java | 12 ++-
 .../source/config/PostgresSourceOptions.java       | 10 +++
 .../source/reader/PostgresSourceReader.java        | 21 ++++-
 .../postgres/table/PostgreSQLTableFactory.java     |  6 +-
 .../postgres/table/PostgreSQLTableSource.java      | 11 ++-
 .../cdc/connectors/postgres/PostgresTestBase.java  |  1 +
 .../postgres/source/PostgresSourceITCase.java      |  4 +-
 .../source/reader/PostgresSourceReaderTest.java    | 94 ++++++++++++++++++++++
 .../postgres/table/MockPostgreSQLTableSource.java  |  3 +-
 .../postgres/table/PostgreSQLTableFactoryTest.java | 16 ++--
 15 files changed, 226 insertions(+), 36 deletions(-)

diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md 
b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index f839c4380..1e24f31d1 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -28,8 +28,7 @@ under the License.
 
 The Postgres CDC connector allows for reading snapshot data and incremental 
data from PostgreSQL database. This document describes how to setup the 
Postgres CDC connector to run SQL queries against PostgreSQL databases.
 
-Dependencies
-------------
+## Dependencies
 
 In order to setup the Postgres CDC connector, the following table provides 
dependency information for both projects using a build automation tool (such as 
Maven or SBT) and SQL Client with SQL JAR bundles.
 
@@ -45,8 +44,7 @@ Download 
[flink-sql-connector-postgres-cdc-3.0.1.jar](https://repo1.maven.org/ma
 
 **Note:** Refer to 
[flink-sql-connector-postgres-cdc](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc),
 more released versions will be available in the Maven central warehouse.
 
-How to create a Postgres CDC table
-----------------
+## How to create a Postgres CDC table
 
 The Postgres CDC table can be defined as following:
 
@@ -76,8 +74,7 @@ CREATE TABLE shipments (
 SELECT * FROM shipments;
 ```
 
-Connector Options
-----------------
+## Connector Options
 
 <div class="highlight">
 <table class="colwidths-auto docutils">
@@ -236,12 +233,29 @@ Connector Options
           so it does not need to be explicitly configured 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
       </td>
     </tr>
+    <tr>
+      <td>scan.lsn-commit.checkpoints-num-delay</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">3</td>
+      <td>Integer</td>
+      <td>The number of checkpoint delays before starting to commit the LSN 
offsets. <br>
+          The checkpoint LSN offsets will be committed in rolling fashion, the 
earliest checkpoint identifier will be committed first from the delayed 
checkpoints.
+      </td>
+    </tr>
     </tbody>
     </table>
 </div>
 <div>
 
-Note: `slot.name` is recommended to set for different tables to avoid the 
potential `PSQLException: ERROR: replication slot "flink" is active for PID 
974` error. See more 
[here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name).
+### Notes
+
+#### `slot.name` option
+
+The `slot.name` is recommended to set for different tables to avoid the 
potential `PSQLException: ERROR: replication slot "flink" is active for PID 
974` error. See more 
[here](https://debezium.io/documentation/reference/2.0/connectors/postgresql.html#postgresql-property-slot-name).
+
+#### `scan.lsn-commit.checkpoints-num-delay` option
+
+When consuming PostgreSQL logs, the LSN offset must be committed to trigger 
the log data cleanup for the corresponding slot. However, once the LSN offset 
is committed, earlier offsets become invalid. To ensure access to earlier LSN 
offsets for job recovery, we delay the LSN commit by 
`scan.lsn-commit.checkpoints-num-delay` (default value is `3`) checkpoints. 
This feature is available when config option 
`scan.incremental.snapshot.enabled` is set to true.
 
 ### Incremental Snapshot Options
 
@@ -340,8 +354,7 @@ The following options is available only when 
`scan.incremental.snapshot.enabled=
 </table>
 </div>
 
-Available Metadata
-----------------
+## Available Metadata
 
 The following format metadata can be exposed as read-only (VIRTUAL) columns in 
a table definition.
 
@@ -377,8 +390,7 @@ The following format metadata can be exposed as read-only 
(VIRTUAL) columns in a
   </tbody>
 </table>
 
-Limitation
---------
+## Limitation
 
 ### Can't perform checkpoint during scanning snapshot of tables when 
incremental snapshot is disabled
 
@@ -417,8 +429,7 @@ CREATE TABLE products (
 );
 ```
 
-Features
---------
+## Features
 
 ### Incremental Snapshot Reading (Experimental)
 
@@ -522,8 +533,7 @@ public class PostgreSQLSourceExample {
 }
 ```
 
-Data Type Mapping
-----------------
+## Data Type Mapping
 
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
index b0763b1d7..4fc75d0ca 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/pom.xml
@@ -164,10 +164,16 @@ limitations under the License.
             <version>${json-path.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
 
 
         <!-- tests will have log4j as the default logging framework available 
-->
 
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
index 5e83f6685..25a5186d1 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
@@ -63,13 +63,11 @@ import static 
io.debezium.connector.postgresql.Utils.currentOffset;
 /** The dialect for Postgres. */
 public class PostgresDialect implements JdbcDataSourceDialect {
     private static final long serialVersionUID = 1L;
-
     private static final String CONNECTION_NAME = "postgres-cdc-connector";
+
     private final PostgresSourceConfig sourceConfig;
     private transient Tables.TableFilter filters;
-
     private transient CustomPostgresSchema schema;
-
     @Nullable private PostgresStreamFetchTask streamFetchTask;
 
     public PostgresDialect(PostgresSourceConfig sourceConfig) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index 6fa882f56..ee991a70f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -280,6 +280,12 @@ public class PostgresSourceBuilder<T> {
         return this;
     }
 
+    /** Set the {@code LSN} checkpoints delay number for Postgres to commit 
the offsets. */
+    public PostgresSourceBuilder<T> lsnCommitCheckpointsDelay(int 
lsnCommitDelay) {
+        this.configFactory.setLsnCommitCheckpointsDelay(lsnCommitDelay);
+        return this;
+    }
+
     /**
      * Build the {@link PostgresIncrementalSource}.
      *
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index c20e57fab..a4e30da11 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -37,6 +37,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
     private static final long serialVersionUID = 1L;
 
     private final int subtaskId;
+    private final int lsnCommitCheckpointsDelay;
 
     public PostgresSourceConfig(
             int subtaskId,
@@ -64,7 +65,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
             int connectionPoolSize,
             @Nullable String chunkKeyColumn,
             boolean skipSnapshotBackfill,
-            boolean isScanNewlyAddedTableEnabled) {
+            boolean isScanNewlyAddedTableEnabled,
+            int lsnCommitCheckpointsDelay) {
         super(
                 startupOptions,
                 databaseList,
@@ -92,14 +94,34 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
                 skipSnapshotBackfill,
                 isScanNewlyAddedTableEnabled);
         this.subtaskId = subtaskId;
+        this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
     }
 
+    /**
+     * Returns {@code subtaskId} value.
+     *
+     * @return subtask id
+     */
     public int getSubtaskId() {
         return subtaskId;
     }
 
+    /**
+     * Returns {@code lsnCommitCheckpointsDelay} value.
+     *
+     * @return lsn commit checkpoint delay
+     */
+    public int getLsnCommitCheckpointsDelay() {
+        return this.lsnCommitCheckpointsDelay;
+    }
+
+    /**
+     * Returns the slot name for backfill task.
+     *
+     * @return backfill task slot name
+     */
     public String getSlotNameForBackfillTask() {
-        return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + 
subtaskId;
+        return getDbzProperties().getProperty(SLOT_NAME.name()) + "_" + 
getSubtaskId();
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 1490dd400..fb9054bd4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -50,6 +50,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
 
     private List<String> schemaList;
 
+    private int lsnCommitCheckpointsDelay;
+
     /** Creates a new {@link PostgresSourceConfig} for the given subtask 
{@code subtaskId}. */
     @Override
     public PostgresSourceConfig create(int subtaskId) {
@@ -100,7 +102,7 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
 
         // The PostgresSource will do snapshot according to its StartupMode.
         // Do not need debezium to do the snapshot work.
-        props.put("snapshot.mode", "never");
+        props.setProperty("snapshot.mode", "never");
 
         Configuration dbzConfiguration = Configuration.from(props);
         return new PostgresSourceConfig(
@@ -129,7 +131,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 connectionPoolSize,
                 chunkKeyColumn,
                 skipSnapshotBackfill,
-                scanNewlyAddedTableEnabled);
+                scanNewlyAddedTableEnabled,
+                lsnCommitCheckpointsDelay);
     }
 
     /**
@@ -173,4 +176,9 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     public void heartbeatInterval(Duration heartbeatInterval) {
         this.heartbeatInterval = heartbeatInterval;
     }
+
+    /** The lsn commit checkpoints delay for Postgres. */
+    public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
+        this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
index 391458ded..0443fa2e1 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
@@ -78,4 +78,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions 
{
                     .defaultValue(Duration.ofSeconds(30))
                     .withDescription(
                             "Optional interval of sending heartbeat event for 
tracing the latest available replication slot offsets");
+
+    public static final ConfigOption<Integer> 
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY =
+            ConfigOptions.key("scan.lsn-commit.checkpoints-num-delay")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The number of checkpoint delays before starting 
to commit the LSN offsets.\n"
+                                    + "By setting this to higher value, the 
offset that is consumed by global slot will be "
+                                    + "committed after multiple checkpoint 
delays instead of after each checkpoint completion.\n"
+                                    + "This allows continuous recycle of log 
files in stream phase.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
index f6d201005..81bff8488 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSeriali
 import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import 
org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
 import 
org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitEvent;
 import org.apache.flink.configuration.Configuration;
@@ -38,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.function.Supplier;
 
 /**
@@ -54,6 +56,9 @@ public class PostgresSourceReader extends 
IncrementalSourceReaderWithCommit {
     /** whether to commit offset. */
     private volatile boolean isCommitOffset = false;
 
+    private final PriorityQueue<Long> minHeap;
+    private final int lsnCommitCheckpointsDelay;
+
     public PostgresSourceReader(
             FutureCompletingBlockingQueue elementQueue,
             Supplier supplier,
@@ -72,6 +77,9 @@ public class PostgresSourceReader extends 
IncrementalSourceReaderWithCommit {
                 sourceConfig,
                 sourceSplitSerializer,
                 dialect);
+        this.lsnCommitCheckpointsDelay =
+                ((PostgresSourceConfig) 
sourceConfig).getLsnCommitCheckpointsDelay();
+        this.minHeap = new PriorityQueue<>();
     }
 
     @Override
@@ -104,12 +112,23 @@ public class PostgresSourceReader extends 
IncrementalSourceReaderWithCommit {
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        this.minHeap.add(checkpointId);
+        if (this.minHeap.size() <= this.lsnCommitCheckpointsDelay) {
+            LOG.info("Pending checkpoints '{}'.", this.minHeap);
+            return;
+        }
+        final long checkpointIdToCommit = this.minHeap.poll();
+        LOG.info(
+                "Pending checkpoints '{}', to be committed checkpoint id 
'{}'.",
+                this.minHeap,
+                checkpointIdToCommit);
+
         // After all snapshot splits are finished, update stream split's 
metadata and reset start
         // offset, which maybe smaller than before.
         // In case that new start-offset of stream split has been recycled, 
don't commit offset
         // during new table added phase.
         if (isCommitOffset()) {
-            super.notifyCheckpointComplete(checkpointId);
+            super.notifyCheckpointComplete(checkpointIdToCommit);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
index e0e1b6b9b..5674908ae 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
@@ -57,6 +57,7 @@ import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSou
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
@@ -114,6 +115,7 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
         boolean closeIdlerReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         boolean isScanNewlyAddedTableEnabled = 
config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+        int lsnCommitCheckpointsDelay = 
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
 
         if (enableParallelRead) {
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -158,7 +160,8 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
                 chunkKeyColumn,
                 closeIdlerReaders,
                 skipSnapshotBackfill,
-                isScanNewlyAddedTableEnabled);
+                isScanNewlyAddedTableEnabled,
+                lsnCommitCheckpointsDelay);
     }
 
     @Override
@@ -200,6 +203,7 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
+        options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
index add3d8f3d..9a9dd822d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
@@ -82,10 +82,9 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
     private final StartupOptions startupOptions;
     private final String chunkKeyColumn;
     private final boolean closeIdleReaders;
-
     private final boolean skipSnapshotBackfill;
-
     private final boolean scanNewlyAddedTableEnabled;
+    private final int lsnCommitCheckpointsDelay;
 
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
@@ -124,7 +123,8 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
             @Nullable String chunkKeyColumn,
             boolean closeIdleReaders,
             boolean skipSnapshotBackfill,
-            boolean isScanNewlyAddedTableEnabled) {
+            boolean isScanNewlyAddedTableEnabled,
+            int lsnCommitCheckpointsDelay) {
         this.physicalSchema = physicalSchema;
         this.port = port;
         this.hostname = checkNotNull(hostname);
@@ -155,6 +155,7 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
         this.closeIdleReaders = closeIdleReaders;
         this.skipSnapshotBackfill = skipSnapshotBackfill;
         this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
+        this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
     }
 
     @Override
@@ -216,6 +217,7 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
                             .closeIdleReaders(closeIdleReaders)
                             .skipSnapshotBackfill(skipSnapshotBackfill)
                             
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+                            
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
                             .build();
             return SourceProvider.of(parallelSource);
         } else {
@@ -283,7 +285,8 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
                         chunkKeyColumn,
                         closeIdleReaders,
                         skipSnapshotBackfill,
-                        scanNewlyAddedTableEnabled);
+                        scanNewlyAddedTableEnabled,
+                        lsnCommitCheckpointsDelay);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
         return source;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
index 10d64f226..5cc8ba5c5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
@@ -220,6 +220,7 @@ public abstract class PostgresTestBase extends 
AbstractTestBase {
         postgresSourceConfigFactory.tableList(schemaName + "." + tableName);
         postgresSourceConfigFactory.splitSize(splitSize);
         postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill);
+        postgresSourceConfigFactory.setLsnCommitCheckpointsDelay(1);
         return postgresSourceConfigFactory;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 96fa103f9..8d1a6d975 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -707,6 +707,7 @@ public class PostgresSourceITCase extends PostgresTestBase {
                         .tableList(tableId)
                         .startupOptions(startupOptions)
                         .skipSnapshotBackfill(skipSnapshotBackfill)
+                        .lsnCommitCheckpointsDelay(1)
                         .deserializer(customerTable.getDeserializer())
                         .build();
 
@@ -816,7 +817,8 @@ public class PostgresSourceITCase extends PostgresTestBase {
                                 + " 'scan.startup.mode' = '%s',"
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
                                 + " 'slot.name' = '%s',"
-                                + " 'scan.incremental.snapshot.backfill.skip' 
= '%s'"
+                                + " 'scan.incremental.snapshot.backfill.skip' 
= '%s',"
+                                + " 'scan.lsn-commit.checkpoints-num-delay' = 
'1'"
                                 + ""
                                 + ")",
                         customDatabase.getHost(),
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
new file mode 100644
index 000000000..1c855cc3f
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import org.apache.flink.cdc.connectors.postgres.source.MockPostgresDialect;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
+import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
+import org.apache.flink.cdc.connectors.postgres.testutils.TestTable;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PostgresSourceReader}. */
+class PostgresSourceReaderTest {
+
+    @Test
+    void testNotifyCheckpointWindowSizeOne() throws Exception {
+        final PostgresSourceReader reader = createReader(1);
+        final List<Long> completedCheckpointIds = new ArrayList<>();
+        MockPostgresDialect.setNotifyCheckpointCompleteCallback(
+                id -> completedCheckpointIds.add(id));
+        reader.notifyCheckpointComplete(11L);
+        assertThat(completedCheckpointIds).isEmpty();
+        reader.notifyCheckpointComplete(12L);
+        assertThat(completedCheckpointIds).containsExactly(11L);
+        reader.notifyCheckpointComplete(13L);
+        assertThat(completedCheckpointIds).containsExactly(11L, 12L);
+    }
+
+    @Test
+    void testNotifyCheckpointWindowSizeDefault() throws Exception {
+        final PostgresSourceReader reader = createReader(3);
+        final List<Long> completedCheckpointIds = new ArrayList<>();
+        MockPostgresDialect.setNotifyCheckpointCompleteCallback(
+                id -> completedCheckpointIds.add(id));
+        reader.notifyCheckpointComplete(103L);
+        assertThat(completedCheckpointIds).isEmpty();
+        reader.notifyCheckpointComplete(102L);
+        assertThat(completedCheckpointIds).isEmpty();
+        reader.notifyCheckpointComplete(101L);
+        assertThat(completedCheckpointIds).isEmpty();
+        reader.notifyCheckpointComplete(104L);
+        assertThat(completedCheckpointIds).containsExactly(101L);
+    }
+
+    private PostgresSourceReader createReader(final int 
lsnCommitCheckpointsDelay)
+            throws Exception {
+        final PostgresOffsetFactory offsetFactory = new 
PostgresOffsetFactory();
+        final PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
+        configFactory.hostname("host");
+        configFactory.database("pgdb");
+        configFactory.username("username");
+        configFactory.password("password");
+        configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
+        final TestTable customerTable =
+                new TestTable(
+                        "pgdb",
+                        "customer",
+                        "customers",
+                        ResolvedSchema.of(Column.physical("id", BIGINT())));
+        final DebeziumDeserializationSchema<?> deserializer = 
customerTable.getDeserializer();
+        MockPostgresDialect dialect = new 
MockPostgresDialect(configFactory.create(0));
+        final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
+                new PostgresSourceBuilder.PostgresIncrementalSource<>(
+                        configFactory, checkNotNull(deserializer), 
offsetFactory, dialect);
+        return source.createReader(new TestingReaderContext());
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
index 9fe55b18f..199ff24fe 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
@@ -64,7 +64,8 @@ public class MockPostgreSQLTableSource extends 
PostgreSQLTableSource {
                 (String) get(postgreSQLTableSource, "chunkKeyColumn"),
                 (boolean) get(postgreSQLTableSource, "closeIdleReaders"),
                 (boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"),
-                (boolean) get(postgreSQLTableSource, 
"scanNewlyAddedTableEnabled"));
+                (boolean) get(postgreSQLTableSource, 
"scanNewlyAddedTableEnabled"),
+                (int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"));
     }
 
     @Override
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
index eb7360689..50cd9b7a4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
@@ -59,6 +59,7 @@ import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSou
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_TIMEOUT;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
 import static 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -151,7 +152,8 @@ public class PostgreSQLTableFactoryTest {
                         null,
                         SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+                        SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -196,7 +198,8 @@ public class PostgreSQLTableFactoryTest {
                         null,
                         SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
                         true,
-                        true);
+                        true,
+                        SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -239,7 +242,8 @@ public class PostgreSQLTableFactoryTest {
                         null,
                         SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+                        SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
         expectedSource.producedDataType = 
SCHEMA_WITH_METADATA.toSourceRowDataType();
         expectedSource.metadataKeys =
                 Arrays.asList("op_ts", "database_name", "schema_name", 
"table_name");
@@ -292,7 +296,8 @@ public class PostgreSQLTableFactoryTest {
                         null,
                         SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+                        SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 
@@ -335,7 +340,8 @@ public class PostgreSQLTableFactoryTest {
                         null,
                         SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT,
                         SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(),
-                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue());
+                        SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
+                        SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue());
         assertEquals(expectedSource, actualSource);
     }
 


Reply via email to