This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c9c113dc [FLINK-38157][connect/postgres] Supports assigning unbounded 
chunk first (#4103)
0c9c113dc is described below

commit 0c9c113dc23956d470b218a151de8a34107c7c06
Author: ouyangwulin <mr.art...@gmail.com>
AuthorDate: Fri Aug 22 21:19:43 2025 +0800

    [FLINK-38157][connect/postgres] Supports assigning unbounded chunk first 
(#4103)
    
    By assigning the unbounded chunk first, we can reduce the chance of OOM 
caused by reading unbounded chunks at last. This feature has been implemented 
in the common incremental snapshotting framework.
---
 .../docs/connectors/pipeline-connectors/postgres.md         | 13 ++++++++++++-
 .../content/docs/connectors/pipeline-connectors/postgres.md | 11 +++++++++++
 .../postgres/factory/PostgresDataSourceFactory.java         |  5 +++++
 .../postgres/source/PostgresDataSourceOptions.java          |  9 +++++++++
 .../postgres/source/PostgresPipelineITCaseTest.java         | 10 ++++++++--
 5 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
index ca76905ae..d783efe78 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md
@@ -245,7 +245,7 @@ pipeline:
         分块元数据的组大小,如果元数据大小超过该组大小,则元数据将被划分为多个组。
       </td>
     </tr>
-<tr>
+    <tr>
       <td>metadata.list</td>
       <td>optional</td>
       <td style="word-wrap: break-word;">false</td>
@@ -254,6 +254,17 @@ pipeline:
         源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。
       </td>
     </tr>
+    <tr>
+     <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
+     <td>optional</td>
+     <td style="word-wrap: break-word;">false</td>
+     <td>String</td>
+     <td>
+        在快照读取阶段,是否优先分配无界分块。<br>
+        这有助于降低在对最大无界分块进行快照时,TaskManager 发生内存溢出(OOM)错误的风险。<br>
+        此为实验性选项,默认值为 false。
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md 
b/docs/content/docs/connectors/pipeline-connectors/postgres.md
index e52fa34bb..120d95935 100644
--- a/docs/content/docs/connectors/pipeline-connectors/postgres.md
+++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md
@@ -246,6 +246,17 @@ pipeline:
         List of readable metadata from SourceRecord to be passed to downstream 
and could be used in transform module, split by `,`. Available readable 
metadata are: op_ts.
       </td>
     </tr>
+    <tr>
+     <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
+     <td>optional</td>
+     <td style="word-wrap: break-word;">false</td>
+     <td>String</td>
+     <td>
+        Whether to assign the unbounded chunks first during snapshot reading 
phase.<br>
+        This might help reduce the risk of the TaskManager experiencing an 
out-of-memory (OOM) error when taking a snapshot of the largest unbounded 
chunk.<br>
+        Experimental option, defaults to false.
+      </td>
+    </tr>
     </tbody>
 </table>
 </div>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index 61c9766c7..918d479f6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -70,6 +70,7 @@ import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
@@ -120,6 +121,8 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
         double distributionFactorLower = 
config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
 
         boolean closeIdleReaders = 
config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
+        boolean isAssignUnboundedChunkFirst =
+                
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
 
         Duration connectTimeout = config.get(CONNECT_TIMEOUT);
         int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
@@ -165,6 +168,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
                         .closeIdleReaders(closeIdleReaders)
                         .skipSnapshotBackfill(skipSnapshotBackfill)
                         .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
+                        .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
                         .getConfigFactory();
 
         List<TableId> tableIds = 
PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -252,6 +256,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
         options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
         options.add(METADATA_LIST);
+        options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
index 6d86bea30..ec51de8b6 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
@@ -255,4 +255,13 @@ public class PostgresDataSourceOptions {
                     .withDescription(
                             "List of readable metadata from SourceRecord to be 
passed to downstream, split by `,`. "
                                     + "Available readable metadata are: 
op_ts.");
+
+    @Experimental
+    public static final ConfigOption<Boolean>
+            SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
+                    
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
+                            .booleanType()
+                            .defaultValue(false)
+                            .withDescription(
+                                    "Whether to assign the unbounded chunks 
first during snapshot reading phase. This might help reduce the risk of the 
TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of 
the largest unbounded chunk.  Defaults to false.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index 5f3c74ae7..a57e1fed5 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -47,6 +47,8 @@ import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,8 +134,9 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
                 .containsExactlyInAnyOrder(expectedSnapshot.toArray(new 
Event[0]));
     }
 
-    @Test
-    public void testInitialStartupModeWithOpts() throws Exception {
+    @ParameterizedTest(name = "unboundedChunkFirst: {0}")
+    @ValueSource(booleans = {true, false})
+    public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) 
throws Exception {
         inventoryDatabase.createAndInitialize();
         Configuration sourceConfiguration = new Configuration();
         sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME, 
POSTGRES_CONTAINER.getHost());
@@ -151,6 +154,9 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
                 inventoryDatabase.getDatabaseName() + ".inventory.products");
         sourceConfiguration.set(PostgresDataSourceOptions.SERVER_TIME_ZONE, 
"UTC");
         sourceConfiguration.set(PostgresDataSourceOptions.METADATA_LIST, 
"op_ts");
+        sourceConfiguration.set(
+                
PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED,
+                unboundedChunkFirst);
 
         Factory.Context context =
                 new FactoryHelper.DefaultContext(

Reply via email to