This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new c3f665707 [FLINK-38818][pipeline-connector][posgtresql]Build
createTableEventCache using TableSchemas from split (#4194)
c3f665707 is described below
commit c3f66570739f4f4bfc76c17c14fa02ef1d90a333
Author: ouyangwulin <[email protected]>
AuthorDate: Thu Dec 25 16:22:34 2025 +0800
[FLINK-38818][pipeline-connector][posgtresql]Build createTableEventCache
using TableSchemas from split (#4194)
---
.../reader/PostgresPipelineRecordEmitter.java | 135 +++++++++++--
.../source/PostgresPipelineITCaseTest.java | 225 ++++++++++++++++++++-
.../reader/IncrementalSourceRecordEmitter.java | 13 ++
.../source/reader/PostgresSourceReader.java | 15 ++
4 files changed, 372 insertions(+), 16 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 3faddcf94..02761d8f3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -21,8 +21,11 @@ import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
@@ -30,21 +33,32 @@ import
org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
+import org.apache.flink.cdc.connectors.postgres.utils.PostgresTypeUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.data.Envelope;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
-import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
@@ -60,7 +74,7 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
private boolean isBounded = false;
- private final List<CreateTableEvent> createTableEventCache = new
ArrayList<>();
+ private final Map<TableId, CreateTableEvent> createTableEventCache;
public PostgresPipelineRecordEmitter(
DebeziumDeserializationSchema debeziumDeserializationSchema,
@@ -76,19 +90,40 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
this.sourceConfig = sourceConfig;
this.postgresDialect = postgresDialect;
this.alreadySendCreateTableTables = new HashSet<>();
+ this.createTableEventCache =
+ ((DebeziumEventDeserializationSchema)
debeziumDeserializationSchema)
+ .getCreateTableEventCache();
generateCreateTableEvent(sourceConfig);
this.isBounded =
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
}
+ @Override
+ public void applySplit(SourceSplitBase split) {
+ if ((isBounded) && createTableEventCache.isEmpty() && split instanceof
SnapshotSplit) {
+ // TableSchemas in SnapshotSplit only contains one table.
+
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
+ } else {
+ for (TableChanges.TableChange tableChange :
split.getTableSchemas().values()) {
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ toCdcTableId(tableChange.getId()),
+ buildSchemaFromTable(tableChange.getTable()));
+ ((DebeziumEventDeserializationSchema)
debeziumDeserializationSchema)
+ .applyChangeEvent(createTableEvent);
+ }
+ }
+ }
+
@Override
protected void processElement(
SourceRecord element, SourceOutput<T> output, SourceSplitState
splitState)
throws Exception {
if (shouldEmitAllCreateTableEventsInSnapshotMode && isBounded) {
// In snapshot mode, we simply emit all schemas at once.
- for (CreateTableEvent createTableEvent : createTableEventCache) {
- output.collect((T) createTableEvent);
- }
+ createTableEventCache.forEach(
+ (tableId, createTableEvent) -> {
+ output.collect((T) createTableEvent);
+ });
shouldEmitAllCreateTableEventsInSnapshotMode = false;
} else if (isLowWatermarkEvent(element) &&
splitState.isSnapshotSplitState()) {
TableId tableId =
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
@@ -99,21 +134,61 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
}
}
} else {
- if (isDataChangeRecord(element) || isSchemaChangeEvent(element)) {
+ boolean isDataChangeRecord = isDataChangeRecord(element);
+ if (isDataChangeRecord || isSchemaChangeEvent(element)) {
TableId tableId = getTableId(element);
if (!alreadySendCreateTableTables.contains(tableId)) {
- for (CreateTableEvent createTableEvent :
createTableEventCache) {
- if (createTableEvent != null) {
- output.collect((T) createTableEvent);
- }
+ CreateTableEvent createTableEvent =
createTableEventCache.get(tableId);
+ if (createTableEvent != null) {
+ output.collect((T) createTableEvent);
}
alreadySendCreateTableTables.add(tableId);
}
+ // In rare case, we may miss some CreateTableEvents before
DataChangeEvents.
+ // Don't send CreateTableEvent for SchemaChangeEvents as it's
the latest schema.
+ if (isDataChangeRecord &&
!createTableEventCache.containsKey(tableId)) {
+ CreateTableEvent createTableEvent =
getCreateTableEvent(sourceConfig, tableId);
+ output.collect((T) createTableEvent);
+ createTableEventCache.put(tableId, createTableEvent);
+ }
}
}
super.processElement(element, output, splitState);
}
+ private Schema buildSchemaFromTable(Table table) {
+ List<Column> columns = table.columns();
+ Schema.Builder tableBuilder = Schema.newBuilder();
+ for (int i = 0; i < columns.size(); i++) {
+ Column column = columns.get(i);
+
+ String colName = column.name();
+ DataType dataType;
+ try (PostgresConnection jdbc =
postgresDialect.openJdbcConnection()) {
+ dataType =
+ PostgresTypeUtils.fromDbzColumn(
+ column,
+ this.sourceConfig.getDbzConnectorConfig(),
+ jdbc.getTypeRegistry());
+ }
+ if (!column.isOptional()) {
+ dataType = dataType.notNull();
+ }
+ tableBuilder.physicalColumn(
+ colName,
+ dataType,
+ column.comment(),
+ column.defaultValueExpression().orElse(null));
+ }
+ tableBuilder.comment(table.comment());
+
+ List<String> primaryKey = table.primaryKeyColumnNames();
+ if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
+ tableBuilder.primaryKey(primaryKey);
+ }
+ return tableBuilder.build();
+ }
+
private void sendCreateTableEvent(
PostgresConnection jdbc, TableId tableId, SourceOutput<Event>
output) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId,
sourceConfig, jdbc);
@@ -124,8 +199,40 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
schema));
}
- private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) {
+ private org.apache.flink.cdc.common.event.TableId toCdcTableId(
+ io.debezium.relational.TableId dbzTableId) {
+ String schemaName =
+ dbzTableId.catalog() == null ? dbzTableId.schema() :
dbzTableId.catalog();
+ return org.apache.flink.cdc.common.event.TableId.tableId(schemaName,
dbzTableId.table());
+ }
+
+ private CreateTableEvent getCreateTableEvent(
+ PostgresSourceConfig sourceConfig, TableId tableId) {
+ try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
+ Schema schema = PostgresSchemaUtils.getTableSchema(tableId,
sourceConfig, jdbc);
+ return new CreateTableEvent(
+ org.apache.flink.cdc.common.event.TableId.tableId(
+ tableId.schema(), tableId.table()),
+ schema);
+ }
+ }
+
+ private TableId getTableId(SourceRecord dataRecord) {
+ Struct value = (Struct) dataRecord.value();
+ Struct source = value.getStruct(Envelope.FieldName.SOURCE);
+ Field field = source.schema().field(SCHEMA_NAME_KEY);
+ String schemaName = null;
+ if (field != null) {
+ schemaName = source.getString(SCHEMA_NAME_KEY);
+ }
+ String tableName = source.getString(TABLE_NAME_KEY);
+ return new TableId(null, schemaName, tableName);
+ }
+
+ private Map<TableId, CreateTableEvent> generateCreateTableEvent(
+ PostgresSourceConfig sourceConfig) {
try (PostgresConnection jdbc = postgresDialect.openJdbcConnection()) {
+ Map<TableId, CreateTableEvent> createTableEventCache = new
HashMap<>();
List<TableId> capturedTableIds =
TableDiscoveryUtils.listTables(
sourceConfig.getDatabaseList().get(0),
@@ -134,12 +241,14 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
sourceConfig.includePartitionedTables());
for (TableId tableId : capturedTableIds) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId,
sourceConfig, jdbc);
- createTableEventCache.add(
+ createTableEventCache.put(
+ tableId,
new CreateTableEvent(
org.apache.flink.cdc.common.event.TableId.tableId(
tableId.schema(), tableId.table()),
schema));
}
+ return createTableEventCache;
} catch (SQLException e) {
throw new RuntimeException("Cannot start emitter to fetch table
schema.", e);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
index cdaae9eaa..bd6c46e25 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.cdc.connectors.postgres.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -40,7 +40,16 @@ import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import
org.apache.flink.streaming.api.operators.collect.AbstractCollectResultBuffer;
+import
org.apache.flink.streaming.api.operators.collect.CheckpointedCollectResultBuffer;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.CloseableIterator;
@@ -52,6 +61,8 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -61,6 +72,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -135,11 +147,206 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
assertThat(inventoryDatabase.checkSlot(slotName)).isEqualTo(slotName);
}
+ @Test
+ public void testLatestOffsetStartupMode() throws Exception {
+ inventoryDatabase.createAndInitialize();
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGRES_CONTAINER.getHost())
+
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(inventoryDatabase.getDatabaseName())
+ .tableList("inventory.products")
+ .startupOptions(StartupOptions.latest())
+ .serverTimeZone("UTC");
+ configFactory.database(inventoryDatabase.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+
+ // Create a temporary directory for savepoint
+ Path savepointDir =
Files.createTempDirectory("postgres-savepoint-test");
+ final String savepointDirectory =
savepointDir.toAbsolutePath().toString();
+ String finishedSavePointPath = null;
+
+ // Listen to tables first time
+ StreamExecutionEnvironment env =
getStreamExecutionEnvironment(finishedSavePointPath, 4);
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+
+ DataStreamSource<Event> source =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> serializer =
+
source.getTransformation().getOutputType().createSerializer(env.getConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(env, source, resultBuffer, serializer,
accumulatorName);
+
+ JobClient jobClient = env.executeAsync("beforeSavepoint");
+ iterator.setJobClient(jobClient);
+
+ // Insert two records while the pipeline is running
+ try (Connection conn =
+ getJdbcConnection(POSTGRES_CONTAINER,
inventoryDatabase.getDatabaseName());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "INSERT INTO inventory.products (name, description,
weight) "
+ + "VALUES ('scooter', 'Small 2-wheel scooter',
3.14)");
+ stmt.execute(
+ "INSERT INTO inventory.products (name, description,
weight) "
+ + "VALUES ('football', 'A leather football',
0.45)");
+ }
+
+ // Wait for the pipeline to process the insert events
+ Thread.sleep(5000);
+
+ // Trigger a savepoint and cancel the job
+ LOG.info("Triggering savepoint");
+ finishedSavePointPath = triggerSavepointWithRetry(jobClient,
savepointDirectory);
+ LOG.info("Savepoint created at: {}", finishedSavePointPath);
+ jobClient.cancel().get();
+ iterator.close();
+
+ // Restore from savepoint
+ LOG.info("Restoring from savepoint: {}", finishedSavePointPath);
+ StreamExecutionEnvironment restoredEnv =
+ getStreamExecutionEnvironment(finishedSavePointPath, 4);
+ FlinkSourceProvider restoredSourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+
+ DataStreamSource<Event> restoredSource =
+ restoredEnv.fromSource(
+ restoredSourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> restoredSerializer =
+ restoredSource
+ .getTransformation()
+ .getOutputType()
+ .createSerializer(restoredEnv.getConfig());
+ CheckpointedCollectResultBuffer<Event> restoredResultBuffer =
+ new CheckpointedCollectResultBuffer<>(restoredSerializer);
+ String restoredAccumulatorName = "dataStreamCollect_" +
UUID.randomUUID();
+ CollectResultIterator<Event> restoredIterator =
+ addCollector(
+ restoredEnv,
+ restoredSource,
+ restoredResultBuffer,
+ restoredSerializer,
+ restoredAccumulatorName);
+
+ JobClient restoredJobClient =
restoredEnv.executeAsync("afterSavepoint");
+ restoredIterator.setJobClient(restoredJobClient);
+
+ // Insert data into the table after restoration
+ try (Connection conn =
+ getJdbcConnection(POSTGRES_CONTAINER,
inventoryDatabase.getDatabaseName());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "INSERT INTO inventory.products (name, description,
weight) "
+ + "VALUES ('new_product_1', 'New product
description', 1.0)");
+ }
+
+ // Wait for the pipeline to stabilize and process events
+ Thread.sleep(10000);
+
+ // Fetch results and check for CreateTableEvent and data change events
+ List<Event> restoreAfterEvents = new ArrayList<>();
+ while (restoreAfterEvents.size() < 2 && restoredIterator.hasNext()) {
+ restoreAfterEvents.add(restoredIterator.next());
+ }
+ restoredIterator.close();
+ restoredJobClient.cancel().get();
+
+ // Check if CreateTableEvent for new_products is present
+ boolean hasCreateTableEvent =
+ restoreAfterEvents.stream().anyMatch(event -> event instanceof
CreateTableEvent);
+ assertThat(hasCreateTableEvent).isTrue();
+
+ // Check if data change event for new_products is present
+ boolean hasProductDataEvent =
+ restoreAfterEvents.stream().anyMatch(event -> event instanceof
DataChangeEvent);
+ assertThat(hasProductDataEvent).isTrue();
+ }
+
+ // Helper method to trigger a savepoint with retry mechanism
+ private String triggerSavepointWithRetry(JobClient jobClient, String
savepointDirectory)
+ throws Exception {
+ int retryCount = 0;
+ final int maxRetries = 600;
+ while (retryCount < maxRetries) {
+ try {
+ return jobClient.stopWithSavepoint(true,
savepointDirectory).get();
+ } catch (Exception e) {
+ retryCount++;
+ LOG.error(
+ "Retry {}/{}: Failed to trigger savepoint: {}",
+ retryCount,
+ maxRetries,
+ e.getMessage());
+ if (retryCount >= maxRetries) {
+ throw e;
+ }
+ Thread.sleep(100);
+ }
+ }
+ throw new Exception("Failed to trigger savepoint after " + maxRetries
+ " retries");
+ }
+
+ // Helper method to get a configured StreamExecutionEnvironment
+ private StreamExecutionEnvironment getStreamExecutionEnvironment(
+ String finishedSavePointPath, int parallelism) {
+ org.apache.flink.configuration.Configuration configuration =
+ new org.apache.flink.configuration.Configuration();
+ if (finishedSavePointPath != null) {
+ configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH,
finishedSavePointPath);
+ }
+ StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(500L);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ return env;
+ }
+
+ // Helper method to add a collector sink and get the iterator
+ private <T> CollectResultIterator<T> addCollector(
+ StreamExecutionEnvironment env,
+ DataStreamSource<T> source,
+ AbstractCollectResultBuffer<T> buffer,
+ TypeSerializer<T> serializer,
+ String accumulatorName) {
+ CollectSinkOperatorFactory<T> sinkFactory =
+ new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+ CollectSinkOperator<T> operator = (CollectSinkOperator<T>)
sinkFactory.getOperator();
+ CollectResultIterator<T> iterator =
+ new CollectResultIterator<>(
+ buffer, operator.getOperatorIdFuture(),
accumulatorName, 0);
+ CollectStreamSink<T> sink = new CollectStreamSink<>(source,
sinkFactory);
+ sink.name("Data stream collect sink");
+ env.addOperator(sink.getTransformation());
+ env.registerCollectIterator(iterator);
+ return iterator;
+ }
+
@ParameterizedTest(name = "unboundedChunkFirst: {0}")
@ValueSource(booleans = {true, false})
public void testInitialStartupModeWithOpts(boolean unboundedChunkFirst)
throws Exception {
inventoryDatabase.createAndInitialize();
- Configuration sourceConfiguration = new Configuration();
+ org.apache.flink.cdc.common.configuration.Configuration
sourceConfiguration =
+ new org.apache.flink.cdc.common.configuration.Configuration();
sourceConfiguration.set(PostgresDataSourceOptions.HOSTNAME,
POSTGRES_CONTAINER.getHost());
sourceConfiguration.set(
PostgresDataSourceOptions.PG_PORT,
@@ -161,7 +368,9 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
Factory.Context context =
new FactoryHelper.DefaultContext(
- sourceConfiguration, new Configuration(),
this.getClass().getClassLoader());
+ sourceConfiguration,
+ new
org.apache.flink.cdc.common.configuration.Configuration(),
+ this.getClass().getClassLoader());
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
new PostgresDataSourceFactory()
@@ -373,6 +582,16 @@ public class PostgresPipelineITCaseTest extends
PostgresTestBase {
return result;
}
+ // Helper method to create a temporary directory for savepoint
+ private Path createTempSavepointDir() throws Exception {
+ return Files.createTempDirectory("postgres-savepoint");
+ }
+
+ // Helper method to execute the job and create a savepoint
+ private String createSavepoint(JobClient jobClient, Path savepointDir)
throws Exception {
+ return jobClient.stopWithSavepoint(true,
savepointDir.toAbsolutePath().toString()).get();
+ }
+
private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
index 25ebb8aaa..26c788330 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
@@ -160,6 +161,18 @@ public class IncrementalSourceRecordEmitter<T>
debeziumDeserializationSchema.deserialize(element, outputCollector);
}
+ /**
+ * Apply the split to the record emitter.
+ *
+ * <p>This method is called when a new split is assigned to the record
emitter. It allows the
+ * record emitter to perform any necessary initialization or setup based
on the characteristics
+ * of the assigned split. In this implementation, we may need to handle
split-specific
+ * configurations or state initialization.
+ *
+ * @param split the split to apply
+ */
+ public void applySplit(SourceSplitBase split) {}
+
protected void reportMetrics(SourceRecord element) {
Long messageTimestamp = getMessageTimestamp(element);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
index ed954d77f..30f77ed25 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReader.java
@@ -23,12 +23,15 @@ import
org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import
org.apache.flink.cdc.connectors.base.source.meta.events.LatestFinishedSplitsNumberEvent;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import
org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import
org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplitState;
import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderWithCommit;
+import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import
org.apache.flink.cdc.connectors.postgres.source.events.OffsetCommitAckEvent;
@@ -95,6 +98,18 @@ public class PostgresSourceReader extends
IncrementalSourceReaderWithCommit {
}
}
+ @Override
+ protected SourceSplitState initializedState(SourceSplitBase split) {
+ if (recordEmitter instanceof IncrementalSourceRecordEmitter) {
+ ((IncrementalSourceRecordEmitter) recordEmitter).applySplit(split);
+ }
+ if (split.isSnapshotSplit()) {
+ return new SnapshotSplitState(split.asSnapshotSplit());
+ } else {
+ return new StreamSplitState(split.asStreamSplit());
+ }
+ }
+
@Override
protected void updateStreamSplitFinishedSplitsSize(
LatestFinishedSplitsNumberEvent sourceEvent) {