This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c3f665707 [FLINK-38818][pipeline-connector][posgtresql]Build 
createTableEventCache using TableSchemas from split  (#4194)
c3f665707 is described below

commit c3f66570739f4f4bfc76c17c14fa02ef1d90a333
Author: ouyangwulin <[email protected]>
AuthorDate: Thu Dec 25 16:22:34 2025 +0800

    [FLINK-38818][pipeline-connector][posgtresql]Build createTableEventCache 
using TableSchemas from split  (#4194)
---
 .../reader/PostgresPipelineRecordEmitter.java      | 135 +++++++++++--
 .../source/PostgresPipelineITCaseTest.java         | 225 ++++++++++++++++++++-
 .../reader/IncrementalSourceRecordEmitter.java     |  13 ++
 .../source/reader/PostgresSourceReader.java        |  15 ++
 4 files changed, 372 insertions(+), 16 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 3faddcf94..02761d8f3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -21,8 +21,11 @@ import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.connectors.base.options.StartupOptions;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
 import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
@@ -30,21 +33,32 @@ import 
org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
 import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
+import org.apache.flink.cdc.connectors.postgres.utils.PostgresTypeUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 
 import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
 import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
-import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
 
@@ -60,7 +74,7 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
     private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
     private boolean isBounded = false;
 
-    private final List<CreateTableEvent> createTableEventCache = new 
ArrayList<>();
+    private final Map<TableId, CreateTableEvent> createTableEventCache;
 
     public PostgresPipelineRecordEmitter(
             DebeziumDeserializationSchema debeziumDeserializationSchema,
@@ -76,19 +90,40 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
         this.sourceConfig = sourceConfig;
         this.postgresDialect = postgresDialect;
         this.alreadySendCreateTableTables = new HashSet<>();
+        this.createTableEventCache =
+                ((DebeziumEventDeserializationSchema) 
debeziumDeserializationSchema)
+                        .getCreateTableEventCache();
         generateCreateTableEvent(sourceConfig);
         this.isBounded = 
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
     }
 
+    @Override
+    public void applySplit(SourceSplitBase split) {
+        if ((isBounded) && createTableEventCache.isEmpty() && split instanceof 
SnapshotSplit) {
+            // TableSchemas in SnapshotSplit only contains one table.
+            
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
+        } else {
+            for (TableChanges.TableChange tableChange : 
split.getTableSchemas().values()) {
+                CreateTableEvent createTableEvent =
+                        new CreateTableEvent(
+                                toCdcTableId(tableChange.getId()),
+                                buildSchemaFromTable(tableChange.getTable()));
+                ((DebeziumEventDeserializationSchema) 
debeziumDeserializationSchema)
+                        .applyChangeEvent(createTableEvent);
+            }
+        }
+    }
+
     @Override
     protected void processElement(
             SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState)
             throws Exception {
         if (shouldEmitAllCreateTableEventsInSnapshotMode && isBounded) {
             // In snapshot mode, we simply emit all schemas at once.
-            for (CreateTableEvent createTableEvent : createTableEventCache) {
-                output.collect((T) createTableEvent);
-            }
+            createTableEventCache.forEach(
+                    (tableId, createTableEvent) -> {
+                        output.collect((T) createTableEvent);
+                    });
             shouldEmitAllCreateTableEventsInSnapshotMode = false;
         } else if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
             TableId tableId = 
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
@@ -99,21 +134,61 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
                 }
             }
         } else {
-            if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+            boolean isDataChangeRecord = isDataChangeRecord(element);
+            if (isDataChangeRecord || isSchemaChangeEvent(element)) {
                 TableId tableId = getTableId(element);
                 if (!alreadySendCreateTableTables.contains(tableId)) {
-                    for (CreateTableEvent createTableEvent : 
createTableEventCache) {
-                        if (createTableEvent != null) {
-                            output.collect((T) createTableEvent);
-                        }
+                    CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
+                    if (createTableEvent != null) {
+                        output.collect((T) createTableEvent);
                     }
                     alreadySendCreateTableTables.add(tableId);
                 }
+                // In rare case, we may miss some CreateTableEvents before 
DataChangeEvents.
+                // Don't send CreateTableEvent for SchemaChangeEvents as it's 
the latest schema.
+                if (isDataChangeRecord && 
!createTableEventCache.containsKey(tableId)) {
+                    CreateTableEvent createTableEvent = 
getCreateTableEvent(sourceConfig, tableId);
+                    output.collect((T) createTableEvent);
+                    createTableEventCache.put(tableId, createTableEvent);
+                }
             }
         }
         super.processElement(element, output, splitState);
     }
 
