This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e576aa91 [FLINK-38959][postgres] Update split state's table schemas 
info and infer schema change event based on pgoutput plugin's relation message. 
(#4316)
8e576aa91 is described below

commit 8e576aa91587abcfa8addd78254d5b6158c19875
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Mar 21 09:20:17 2026 +0800

    [FLINK-38959][postgres] Update split state's table schemas info and infer 
schema change event based on pgoutput plugin's relation message. (#4316)
---
 .../factory/PostgresDataSourceFactory.java         |   4 +
 .../postgres/source/PostgresDataSourceOptions.java |   8 +
 .../reader/PostgresPipelineRecordEmitter.java      | 138 +++++----
 .../postgres/utils/SchemaChangeUtil.java           | 318 +++++++++++++++++++++
 .../postgres/source/PostgresPipelineITCase.java    | 316 +++++++++++++++++++-
 .../source/meta/split/SourceSplitSerializer.java   |   3 +-
 .../reader/IncrementalSourceRecordEmitter.java     |  23 +-
 .../connector/postgresql/PostgresObjectUtils.java  |  28 +-
 .../postgres/source/PostgresSourceBuilder.java     |  20 ++
 .../source/config/PostgresSourceConfig.java        |  16 +-
 .../source/config/PostgresSourceConfigFactory.java |  10 +-
 .../source/fetch/CDCPostgresDispatcher.java        |  16 +-
 .../fetch/PostgresSourceFetchTaskContext.java      |  17 +-
 .../source/reader/PostgresSourceRecordEmitter.java |  56 ++++
 .../source/schema/PostgresSchemaRecord.java        |  72 +++++
 .../source/schema/RelationAwarePostgresSchema.java |  62 ++++
 .../postgres/source/schema/SchemaDispatcher.java   |  25 ++
 .../cdc/connectors/postgres/PostgresTestBase.java  |   9 +-
 .../fetch/IncrementalSourceStreamFetcherTest.java  |  99 ++++++-
 .../source/fetch/PostgresScanFetchTaskTest.java    |   5 +-
 .../source/reader/PostgresSourceReaderTest.java    | 140 ++++++++-
 21 files changed, 1264 insertions(+), 121 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
index 578b6e0f8..93d8657cf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java
@@ -74,6 +74,7 @@ import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSource
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SCHEMA_CHANGE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SERVER_TIME_ZONE;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SLOT_NAME;
 import static 
org.apache.flink.cdc.connectors.postgres.source.PostgresDataSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
@@ -131,6 +132,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
         boolean skipSnapshotBackfill = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
         int lsnCommitCheckpointsDelay = 
config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
         boolean tableIdIncludeDatabase = config.get(TABLE_ID_INCLUDE_DATABASE);
+        boolean includeSchemaChanges = config.get(SCHEMA_CHANGE_ENABLED);
 
         validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 
1);
         validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -172,6 +174,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
                         .lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
                         .assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
                         .includeDatabaseInTableId(tableIdIncludeDatabase)
+                        .includeSchemaChanges(includeSchemaChanges)
                         .getConfigFactory();
 
         List<TableId> tableIds = 
