This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 8e576aa91 [FLINK-38959][postgres] Update split state's table schemas
info and infer schema change event based on pgoutput plugin's relation message.
(#4316)
8e576aa91 is described below
commit 8e576aa91587abcfa8addd78254d5b6158c19875
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Mar 21 09:20:17 2026 +0800
[FLINK-38959][postgres] Update split state's table schemas info and infer
schema change event based on pgoutput plugin's relation message. (#4316)
---
.../factory/PostgresDataSourceFactory.java | 4 +
.../postgres/source/PostgresDataSourceOptions.java | 8 +
.../reader/PostgresPipelineRecordEmitter.java | 138 +++++----
.../postgres/utils/SchemaChangeUtil.java | 318 +++++++++++++++++++++
.../postgres/source/PostgresPipelineITCase.java | 316 +++++++++++++++++++-
.../source/meta/split/SourceSplitSerializer.java | 3 +-
.../reader/IncrementalSourceRecordEmitter.java | 23 +-
.../connector/postgresql/PostgresObjectUtils.java | 28 +-
.../postgres/source/PostgresSourceBuilder.java | 20 ++
.../source/config/PostgresSourceConfig.java | 16 +-
.../source/config/PostgresSourceConfigFactory.java | 10 +-
.../source/fetch/CDCPostgresDispatcher.java | 16 +-
.../fetch/PostgresSourceFetchTaskContext.java | 17 +-
.../source/reader/PostgresSourceRecordEmitter.java | 56 ++++
.../source/schema/PostgresSchemaRecord.java | 72 +++++
.../source/schema/RelationAwarePostgresSchema.java | 62 ++++
.../postgres/source/schema/SchemaDispatcher.java | 25 ++
.../cdc/connectors/postgres/PostgresTestBase.java | 9 +-
.../fetch/IncrementalSourceStreamFetcherTest.java | 99 ++++++-
.../source/fetch/PostgresScanFetchTaskTest.java | 5 +-
.../source/reader/PostgresSourceReaderTest.java | 140 ++++++++-
21 files changed, 1264 insertions(+), 121 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index 578b6e0f8..93d8657cf 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -74,6 +74,7 @@ import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
+import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
import static
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -131,6 +132,7 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
boolean skipSnapshotBackfill =
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
int lsnCommitCheckpointsDelay =
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
+ boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize,
1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -172,6 +174,7 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.includeDatabaseInTableId(tableIdIncludeDatabase)
+ .includeSchemaChanges(includeSchemaChanges)
.getConfigFactory();
List<TableId> tableIds =
PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -262,6 +265,7 @@ public class PostgresDataSourceFactory implements
DataSourceFactory {
options.add(METADATA_LIST);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(TABLE_ID_INCLUDE_DATABASE);
+ options.add(SCHEMA_CHANGE_ENABLED);
return options;
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
index 7e9ac4b0c..95cc823d2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
@@ -273,4 +273,12 @@ public class PostgresDataSourceOptions {
"Whether to include database in the generated
Table ID. "
+ "If set to true, the Table ID will be in
the format (database, schema, table). "
+ "If set to false, the Table ID will be
in the format (schema, table). Defaults to false.");
+
+ @Experimental
+ public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
+ ConfigOptions.key("schema-change.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to infer CDC column types when processing
pgoutput Relation messages.");
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index dd862354c..18c1d9b99 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -19,41 +19,39 @@ package
org.apache.flink.cdc.connectors.postgres.source.reader;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.common.event.CreateTableEvent;
-import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
-import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
import
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
-import org.apache.flink.cdc.connectors.postgres.utils.PostgresTypeUtils;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.data.Envelope;
-import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
@@ -62,24 +60,26 @@ import static
org.apache.flink.cdc.connectors.base.source.meta.wartermark.Waterm
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
import static
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
+import static
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.inferSchemaChangeEvent;
+import static
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.toCreateTableEvent;
/** The {@link RecordEmitter} implementation for PostgreSQL pipeline
connector. */
-public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmitter<T> {
+public class PostgresPipelineRecordEmitter<T> extends
PostgresSourceRecordEmitter<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresPipelineRecordEmitter.class);
private final PostgresSourceConfig sourceConfig;
private final PostgresDialect postgresDialect;
// Used when startup mode is initial
- private Set<TableId> alreadySendCreateTableTables;
+ private final Set<TableId> alreadySendCreateTableTables;
+ private final boolean isBounded;
+ private final boolean includeDatabaseInTableId;
+ private final Map<TableId, CreateTableEvent> createTableEventCache;
// Used when startup mode is not initial
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
- private boolean isBounded = false;
- private boolean includeDatabaseInTableId = false;
-
- private final Map<TableId, CreateTableEvent> createTableEventCache;
public PostgresPipelineRecordEmitter(
- DebeziumDeserializationSchema debeziumDeserializationSchema,
+ DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
SourceReaderMetrics sourceReaderMetrics,
PostgresSourceConfig sourceConfig,
OffsetFactory offsetFactory,
@@ -108,16 +108,11 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
} else {
for (Map.Entry<TableId, TableChanges.TableChange> entry :
split.getTableSchemas().entrySet()) {
- TableId tableId =
- entry.getKey(); // Use the TableId from the map key
which contains full info
TableChanges.TableChange tableChange = entry.getValue();
+
+ Table table = tableChange.getTable();
CreateTableEvent createTableEvent =
- new CreateTableEvent(
- toCdcTableId(
- tableId,
- sourceConfig.getDatabaseList().get(0),
- includeDatabaseInTableId),
- buildSchemaFromTable(tableChange.getTable()));
+ toCreateTableEvent(table, sourceConfig,
postgresDialect);
((DebeziumEventDeserializationSchema)
debeziumDeserializationSchema)
.applyChangeEvent(createTableEvent);
}
@@ -137,68 +132,63 @@ public class PostgresPipelineRecordEmitter<T> extends
IncrementalSourceRecordEmi
shouldEmitAllCreateTableEventsInSnapshotMode = false;
} else if (isLowWatermarkEvent(element) &&
splitState.isSnapshotSplitState()) {
TableId tableId =
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
- if (!alreadySendCreateTableTables.contains(tableId)) {
- sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
- alreadySendCreateTableTables.add(tableId);
- }
- } else {
- boolean isDataChangeRecord = isDataChangeRecord(element);
- if (isDataChangeRecord || isSchemaChangeEvent(element)) {
- TableId tableId = getTableId(element);
- if (!alreadySendCreateTableTables.contains(tableId)) {
- CreateTableEvent createTableEvent =
createTableEventCache.get(tableId);
- if (createTableEvent != null) {
- output.collect((T) createTableEvent);
- }
- alreadySendCreateTableTables.add(tableId);
- }
- // In rare case, we may miss some CreateTableEvents before
DataChangeEvents.
- // Don't send CreateTableEvent for SchemaChangeEvents as it's
the latest schema.
- if (isDataChangeRecord &&
!createTableEventCache.containsKey(tableId)) {
- CreateTableEvent createTableEvent =
getCreateTableEvent(sourceConfig, tableId);
- output.collect((T) createTableEvent);
- createTableEventCache.put(tableId, createTableEvent);
- }
- }
+ maybeSendCreateTableEventFromCache(tableId, output);
+ } else if (isDataChangeRecord(element)) {
+ handleDataChangeRecord(element, output);
+ } else if (isSchemaChangeEvent(element) &&
sourceConfig.isSchemaChangeEnabled()) {
+ handleSchemaChangeRecord(element, output, splitState);
}
super.processElement(element, output, splitState);
}
- private Schema buildSchemaFromTable(Table table) {
- List<Column> columns = table.columns();
- Schema.Builder tableBuilder = Schema.newBuilder();
- for (int i = 0; i < columns.size(); i++) {
- Column column = columns.get(i);
-
- String colName = column.name();
- DataType dataType;
- try (PostgresConnection jdbc =
postgresDialect.openJdbcConnection()) {
- dataType =
- PostgresTypeUtils.fromDbzColumn(
- column,
- this.sourceConfig.getDbzConnectorConfig(),
- jdbc.getTypeRegistry());
- }
- if (!column.isOptional()) {
- dataType = dataType.notNull();
- }
- tableBuilder.physicalColumn(
- colName,
- dataType,
- column.comment(),
- column.defaultValueExpression().orElse(null));
+ private void handleDataChangeRecord(SourceRecord element, SourceOutput<T>
output) {
+ TableId tableId = getTableId(element);
+ maybeSendCreateTableEventFromCache(tableId, output);
+ // In rare case, we may miss some CreateTableEvents before
DataChangeEvents.
+ // Don't send CreateTableEvent for SchemaChangeEvents as it's the
latest schema.
+ if (!createTableEventCache.containsKey(tableId)) {
+ CreateTableEvent createTableEvent =
getCreateTableEvent(sourceConfig, tableId);
+ sendCreateTableEvent(createTableEvent, output);
+ createTableEventCache.put(tableId, createTableEvent);
+ }
+ }
+
+ private void handleSchemaChangeRecord(
+ SourceRecord element, SourceOutput<T> output, SourceSplitState
splitState) {
+ if (!(element instanceof PostgresSchemaRecord)) {
+ // Ignore non-Postgres schema change records; they may represent
non-relation
+ // schema changes that are not handled via PostgresSchemaRecord.
+ LOG.warn("Ignoring non-PostgresSchemaRecord schema change event:
{}", element);
+ return;
+ }
+ Map<TableId, TableChanges.TableChange> existedTableSchemas =
+ splitState.toSourceSplit().getTableSchemas();
+ PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+ Table schemaAfter = schemaRecord.getTable();
+ maybeSendCreateTableEventFromCache(schemaAfter.id(), output);
+ Table schemaBefore = null;
+ if (existedTableSchemas.containsKey(schemaAfter.id())) {
+ schemaBefore =
existedTableSchemas.get(schemaAfter.id()).getTable();
}
- tableBuilder.comment(table.comment());
+ List<SchemaChangeEvent> schemaChangeEvents =
+ inferSchemaChangeEvent(
+ schemaAfter.id(), schemaBefore, schemaAfter,
sourceConfig, postgresDialect);
+ LOG.info("Inferred Schema change events: {}", schemaChangeEvents);
+ schemaChangeEvents.forEach(schemaChangeEvent -> output.collect((T)
schemaChangeEvent));
+ }
- List<String> primaryKey = table.primaryKeyColumnNames();
- if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
- tableBuilder.primaryKey(primaryKey);
+ private void maybeSendCreateTableEventFromCache(TableId tableId,
SourceOutput<T> output) {
+ if (!alreadySendCreateTableTables.contains(tableId)) {
+ CreateTableEvent createTableEvent =
createTableEventCache.get(tableId);
+ if (createTableEvent != null) {
+ sendCreateTableEvent(createTableEvent, output);
+ }
+ alreadySendCreateTableTables.add(tableId);
}
- return tableBuilder.build();
}
- private void sendCreateTableEvent(TableId tableId, SourceOutput<Event>
output) {
- output.collect(getCreateTableEvent(sourceConfig, tableId));
+ private void sendCreateTableEvent(CreateTableEvent createTableEvent,
SourceOutput<T> output) {
+ output.collect((T) createTableEvent);
}
private CreateTableEvent getCreateTableEvent(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
new file mode 100644
index 000000000..c92c83f0b
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
+import static
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toSchema;
+
+/**
+ * Utilities for inferring CDC schema change events by comparing before/after
Debezium table
+ * schemas.
+ *
+ * <p>PostgreSQL DDL has exactly four operations that structurally change a
table schema:
+ *
+ * <ul>
+ * <li><b>Add column</b> — always appended at the end (PostgreSQL does not
support adding a column
+ * at an arbitrary position).
+ * <li><b>Drop column</b> — removes a column by name.
+ * <li><b>Rename column</b> — changes a column's name while preserving its
position and type.
+ * <li><b>Alter column type</b> — changes a column's type while preserving
its name and position.
+ * </ul>
+ *
+ * <p>Ideally, PostgreSQL's {@code pg_attribute.attnum} would be the best way
to distinguish whether
+ * a column was dropped-and-recreated or simply renamed. However, the pgoutput
relation message does
+ * not include {@code attnum}. Even querying the database for the current
attnum after the fact is
+ * unreliable due to temporal mismatch — for example, if column {@code C} is
renamed to {@code D}
+ * and a new column {@code C} is added between two relation messages, a
retroactive attnum query
+ * would see the <em>new</em> {@code C}'s attnum, not the original one,
leading to incorrect
+ * inference.
+ *
+ * <p>Instead, this utility infers schema changes by comparing the
before/after column lists and
+ * computing the <b>minimum edit</b> using the four operations above. The main
inherent ambiguity is
+ * the last-column scenario: renaming the last column looks identical to
dropping it and adding a
+ * new column with the same name at the end, since both produce the same
before/after column list.
+ * The algorithm resolves this by always preferring the interpretation with
the fewest operations
+ * (rename costs 1; drop + add costs 2), so a last-column name change is
inferred as a rename.
+ */
+public class SchemaChangeUtil {
+ /**
+ * Infers the minimum list of schema change events that transform {@code
tableBefore} into
+ * {@code tableAfter}. Returns a {@link CreateTableEvent} if {@code
tableBefore} is null.
+ */
+ public static List<SchemaChangeEvent> inferSchemaChangeEvent(
+ io.debezium.relational.TableId dbzTableId,
+ @Nullable Table tableBefore,
+ Table tableAfter,
+ PostgresSourceConfig sourceConfig,
+ PostgresDialect dialect) {
+
+ if (tableBefore == null) {
+ return Collections.singletonList(toCreateTableEvent(tableAfter,
sourceConfig, dialect));
+ }
+
+ TableId cdcTableId =
+ toCdcTableId(
+ dbzTableId,
+ sourceConfig.getDatabaseList().get(0),
+ sourceConfig.isIncludeDatabaseInTableId());
+ PostgresConnectorConfig dbzConfig =
sourceConfig.getDbzConnectorConfig();
+
+ try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ TypeRegistry typeRegistry = connection.getTypeRegistry();
+ return inferMinimalSchemaChanges(
+ cdcTableId,
+ tableBefore.columns(),
+ tableAfter.columns(),
+ dbzConfig,
+ typeRegistry);
+ }
+ }
+
+ public static CreateTableEvent toCreateTableEvent(
+ Table table, PostgresSourceConfig sourceConfig, PostgresDialect
dialect) {
+ try (PostgresConnection connection = dialect.openJdbcConnection()) {
+ return toCreateTableEvent(table, sourceConfig,
connection.getTypeRegistry());
+ }
+ }
+
+ private static CreateTableEvent toCreateTableEvent(
+ Table table, PostgresSourceConfig sourceConfig, TypeRegistry
typeRegistry) {
+ return new CreateTableEvent(
+ toCdcTableId(
+ table.id(),
+ sourceConfig.getDatabaseList().get(0),
+ sourceConfig.isIncludeDatabaseInTableId()),
+ toSchema(table, sourceConfig.getDbzConnectorConfig(),
typeRegistry));
+ }
+
+ /**
+ * Finds the minimum schema change events to transform beforeCols into
afterCols using recursion
+ * with memoization. Available operations: rename column, add column at
last, drop column, alter
+ * column type. Recursion depth bounded by total column count.
+ */
+ private static List<SchemaChangeEvent> inferMinimalSchemaChanges(
+ TableId cdcTableId,
+ List<Column> beforeCols,
+ List<Column> afterCols,
+ PostgresConnectorConfig dbzConfig,
+ TypeRegistry typeRegistry) {
+
+ int n = beforeCols.size();
+ int m = afterCols.size();
+
+ // memo[i][j] = min cost from state (i, j), -1 means unvisited
+ int[][] memo = new int[n + 1][m + 1];
+ for (int[] row : memo) {
+ Arrays.fill(row, -1);
+ }
+
+ // Fill memoization table via recursion
+ minCost(0, 0, n, m, beforeCols, afterCols, memo, dbzConfig,
typeRegistry);
+
+ // Traceback to build schema change events
+ return tracebackEvents(
+ cdcTableId, n, m, beforeCols, afterCols, memo, dbzConfig,
typeRegistry);
+ }
+
+ /**
+ * Recursively computes the minimum number of individual column operations
to align
+ * before[i..n-1] with after[j..m-1], where {@code i} is the current index
into {@code
+ * beforeCols} (0..n) and {@code j} is the current index into {@code
afterCols} (0..m). Boundary
+ * cases ({@code i == n} or {@code j == m}) return immediately without
list access. At each
+ * non-boundary state, either drop before[i] (cost 1) or match before[i]
to after[j] (cost =
+ * rename(0/1) + alterType(0/1)). Unmatched after columns are added at the
end.
+ */
+ private static int minCost(
+ int i,
+ int j,
+ int n,
+ int m,
+ List<Column> beforeCols,
+ List<Column> afterCols,
+ int[][] memo,
+ PostgresConnectorConfig dbzConfig,
+ TypeRegistry typeRegistry) {
+
+ if (i == n) {
+ return m - j; // add remaining after columns
+ }
+ if (j == m) {
+ return n - i; // drop remaining before columns
+ }
+ if (memo[i][j] != -1) {
+ return memo[i][j];
+ }
+
+ // Option 1: drop beforeCols[i]
+ int dropCost =
+ 1 + minCost(i + 1, j, n, m, beforeCols, afterCols, memo,
dbzConfig, typeRegistry);
+
+ // Option 2: match beforeCols[i] to afterCols[j]
+ int matchCost =
+ columnMatchCost(beforeCols.get(i), afterCols.get(j),
dbzConfig, typeRegistry)
+ + minCost(
+ i + 1,
+ j + 1,
+ n,
+ m,
+ beforeCols,
+ afterCols,
+ memo,
+ dbzConfig,
+ typeRegistry);
+
+ memo[i][j] = Math.min(dropCost, matchCost);
+ return memo[i][j];
+ }
+
+ /**
+ * Computes the min cost of matching a before column to an after column
(0, 1, or 2). if column
+ * name is same and type is same, cost = 0; if column name is same and
type is different, cost =
+ * 1(alter type); if column name is different and type is same, cost =
1(rename); if column name
+ * is different and type is different, cost = 2(rename + alter type);
+ */
+ private static int columnMatchCost(
+ Column before,
+ Column after,
+ PostgresConnectorConfig dbzConfig,
+ TypeRegistry typeRegistry) {
+ int cost = 0;
+ if (!before.name().equals(after.name())) {
+ cost++;
+ }
+ DataType beforeType = PostgresTypeUtils.fromDbzColumn(before,
dbzConfig, typeRegistry);
+ DataType afterType = PostgresTypeUtils.fromDbzColumn(after, dbzConfig,
typeRegistry);
+ if (!beforeType.equals(afterType)) {
+ cost++;
+ }
+ return cost;
+ }
+
+ /**
+ * Traces back through the memoization table to reconstruct the optimal
sequence of schema
+ * change events. Events are emitted per-column in the natural
left-to-right scan order.
+ */
+ private static List<SchemaChangeEvent> tracebackEvents(
+ TableId cdcTableId,
+ int n,
+ int m,
+ List<Column> beforeCols,
+ List<Column> afterCols,
+ int[][] memo,
+ PostgresConnectorConfig dbzConfig,
+ TypeRegistry typeRegistry) {
+
+ List<SchemaChangeEvent> events = new ArrayList<>();
+
+ int i = 0;
+ int j = 0;
+ while (i < n && j < m) {
+ int dropCost = 1 + memoOrBase(i + 1, j, n, m, memo);
+ int matchCost =
+ columnMatchCost(beforeCols.get(i), afterCols.get(j),
dbzConfig, typeRegistry)
+ + memoOrBase(i + 1, j + 1, n, m, memo);
+
+ if (dropCost <= matchCost) {
+ events.add(
+ new DropColumnEvent(
+ cdcTableId,
Collections.singletonList(beforeCols.get(i).name())));
+ i++;
+ } else {
+ Column bc = beforeCols.get(i);
+ Column ac = afterCols.get(j);
+
+ if (!bc.name().equals(ac.name())) {
+ events.add(
+ new RenameColumnEvent(
+ cdcTableId,
Collections.singletonMap(bc.name(), ac.name())));
+ }
+
+ DataType beforeType = PostgresTypeUtils.fromDbzColumn(bc,
dbzConfig, typeRegistry);
+ DataType afterType = PostgresTypeUtils.fromDbzColumn(ac,
dbzConfig, typeRegistry);
+ if (!beforeType.equals(afterType)) {
+ events.add(
+ new AlterColumnTypeEvent(
+ cdcTableId,
Collections.singletonMap(ac.name(), afterType)));
+ }
+
+ i++;
+ j++;
+ }
+ }
+
+ // Drop remaining before columns
+ while (i < n) {
+ events.add(
+ new DropColumnEvent(
+ cdcTableId,
Collections.singletonList(beforeCols.get(i).name())));
+ i++;
+ }
+
+ // Add remaining after columns at last. Adds are always trailing
because the only
+ // available operation is "add column at last" (PostgreSQL ALTER TABLE
ADD COLUMN always
+ // appends). Surviving before-columns (after
drops/renames/alter-types) map to a prefix
+ // of the after-columns; the unmatched suffix can only be fulfilled by
appending.
+ while (j < m) {
+ events.add(
+ new AddColumnEvent(
+ cdcTableId,
+ Collections.singletonList(
+ AddColumnEvent.last(
+ PostgresSchemaUtils.toColumn(
+ afterCols.get(j),
dbzConfig, typeRegistry)))));
+ j++;
+ }
+
+ return events;
+ }
+
+ /** Returns memoized cost or computes base case for boundary conditions. */
+ private static int memoOrBase(int i, int j, int n, int m, int[][] memo) {
+ if (i == n) {
+ return m - j;
+ }
+ if (j == m) {
+ return n - i;
+ }
+ return memo[i][j];
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
index 9ebd21c29..51de6a3b4 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
@@ -19,11 +19,16 @@ package org.apache.flink.cdc.connectors.postgres.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
@@ -78,6 +83,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -105,7 +111,9 @@ public class PostgresPipelineITCase extends
PostgresTestBase {
}
@AfterEach
- public void after() throws SQLException {
+ public void after() throws Exception {
+ // sleep 1000ms to wait until connections are closed.
+ Thread.sleep(1000L);
inventoryDatabase.removeSlot(slotName);
}
@@ -595,6 +603,300 @@ public class PostgresPipelineITCase extends
PostgresTestBase {
.isEqualTo(String.format("Replication slot \"%s\" does not
exist", slotName));
}
+ @Test
+ public void testSchemaChangeWithDataInserts() throws Exception {
+ inventoryDatabase.createAndInitialize();
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGRES_CONTAINER.getHost())
+
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(inventoryDatabase.getDatabaseName())
+ .tableList("inventory.products")
+ .startupOptions(StartupOptions.initial())
+ .serverTimeZone("UTC");
+ configFactory.database(inventoryDatabase.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+ configFactory.enableSchemaChange(true);
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+
+ StreamExecutionEnvironment testEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ testEnv.setParallelism(1);
+ testEnv.enableCheckpointing(1000);
+
+ DataStreamSource<Event> source =
+ testEnv.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo());
+
+ TypeSerializer<Event> serializer =
+ source.getTransformation()
+ .getOutputType()
+
.createSerializer(testEnv.getConfig().getSerializerConfig());
+ CheckpointedCollectResultBuffer<Event> resultBuffer =
+ new CheckpointedCollectResultBuffer<>(serializer);
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectResultIterator<Event> iterator =
+ addCollector(testEnv, source, resultBuffer, serializer,
accumulatorName);
+
+ JobClient jobClient =
testEnv.executeAsync("testSchemaChangeWithDataInserts");
+ iterator.setJobClient(jobClient);
+
+ try {
+ // Wait for snapshot phase to complete (9 rows in products)
+ List<Event> snapshotEvents =
+ fetchEvent(iterator, 9, (event) -> !(event instanceof
CreateTableEvent));
+ assertThat(snapshotEvents).hasSize(9);
+
+ // Wait for stream phase to stabilize
+ Thread.sleep(1000);
+ TableId tableId = TableId.tableId("inventory", "products");
+
+ // Build expected event list in order
+ List<Event> expectedLog = new ArrayList<>();
+
+ // --- Original schema: (id, name, description, weight) ---
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(255).notNull(),
+ DataTypes.VARCHAR(512),
+ DataTypes.DOUBLE()
+ },
+ new String[] {"id", "name", "description",
"weight"});
+ BinaryRecordDataGenerator gen1 = new
BinaryRecordDataGenerator(rowType1);
+
+ // --- After ADD COLUMN: (id, name, description, weight, category)
---
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(255).notNull(),
+ DataTypes.VARCHAR(512),
+ DataTypes.DOUBLE(),
+ DataTypes.VARCHAR(100)
+ },
+ new String[] {"id", "name", "description",
"weight", "category"});
+ BinaryRecordDataGenerator gen2 = new
BinaryRecordDataGenerator(rowType2);
+
+ // --- After DROP COLUMN 'description': (id, name, weight,
category) ---
+ RowType rowType3 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(255).notNull(),
+ DataTypes.DOUBLE(),
+ DataTypes.VARCHAR(100)
+ },
+ new String[] {"id", "name", "weight", "category"});
+ BinaryRecordDataGenerator gen3 = new
BinaryRecordDataGenerator(rowType3);
+
+ // --- After ALTER COLUMN TYPE weight -> DECIMAL(10,2): (id, name,
weight, category) ---
+ RowType rowType4 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(255).notNull(),
+ DataTypes.DECIMAL(10, 2),
+ DataTypes.VARCHAR(100)
+ },
+ new String[] {"id", "name", "weight", "category"});
+ BinaryRecordDataGenerator gen4 = new
BinaryRecordDataGenerator(rowType4);
+
+ // --- After RENAME COLUMN category -> product_category: (id,
name, weight,
+ // product_category) ---
+ RowType rowType5 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(255).notNull(),
+ DataTypes.DECIMAL(10, 2),
+ DataTypes.VARCHAR(100)
+ },
+ new String[] {"id", "name", "weight",
"product_category"});
+ BinaryRecordDataGenerator gen5 = new
BinaryRecordDataGenerator(rowType5);
+
+ try (Connection conn =
+ getJdbcConnection(
+ POSTGRES_CONTAINER,
inventoryDatabase.getDatabaseName());
+ Statement stmt = conn.createStatement()) {
+
+ // Insert before ADD COLUMN (id=110)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'before_add_col', 'desc1', 1.0)");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen1.generate(
+ new Object[] {
+ 110,
+
BinaryStringData.fromString("before_add_col"),
+
BinaryStringData.fromString("desc1"),
+ 1.0
+ })));
+
+ // ADD COLUMN category
+ stmt.execute("ALTER TABLE inventory.products ADD COLUMN
category VARCHAR(100)");
+ expectedLog.add(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
org.apache.flink.cdc.common.schema.Column
+ .physicalColumn(
+ "category",
+
DataTypes.VARCHAR(100),
+ null,
+ null)))));
+
+ // Insert after ADD COLUMN (id=111)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'after_add_col', 'desc2', 2.0, 'electronics')");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen2.generate(
+ new Object[] {
+ 111,
+
BinaryStringData.fromString("after_add_col"),
+
BinaryStringData.fromString("desc2"),
+ 2.0,
+
BinaryStringData.fromString("electronics")
+ })));
+
+ // Insert before DROP COLUMN (id=112)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'before_drop_col', 'desc3', 3.0, 'books')");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen2.generate(
+ new Object[] {
+ 112,
+
BinaryStringData.fromString("before_drop_col"),
+
BinaryStringData.fromString("desc3"),
+ 3.0,
+
BinaryStringData.fromString("books")
+ })));
+
+ // DROP COLUMN description
+ stmt.execute("ALTER TABLE inventory.products DROP COLUMN
description");
+ expectedLog.add(
+ new DropColumnEvent(tableId,
Collections.singletonList("description")));
+
+ // Insert after DROP COLUMN (id=113)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'after_drop_col', 4.0, 'toys')");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen3.generate(
+ new Object[] {
+ 113,
+
BinaryStringData.fromString("after_drop_col"),
+ 4.0,
+ BinaryStringData.fromString("toys")
+ })));
+
+ // Insert before ALTER COLUMN TYPE (id=114)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'before_alter_type', 5.0, 'food')");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen3.generate(
+ new Object[] {
+ 114,
+
BinaryStringData.fromString("before_alter_type"),
+ 5.0,
+ BinaryStringData.fromString("food")
+ })));
+
+ // ALTER COLUMN TYPE weight -> DECIMAL(10,2)
+ stmt.execute(
+ "ALTER TABLE inventory.products ALTER COLUMN weight
TYPE DECIMAL(10,2)");
+ expectedLog.add(
+ new AlterColumnTypeEvent(
+ tableId,
+ Collections.singletonMap("weight",
DataTypes.DECIMAL(10, 2))));
+
+ // Insert after ALTER COLUMN TYPE (id=115)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'after_alter_type', 6.00, 'sports')");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen4.generate(
+ new Object[] {
+ 115,
+
BinaryStringData.fromString("after_alter_type"),
+ DecimalData.fromBigDecimal(
+ new
java.math.BigDecimal("6.00"), 10, 2),
+
BinaryStringData.fromString("sports")
+ })));
+
+ // Insert before RENAME COLUMN (id=116)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'before_rename_col', 7.00, 'clothing')");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen4.generate(
+ new Object[] {
+ 116,
+
BinaryStringData.fromString("before_rename_col"),
+ DecimalData.fromBigDecimal(
+ new
java.math.BigDecimal("7.00"), 10, 2),
+
BinaryStringData.fromString("clothing")
+ })));
+
+ // RENAME COLUMN category -> product_category
+ stmt.execute(
+ "ALTER TABLE inventory.products RENAME COLUMN category
TO product_category");
+ expectedLog.add(
+ new RenameColumnEvent(
+ tableId, Collections.singletonMap("category",
"product_category")));
+
+ // Insert after RENAME COLUMN (id=117)
+ stmt.execute(
+ "INSERT INTO inventory.products VALUES (default,
'after_rename_col', 8.00, 'garden')");
+ expectedLog.add(
+ DataChangeEvent.insertEvent(
+ tableId,
+ gen5.generate(
+ new Object[] {
+ 117,
+
BinaryStringData.fromString("after_rename_col"),
+ DecimalData.fromBigDecimal(
+ new
java.math.BigDecimal("8.00"), 10, 2),
+
BinaryStringData.fromString("garden")
+ })));
+ }
+
+ // Collect streaming events (filter out CreateTableEvents)
+ List<Event> actualLog = fetchEvent(iterator, expectedLog.size(),
(event) -> true);
+
+ assertThat(actualLog).isEqualTo(expectedLog);
+
+ } finally {
+ try {
+ iterator.close();
+ jobClient.cancel().get();
+ } catch (Exception e) {
+ LOG.warn("Failed to cancel job: {}", e.getMessage());
+ }
+ }
+ }
+
@Test
public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
// Create a real database with hyphen to verify full CDC sync works
@@ -776,6 +1078,18 @@ public class PostgresPipelineITCase extends
PostgresTestBase {
return result;
}
+ private List<Event> fetchEvent(Iterator<Event> iter, int size,
Predicate<Event> predicate) {
+ List<Event> result = new ArrayList<>(size);
+ while (size > 0 && iter.hasNext()) {
+ Event event = iter.next();
+ if (predicate.test(event)) {
+ result.add(event);
+ size--;
+ }
+ }
+ return result;
+ }
+
private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
index 446df1dbf..51f6aac59 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
@@ -247,7 +247,8 @@ public abstract class SourceSplitSerializer
throw new IOException("Unknown version: " + version);
}
Document document = documentReader.read(tableChangeStr);
- TableChange tableChange =
FlinkJsonTableChangeSerializer.fromDocument(document, true);
+ TableChange tableChange =
+ FlinkJsonTableChangeSerializer.fromDocument(document,
useCatalogBeforeSchema);
tableSchemas.put(tableId, tableChange);
}
return tableSchemas;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
index 26c788330..984472803 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -102,10 +103,7 @@ public class IncrementalSourceRecordEmitter<T>
}
} else if (isSchemaChangeEvent(element) &&
splitState.isStreamSplitState()) {
LOG.trace("Process SchemaChangeEvent: {}; splitState = {}",
element, splitState);
- HistoryRecord historyRecord = getHistoryRecord(element);
- Array tableChanges =
-
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
- TableChanges changes =
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+ TableChanges changes = getTableChangeRecord(element);
for (TableChanges.TableChange tableChange : changes) {
splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
}
@@ -128,6 +126,12 @@ public class IncrementalSourceRecordEmitter<T>
}
}
+ protected TableChanges getTableChangeRecord(SourceRecord element) throws
IOException {
+ HistoryRecord historyRecord = getHistoryRecord(element);
+ Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+ return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+ }
+
private void updateStreamSplitState(SourceSplitState splitState,
SourceRecord element) {
if (splitState.isStreamSplitState()) {
Offset position = getOffsetPosition(element);
@@ -161,16 +165,7 @@ public class IncrementalSourceRecordEmitter<T>
debeziumDeserializationSchema.deserialize(element, outputCollector);
}
- /**
- * Apply the split to the record emitter.
- *
- * <p>This method is called when a new split is assigned to the record
emitter. It allows the
- * record emitter to perform any necessary initialization or setup based
on the characteristics
- * of the assigned split. In this implementation, we may need to handle
split-specific
- * configurations or state initialization.
- *
- * @param split the split to apply
- */
+ /** Called when a new split is assigned. Subclasses may override for
split-specific setup. */
public void applySplit(SourceSplitBase split) {}
protected void reportMetrics(SourceRecord element) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
index c50db0435..f5132bffd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
@@ -17,12 +17,14 @@
package io.debezium.connector.postgresql;
+import
org.apache.flink.cdc.connectors.postgres.source.schema.RelationAwarePostgresSchema;
import org.apache.flink.util.FlinkRuntimeException;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
@@ -32,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.time.Duration;
+import java.util.Collection;
/**
* A factory for creating various Debezium objects
@@ -42,15 +45,15 @@ public class PostgresObjectUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(PostgresObjectUtils.class);
/** Create a new PostgresSchema and initialize the content of the schema.
*/
- public static PostgresSchema newSchema(
+ public static RelationAwarePostgresSchema newSchema(
PostgresConnection connection,
PostgresConnectorConfig config,
TypeRegistry typeRegistry,
TopicSelector<TableId> topicSelector,
PostgresValueConverter valueConverter)
throws SQLException {
- PostgresSchema schema =
- new PostgresSchema(
+ RelationAwarePostgresSchema schema =
+ new RelationAwarePostgresSchema(
config,
typeRegistry,
connection.getDefaultValueConverter(),
@@ -60,6 +63,25 @@ public class PostgresObjectUtils {
return schema;
}
+ public static RelationAwarePostgresSchema newSchema(
+ PostgresConnection connection,
+ PostgresConnectorConfig config,
+ TypeRegistry typeRegistry,
+ TopicSelector<TableId> topicSelector,
+ PostgresValueConverter valueConverter,
+ Collection<TableChanges.TableChange> tableChanges)
+ throws SQLException {
+ RelationAwarePostgresSchema schema =
+ new RelationAwarePostgresSchema(
+ config,
+ typeRegistry,
+ connection.getDefaultValueConverter(),
+ topicSelector,
+ valueConverter);
+ tableChanges.forEach(tableChange ->
schema.buildAndRegisterSchema(tableChange.getTable()));
+ return schema;
+ }
+
public static PostgresTaskContext newTaskContext(
PostgresConnectorConfig connectorConfig,
PostgresSchema schema,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index 0f550ca96..53bb066b5 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -21,6 +21,7 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import
org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner;
import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
@@ -31,6 +32,7 @@ import
org.apache.flink.cdc.connectors.base.source.assigner.state.StreamPendingS
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
@@ -39,7 +41,9 @@ import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
import
org.apache.flink.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator;
import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresSourceReader;
+import
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresSourceRecordEmitter;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
@@ -311,6 +315,12 @@ public class PostgresSourceBuilder<T> {
return this;
}
+ /** Whether to infer schema change event on relation message. */
+ public PostgresSourceBuilder<T> includeSchemaChanges(boolean
includeSchemaChanges) {
+ this.configFactory.includeSchemaChanges(includeSchemaChanges);
+ return this;
+ }
+
/**
* Build the {@link PostgresIncrementalSource}.
*
@@ -445,6 +455,16 @@ public class PostgresSourceBuilder<T> {
dataSourceDialect);
}
+ @Override
+ protected RecordEmitter<SourceRecords, T, SourceSplitState>
createRecordEmitter(
+ SourceConfig sourceConfig, SourceReaderMetrics
sourceReaderMetrics) {
+ return new PostgresSourceRecordEmitter<>(
+ deserializationSchema,
+ sourceReaderMetrics,
+ sourceConfig.isIncludeSchemaChanges(),
+ offsetFactory);
+ }
+
public static <T> PostgresSourceBuilder<T> builder() {
return new PostgresSourceBuilder<>();
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 4402219a6..9807bb2dc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -40,6 +40,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private final int lsnCommitCheckpointsDelay;
private final boolean includePartitionedTables;
private final boolean includeDatabaseInTableId;
+ private final boolean schemaChangeEnabled;
public PostgresSourceConfig(
int subtaskId,
@@ -71,7 +72,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst,
boolean includePartitionedTables,
- boolean includeDatabaseInTableId) {
+ boolean includeDatabaseInTableId,
+ boolean schemaChangeEnabled) {
super(
startupOptions,
databaseList,
@@ -103,6 +105,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.includePartitionedTables = includePartitionedTables;
this.includeDatabaseInTableId = includeDatabaseInTableId;
+ this.schemaChangeEnabled = schemaChangeEnabled;
}
/**
@@ -152,12 +155,13 @@ public class PostgresSourceConfig extends
JdbcSourceConfig {
return new PostgresConnectorConfig(getDbzConfiguration());
}
- /**
- * Returns whether to include database in the generated Table ID.
- *
- * @return whether to include database in the generated Table ID
- */
+ /** Returns whether to include database in the generated Table ID. */
public boolean isIncludeDatabaseInTableId() {
return includeDatabaseInTableId;
}
+
+ /** Returns whether to infer column types via JDBC TypeRegistry on schema
change events. */
+ public boolean isSchemaChangeEnabled() {
+ return schemaChangeEnabled;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 847b15474..20e1800d4 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -57,6 +57,8 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
private boolean includeDatabaseInTableId =
PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
+ private boolean schemaChangeEnabled = false;
+
/** Creates a new {@link PostgresSourceConfig} for the given subtask
{@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
@@ -140,7 +142,8 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
includePartitionedTables,
- includeDatabaseInTableId);
+ includeDatabaseInTableId,
+ schemaChangeEnabled);
}
/**
@@ -198,4 +201,9 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
this.includeDatabaseInTableId = includeDatabaseInTableId;
}
+
+ /** Set whether to infer schema change event on relation message. */
+ public void enableSchemaChange(boolean schemaChangeEnabled) {
+ this.schemaChangeEnabled = schemaChangeEnabled;
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
index e2db160e1..ce9183af0 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
@@ -22,6 +22,8 @@ import
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
+import
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import org.apache.flink.cdc.connectors.postgres.source.schema.SchemaDispatcher;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
@@ -30,6 +32,7 @@ import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DatabaseSchema;
@@ -41,7 +44,7 @@ import java.util.Map;
/** Postgres Dispatcher for cdc source with watermark. */
public class CDCPostgresDispatcher extends PostgresEventDispatcher<TableId>
- implements WatermarkDispatcher {
+ implements WatermarkDispatcher, SchemaDispatcher {
private final String topic;
private final ChangeEventQueue<DataChangeEvent> queue;
@@ -81,4 +84,15 @@ public class CDCPostgresDispatcher extends
PostgresEventDispatcher<TableId>
sourcePartition, topic, sourceSplit.splitId(),
watermarkKind, watermark);
queue.enqueue(new DataChangeEvent(sourceRecord));
}
+
+ @Override
+ public void dispatch(Table table) {
+ PostgresSchemaRecord schemaRecord = new PostgresSchemaRecord(table);
+ try {
+ queue.enqueue(new DataChangeEvent(schemaRecord));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index 49b62cb39..3a275ff6b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.postgres.source.fetch;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
-import
org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
@@ -30,6 +29,8 @@ import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
+import
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import
org.apache.flink.cdc.connectors.postgres.source.schema.RelationAwarePostgresSchema;
import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
import org.apache.flink.table.types.logical.RowType;
@@ -92,7 +93,7 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
private ReplicationConnection replicationConnection;
private PostgresOffsetContext offsetContext;
private PostgresPartition partition;
- private PostgresSchema schema;
+ private RelationAwarePostgresSchema schema;
private ErrorHandler errorHandler;
private CDCPostgresDispatcher postgresDispatcher;
private EventMetadataProvider metadataProvider;
@@ -179,11 +180,6 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
dbzConfig.getJdbcConfig(), valueConverterBuilder,
CONNECTION_NAME);
TopicSelector<TableId> topicSelector =
PostgresTopicSelector.create(dbzConfig);
- EmbeddedFlinkDatabaseHistory.registerHistory(
- sourceConfig
- .getDbzConfiguration()
-
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
- sourceSplitBase.getTableSchemas().values());
try {
this.schema =
@@ -192,7 +188,8 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
dbzConfig,
jdbcConnection.getTypeRegistry(),
topicSelector,
-
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
+
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()),
+ sourceSplitBase.getTableSchemas().values());
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize PostgresSchema",
e);
}
@@ -271,6 +268,7 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
}
}),
schemaNameAdjuster);
+ schema.setDispatcher(postgresDispatcher);
ChangeEventSourceMetricsFactory<PostgresPartition> metricsFactory =
new DefaultChangeEventSourceMetricsFactory<>();
@@ -326,6 +324,9 @@ public class PostgresSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
@Override
public TableId getTableId(SourceRecord record) {
+ if (record instanceof PostgresSchemaRecord) {
+ return ((PostgresSchemaRecord) record).getTable().id();
+ }
Struct value = (Struct) record.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String schemaName = source.getString(SCHEMA_NAME_KEY);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
new file mode 100644
index 000000000..7689eccf7
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+
+/** Record emitter that recognizes {@link PostgresSchemaRecord} as schema
change events. */
+public class PostgresSourceRecordEmitter<T> extends
IncrementalSourceRecordEmitter<T> {
+ public PostgresSourceRecordEmitter(
+ DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+ SourceReaderMetrics sourceReaderMetrics,
+ boolean includeSchemaChanges,
+ OffsetFactory offsetFactory) {
+ super(
+ debeziumDeserializationSchema,
+ sourceReaderMetrics,
+ includeSchemaChanges,
+ offsetFactory);
+ }
+
+ @Override
+ protected TableChanges getTableChangeRecord(SourceRecord element) throws
IOException {
+ if (element instanceof PostgresSchemaRecord) {
+ PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+ Table table = schemaRecord.getTable();
+ return new TableChanges().create(table);
+ } else {
+ return super.getTableChangeRecord(element);
+ }
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
new file mode 100644
index 000000000..53edb9d04
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A SourceRecord wrapper carrying a Debezium Table for schema change
propagation. */
+public class PostgresSchemaRecord extends SourceRecord {
+ // Use `postgres-cdc` rather than `postgres` to avoid conflict if debezium
support later.
+ private static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
+ "io.debezium.connector.postgres-cdc.SchemaChangeKey";
+ private static final String SCHEMA_CHANGE_EVENT_VALUE_NAME =
+ "io.debezium.connector.postgres-cdc.SchemaChangeValue";
+
+ private static final Schema KEY_SCHEMA =
+ SchemaBuilder.struct()
+ .name(SCHEMA_CHANGE_EVENT_KEY_NAME)
+ .field("table_id", Schema.STRING_SCHEMA)
+ .build();
+
+ /** Minimal value schema to avoid null valueSchema/value NPEs downstream.
*/
+ private static final Schema VALUE_SCHEMA =
+
SchemaBuilder.struct().name(SCHEMA_CHANGE_EVENT_VALUE_NAME).build();
+
+ private final Table table;
+
+ public PostgresSchemaRecord(Table table) {
+ super(
+ null,
+ null,
+ null,
+ KEY_SCHEMA,
+ buildKey(table),
+ VALUE_SCHEMA,
+ new Struct(VALUE_SCHEMA));
+ this.table = table;
+ }
+
+ private static Struct buildKey(Table table) {
+ Struct struct = new Struct(KEY_SCHEMA);
+ struct.put("table_id", table.id().toString());
+ return struct;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ @Override
+ public String toString() {
+ return "PostgresSchemaRecord{" + "table=" + table + '}';
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/RelationAwarePostgresSchema.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/RelationAwarePostgresSchema.java
new file mode 100644
index 000000000..e07cbe443
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/RelationAwarePostgresSchema.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.PostgresValueConverter;
+import io.debezium.connector.postgresql.TypeRegistry;
+import
io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+
+/**
+ * Extends PostgresSchema to dispatch Relation messages as schema change
events via the event queue
+ * and expose buildAndRegisterSchema as public.
+ */
+public class RelationAwarePostgresSchema extends PostgresSchema {
+
+ private SchemaDispatcher dispatcher;
+
+ public RelationAwarePostgresSchema(
+ PostgresConnectorConfig config,
+ TypeRegistry typeRegistry,
+ PostgresDefaultValueConverter defaultValueConverter,
+ TopicSelector<TableId> topicSelector,
+ PostgresValueConverter valueConverter) {
+ super(config, typeRegistry, defaultValueConverter, topicSelector,
valueConverter);
+ }
+
+ public void setDispatcher(SchemaDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void applySchemaChangesForTable(int relationId, Table table) {
+ super.applySchemaChangesForTable(relationId, table);
+ if (dispatcher != null && !isFilteredOut(table.id())) {
+ dispatcher.dispatch(table);
+ }
+ }
+
+ @Override
+ public void buildAndRegisterSchema(Table table) {
+ super.buildAndRegisterSchema(table);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/SchemaDispatcher.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/SchemaDispatcher.java
new file mode 100644
index 000000000..01b46b870
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/SchemaDispatcher.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+
+/** Dispatches Debezium table schema changes into the change event queue. */
+public interface SchemaDispatcher {
+ void dispatch(Table table);
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
index d2518efbb..b0ba6c4e3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
@@ -38,6 +38,8 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
+import javax.annotation.Nullable;
+
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
@@ -248,13 +250,14 @@ public abstract class PostgresTestBase extends
AbstractTestBase {
protected PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
UniqueDatabase database, String schemaName, String tableName, int
splitSize) {
return getMockPostgresSourceConfigFactory(
- database, schemaName, tableName, splitSize, false);
+ database, schemaName, tableName, null, splitSize, false);
}
protected PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
UniqueDatabase database,
String schemaName,
String tableName,
+ @Nullable String slotName,
int splitSize,
boolean skipSnapshotBackfill) {
@@ -270,6 +273,10 @@ public abstract class PostgresTestBase extends
AbstractTestBase {
postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill);
postgresSourceConfigFactory.setLsnCommitCheckpointsDelay(1);
postgresSourceConfigFactory.decodingPluginName("pgoutput");
+ if (slotName != null) {
+ postgresSourceConfigFactory.slotName(slotName);
+ }
+
return postgresSourceConfigFactory;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
index fa0fe8773..419c2be88 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
@@ -34,14 +34,21 @@ import
org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
+import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isHeartbeatEvent;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link IncrementalSourceStreamFetcher }. */
@@ -58,11 +65,26 @@ public class IncrementalSourceStreamFetcherTest extends
PostgresTestBase {
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
+ private String slotName;
+
+ @BeforeEach
+ public void before() throws SQLException {
+ customDatabase.createAndInitialize();
+ this.slotName = getSlotName();
+ }
+
+ @AfterEach
+ public void after() throws Exception {
+ // sleep 1000ms to wait until connections are closed.
+ Thread.sleep(1000L);
+ customDatabase.removeSlot(slotName);
+ }
+
@Test
void testReadStreamSplitWithException() throws Exception {
- customDatabase.createAndInitialize();
PostgresSourceConfigFactory sourceConfigFactory =
- getMockPostgresSourceConfigFactory(customDatabase, schemaName,
tableName, 10, true);
+ getMockPostgresSourceConfigFactory(
+ customDatabase, schemaName, tableName, slotName, 10,
true);
sourceConfigFactory.startupOptions(StartupOptions.latest());
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect dialect = new
PostgresDialect(sourceConfigFactory.create(0));
@@ -96,6 +118,79 @@ public class IncrementalSourceStreamFetcherTest extends
PostgresTestBase {
fetcher.close();
}
+ @Test
+ void testSchemaChangeRecordInQueue() throws Exception {
+ PostgresSourceConfigFactory sourceConfigFactory =
+ getMockPostgresSourceConfigFactory(
+ customDatabase, schemaName, tableName, slotName, 10,
true);
+ sourceConfigFactory.startupOptions(StartupOptions.latest());
+ PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
+ PostgresDialect dialect = new
PostgresDialect(sourceConfigFactory.create(0));
+
+ PostgresSourceFetchTaskContext taskContext =
+ new PostgresSourceFetchTaskContext(sourceConfig, dialect);
+ IncrementalSourceStreamFetcher fetcher = new
IncrementalSourceStreamFetcher(taskContext, 0);
+ StreamSplit split = createStreamSplit(sourceConfig, dialect);
+ PostgresStreamFetchTask fetchTask =
+ (PostgresStreamFetchTask) dialect.createFetchTask(split);
+
+ fetcher.submitTask(fetchTask);
+ // Wait for the stream reader to start consuming WAL
+ Thread.sleep(1000L);
+
+ try (Connection conn =
+ getJdbcConnection(POSTGRES_CONTAINER,
customDatabase.getDatabaseName());
+ Statement stmt = conn.createStatement()) {
+ // Insert a record BEFORE the DDL change
+ stmt.execute(
+ "INSERT INTO customer.\"Customers\" VALUES (3001,
'before_ddl', 'Beijing', '111')");
+
+ // Perform a DDL change to trigger schema change event
+ stmt.execute(
+ "ALTER TABLE customer.\"Customers\" ADD COLUMN email
VARCHAR(255) DEFAULT '[email protected]'");
+
+ // Insert a record AFTER the DDL change
+ stmt.execute(
+ "INSERT INTO customer.\"Customers\" VALUES (3002,
'after_ddl', 'Shanghai', '222', '[email protected]')");
+ }
+
+ // Wait for all events to be enqueued
+ Thread.sleep(1000L);
+
+ // Poll all records (data changes + schema changes) to verify ordering
+ List<SourceRecord> allRecords = pollRecordsFromReader(fetcher, r ->
!isHeartbeatEvent(r));
+ assertThat(allRecords).hasSize(4);
+ assertThat(allRecords.get(0).toString())
+ .contains(
+ "PostgresSchemaRecord{table=columns: {\n"
+ + " Id int4(10, 0) NOT NULL\n"
+ + " Name varchar(255, 0) NOT NULL DEFAULT
VALUE 'flink'::character varying\n"
+ + " address varchar(1024, 0) DEFAULT VALUE
NULL\n"
+ + " phone_number varchar(512, 0) DEFAULT
VALUE NULL\n"
+ + "}\n"
+ + "primary key: [Id]\n"
+ + "default charset: null\n"
+ + "comment: null\n"
+ + "}");
+ assertThat(allRecords.get(1).toString()).contains("before_ddl");
+ assertThat(allRecords.get(2).toString())
+ .contains(
+ "PostgresSchemaRecord{table=columns: {\n"
+ + " Id int4(10, 0) NOT NULL\n"
+ + " Name varchar(255, 0) NOT NULL DEFAULT
VALUE 'flink'::character varying\n"
+ + " address varchar(1024, 0) DEFAULT VALUE
NULL\n"
+ + " phone_number varchar(512, 0) DEFAULT
VALUE NULL\n"
+ + " email varchar(255, 0) DEFAULT VALUE
'[email protected]'::character varying\n"
+ + "}\n"
+ + "primary key: [Id]\n"
+ + "default charset: null\n"
+ + "comment: null\n"
+ + "}");
+ assertThat(allRecords.get(3).toString()).contains("after_ddl");
+
+ fetcher.close();
+ }
+
private StreamSplit createStreamSplit(
PostgresSourceConfig sourceConfig, PostgresDialect dialect) throws
Exception {
StreamSplitAssigner streamSplitAssigner =
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index a0ff12c4f..a40f70296 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -247,7 +247,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
void testSnapshotFetchSize() throws Exception {
customDatabase.createAndInitialize();
PostgresSourceConfigFactory sourceConfigFactory =
- getMockPostgresSourceConfigFactory(customDatabase, schemaName,
tableName, 10, true);
+ getMockPostgresSourceConfigFactory(
+ customDatabase, schemaName, tableName, null, 10, true);
Properties properties = new Properties();
properties.setProperty("snapshot.fetch.size", "2");
sourceConfigFactory.debeziumProperties(properties);
@@ -291,7 +292,7 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
throws Exception {
PostgresSourceConfigFactory sourceConfigFactory =
getMockPostgresSourceConfigFactory(
- customDatabase, schemaName, tableName, 10,
skipSnapshotBackfill);
+ customDatabase, schemaName, tableName, null, 10,
skipSnapshotBackfill);
PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
PostgresDialect postgresDialect = new
PostgresDialect(sourceConfigFactory.create(0));
SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 68ef4690e..77b845074 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -21,10 +21,15 @@ import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
import org.apache.flink.cdc.connectors.postgres.source.MockPostgresDialect;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
import
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
import
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
@@ -39,6 +44,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -46,11 +52,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.sql.Connection;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.flink.core.io.InputStatus.END_OF_INPUT;
import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
@@ -186,6 +195,106 @@ public class PostgresSourceReaderTest extends
PostgresTestBase {
assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
}
+ @Test
+ void testSchemaChangeUpdatesSnapshotState() throws Exception {
+ PostgresSourceReader reader = createStreamReader();
+ reader.start();
+
+ // Discover table schemas and create a stream split
+ PostgresSourceConfigFactory configFactory = createConfigFactory();
+ PostgresSourceConfig sourceConfig = configFactory.create(0);
+ PostgresDialect dialect = new PostgresDialect(sourceConfig);
+ Map<TableId, TableChanges.TableChange> tableSchemas =
+ dialect.discoverDataCollectionSchemas(sourceConfig);
+
+ TableId tableId = new TableId(null, SCHEMA_NAME, "Customers");
+ // Verify original schema has 4 columns (Id, Name, address,
phone_number)
+ assertThat(tableSchemas.get(tableId).getTable().columns()).hasSize(4);
+
assertThat(tableSchemas.get(tableId).getTable().columnWithName("email")).isNull();
+
+ PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
+ StreamSplit streamSplit =
+ new StreamSplit(
+ StreamSplit.STREAM_SPLIT_ID,
+ offsetFactory.createInitialOffset(),
+ offsetFactory.createNoStoppingOffset(),
+ Collections.emptyList(),
+ tableSchemas,
+ 0);
+ reader.addSplits(Collections.singletonList(streamSplit));
+
+ // Wait for the reader to start consuming
+ Thread.sleep(1000L);
+
+ try (Connection conn =
+ getJdbcConnection(POSTGRES_CONTAINER,
customDatabase.getDatabaseName());
+ Statement stmt = conn.createStatement()) {
+ // Insert a record BEFORE the DDL change
+ stmt.execute(
+ "INSERT INTO customer.\"Customers\" VALUES (3001,
'before_ddl', 'Beijing', '111')");
+
+ // Perform a DDL change
+ stmt.execute(
+ "ALTER TABLE customer.\"Customers\" ADD COLUMN email
VARCHAR(255) DEFAULT '[email protected]'");
+
+ // Insert a record AFTER the DDL change
+ stmt.execute(
+ "INSERT INTO customer.\"Customers\" VALUES (3002,
'after_ddl', 'Shanghai', '222', '[email protected]')");
+ }
+
+ // Wait for the schema change event to be processed
+ Thread.sleep(1000L);
+
+ // Poll records so the emitter processes all events
+ final SimpleReaderOutput output = new SimpleReaderOutput();
+ for (int i = 0; i < 10; i++) {
+ reader.pollNext(output);
+ }
+
+ // Verify the emitted records contain data before and after DDL in
correct order
+ List<SourceRecord> results = output.getResults();
+ int beforeDdlPos = -1;
+ int afterDdlPos = -1;
+ for (int i = 0; i < results.size(); i++) {
+ SourceRecord record = results.get(i);
+ if (record.value() != null) {
+ String value = record.value().toString();
+ if (value.contains("before_ddl")) {
+ beforeDdlPos = i;
+ } else if (value.contains("after_ddl")) {
+ afterDdlPos = i;
+ }
+ }
+ }
+ assertThat(beforeDdlPos)
+ .as("Should capture the INSERT before DDL")
+ .isGreaterThanOrEqualTo(0);
+ assertThat(afterDdlPos).as("Should capture the INSERT after
DDL").isGreaterThanOrEqualTo(0);
+ assertThat(beforeDdlPos)
+ .as("INSERT before DDL should appear before INSERT after DDL")
+ .isLessThan(afterDdlPos);
+
+ // Verify that snapshotState returns splits with updated table schema
+ List<SourceSplitBase> splits = reader.snapshotState(1L);
+ assertThat(splits).isNotEmpty();
+
+ boolean foundUpdatedSchema = false;
+ for (SourceSplitBase split : splits) {
+ if (split.isStreamSplit()) {
+ Map<TableId, TableChanges.TableChange> schemas =
+ split.asStreamSplit().getTableSchemas();
+ if (schemas.containsKey(tableId)
+ &&
schemas.get(tableId).getTable().columnWithName("email") != null) {
+ foundUpdatedSchema = true;
+ break;
+ }
+ }
+ }
+ assertThat(foundUpdatedSchema)
+ .as("The snapshotState should contain the updated table schema
with 'email' column")
+ .isTrue();
+ }
+
private List<String> consumeSnapshotRecords(
PostgresSourceReader sourceReader, DataType recordType) throws
Exception {
// Poll all the records of the multiple assigned snapshot split.
@@ -207,6 +316,29 @@ public class PostgresSourceReaderTest extends
PostgresTestBase {
private PostgresSourceReader createReader(
final int lsnCommitCheckpointsDelay, boolean skipBackFill) throws
Exception {
final PostgresOffsetFactory offsetFactory = new
PostgresOffsetFactory();
+ final PostgresSourceConfigFactory configFactory =
createConfigFactory();
+ configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
+ configFactory.skipSnapshotBackfill(skipBackFill);
+ MockPostgresDialect dialect = new
MockPostgresDialect(configFactory.create(0));
+ final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
+ new PostgresSourceBuilder.PostgresIncrementalSource<>(
+ configFactory, new ForwardDeserializeSchema(),
offsetFactory, dialect);
+ return source.createReader(new TestingReaderContext());
+ }
+
+ private PostgresSourceReader createStreamReader() throws Exception {
+ final PostgresOffsetFactory offsetFactory = new
PostgresOffsetFactory();
+ final PostgresSourceConfigFactory configFactory =
createConfigFactory();
+ configFactory.startupOptions(StartupOptions.latest());
+ configFactory.setLsnCommitCheckpointsDelay(1);
+ PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
+ final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
+ new PostgresSourceBuilder.PostgresIncrementalSource<>(
+ configFactory, new ForwardDeserializeSchema(),
offsetFactory, dialect);
+ return source.createReader(new TestingReaderContext());
+ }
+
+ private PostgresSourceConfigFactory createConfigFactory() {
final PostgresSourceConfigFactory configFactory = new
PostgresSourceConfigFactory();
configFactory.hostname(customDatabase.getHost());
configFactory.port(customDatabase.getDatabasePort());
@@ -214,14 +346,8 @@ public class PostgresSourceReaderTest extends
PostgresTestBase {
configFactory.tableList(SCHEMA_NAME + ".Customers");
configFactory.username(customDatabase.getUsername());
configFactory.password(customDatabase.getPassword());
- configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
- configFactory.skipSnapshotBackfill(skipBackFill);
configFactory.decodingPluginName("pgoutput");
- MockPostgresDialect dialect = new
MockPostgresDialect(configFactory.create(0));
- final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
- new PostgresSourceBuilder.PostgresIncrementalSource<>(
- configFactory, new ForwardDeserializeSchema(),
offsetFactory, dialect);
- return source.createReader(new TestingReaderContext());
+ return configFactory;
}
// ------------------------------------------------------------------------