+    private Schema buildSchemaFromTable(Table table) {
+        List<Column> columns = table.columns();
+        Schema.Builder tableBuilder = Schema.newBuilder();
+        for (int i = 0; i < columns.size(); i++) {
+            Column column = columns.get(i);
+
+            String colName = column.name();
+            DataType dataType;
+            try (PostgresConnection jdbc = 
postgresDialect.openJdbcConnection()) {
+                dataType =
+                        PostgresTypeUtils.fromDbzColumn(
+                                column,
+                                this.sourceConfig.getDbzConnectorConfig(),
+                                jdbc.getTypeRegistry());
+            }
+            if (!column.isOptional()) {
+                dataType = dataType.notNull();
+            }
+            tableBuilder.physicalColumn(
+                    colName,
+                    dataType,
+                    column.comment(),
+                    column.defaultValueExpression().orElse(null));
+        }
+        tableBuilder.comment(table.comment());
+
+        List<String> primaryKey = table.primaryKeyColumnNames();
+        if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
+            tableBuilder.primaryKey(primaryKey);
+        }
+        return tableBuilder.build();
+    }
+
     private void sendCreateTableEvent(
             PostgresConnection jdbc, TableId tableId, SourceOutput<Event> 
output) {
         Schema schema = PostgresSchemaUtils.getTableSchema(tableId, 
sourceConfig, jdbc);
@@ -124,8 +199,40 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
                         schema));
     }
 