PostgresSchemaUtils.listTables(configFactory.create(0), null);
@@ -262,6 +265,7 @@ public class PostgresDataSourceFactory implements 
DataSourceFactory {
         options.add(METADATA_LIST);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
         options.add(TABLE_ID_INCLUDE_DATABASE);
+        options.add(SCHEMA_CHANGE_ENABLED);
         return options;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
index 7e9ac4b0c..95cc823d2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSourceOptions.java
@@ -273,4 +273,12 @@ public class PostgresDataSourceOptions {
                             "Whether to include database in the generated 
Table ID. "
                                     + "If set to true, the Table ID will be in 
the format (database, schema, table). "
                                     + "If set to false, the Table ID will be 
in the format (schema, table). Defaults to false.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
+            ConfigOptions.key("schema-change.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to infer CDC column types when processing 
pgoutput Relation messages.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index dd862354c..18c1d9b99 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -19,41 +19,39 @@ package 
org.apache.flink.cdc.connectors.postgres.source.reader;
 
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
-import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.connectors.base.options.StartupOptions;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
 import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
-import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
 import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+import 
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
 import 
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils;
 import org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils;
-import org.apache.flink.cdc.connectors.postgres.utils.PostgresTypeUtils;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
 import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import io.debezium.data.Envelope;
-import io.debezium.relational.Column;
 import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.sql.SQLException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 
 import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY;
@@ -62,24 +60,26 @@ import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.Waterm
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
 import static 
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
+import static 
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.inferSchemaChangeEvent;
+import static 
org.apache.flink.cdc.connectors.postgres.utils.SchemaChangeUtil.toCreateTableEvent;
 
 /** The {@link RecordEmitter} implementation for PostgreSQL pipeline 
connector. */
-public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+public class PostgresPipelineRecordEmitter<T> extends 
PostgresSourceRecordEmitter<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresPipelineRecordEmitter.class);
     private final PostgresSourceConfig sourceConfig;
     private final PostgresDialect postgresDialect;
 
     // Used when startup mode is initial
-    private Set<TableId> alreadySendCreateTableTables;
+    private final Set<TableId> alreadySendCreateTableTables;
+    private final boolean isBounded;
+    private final boolean includeDatabaseInTableId;
+    private final Map<TableId, CreateTableEvent> createTableEventCache;
 
     // Used when startup mode is not initial
     private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
-    private boolean isBounded = false;
-    private boolean includeDatabaseInTableId = false;
-
-    private final Map<TableId, CreateTableEvent> createTableEventCache;
 
     public PostgresPipelineRecordEmitter(
-            DebeziumDeserializationSchema debeziumDeserializationSchema,
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
             SourceReaderMetrics sourceReaderMetrics,
             PostgresSourceConfig sourceConfig,
             OffsetFactory offsetFactory,
@@ -108,16 +108,11 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
         } else {
             for (Map.Entry<TableId, TableChanges.TableChange> entry :
                     split.getTableSchemas().entrySet()) {
-                TableId tableId =
-                        entry.getKey(); // Use the TableId from the map key 
which contains full info
                 TableChanges.TableChange tableChange = entry.getValue();
+
+                Table table = tableChange.getTable();
                 CreateTableEvent createTableEvent =
-                        new CreateTableEvent(
-                                toCdcTableId(
-                                        tableId,
-                                        sourceConfig.getDatabaseList().get(0),
-                                        includeDatabaseInTableId),
-                                buildSchemaFromTable(tableChange.getTable()));
+                        toCreateTableEvent(table, sourceConfig, 
postgresDialect);
                 ((DebeziumEventDeserializationSchema) 
debeziumDeserializationSchema)
                         .applyChangeEvent(createTableEvent);
             }
@@ -137,68 +132,63 @@ public class PostgresPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmi
             shouldEmitAllCreateTableEventsInSnapshotMode = false;
         } else if (isLowWatermarkEvent(element) && 
splitState.isSnapshotSplitState()) {
             TableId tableId = 
splitState.asSnapshotSplitState().toSourceSplit().getTableId();
-            if (!alreadySendCreateTableTables.contains(tableId)) {
-                sendCreateTableEvent(tableId, (SourceOutput<Event>) output);
-                alreadySendCreateTableTables.add(tableId);
-            }
-        } else {
-            boolean isDataChangeRecord = isDataChangeRecord(element);
-            if (isDataChangeRecord || isSchemaChangeEvent(element)) {
-                TableId tableId = getTableId(element);
-                if (!alreadySendCreateTableTables.contains(tableId)) {
-                    CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
-                    if (createTableEvent != null) {
-                        output.collect((T) createTableEvent);
-                    }
-                    alreadySendCreateTableTables.add(tableId);
-                }
-                // In rare case, we may miss some CreateTableEvents before 
DataChangeEvents.
-                // Don't send CreateTableEvent for SchemaChangeEvents as it's 
the latest schema.
-                if (isDataChangeRecord && 
!createTableEventCache.containsKey(tableId)) {
-                    CreateTableEvent createTableEvent = 
getCreateTableEvent(sourceConfig, tableId);
-                    output.collect((T) createTableEvent);
-                    createTableEventCache.put(tableId, createTableEvent);
-                }
-            }
+            maybeSendCreateTableEventFromCache(tableId, output);
+        } else if (isDataChangeRecord(element)) {
+            handleDataChangeRecord(element, output);
+        } else if (isSchemaChangeEvent(element) && 
sourceConfig.isSchemaChangeEnabled()) {
+            handleSchemaChangeRecord(element, output, splitState);
         }
         super.processElement(element, output, splitState);
     }
 
-    private Schema buildSchemaFromTable(Table table) {
-        List<Column> columns = table.columns();
-        Schema.Builder tableBuilder = Schema.newBuilder();
-        for (int i = 0; i < columns.size(); i++) {
-            Column column = columns.get(i);
-
-            String colName = column.name();
-            DataType dataType;
-            try (PostgresConnection jdbc = 
postgresDialect.openJdbcConnection()) {
-                dataType =
-                        PostgresTypeUtils.fromDbzColumn(
-                                column,
-                                this.sourceConfig.getDbzConnectorConfig(),
-                                jdbc.getTypeRegistry());
-            }
-            if (!column.isOptional()) {
-                dataType = dataType.notNull();
-            }
-            tableBuilder.physicalColumn(
-                    colName,
-                    dataType,
-                    column.comment(),
-                    column.defaultValueExpression().orElse(null));
+    private void handleDataChangeRecord(SourceRecord element, SourceOutput<T> 
output) {
+        TableId tableId = getTableId(element);
+        maybeSendCreateTableEventFromCache(tableId, output);
+        // In rare case, we may miss some CreateTableEvents before 
DataChangeEvents.
+        // Don't send CreateTableEvent for SchemaChangeEvents as it's the 
latest schema.
+        if (!createTableEventCache.containsKey(tableId)) {
+            CreateTableEvent createTableEvent = 
getCreateTableEvent(sourceConfig, tableId);
+            sendCreateTableEvent(createTableEvent, output);
+            createTableEventCache.put(tableId, createTableEvent);
+        }
+    }
+
+    private void handleSchemaChangeRecord(
+            SourceRecord element, SourceOutput<T> output, SourceSplitState 
splitState) {
+        if (!(element instanceof PostgresSchemaRecord)) {
+            // Ignore non-Postgres schema change records; they may represent 
non-relation
+            // schema changes that are not handled via PostgresSchemaRecord.
+            LOG.warn("Ignoring non-PostgresSchemaRecord schema change event: 
{}", element);
+            return;
+        }
+        Map<TableId, TableChanges.TableChange> existedTableSchemas =
+                splitState.toSourceSplit().getTableSchemas();
+        PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+        Table schemaAfter = schemaRecord.getTable();
+        maybeSendCreateTableEventFromCache(schemaAfter.id(), output);
+        Table schemaBefore = null;
+        if (existedTableSchemas.containsKey(schemaAfter.id())) {
+            schemaBefore = 
existedTableSchemas.get(schemaAfter.id()).getTable();
         }
-        tableBuilder.comment(table.comment());
+        List<SchemaChangeEvent> schemaChangeEvents =
+                inferSchemaChangeEvent(
+                        schemaAfter.id(), schemaBefore, schemaAfter, 
sourceConfig, postgresDialect);
+        LOG.info("Inferred Schema change events: {}", schemaChangeEvents);
+        schemaChangeEvents.forEach(schemaChangeEvent -> output.collect((T) 
schemaChangeEvent));
+    }
 
-        List<String> primaryKey = table.primaryKeyColumnNames();
-        if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
-            tableBuilder.primaryKey(primaryKey);
+    private void maybeSendCreateTableEventFromCache(TableId tableId, 
SourceOutput<T> output) {
+        if (!alreadySendCreateTableTables.contains(tableId)) {
+            CreateTableEvent createTableEvent = 
createTableEventCache.get(tableId);
+            if (createTableEvent != null) {
+                sendCreateTableEvent(createTableEvent, output);
+            }
+            alreadySendCreateTableTables.add(tableId);
         }
-        return tableBuilder.build();
     }
 
-    private void sendCreateTableEvent(TableId tableId, SourceOutput<Event> 
output) {
-        output.collect(getCreateTableEvent(sourceConfig, tableId));
+    private void sendCreateTableEvent(CreateTableEvent createTableEvent, 
SourceOutput<T> output) {
+        output.collect((T) createTableEvent);
     }
 
     private CreateTableEvent getCreateTableEvent(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
new file mode 100644
index 000000000..c92c83f0b
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/SchemaChangeUtil.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.utils;
+
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.TypeRegistry;
+import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toCdcTableId;
+import static 
org.apache.flink.cdc.connectors.postgres.utils.PostgresSchemaUtils.toSchema;
+
+/**
+ * Utilities for inferring CDC schema change events by comparing before/after 
Debezium table
+ * schemas.
+ *
+ * <p>PostgreSQL DDL has exactly four operations that structurally change a 
table schema:
+ *
+ * <ul>
+ *   <li><b>Add column</b> — always appended at the end (PostgreSQL does not 
support adding a column
+ *       at an arbitrary position).
+ *   <li><b>Drop column</b> — removes a column by name.
+ *   <li><b>Rename column</b> — changes a column's name while preserving its 
position and type.
+ *   <li><b>Alter column type</b> — changes a column's type while preserving 
its name and position.
+ * </ul>
+ *
+ * <p>Ideally, PostgreSQL's {@code pg_attribute.attnum} would be the best way 
to distinguish whether
+ * a column was dropped-and-recreated or simply renamed. However, the pgoutput 
relation message does
+ * not include {@code attnum}. Even querying the database for the current 
attnum after the fact is
+ * unreliable due to temporal mismatch — for example, if column {@code C} is 
renamed to {@code D}
+ * and a new column {@code C} is added between two relation messages, a 
retroactive attnum query
+ * would see the <em>new</em> {@code C}'s attnum, not the original one, 
leading to incorrect
+ * inference.
+ *
+ * <p>Instead, this utility infers schema changes by comparing the 
before/after column lists and
+ * computing the <b>minimum edit</b> using the four operations above. The main 
inherent ambiguity is
+ * the last-column scenario: renaming the last column looks identical to 
dropping it and adding a
+ * new column with the same name at the end, since both produce the same 
before/after column list.
+ * The algorithm resolves this by always preferring the interpretation with 
the fewest operations
+ * (rename costs 1; drop + add costs 2), so a last-column name change is 
inferred as a rename.
+ */
+public class SchemaChangeUtil {
+    /**
+     * Infers the minimum list of schema change events that transform {@code 
tableBefore} into
+     * {@code tableAfter}. Returns a {@link CreateTableEvent} if {@code 
tableBefore} is null.
+     */
+    public static List<SchemaChangeEvent> inferSchemaChangeEvent(
+            io.debezium.relational.TableId dbzTableId,
+            @Nullable Table tableBefore,
+            Table tableAfter,
+            PostgresSourceConfig sourceConfig,
+            PostgresDialect dialect) {
+
+        if (tableBefore == null) {
+            return Collections.singletonList(toCreateTableEvent(tableAfter, 
sourceConfig, dialect));
+        }
+
+        TableId cdcTableId =
+                toCdcTableId(
+                        dbzTableId,
+                        sourceConfig.getDatabaseList().get(0),
+                        sourceConfig.isIncludeDatabaseInTableId());
+        PostgresConnectorConfig dbzConfig = 
sourceConfig.getDbzConnectorConfig();
+
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            TypeRegistry typeRegistry = connection.getTypeRegistry();
+            return inferMinimalSchemaChanges(
+                    cdcTableId,
+                    tableBefore.columns(),
+                    tableAfter.columns(),
+                    dbzConfig,
+                    typeRegistry);
+        }
+    }
+
+    public static CreateTableEvent toCreateTableEvent(
+            Table table, PostgresSourceConfig sourceConfig, PostgresDialect 
dialect) {
+        try (PostgresConnection connection = dialect.openJdbcConnection()) {
+            return toCreateTableEvent(table, sourceConfig, 
connection.getTypeRegistry());
+        }
+    }
+
+    private static CreateTableEvent toCreateTableEvent(
+            Table table, PostgresSourceConfig sourceConfig, TypeRegistry 
typeRegistry) {
+        return new CreateTableEvent(
+                toCdcTableId(
+                        table.id(),
+                        sourceConfig.getDatabaseList().get(0),
+                        sourceConfig.isIncludeDatabaseInTableId()),
+                toSchema(table, sourceConfig.getDbzConnectorConfig(), 
typeRegistry));
+    }
+
+    /**
+     * Finds the minimum schema change events to transform beforeCols into 
afterCols using recursion
+     * with memoization. Available operations: rename column, add column at 
last, drop column, alter
+     * column type. Recursion depth bounded by total column count.
+     */
+    private static List<SchemaChangeEvent> inferMinimalSchemaChanges(
+            TableId cdcTableId,
+            List<Column> beforeCols,
+            List<Column> afterCols,
+            PostgresConnectorConfig dbzConfig,
+            TypeRegistry typeRegistry) {
+
+        int n = beforeCols.size();
+        int m = afterCols.size();
+
+        // memo[i][j] = min cost from state (i, j), -1 means unvisited
+        int[][] memo = new int[n + 1][m + 1];
+        for (int[] row : memo) {
+            Arrays.fill(row, -1);
+        }
+
+        // Fill memoization table via recursion
+        minCost(0, 0, n, m, beforeCols, afterCols, memo, dbzConfig, 
typeRegistry);
+
+        // Traceback to build schema change events
+        return tracebackEvents(
+                cdcTableId, n, m, beforeCols, afterCols, memo, dbzConfig, 
typeRegistry);
+    }
+
+    /**
+     * Recursively computes the minimum number of individual column operations 
to align
+     * before[i..n-1] with after[j..m-1], where {@code i} is the current index 
into {@code
+     * beforeCols} (0..n) and {@code j} is the current index into {@code 
afterCols} (0..m). Boundary
+     * cases ({@code i == n} or {@code j == m}) return immediately without 
list access. At each
+     * non-boundary state, either drop before[i] (cost 1) or match before[i] 
to after[j] (cost =
+     * rename(0/1) + alterType(0/1)). Unmatched after columns are added at the 
end.
+     */
+    private static int minCost(
+            int i,
+            int j,
+            int n,
+            int m,
+            List<Column> beforeCols,
+            List<Column> afterCols,
+            int[][] memo,
+            PostgresConnectorConfig dbzConfig,
+            TypeRegistry typeRegistry) {
+
+        if (i == n) {
+            return m - j; // add remaining after columns
+        }
+        if (j == m) {
+            return n - i; // drop remaining before columns
+        }
+        if (memo[i][j] != -1) {
+            return memo[i][j];
+        }
+
+        // Option 1: drop beforeCols[i]
+        int dropCost =
+                1 + minCost(i + 1, j, n, m, beforeCols, afterCols, memo, 
dbzConfig, typeRegistry);
+
+        // Option 2: match beforeCols[i] to afterCols[j]
+        int matchCost =
+                columnMatchCost(beforeCols.get(i), afterCols.get(j), 
dbzConfig, typeRegistry)
+                        + minCost(
+                                i + 1,
+                                j + 1,
+                                n,
+                                m,
+                                beforeCols,
+                                afterCols,
+                                memo,
+                                dbzConfig,
+                                typeRegistry);
+
+        memo[i][j] = Math.min(dropCost, matchCost);
+        return memo[i][j];
+    }
+
+    /**
+     * Computes the min cost of matching a before column to an after column 
(0, 1, or 2). if column
+     * name is same and type is same, cost = 0; if column name is same and 
type is different, cost =
+     * 1(alter type); if column name is different and type is same, cost = 
1(rename); if column name
+     * is different and type is different, cost = 2(rename + alter type);
+     */
+    private static int columnMatchCost(
+            Column before,
+            Column after,
+            PostgresConnectorConfig dbzConfig,
+            TypeRegistry typeRegistry) {
+        int cost = 0;
+        if (!before.name().equals(after.name())) {
+            cost++;
+        }
+        DataType beforeType = PostgresTypeUtils.fromDbzColumn(before, 
dbzConfig, typeRegistry);
+        DataType afterType = PostgresTypeUtils.fromDbzColumn(after, dbzConfig, 
typeRegistry);
+        if (!beforeType.equals(afterType)) {
+            cost++;
+        }
+        return cost;
+    }
+
+    /**
+     * Traces back through the memoization table to reconstruct the optimal 
sequence of schema
+     * change events. Events are emitted per-column in the natural 
left-to-right scan order.
+     */
+    private static List<SchemaChangeEvent> tracebackEvents(
+            TableId cdcTableId,
+            int n,
+            int m,
+            List<Column> beforeCols,
+            List<Column> afterCols,
+            int[][] memo,
+            PostgresConnectorConfig dbzConfig,
+            TypeRegistry typeRegistry) {
+
+        List<SchemaChangeEvent> events = new ArrayList<>();
+
+        int i = 0;
+        int j = 0;
+        while (i < n && j < m) {
+            int dropCost = 1 + memoOrBase(i + 1, j, n, m, memo);
+            int matchCost =
+                    columnMatchCost(beforeCols.get(i), afterCols.get(j), 
dbzConfig, typeRegistry)
+                            + memoOrBase(i + 1, j + 1, n, m, memo);
+
+            if (dropCost <= matchCost) {
+                events.add(
+                        new DropColumnEvent(
+                                cdcTableId, 
Collections.singletonList(beforeCols.get(i).name())));
+                i++;
+            } else {
+                Column bc = beforeCols.get(i);
+                Column ac = afterCols.get(j);
+
+                if (!bc.name().equals(ac.name())) {
+                    events.add(
+                            new RenameColumnEvent(
+                                    cdcTableId, 
Collections.singletonMap(bc.name(), ac.name())));
+                }
+
+                DataType beforeType = PostgresTypeUtils.fromDbzColumn(bc, 
dbzConfig, typeRegistry);
+                DataType afterType = PostgresTypeUtils.fromDbzColumn(ac, 
dbzConfig, typeRegistry);
+                if (!beforeType.equals(afterType)) {
+                    events.add(
+                            new AlterColumnTypeEvent(
+                                    cdcTableId, 
Collections.singletonMap(ac.name(), afterType)));
+                }
+
+                i++;
+                j++;
+            }
+        }
+
+        // Drop remaining before columns
+        while (i < n) {
+            events.add(
+                    new DropColumnEvent(
+                            cdcTableId, 
Collections.singletonList(beforeCols.get(i).name())));
+            i++;
+        }
+
+        // Add remaining after columns at last. Adds are always trailing 
because the only
+        // available operation is "add column at last" (PostgreSQL ALTER TABLE 
ADD COLUMN always
+        // appends). Surviving before-columns (after 
drops/renames/alter-types) map to a prefix
+        // of the after-columns; the unmatched suffix can only be fulfilled by 
appending.
+        while (j < m) {
+            events.add(
+                    new AddColumnEvent(
+                            cdcTableId,
+                            Collections.singletonList(
+                                    AddColumnEvent.last(
+                                            PostgresSchemaUtils.toColumn(
+                                                    afterCols.get(j), 
dbzConfig, typeRegistry)))));
+            j++;
+        }
+
+        return events;
+    }
+
+    /** Returns memoized cost or computes base case for boundary conditions. */
+    private static int memoOrBase(int i, int j, int n, int m, int[][] memo) {
+        if (i == n) {
+            return m - j;
+        }
+        if (j == m) {
+            return n - i;
+        }
+        return memo[i][j];
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
index 9ebd21c29..51de6a3b4 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
@@ -19,11 +19,16 @@ package org.apache.flink.cdc.connectors.postgres.source;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.data.DecimalData;
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.AddColumnEvent;
+import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.factories.Factory;
@@ -78,6 +83,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -105,7 +111,9 @@ public class PostgresPipelineITCase extends 
PostgresTestBase {
     }
 
     @AfterEach
-    public void after() throws SQLException {
+    public void after() throws Exception {
+        // sleep 1000ms to wait until connections are closed.
+        Thread.sleep(1000L);
         inventoryDatabase.removeSlot(slotName);
     }
 
@@ -595,6 +603,300 @@ public class PostgresPipelineITCase extends 
PostgresTestBase {
                 .isEqualTo(String.format("Replication slot \"%s\" does not 
exist", slotName));
     }
 
+    @Test
+    public void testSchemaChangeWithDataInserts() throws Exception {
+        inventoryDatabase.createAndInitialize();
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGRES_CONTAINER.getHost())
+                                
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(inventoryDatabase.getDatabaseName())
+                                .tableList("inventory.products")
+                                .startupOptions(StartupOptions.initial())
+                                .serverTimeZone("UTC");
+        configFactory.database(inventoryDatabase.getDatabaseName());
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+        configFactory.enableSchemaChange(true);
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+
+        StreamExecutionEnvironment testEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        testEnv.setParallelism(1);
+        testEnv.enableCheckpointing(1000);
+
+        DataStreamSource<Event> source =
+                testEnv.fromSource(
+                        sourceProvider.getSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        PostgresDataSourceFactory.IDENTIFIER,
+                        new EventTypeInfo());
+
+        TypeSerializer<Event> serializer =
+                source.getTransformation()
+                        .getOutputType()
+                        
.createSerializer(testEnv.getConfig().getSerializerConfig());
+        CheckpointedCollectResultBuffer<Event> resultBuffer =
+                new CheckpointedCollectResultBuffer<>(serializer);
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectResultIterator<Event> iterator =
+                addCollector(testEnv, source, resultBuffer, serializer, 
accumulatorName);
+
+        JobClient jobClient = 
testEnv.executeAsync("testSchemaChangeWithDataInserts");
+        iterator.setJobClient(jobClient);
+
+        try {
+            // Wait for snapshot phase to complete (9 rows in products)
+            List<Event> snapshotEvents =
+                    fetchEvent(iterator, 9, (event) -> !(event instanceof 
CreateTableEvent));
+            assertThat(snapshotEvents).hasSize(9);
+
+            // Wait for stream phase to stabilize
+            Thread.sleep(1000);
+            TableId tableId = TableId.tableId("inventory", "products");
+
+            // Build expected event list in order
+            List<Event> expectedLog = new ArrayList<>();
+
+            // --- Original schema: (id, name, description, weight) ---
+            RowType rowType1 =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(255).notNull(),
+                                DataTypes.VARCHAR(512),
+                                DataTypes.DOUBLE()
+                            },
+                            new String[] {"id", "name", "description", 
"weight"});
+            BinaryRecordDataGenerator gen1 = new 
BinaryRecordDataGenerator(rowType1);
+
+            // --- After ADD COLUMN: (id, name, description, weight, category) 
---
+            RowType rowType2 =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(255).notNull(),
+                                DataTypes.VARCHAR(512),
+                                DataTypes.DOUBLE(),
+                                DataTypes.VARCHAR(100)
+                            },
+                            new String[] {"id", "name", "description", 
"weight", "category"});
+            BinaryRecordDataGenerator gen2 = new 
BinaryRecordDataGenerator(rowType2);
+
+            // --- After DROP COLUMN 'description': (id, name, weight, 
category) ---
+            RowType rowType3 =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(255).notNull(),
+                                DataTypes.DOUBLE(),
+                                DataTypes.VARCHAR(100)
+                            },
+                            new String[] {"id", "name", "weight", "category"});
+            BinaryRecordDataGenerator gen3 = new 
BinaryRecordDataGenerator(rowType3);
+
+            // --- After ALTER COLUMN TYPE weight -> DECIMAL(10,2): (id, name, 
weight, category) ---
+            RowType rowType4 =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(255).notNull(),
+                                DataTypes.DECIMAL(10, 2),
+                                DataTypes.VARCHAR(100)
+                            },
+                            new String[] {"id", "name", "weight", "category"});
+            BinaryRecordDataGenerator gen4 = new 
BinaryRecordDataGenerator(rowType4);
+
+            // --- After RENAME COLUMN category -> product_category: (id, 
name, weight,
+            // product_category) ---
+            RowType rowType5 =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(255).notNull(),
+                                DataTypes.DECIMAL(10, 2),
+                                DataTypes.VARCHAR(100)
+                            },
+                            new String[] {"id", "name", "weight", 
"product_category"});
+            BinaryRecordDataGenerator gen5 = new 
BinaryRecordDataGenerator(rowType5);
+
+            try (Connection conn =
+                            getJdbcConnection(
+                                    POSTGRES_CONTAINER, 
inventoryDatabase.getDatabaseName());
+                    Statement stmt = conn.createStatement()) {
+
+                // Insert before ADD COLUMN (id=110)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'before_add_col', 'desc1', 1.0)");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen1.generate(
+                                        new Object[] {
+                                            110,
+                                            
BinaryStringData.fromString("before_add_col"),
+                                            
BinaryStringData.fromString("desc1"),
+                                            1.0
+                                        })));
+
+                // ADD COLUMN category
+                stmt.execute("ALTER TABLE inventory.products ADD COLUMN 
category VARCHAR(100)");
+                expectedLog.add(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
org.apache.flink.cdc.common.schema.Column
+                                                        .physicalColumn(
+                                                                "category",
+                                                                
DataTypes.VARCHAR(100),
+                                                                null,
+                                                                null)))));
+
+                // Insert after ADD COLUMN (id=111)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'after_add_col', 'desc2', 2.0, 'electronics')");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen2.generate(
+                                        new Object[] {
+                                            111,
+                                            
BinaryStringData.fromString("after_add_col"),
+                                            
BinaryStringData.fromString("desc2"),
+                                            2.0,
+                                            
BinaryStringData.fromString("electronics")
+                                        })));
+
+                // Insert before DROP COLUMN (id=112)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'before_drop_col', 'desc3', 3.0, 'books')");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen2.generate(
+                                        new Object[] {
+                                            112,
+                                            
BinaryStringData.fromString("before_drop_col"),
+                                            
BinaryStringData.fromString("desc3"),
+                                            3.0,
+                                            
BinaryStringData.fromString("books")
+                                        })));
+
+                // DROP COLUMN description
+                stmt.execute("ALTER TABLE inventory.products DROP COLUMN 
description");
+                expectedLog.add(
+                        new DropColumnEvent(tableId, 
Collections.singletonList("description")));
+
+                // Insert after DROP COLUMN (id=113)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'after_drop_col', 4.0, 'toys')");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen3.generate(
+                                        new Object[] {
+                                            113,
+                                            
BinaryStringData.fromString("after_drop_col"),
+                                            4.0,
+                                            BinaryStringData.fromString("toys")
+                                        })));
+
+                // Insert before ALTER COLUMN TYPE (id=114)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'before_alter_type', 5.0, 'food')");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen3.generate(
+                                        new Object[] {
+                                            114,
+                                            
BinaryStringData.fromString("before_alter_type"),
+                                            5.0,
+                                            BinaryStringData.fromString("food")
+                                        })));
+
+                // ALTER COLUMN TYPE weight -> DECIMAL(10,2)
+                stmt.execute(
+                        "ALTER TABLE inventory.products ALTER COLUMN weight 
TYPE DECIMAL(10,2)");
+                expectedLog.add(
+                        new AlterColumnTypeEvent(
+                                tableId,
+                                Collections.singletonMap("weight", 
DataTypes.DECIMAL(10, 2))));
+
+                // Insert after ALTER COLUMN TYPE (id=115)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'after_alter_type', 6.00, 'sports')");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen4.generate(
+                                        new Object[] {
+                                            115,
+                                            
BinaryStringData.fromString("after_alter_type"),
+                                            DecimalData.fromBigDecimal(
+                                                    new 
java.math.BigDecimal("6.00"), 10, 2),
+                                            
BinaryStringData.fromString("sports")
+                                        })));
+
+                // Insert before RENAME COLUMN (id=116)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'before_rename_col', 7.00, 'clothing')");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen4.generate(
+                                        new Object[] {
+                                            116,
+                                            
BinaryStringData.fromString("before_rename_col"),
+                                            DecimalData.fromBigDecimal(
+                                                    new 
java.math.BigDecimal("7.00"), 10, 2),
+                                            
BinaryStringData.fromString("clothing")
+                                        })));
+
+                // RENAME COLUMN category -> product_category
+                stmt.execute(
+                        "ALTER TABLE inventory.products RENAME COLUMN category 
TO product_category");
+                expectedLog.add(
+                        new RenameColumnEvent(
+                                tableId, Collections.singletonMap("category", 
"product_category")));
+
+                // Insert after RENAME COLUMN (id=117)
+                stmt.execute(
+                        "INSERT INTO inventory.products VALUES (default, 
'after_rename_col', 8.00, 'garden')");
+                expectedLog.add(
+                        DataChangeEvent.insertEvent(
+                                tableId,
+                                gen5.generate(
+                                        new Object[] {
+                                            117,
+                                            
BinaryStringData.fromString("after_rename_col"),
+                                            DecimalData.fromBigDecimal(
+                                                    new 
java.math.BigDecimal("8.00"), 10, 2),
+                                            
BinaryStringData.fromString("garden")
+                                        })));
+            }
+
+            // Collect streaming events (filter out CreateTableEvents)
+            List<Event> actualLog = fetchEvent(iterator, expectedLog.size(), 
(event) -> true);
+
+            assertThat(actualLog).isEqualTo(expectedLog);
+
+        } finally {
+            try {
+                iterator.close();
+                jobClient.cancel().get();
+            } catch (Exception e) {
+                LOG.warn("Failed to cancel job: {}", e.getMessage());
+            }
+        }
+    }
+
     @Test
     public void testDatabaseNameWithHyphenEndToEnd() throws Exception {
         // Create a real database with hyphen to verify full CDC sync works
@@ -776,6 +1078,18 @@ public class PostgresPipelineITCase extends 
PostgresTestBase {
         return result;
     }
 
+    private List<Event> fetchEvent(Iterator<Event> iter, int size, 
Predicate<Event> predicate) {
+        List<Event> result = new ArrayList<>(size);
+        while (size > 0 && iter.hasNext()) {
+            Event event = iter.next();
+            if (predicate.test(event)) {
+                result.add(event);
+                size--;
+            }
+        }
+        return result;
+    }
+
     private List<Event> getSnapshotExpected(TableId tableId) {
         RowType rowType =
                 RowType.of(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
index 446df1dbf..51f6aac59 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/meta/split/SourceSplitSerializer.java
@@ -247,7 +247,8 @@ public abstract class SourceSplitSerializer
                     throw new IOException("Unknown version: " + version);
             }
             Document document = documentReader.read(tableChangeStr);
-            TableChange tableChange = 
FlinkJsonTableChangeSerializer.fromDocument(document, true);
+            TableChange tableChange =
+                    FlinkJsonTableChangeSerializer.fromDocument(document, 
useCatalogBeforeSchema);
             tableSchemas.put(tableId, tableChange);
         }
         return tableSchemas;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
index 26c788330..984472803 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -102,10 +103,7 @@ public class IncrementalSourceRecordEmitter<T>
             }
         } else if (isSchemaChangeEvent(element) && 
splitState.isStreamSplitState()) {
             LOG.trace("Process SchemaChangeEvent: {}; splitState = {}", 
element, splitState);
-            HistoryRecord historyRecord = getHistoryRecord(element);
-            Array tableChanges =
-                    
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
-            TableChanges changes = 
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+            TableChanges changes = getTableChangeRecord(element);
             for (TableChanges.TableChange tableChange : changes) {
                 
splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange);
             }
