This is an automated email from the ASF dual-hosted git repository. yux pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 0c9c113dc [FLINK-38157][connect/postgres] Supports assigning unbounded chunk first (#4103) 0c9c113dc is described below commit 0c9c113dc23956d470b218a151de8a34107c7c06 Author: ouyangwulin <mr.art...@gmail.com> AuthorDate: Fri Aug 22 21:19:43 2025 +0800 [FLINK-38157][connect/postgres] Supports assigning unbounded chunk first (#4103) By assigning the unbounded chunk first, we can reduce the chance of OOM caused by reading unbounded chunks at last. This feature has been implemented in the common incremental snapshotting framework. --- .../docs/connectors/pipeline-connectors/postgres.md | 13 ++++++++++++- .../content/docs/connectors/pipeline-connectors/postgres.md | 11 +++++++++++ .../postgres/factory/PostgresDataSourceFactory.java | 5 +++++ .../postgres/source/PostgresDataSourceOptions.java | 9 +++++++++ .../postgres/source/PostgresPipelineITCaseTest.java | 10 ++++++++-- 5 files changed, 45 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md index ca76905ae..d783efe78 100644 --- a/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content.zh/docs/connectors/pipeline-connectors/postgres.md @@ -245,7 +245,7 @@ pipeline: 分块元数据的组大小,如果元数据大小超过该组大小,则元数据将被划分为多个组。 </td> </tr> -<tr> + <tr> <td>metadata.list</td> <td>optional</td> <td style="word-wrap: break-word;">false</td> @@ -254,6 +254,17 @@ pipeline: 源记录中可读取的元数据列表,将传递给下游并在转换模块中使用,各字段以逗号分隔。可用的可读元数据包括:op_ts。 </td> </tr> + <tr> + <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>String</td> + <td> + 在快照读取阶段,是否优先分配无界分块。<br> + 这有助于降低在对最大无界分块进行快照时,TaskManager 发生内存溢出(OOM)错误的风险。<br> + 此为实验性选项,默认值为 false。 + </td> + </tr> </tbody> </table> </div> diff --git a/docs/content/docs/connectors/pipeline-connectors/postgres.md b/docs/content/docs/connectors/pipeline-connectors/postgres.md index e52fa34bb..120d95935 100644 --- a/docs/content/docs/connectors/pipeline-connectors/postgres.md +++ b/docs/content/docs/connectors/pipeline-connectors/postgres.md @@ -246,6 +246,17 @@ pipeline: List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts. </td> </tr> + <tr> + <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td> + <td>optional</td> + <td style="word-wrap: break-word;">false</td> + <td>String</td> + <td> + Whether to assign the unbounded chunks first during snapshot reading phase.<br> + This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br> + Experimental option, defaults to false. + </td> + </tr> </tbody> </table> </div> diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java index 61c9766c7..918d479f6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java @@ -70,6 +70,7 @@ import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; +import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE; @@ -120,6 +121,8 @@ public class PostgresDataSourceFactory implements DataSourceFactory { double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + boolean isAssignUnboundedChunkFirst = + config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); @@ -165,6 +168,7 @@ public class PostgresDataSourceFactory implements DataSourceFactory { .closeIdleReaders(closeIdleReaders) .skipSnapshotBackfill(skipSnapshotBackfill) .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay) + .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst) .getConfigFactory(); List<TableId> tableIds = PostgresSchemaUtils.listTables(configFactory.create(0), null); @@ -252,6 +256,7 @@ public class PostgresDataSourceFactory implements DataSourceFactory { options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY); options.add(METADATA_LIST); + options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java index 6d86bea30..ec51de8b6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java @@ -255,4 +255,13 @@ public class PostgresDataSourceOptions { .withDescription( "List of readable metadata from SourceRecord to be passed to downstream, split by `,`. " + "Available readable metadata are: op_ts."); + + @Experimental + public static final ConfigOption<Boolean> + SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED = + ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java index 5f3c74ae7..a57e1fed5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java @@ -47,6 +47,8 @@ import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,8 +134,9 @@ public class PostgresPipelineITCaseTest extends PostgresTestBase { .containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0])); } - @Test - public void testInitialStartupModeWithOpts() throws Exception { + @ParameterizedTest(name = "unboundedChunkFirst: {0}") + @ValueSource(booleans = {true, false}) + public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) throws Exception { inventoryDatabase.createAndInitialize(); Configuration sourceConfiguration = new Configuration(); sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME, POSTGRES_CONTAINER.getHost()); @@ -151,6 +154,9 @@ public class PostgresPipelineITCaseTest extends PostgresTestBase { inventoryDatabase.getDatabaseName() + ".inventory.products"); sourceConfiguration.set(PostgresDataSourceOptions.SERVER_TIME_ZONE, "UTC"); sourceConfiguration.set(PostgresDataSourceOptions.METADATA_LIST, "op_ts"); + sourceConfiguration.set( + PostgresDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED, + unboundedChunkFirst); Factory.Context context = new FactoryHelper.DefaultContext(