-    private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) {
+    private org.apache.flink.cdc.common.event.TableId toCdcTableId(
+            io.debezium.relational.TableId dbzTableId) {
+        String schemaName =
+                dbzTableId.catalog() == null ? dbzTableId.schema() : 
dbzTableId.catalog();
+        return org.apache.flink.cdc.common.event.TableId.tableId(schemaName, 
dbzTableId.table());
+    }
+
+    private CreateTableEvent getCreateTableEvent(
+            PostgresSourceConfig sourceConfig, TableId tableId) {
+        try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
+            Schema schema = PostgresSchemaUtils.getTableSchema(tableId, 
sourceConfig, jdbc);
+            return new CreateTableEvent(
+                    org.apache.flink.cdc.common.event.TableId.tableId(
+                            tableId.schema(), tableId.table()),
+                    schema);
+        }
+    }
+
+    private TableId getTableId(SourceRecord dataRecord) {
+        Struct value = (Struct) dataRecord.value();
+        Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+        Field field = source.schema().field(SCHEMA_NAME_KEY);
+        String schemaName = null;
+        if (field != null) {
+            schemaName = source.getString(SCHEMA_NAME_KEY);
+        }
+        String tableName = source.getString(TABLE_NAME_KEY);
+        return new TableId(null, schemaName, tableName);
+    }
+
+    private Map<TableId, CreateTableEvent> generateCreateTableEvent(
+            PostgresSourceConfig sourceConfig) {
         try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
+            Map<TableId, CreateTableEvent> createTableEventCache = new 
HashMap<>();
             List<TableId> capturedTableIds =
                     TableDiscoveryUtils.listTables(
                             sourceConfig.getDatabaseList().get(0),
@@ -134,12 +241,14 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
                             sourceConfig.includePartitionedTables());
             for (TableId tableId : capturedTableIds) {
                 Schema schema = PostgresSchemaUtils.getTableSchema(tableId, 
sourceConfig, jdbc);
-                createTableEventCache.add(
+                createTableEventCache.put(
+                        tableId,
                         new CreateTableEvent(
                                 
org.apache.flink.cdc.common.event.TableId.tableId(
                                         tableId.schema(), tableId.table()),
                                 schema));
             }
+            return createTableEventCache;
         } catch (SQLException e) {
             throw new RuntimeException("Cannot start emitter to fetch table 
schema.", e);
         }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index cdaae9eaa..bd6c46e25 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.cdc.connectors.postgres.source;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -40,7 +40,16 @@ import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
 import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
+import 
org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.util.CloseableIterator;
 
@@ -52,6 +61,8 @@ import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -61,6 +72,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -135,11 +147,206 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
         assertThat(inventoryDatabase.checkSlot(slotName)).isEqualTo(slotName);
     }
 
+    @Test
+    public void testLatestOffsetStartupMode() throws Exception {
+        inventoryDatabase.createAndInitialize();
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGRES_CONTAINER.getHost())
+                                
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(inventoryDatabase.getDatabaseName())
+                                .tableList("inventory.products")
+                                .startupOptions(StartupOptions.latest())
+                                .serverTimeZone("UTC");
+        configFactory.database(inventoryDatabase.getDatabaseName());
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+
+        // Create a temporary directory for savepoint
+        Path savepointDir = 
Files.createTempDirectory("postgres-savepoint-test");
+        final String savepointDirectory = 
savepointDir.toAbsolutePath().toString();
+        String finishedSavePointPath = null;
+
+        // Listen to tables first time
+        StreamExecutionEnvironment env = 
getStreamExecutionEnvironment(finishedSavePointPath, 4);
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+
+        DataStreamSource<Event> source =
+                env.fromSource(
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        PostgresDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                
source.getTransformation().getOutputType().createSerializer(env.getConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(env, source, resultBuffer, serializer, 
accumulatorName);
+
+        JobClient jobClient = env.executeAsync("beforeSavepoint");
+        iterator.setJobClient(jobClient);
+
+        // Insert two records while the pipeline is running
+        try (Connection conn =
+                        getJdbcConnection(POSTGRES_CONTAINER, 
inventoryDatabase.getDatabaseName());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(
+                    "INSERT INTO inventory.products (name, description, 
weight) "
+                            + "VALUES ('scooter', 'Small 2-wheel scooter', 
3.14)");
+            stmt.execute(
+                    "INSERT INTO inventory.products (name, description, 
weight) "
+                            + "VALUES ('football', 'A leather football', 
0.45)");
+        }
+
+        // Wait for the pipeline to process the insert events
+        Thread.sleep(5000);
+
+        // Trigger a savepoint and cancel the job
+        LOG.info("Triggering savepoint");
+        finishedSavePointPath = triggerSavepointWithRetry(jobClient, 
savepointDirectory);
+        LOG.info("Savepoint created at: {}", finishedSavePointPath);
+        jobClient.cancel().get();
+        iterator.close();
+
+        // Restore from savepoint
+        LOG.info("Restoring from savepoint: {}", finishedSavePointPath);
+        StreamExecutionEnvironment restoredEnv =
+                getStreamExecutionEnvironment(finishedSavePointPath, 4);
+        FlinkSourceProvider restoredSourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+
+        DataStreamSource<Event> restoredSource =
+                restoredEnv.fromSource(
+                        restoredSourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        PostgresDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> restoredSerializer =
+                restoredSource
+                        .getTransformation()
+                        .getOutputType()
+                        .createSerializer(restoredEnv.getConfig());
+        CheckpointedCollectResultBuffer<Event> restoredResultBuffer =
+                new CheckpointedCollectResultBuffer<>(restoredSerializer);
+        String restoredAccumulatorName = "dataStreamCollect_" + 
UUID.randomUUID();
+        CollectResultIterator<Event> restoredIterator =
+                addCollector(
+                        restoredEnv,
+                        restoredSource,
+                        restoredResultBuffer,
+                        restoredSerializer,
+                        restoredAccumulatorName);
+
+        JobClient restoredJobClient = 
restoredEnv.executeAsync("afterSavepoint");
+        restoredIterator.setJobClient(restoredJobClient);
+
+        // Insert data into the table after restoration
+        try (Connection conn =
+                        getJdbcConnection(POSTGRES_CONTAINER, 
inventoryDatabase.getDatabaseName());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(
+                    "INSERT INTO inventory.products (name, description, 
weight) "
+                            + "VALUES ('new_product_1', 'New product 
description', 1.0)");
+        }
+
+        // Wait for the pipeline to stabilize and process events
+        Thread.sleep(10000);
+
+        // Fetch results and check for CreateTableEvent and data change events
+        List<Event> restoreAfterEvents = new ArrayList<>();
+        while (restoreAfterEvents.size() < 2 && restoredIterator.hasNext()) {
+            restoreAfterEvents.add(restoredIterator.next());
+        }
+        restoredIterator.close();
+        restoredJobClient.cancel().get();
+
+        // Check if CreateTableEvent for new_products is present
+        boolean hasCreateTableEvent =
+                restoreAfterEvents.stream().anyMatch(event -> event instanceof 
CreateTableEvent);
+        assertThat(hasCreateTableEvent).isTrue();
+
+        // Check if data change event for new_products is present
+        boolean hasProductDataEvent =
+                restoreAfterEvents.stream().anyMatch(event -> event instanceof 
DataChangeEvent);
+        assertThat(hasProductDataEvent).isTrue();
+    }
+
+    // Helper method to trigger a savepoint with retry mechanism
+    private String triggerSavepointWithRetry(JobClient jobClient, String 
savepointDirectory)
+            throws Exception {
+        int retryCount = 0;
+        final int maxRetries = 600;
+        while (retryCount < maxRetries) {
+            try {
+                return jobClient.stopWithSavepoint(true, 
savepointDirectory).get();
+            } catch (Exception e) {
+                retryCount++;
+                LOG.error(
+                        "Retry {}/{}: Failed to trigger savepoint: {}",
+                        retryCount,
+                        maxRetries,
+                        e.getMessage());
+                if (retryCount >= maxRetries) {
+                    throw e;
+                }
+                Thread.sleep(100);
+            }
+        }
+        throw new Exception("Failed to trigger savepoint after " + maxRetries 
+ " retries");
+    }
+
+    // Helper method to get a configured StreamExecutionEnvironment
+    private StreamExecutionEnvironment getStreamExecutionEnvironment(
+            String finishedSavePointPath, int parallelism) {
+        org.apache.flink.configuration.Configuration configuration =
+                new org.apache.flink.configuration.Configuration();
+        if (finishedSavePointPath != null) {
+            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, 
finishedSavePointPath);
+        }
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        env.setParallelism(parallelism);
+        env.enableCheckpointing(500L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        return env;
+    }
+
+    // Helper method to add a collector sink and get the iterator
+    private <T> CollectResultIterator<T> addCollector(
+            StreamExecutionEnvironment env,
+            DataStreamSource<T> source,
+            AbstractCollectResultBuffer<T> buffer,
+            TypeSerializer<T> serializer,
+            String accumulatorName) {
+        CollectSinkOperatorFactory<T> sinkFactory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
sinkFactory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        buffer, operator.getOperatorIdFuture(), 
accumulatorName, 0);
+        CollectStreamSink<T> sink = new CollectStreamSink<>(source, 
sinkFactory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        env.registerCollectIterator(iterator);
+        return iterator;
+    }
+
     @ParameterizedTest(name = "unboundedChunkFirst: {0}")
     @ValueSource(booleans = {true, false})
     public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst) 