@@ -128,6 +126,12 @@ public class IncrementalSourceRecordEmitter<T>
         }
     }
 
+    protected TableChanges getTableChangeRecord(SourceRecord element) throws 
IOException {
+        HistoryRecord historyRecord = getHistoryRecord(element);
+        Array tableChanges = 
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+        return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+    }
+
     private void updateStreamSplitState(SourceSplitState splitState, 
SourceRecord element) {
         if (splitState.isStreamSplitState()) {
             Offset position = getOffsetPosition(element);
@@ -161,16 +165,7 @@ public class IncrementalSourceRecordEmitter<T>
         debeziumDeserializationSchema.deserialize(element, outputCollector);
     }
 
-    /**
-     * Apply the split to the record emitter.
-     *
-     * <p>This method is called when a new split is assigned to the record 
emitter. It allows the
-     * record emitter to perform any necessary initialization or setup based 
on the characteristics
-     * of the assigned split. In this implementation, we may need to handle 
split-specific
-     * configurations or state initialization.
-     *
-     * @param split the split to apply
-     */
+    /** Called when a new split is assigned. Subclasses may override for 
split-specific setup. */
     public void applySplit(SourceSplitBase split) {}
 
     protected void reportMetrics(SourceRecord element) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
index c50db0435..f5132bffd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresObjectUtils.java
@@ -17,12 +17,14 @@
 
 package io.debezium.connector.postgresql;
 
+import 
org.apache.flink.cdc.connectors.postgres.source.schema.RelationAwarePostgresSchema;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import io.debezium.connector.postgresql.connection.PostgresConnection;
 import io.debezium.connector.postgresql.connection.ReplicationConnection;
 import io.debezium.connector.postgresql.spi.SlotState;
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
 import io.debezium.schema.TopicSelector;
 import io.debezium.util.Clock;
 import io.debezium.util.Metronome;
@@ -32,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
 import java.time.Duration;
+import java.util.Collection;
 
 /**
  * A factory for creating various Debezium objects
@@ -42,15 +45,15 @@ public class PostgresObjectUtils {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PostgresObjectUtils.class);
 
     /** Create a new PostgresSchema and initialize the content of the schema. 
*/
-    public static PostgresSchema newSchema(
+    public static RelationAwarePostgresSchema newSchema(
             PostgresConnection connection,
             PostgresConnectorConfig config,
             TypeRegistry typeRegistry,
             TopicSelector<TableId> topicSelector,
             PostgresValueConverter valueConverter)
             throws SQLException {
-        PostgresSchema schema =
-                new PostgresSchema(
+        RelationAwarePostgresSchema schema =
+                new RelationAwarePostgresSchema(
                         config,
                         typeRegistry,
                         connection.getDefaultValueConverter(),
@@ -60,6 +63,25 @@ public class PostgresObjectUtils {
         return schema;
     }
 
+    public static RelationAwarePostgresSchema newSchema(
+            PostgresConnection connection,
+            PostgresConnectorConfig config,
+            TypeRegistry typeRegistry,
+            TopicSelector<TableId> topicSelector,
+            PostgresValueConverter valueConverter,
+            Collection<TableChanges.TableChange> tableChanges)
+            throws SQLException {
+        RelationAwarePostgresSchema schema =
+                new RelationAwarePostgresSchema(
+                        config,
+                        typeRegistry,
+                        connection.getDefaultValueConverter(),
+                        topicSelector,
+                        valueConverter);
+        tableChanges.forEach(tableChange -> 
schema.buildAndRegisterSchema(tableChange.getTable()));
+        return schema;
+    }
+
     public static PostgresTaskContext newTaskContext(
             PostgresConnectorConfig connectorConfig,
             PostgresSchema schema,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index 0f550ca96..53bb066b5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.cdc.common.annotation.Experimental;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
+import org.apache.flink.cdc.connectors.base.config.SourceConfig;
 import org.apache.flink.cdc.connectors.base.options.StartupOptions;
 import 
org.apache.flink.cdc.connectors.base.source.assigner.HybridSplitAssigner;
 import org.apache.flink.cdc.connectors.base.source.assigner.SplitAssigner;
@@ -31,6 +32,7 @@ import 
org.apache.flink.cdc.connectors.base.source.assigner.state.StreamPendingS
 import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
 import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
 import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
@@ -39,7 +41,9 @@ import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
 import 
org.apache.flink.cdc.connectors.postgres.source.enumerator.PostgresSourceEnumerator;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresSourceReader;
+import 
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresSourceRecordEmitter;
 import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -311,6 +315,12 @@ public class PostgresSourceBuilder<T> {
         return this;
     }
 
+    /** Whether to infer schema change event on relation message. */
+    public PostgresSourceBuilder<T> includeSchemaChanges(boolean 
includeSchemaChanges) {
+        this.configFactory.includeSchemaChanges(includeSchemaChanges);
+        return this;
+    }
+
     /**
      * Build the {@link PostgresIncrementalSource}.
      *
@@ -445,6 +455,16 @@ public class PostgresSourceBuilder<T> {
                     dataSourceDialect);
         }
 
+        @Override
+        protected RecordEmitter<SourceRecords, T, SourceSplitState> 
createRecordEmitter(
+                SourceConfig sourceConfig, SourceReaderMetrics 
sourceReaderMetrics) {
+            return new PostgresSourceRecordEmitter<>(
+                    deserializationSchema,
+                    sourceReaderMetrics,
+                    sourceConfig.isIncludeSchemaChanges(),
+                    offsetFactory);
+        }
+
         public static <T> PostgresSourceBuilder<T> builder() {
             return new PostgresSourceBuilder<>();
         }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 4402219a6..9807bb2dc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -40,6 +40,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
     private final int lsnCommitCheckpointsDelay;
     private final boolean includePartitionedTables;
     private final boolean includeDatabaseInTableId;
+    private final boolean schemaChangeEnabled;
 
     public PostgresSourceConfig(
             int subtaskId,
@@ -71,7 +72,8 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
             int lsnCommitCheckpointsDelay,
             boolean assignUnboundedChunkFirst,
             boolean includePartitionedTables,
-            boolean includeDatabaseInTableId) {
+            boolean includeDatabaseInTableId,
+            boolean schemaChangeEnabled) {
         super(
                 startupOptions,
                 databaseList,
@@ -103,6 +105,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
         this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
         this.includePartitionedTables = includePartitionedTables;
         this.includeDatabaseInTableId = includeDatabaseInTableId;
+        this.schemaChangeEnabled = schemaChangeEnabled;
     }
 
     /**
@@ -152,12 +155,13 @@ public class PostgresSourceConfig extends 
JdbcSourceConfig {
         return new PostgresConnectorConfig(getDbzConfiguration());
     }
 
-    /**
-     * Returns whether to include database in the generated Table ID.
-     *
-     * @return whether to include database in the generated Table ID
-     */
+    /** Returns whether to include database in the generated Table ID. */
     public boolean isIncludeDatabaseInTableId() {
         return includeDatabaseInTableId;
     }
+
+    /** Returns whether to infer column types via JDBC TypeRegistry on schema 
change events. */
+    public boolean isSchemaChangeEnabled() {
+        return schemaChangeEnabled;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 847b15474..20e1800d4 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -57,6 +57,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     private boolean includeDatabaseInTableId =
             PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
 
+    private boolean schemaChangeEnabled = false;
+
     /** Creates a new {@link PostgresSourceConfig} for the given subtask 
{@code subtaskId}. */
     @Override
     public PostgresSourceConfig create(int subtaskId) {
@@ -140,7 +142,8 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 lsnCommitCheckpointsDelay,
                 assignUnboundedChunkFirst,
                 includePartitionedTables,
-                includeDatabaseInTableId);
+                includeDatabaseInTableId,
+                schemaChangeEnabled);
     }
 
     /**
@@ -198,4 +201,9 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
         this.includeDatabaseInTableId = includeDatabaseInTableId;
     }
+
+    /** Set whether to infer schema change event on relation message. */
+    public void enableSchemaChange(boolean schemaChangeEnabled) {
+        this.schemaChangeEnabled = schemaChangeEnabled;
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
index e2db160e1..ce9183af0 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/CDCPostgresDispatcher.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
 import 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
+import 
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import org.apache.flink.cdc.connectors.postgres.source.schema.SchemaDispatcher;
 
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.connector.postgresql.PostgresConnectorConfig;
@@ -30,6 +32,7 @@ import io.debezium.heartbeat.HeartbeatFactory;
 import io.debezium.pipeline.DataChangeEvent;
 import io.debezium.pipeline.source.spi.EventMetadataProvider;
 import io.debezium.pipeline.spi.ChangeEventCreator;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import io.debezium.schema.DataCollectionFilters;
 import io.debezium.schema.DatabaseSchema;
@@ -41,7 +44,7 @@ import java.util.Map;
 
 /** Postgres Dispatcher for cdc source with watermark. */
 public class CDCPostgresDispatcher extends PostgresEventDispatcher<TableId>
-        implements WatermarkDispatcher {
+        implements WatermarkDispatcher, SchemaDispatcher {
     private final String topic;
     private final ChangeEventQueue<DataChangeEvent> queue;
 
@@ -81,4 +84,15 @@ public class CDCPostgresDispatcher extends 
PostgresEventDispatcher<TableId>
                         sourcePartition, topic, sourceSplit.splitId(), 
watermarkKind, watermark);
         queue.enqueue(new DataChangeEvent(sourceRecord));
     }
+
+    @Override
+    public void dispatch(Table table) {
+        PostgresSchemaRecord schemaRecord = new PostgresSchemaRecord(table);
+        try {
+            queue.enqueue(new DataChangeEvent(schemaRecord));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index 49b62cb39..3a275ff6b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.postgres.source.fetch;
 
 import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
-import 
org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
@@ -30,6 +29,8 @@ import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
 import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils;
+import 
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import 
org.apache.flink.cdc.connectors.postgres.source.schema.RelationAwarePostgresSchema;
 import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -92,7 +93,7 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
     private ReplicationConnection replicationConnection;
     private PostgresOffsetContext offsetContext;
     private PostgresPartition partition;
-    private PostgresSchema schema;
+    private RelationAwarePostgresSchema schema;
     private ErrorHandler errorHandler;
     private CDCPostgresDispatcher postgresDispatcher;
     private EventMetadataProvider metadataProvider;
@@ -179,11 +180,6 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                         dbzConfig.getJdbcConfig(), valueConverterBuilder, 
CONNECTION_NAME);
 
         TopicSelector<TableId> topicSelector = 
PostgresTopicSelector.create(dbzConfig);
-        EmbeddedFlinkDatabaseHistory.registerHistory(
-                sourceConfig
-                        .getDbzConfiguration()
-                        
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
-                sourceSplitBase.getTableSchemas().values());
 
         try {
             this.schema =
@@ -192,7 +188,8 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                             dbzConfig,
                             jdbcConnection.getTypeRegistry(),
                             topicSelector,
-                            
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()));
+                            
valueConverterBuilder.build(jdbcConnection.getTypeRegistry()),
+                            sourceSplitBase.getTableSchemas().values());
         } catch (SQLException e) {
             throw new RuntimeException("Failed to initialize PostgresSchema", 
e);
         }
@@ -271,6 +268,7 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                                     }
                                 }),
                         schemaNameAdjuster);