throws Exception {
         inventoryDatabase.createAndInitialize();
-        Configuration sourceConfiguration = new Configuration();
+        org.apache.flink.cdc.common.configuration.Configuration 
sourceConfiguration =
+                new org.apache.flink.cdc.common.configuration.Configuration();
         sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME, 
POSTGRES_CONTAINER.getHost());
         sourceConfiguration.set(
                 PostgresDataSourceOptions.PG_PORT,
@@ -161,7 +368,9 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
 
         Factory.Context context =
                 new FactoryHelper.DefaultContext(
-                        sourceConfiguration, new Configuration(), 
this.getClass().getClassLoader());
+                        sourceConfiguration,
+                        new 
org.apache.flink.cdc.common.configuration.Configuration(),
+                        this.getClass().getClassLoader());
         FlinkSourceProvider sourceProvider =
                 (FlinkSourceProvider)
                         new PostgresDataSourceFactory()
@@ -373,6 +582,16 @@ public class PostgresPipelineITCaseTest extends 
PostgresTestBase {
         return result;
     }
 
+    // Helper method to create a temporary directory for savepoint
+    private Path createTempSavepointDir() throws Exception {
+        return Files.createTempDirectory("postgres-savepoint");
+    }
+
+    // Helper method to execute the job and create a savepoint
+    private String createSavepoint(JobClient jobClient, Path savepointDir) 
throws Exception {
+        return jobClient.stopWithSavepoint(true, 
savepointDir.toAbsolutePath().toString()).get();
+    }
+
     private List<Event> getSnapshotExpected(TableId tableId) {
         RowType rowType =
                 RowType.of(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
index 25ebb8aaa..26c788330 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
 import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
@@ -160,6 +161,18 @@ public class IncrementalSourceRecordEmitter<T>
         debeziumDeserializationSchema.deserialize(element, outputCollector);
     }
 
+    /**
+     * Apply the split to the record emitter.
+     *
+     * <p>This method is called when a new split is assigned to the record 
emitter. It allows the
+     * record emitter to perform any necessary initialization or setup based 
on the characteristics
+     * of the assigned split. In this implementation, we may need to handle 
split-specific
+     * configurations or state initialization.
+     *
+     * @param split the split to apply
+     */
+    public void applySplit(SourceSplitBase split) {}
+
     protected void reportMetrics(SourceRecord element) {
         Long messageTimestamp = getMessageTimestamp(element);
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
index ed954d77f..30f77ed25 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
@@ -23,12 +23,15 @@ import 
org.apache.flink.cdc.connectors.base.config.SourceConfig;
 import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
 import 
org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberEvent;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import 
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplitState;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import 
org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
 import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplitState;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
 import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import 
org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
@@ -95,6 +98,18 @@ public class PostgresSourceReader extends 
IncrementalSourceReaderWithCommit {
         }
     }
 
+    @Override
+    protected SourceSplitState initializedState(SourceSplitBase split) {
+        if (recordEmitter instanceof IncrementalSourceRecordEmitter) {
+            ((IncrementalSourceRecordEmitter) recordEmitter).applySplit(split);
+        }
+        if (split.isSnapshotSplit()) {
+            return new SnapshotSplitState(split.asSnapshotSplit());
+        } else {
+            return new StreamSplitState(split.asStreamSplit());
+        }
+    }
+
     @Override
     protected void updateStreamSplitFinishedSplitsSize(
             LatestFinishedSplitsNumberEvent sourceEvent) {

Reply via email to