+        schema.setDispatcher(postgresDispatcher);
 
         ChangeEventSourceMetricsFactory<PostgresPartition> metricsFactory =
                 new DefaultChangeEventSourceMetricsFactory<>();
@@ -326,6 +324,9 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
 
     @Override
     public TableId getTableId(SourceRecord record) {
+        if (record instanceof PostgresSchemaRecord) {
+            return ((PostgresSchemaRecord) record).getTable().id();
+        }
         Struct value = (Struct) record.value();
         Struct source = value.getStruct(Envelope.FieldName.SOURCE);
         String schemaName = source.getString(SCHEMA_NAME_KEY);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
new file mode 100644
index 000000000..7689eccf7
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceRecordEmitter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.reader;
+
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import 
org.apache.flink.cdc.connectors.postgres.source.schema.PostgresSchemaRecord;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.IOException;
+
+/** Record emitter that recognizes {@link PostgresSchemaRecord} as schema 
change events. */
+public class PostgresSourceRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    public PostgresSourceRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            boolean includeSchemaChanges,
+            OffsetFactory offsetFactory) {
+        super(
+                debeziumDeserializationSchema,
+                sourceReaderMetrics,
+                includeSchemaChanges,
+                offsetFactory);
+    }
+
+    @Override
+    protected TableChanges getTableChangeRecord(SourceRecord element) throws 
IOException {
+        if (element instanceof PostgresSchemaRecord) {
+            PostgresSchemaRecord schemaRecord = (PostgresSchemaRecord) element;
+            Table table = schemaRecord.getTable();
+            return new TableChanges().create(table);
+        } else {
+            return super.getTableChangeRecord(element);
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
new file mode 100644
index 000000000..53edb9d04
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/PostgresSchemaRecord.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/** A SourceRecord wrapper carrying a Debezium Table for schema change 
propagation. */
+public class PostgresSchemaRecord extends SourceRecord {
+    // Use `postgres-cdc` rather than `postgres` to avoid conflict if debezium 
support later.
+    private static final String SCHEMA_CHANGE_EVENT_KEY_NAME =
+            "io.debezium.connector.postgres-cdc.SchemaChangeKey";
+    private static final String SCHEMA_CHANGE_EVENT_VALUE_NAME =
+            "io.debezium.connector.postgres-cdc.SchemaChangeValue";
+
+    private static final Schema KEY_SCHEMA =
+            SchemaBuilder.struct()
+                    .name(SCHEMA_CHANGE_EVENT_KEY_NAME)
+                    .field("table_id", Schema.STRING_SCHEMA)
+                    .build();
+
+    /** Minimal value schema to avoid null valueSchema/value NPEs downstream. 
*/
+    private static final Schema VALUE_SCHEMA =
+            
SchemaBuilder.struct().name(SCHEMA_CHANGE_EVENT_VALUE_NAME).build();
+
+    private final Table table;
+
+    public PostgresSchemaRecord(Table table) {
+        super(
+                null,
+                null,
+                null,
+                KEY_SCHEMA,
+                buildKey(table),
+                VALUE_SCHEMA,
+                new Struct(VALUE_SCHEMA));
+        this.table = table;
+    }
+
+    private static Struct buildKey(Table table) {
+        Struct struct = new Struct(KEY_SCHEMA);
+        struct.put("table_id", table.id().toString());
+        return struct;
+    }
+
+    public Table getTable() {
+        return table;
+    }
+
+    @Override
+    public String toString() {
+        return "PostgresSchemaRecord{" + "table=" + table + '}';
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/RelationAwarePostgresSchema.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/RelationAwarePostgresSchema.java
new file mode 100644
index 000000000..e07cbe443
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/RelationAwarePostgresSchema.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.connector.postgresql.PostgresValueConverter;
+import io.debezium.connector.postgresql.TypeRegistry;
+import 
io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.schema.TopicSelector;
+
+/**
+ * Extends PostgresSchema to dispatch Relation messages as schema change 
events via the event queue
+ * and expose buildAndRegisterSchema as public.
+ */
+public class RelationAwarePostgresSchema extends PostgresSchema {
+
+    private SchemaDispatcher dispatcher;
+
+    public RelationAwarePostgresSchema(
+            PostgresConnectorConfig config,
+            TypeRegistry typeRegistry,
+            PostgresDefaultValueConverter defaultValueConverter,
+            TopicSelector<TableId> topicSelector,
+            PostgresValueConverter valueConverter) {
+        super(config, typeRegistry, defaultValueConverter, topicSelector, 
valueConverter);
+    }
+
+    public void setDispatcher(SchemaDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    @Override
+    public void applySchemaChangesForTable(int relationId, Table table) {
+        super.applySchemaChangesForTable(relationId, table);
+        if (dispatcher != null && !isFilteredOut(table.id())) {
+            dispatcher.dispatch(table);
+        }
+    }
+
+    @Override
+    public void buildAndRegisterSchema(Table table) {
+        super.buildAndRegisterSchema(table);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/SchemaDispatcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/SchemaDispatcher.java
new file mode 100644
index 000000000..01b46b870
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/schema/SchemaDispatcher.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.source.schema;
+
+import io.debezium.relational.Table;
+
+/** Dispatches Debezium table schema changes into the change event queue. */
+public interface SchemaDispatcher {
+    void dispatch(Table table);
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
index d2518efbb..b0ba6c4e3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/PostgresTestBase.java
@@ -38,6 +38,8 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
 
+import javax.annotation.Nullable;
+
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
@@ -248,13 +250,14 @@ public abstract class PostgresTestBase extends 
AbstractTestBase {
     protected PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
             UniqueDatabase database, String schemaName, String tableName, int 
splitSize) {
         return getMockPostgresSourceConfigFactory(
-                database, schemaName, tableName, splitSize, false);
+                database, schemaName, tableName, null, splitSize, false);
     }
 
     protected PostgresSourceConfigFactory getMockPostgresSourceConfigFactory(
             UniqueDatabase database,
             String schemaName,
             String tableName,
+            @Nullable String slotName,
             int splitSize,
             boolean skipSnapshotBackfill) {
 
@@ -270,6 +273,10 @@ public abstract class PostgresTestBase extends 
AbstractTestBase {
         postgresSourceConfigFactory.skipSnapshotBackfill(skipSnapshotBackfill);
         postgresSourceConfigFactory.setLsnCommitCheckpointsDelay(1);
         postgresSourceConfigFactory.decodingPluginName("pgoutput");
+        if (slotName != null) {
+            postgresSourceConfigFactory.slotName(slotName);
+        }
+
         return postgresSourceConfigFactory;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
index fa0fe8773..419c2be88 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/IncrementalSourceStreamFetcherTest.java
@@ -34,14 +34,21 @@ import 
org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
 
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isHeartbeatEvent;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link IncrementalSourceStreamFetcher }. */
@@ -58,11 +65,26 @@ public class IncrementalSourceStreamFetcherTest extends 
PostgresTestBase {
                     POSTGRES_CONTAINER.getUsername(),
                     POSTGRES_CONTAINER.getPassword());
 
+    private String slotName;
+
+    @BeforeEach
+    public void before() throws SQLException {
+        customDatabase.createAndInitialize();
+        this.slotName = getSlotName();
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        // sleep 1000ms to wait until connections are closed.
+        Thread.sleep(1000L);
+        customDatabase.removeSlot(slotName);
+    }
+
     @Test
     void testReadStreamSplitWithException() throws Exception {
-        customDatabase.createAndInitialize();
         PostgresSourceConfigFactory sourceConfigFactory =
-                getMockPostgresSourceConfigFactory(customDatabase, schemaName, 
tableName, 10, true);
+                getMockPostgresSourceConfigFactory(
+                        customDatabase, schemaName, tableName, slotName, 10, 
true);
         sourceConfigFactory.startupOptions(StartupOptions.latest());
         PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
         PostgresDialect dialect = new 
PostgresDialect(sourceConfigFactory.create(0));
@@ -96,6 +118,79 @@ public class IncrementalSourceStreamFetcherTest extends 
PostgresTestBase {
         fetcher.close();
     }
 
+    @Test
+    void testSchemaChangeRecordInQueue() throws Exception {
+        PostgresSourceConfigFactory sourceConfigFactory =
+                getMockPostgresSourceConfigFactory(
+                        customDatabase, schemaName, tableName, slotName, 10, 
true);
+        sourceConfigFactory.startupOptions(StartupOptions.latest());
+        PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
+        PostgresDialect dialect = new 
PostgresDialect(sourceConfigFactory.create(0));
+
+        PostgresSourceFetchTaskContext taskContext =
+                new PostgresSourceFetchTaskContext(sourceConfig, dialect);
+        IncrementalSourceStreamFetcher fetcher = new 
IncrementalSourceStreamFetcher(taskContext, 0);
+        StreamSplit split = createStreamSplit(sourceConfig, dialect);
+        PostgresStreamFetchTask fetchTask =
+                (PostgresStreamFetchTask) dialect.createFetchTask(split);
+
+        fetcher.submitTask(fetchTask);
+        // Wait for the stream reader to start consuming WAL
+        Thread.sleep(1000L);
+
+        try (Connection conn =
+                        getJdbcConnection(POSTGRES_CONTAINER, 
customDatabase.getDatabaseName());
+                Statement stmt = conn.createStatement()) {
+            // Insert a record BEFORE the DDL change
+            stmt.execute(
+                    "INSERT INTO customer.\"Customers\" VALUES (3001, 
'before_ddl', 'Beijing', '111')");
+
+            // Perform a DDL change to trigger schema change event
+            stmt.execute(
+                    "ALTER TABLE customer.\"Customers\" ADD COLUMN email 
VARCHAR(255) DEFAULT '[email protected]'");
+
+            // Insert a record AFTER the DDL change
+            stmt.execute(
+                    "INSERT INTO customer.\"Customers\" VALUES (3002, 
'after_ddl', 'Shanghai', '222', '[email protected]')");
+        }
+
+        // Wait for all events to be enqueued
+        Thread.sleep(1000L);
+
+        // Poll all records (data changes + schema changes) to verify ordering
+        List<SourceRecord> allRecords = pollRecordsFromReader(fetcher, r -> 
!isHeartbeatEvent(r));
+        assertThat(allRecords).hasSize(4);
+        assertThat(allRecords.get(0).toString())
+                .contains(
+                        "PostgresSchemaRecord{table=columns: {\n"
+                                + "  Id int4(10, 0) NOT NULL\n"
+                                + "  Name varchar(255, 0) NOT NULL DEFAULT 
VALUE 'flink'::character varying\n"
+                                + "  address varchar(1024, 0) DEFAULT VALUE 
NULL\n"
+                                + "  phone_number varchar(512, 0) DEFAULT 
VALUE NULL\n"
+                                + "}\n"
+                                + "primary key: [Id]\n"
+                                + "default charset: null\n"
+                                + "comment: null\n"
+                                + "}");
+        assertThat(allRecords.get(1).toString()).contains("before_ddl");
+        assertThat(allRecords.get(2).toString())
+                .contains(
+                        "PostgresSchemaRecord{table=columns: {\n"
+                                + "  Id int4(10, 0) NOT NULL\n"
+                                + "  Name varchar(255, 0) NOT NULL DEFAULT 
VALUE 'flink'::character varying\n"
+                                + "  address varchar(1024, 0) DEFAULT VALUE 
NULL\n"
+                                + "  phone_number varchar(512, 0) DEFAULT 
VALUE NULL\n"
+                                + "  email varchar(255, 0) DEFAULT VALUE 
'[email protected]'::character varying\n"
+                                + "}\n"
+                                + "primary key: [Id]\n"
+                                + "default charset: null\n"
+                                + "comment: null\n"
+                                + "}");
+        assertThat(allRecords.get(3).toString()).contains("after_ddl");
+
+        fetcher.close();
+    }
+
     private StreamSplit createStreamSplit(
             PostgresSourceConfig sourceConfig, PostgresDialect dialect) throws 
Exception {
         StreamSplitAssigner streamSplitAssigner =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index a0ff12c4f..a40f70296 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -247,7 +247,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     void testSnapshotFetchSize() throws Exception {
         customDatabase.createAndInitialize();
         PostgresSourceConfigFactory sourceConfigFactory =
-                getMockPostgresSourceConfigFactory(customDatabase, schemaName, 
tableName, 10, true);
+                getMockPostgresSourceConfigFactory(
+                        customDatabase, schemaName, tableName, null, 10, true);
         Properties properties = new Properties();
         properties.setProperty("snapshot.fetch.size", "2");
         sourceConfigFactory.debeziumProperties(properties);
@@ -291,7 +292,7 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
             throws Exception {
         PostgresSourceConfigFactory sourceConfigFactory =
                 getMockPostgresSourceConfigFactory(
-                        customDatabase, schemaName, tableName, 10, 
skipSnapshotBackfill);
+                        customDatabase, schemaName, tableName, null, 10, 
skipSnapshotBackfill);
         PostgresSourceConfig sourceConfig = sourceConfigFactory.create(0);
         PostgresDialect postgresDialect = new 
PostgresDialect(sourceConfigFactory.create(0));
         SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 68ef4690e..77b845074 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -21,10 +21,15 @@ import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
 import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
 import org.apache.flink.cdc.connectors.postgres.source.MockPostgresDialect;
+import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
 import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
 import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
@@ -39,6 +44,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 
 import io.debezium.relational.TableId;
+import io.debezium.relational.history.TableChanges;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -46,11 +52,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.sql.Connection;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.core.io.InputStatus.END_OF_INPUT;
 import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE;
@@ -186,6 +195,106 @@ public class PostgresSourceReaderTest extends 
PostgresTestBase {
         assertEqualsInAnyOrder(Arrays.asList(expectedRecords), actualRecords);
     }
 
+    @Test
+    void testSchemaChangeUpdatesSnapshotState() throws Exception {
+        PostgresSourceReader reader = createStreamReader();
+        reader.start();
+
+        // Discover table schemas and create a stream split
+        PostgresSourceConfigFactory configFactory = createConfigFactory();
+        PostgresSourceConfig sourceConfig = configFactory.create(0);
+        PostgresDialect dialect = new PostgresDialect(sourceConfig);
+        Map<TableId, TableChanges.TableChange> tableSchemas =
+                dialect.discoverDataCollectionSchemas(sourceConfig);
+
+        TableId tableId = new TableId(null, SCHEMA_NAME, "Customers");
+        // Verify original schema has 4 columns (Id, Name, address, 
phone_number)
+        assertThat(tableSchemas.get(tableId).getTable().columns()).hasSize(4);
+        
assertThat(tableSchemas.get(tableId).getTable().columnWithName("email")).isNull();
+
+        PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
+        StreamSplit streamSplit =
+                new StreamSplit(
+                        StreamSplit.STREAM_SPLIT_ID,
+                        offsetFactory.createInitialOffset(),
+                        offsetFactory.createNoStoppingOffset(),
+                        Collections.emptyList(),
+                        tableSchemas,
+                        0);
+        reader.addSplits(Collections.singletonList(streamSplit));
+
+        // Wait for the reader to start consuming
+        Thread.sleep(1000L);
+
+        try (Connection conn =
+                        getJdbcConnection(POSTGRES_CONTAINER, 
customDatabase.getDatabaseName());
+                Statement stmt = conn.createStatement()) {
+            // Insert a record BEFORE the DDL change
+            stmt.execute(
+                    "INSERT INTO customer.\"Customers\" VALUES (3001, 
'before_ddl', 'Beijing', '111')");
+
+            // Perform a DDL change
+            stmt.execute(
+                    "ALTER TABLE customer.\"Customers\" ADD COLUMN email 
VARCHAR(255) DEFAULT '[email protected]'");
+
+            // Insert a record AFTER the DDL change
+            stmt.execute(
+                    "INSERT INTO customer.\"Customers\" VALUES (3002, 
'after_ddl', 'Shanghai', '222', '[email protected]')");
+        }
+
+        // Wait for the schema change event to be processed
+        Thread.sleep(1000L);
+
+        // Poll records so the emitter processes all events
+        final SimpleReaderOutput output = new SimpleReaderOutput();
+        for (int i = 0; i < 10; i++) {
+            reader.pollNext(output);
+        }
+
+        // Verify the emitted records contain data before and after DDL in 
correct order
+        List<SourceRecord> results = output.getResults();
+        int beforeDdlPos = -1;
+        int afterDdlPos = -1;
+        for (int i = 0; i < results.size(); i++) {
+            SourceRecord record = results.get(i);
+            if (record.value() != null) {
+                String value = record.value().toString();
+                if (value.contains("before_ddl")) {
+                    beforeDdlPos = i;
+                } else if (value.contains("after_ddl")) {
+                    afterDdlPos = i;
+                }
+            }
+        }
+        assertThat(beforeDdlPos)
+                .as("Should capture the INSERT before DDL")
+                .isGreaterThanOrEqualTo(0);
+        assertThat(afterDdlPos).as("Should capture the INSERT after 
DDL").isGreaterThanOrEqualTo(0);
+        assertThat(beforeDdlPos)
+                .as("INSERT before DDL should appear before INSERT after DDL")
+                .isLessThan(afterDdlPos);
+
+        // Verify that snapshotState returns splits with updated table schema
+        List<SourceSplitBase> splits = reader.snapshotState(1L);
+        assertThat(splits).isNotEmpty();
+
+        boolean foundUpdatedSchema = false;
+        for (SourceSplitBase split : splits) {
+            if (split.isStreamSplit()) {
+                Map<TableId, TableChanges.TableChange> schemas =
+                        split.asStreamSplit().getTableSchemas();
+                if (schemas.containsKey(tableId)
+                        && 
schemas.get(tableId).getTable().columnWithName("email") != null) {
+                    foundUpdatedSchema = true;
+                    break;
+                }
+            }
+        }
+        assertThat(foundUpdatedSchema)
+                .as("The snapshotState should contain the updated table schema 
with 'email' column")
+                .isTrue();
+    }
+
     private List<String> consumeSnapshotRecords(
             PostgresSourceReader sourceReader, DataType recordType) throws 
Exception {
         // Poll all the  records of the multiple assigned snapshot split.
@@ -207,6 +316,29 @@ public class PostgresSourceReaderTest extends 
PostgresTestBase {
     private PostgresSourceReader createReader(
             final int lsnCommitCheckpointsDelay, boolean skipBackFill) throws 
Exception {
         final PostgresOffsetFactory offsetFactory = new 
PostgresOffsetFactory();
+        final PostgresSourceConfigFactory configFactory = 
createConfigFactory();
+        configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
+        configFactory.skipSnapshotBackfill(skipBackFill);
+        MockPostgresDialect dialect = new 
MockPostgresDialect(configFactory.create(0));
+        final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
+                new PostgresSourceBuilder.PostgresIncrementalSource<>(
+                        configFactory, new ForwardDeserializeSchema(), 
offsetFactory, dialect);
+        return source.createReader(new TestingReaderContext());
+    }
+
+    private PostgresSourceReader createStreamReader() throws Exception {
+        final PostgresOffsetFactory offsetFactory = new 
PostgresOffsetFactory();
+        final PostgresSourceConfigFactory configFactory = 
createConfigFactory();
+        configFactory.startupOptions(StartupOptions.latest());
+        configFactory.setLsnCommitCheckpointsDelay(1);
+        PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
+        final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
+                new PostgresSourceBuilder.PostgresIncrementalSource<>(
+                        configFactory, new ForwardDeserializeSchema(), 
offsetFactory, dialect);
+        return source.createReader(new TestingReaderContext());
+    }
+
+    private PostgresSourceConfigFactory createConfigFactory() {
         final PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
         configFactory.hostname(customDatabase.getHost());
         configFactory.port(customDatabase.getDatabasePort());
@@ -214,14 +346,8 @@ public class PostgresSourceReaderTest extends 
PostgresTestBase {
         configFactory.tableList(SCHEMA_NAME + ".Customers");
         configFactory.username(customDatabase.getUsername());
         configFactory.password(customDatabase.getPassword());
-        configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
-        configFactory.skipSnapshotBackfill(skipBackFill);
         configFactory.decodingPluginName("pgoutput");
-        MockPostgresDialect dialect = new 
MockPostgresDialect(configFactory.create(0));
-        final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
-                new PostgresSourceBuilder.PostgresIncrementalSource<>(
-                        configFactory, new ForwardDeserializeSchema(), 
offsetFactory, dialect);
-        return source.createReader(new TestingReaderContext());
+        return configFactory;
     }
 
     // ------------------------------------------------------------------------

Reply via email to