This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch release-3.6
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit adec8c121ecce8e5a05a7237afd95c9f396606d3
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Mar 25 09:41:55 2026 +0800

    [hotfix] [pipeline-connector/postgres] Fix wrong SchemaChangeEnabled option 
during delivery
    
    This closes #4333.
    
    (cherry picked from commit b58561a081a63953b58344a4b69e7164a3d5687d)
---
 .../reader/PostgresPipelineRecordEmitter.java      |  2 +-
 .../postgres/source/PostgresPipelineITCase.java    |  2 +-
 .../source/config/PostgresSourceConfig.java        | 10 +----
 .../source/config/PostgresSourceConfigFactory.java | 10 +----
 .../cdc/pipeline/tests/PostgresE2eITCase.java      | 46 +++++++++++++++++++++-
 5 files changed, 49 insertions(+), 21 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 18c1d9b99..49b20e324 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -135,7 +135,7 @@ public class PostgresPipelineRecordEmitter<T> extends 
PostgresSourceRecordEmitte
             maybeSendCreateTableEventFromCache(tableId, output);
         } else if (isDataChangeRecord(element)) {
             handleDataChangeRecord(element, output);
-        } else if (isSchemaChangeEvent(element) && 
sourceConfig.isSchemaChangeEnabled()) {
+        } else if (isSchemaChangeEvent(element) && 
sourceConfig.isIncludeSchemaChanges()) {
             handleSchemaChangeRecord(element, output, splitState);
         }
         super.processElement(element, output, splitState);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
index 51de6a3b4..f45209169 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
@@ -620,7 +620,7 @@ public class PostgresPipelineITCase extends 
PostgresTestBase {
         configFactory.database(inventoryDatabase.getDatabaseName());
         configFactory.slotName(slotName);
         configFactory.decodingPluginName("pgoutput");
-        configFactory.enableSchemaChange(true);
+        configFactory.includeSchemaChanges(true);
 
         FlinkSourceProvider sourceProvider =
                 (FlinkSourceProvider)
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 9807bb2dc..4ad4b3e7e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -40,7 +40,6 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
     private final int lsnCommitCheckpointsDelay;
     private final boolean includePartitionedTables;
     private final boolean includeDatabaseInTableId;
-    private final boolean schemaChangeEnabled;
 
     public PostgresSourceConfig(
             int subtaskId,
@@ -72,8 +71,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
             int lsnCommitCheckpointsDelay,
             boolean assignUnboundedChunkFirst,
             boolean includePartitionedTables,
-            boolean includeDatabaseInTableId,
-            boolean schemaChangeEnabled) {
+            boolean includeDatabaseInTableId) {
         super(
                 startupOptions,
                 databaseList,
@@ -105,7 +103,6 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
         this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
         this.includePartitionedTables = includePartitionedTables;
         this.includeDatabaseInTableId = includeDatabaseInTableId;
-        this.schemaChangeEnabled = schemaChangeEnabled;
     }
 
     /**
@@ -159,9 +156,4 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
     public boolean isIncludeDatabaseInTableId() {
         return includeDatabaseInTableId;
     }
-
-    /** Returns whether to infer column types via JDBC TypeRegistry on schema 
change events. */
-    public boolean isSchemaChangeEnabled() {
-        return schemaChangeEnabled;
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 20e1800d4..847b15474 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -57,8 +57,6 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     private boolean includeDatabaseInTableId =
             PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
 
-    private boolean schemaChangeEnabled = false;
-
     /** Creates a new {@link PostgresSourceConfig} for the given subtask 
{@code subtaskId}. */
     @Override
     public PostgresSourceConfig create(int subtaskId) {
@@ -142,8 +140,7 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
                 lsnCommitCheckpointsDelay,
                 assignUnboundedChunkFirst,
                 includePartitionedTables,
-                includeDatabaseInTableId,
-                schemaChangeEnabled);
+                includeDatabaseInTableId);
     }
 
     /**
@@ -201,9 +198,4 @@ public class PostgresSourceConfigFactory extends 
JdbcSourceConfigFactory {
     public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
         this.includeDatabaseInTableId = includeDatabaseInTableId;
     }
-
-    /** Set whether to infer schema change event on relation message. */
-    public void enableSchemaChange(boolean schemaChangeEnabled) {
-        this.schemaChangeEnabled = schemaChangeEnabled;
-    }
 }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
index e4b33b3ff..bcc70aff2 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
@@ -116,12 +116,14 @@ public class PostgresE2eITCase extends 
PipelineTestEnvironment {
                                 + "  scan.startup.mode: initial\n"
                                 + "  server-time-zone: UTC\n"
                                 + "  connect.timeout: 120s\n"
+                                + "  schema-change.enabled: true\n"
                                 + "\n"
                                 + "sink:\n"
                                 + "  type: values\n"
                                 + "\n"
                                 + "pipeline:\n"
-                                + "  parallelism: %d",
+                                + "  parallelism: %d\n"
+                                + "  schema.change.behavior: evolve",
                         INTER_CONTAINER_POSTGRES_ALIAS,
                         5432,
                         POSTGRES_TEST_USER,
@@ -172,5 +174,47 @@ public class PostgresE2eITCase extends 
PipelineTestEnvironment {
             LOG.error("Update table for CDC failed.", e);
             throw new RuntimeException(e);
         }
+
+        LOG.info("Begin schema change stage.");
+
+        try (Connection conn =
+                        getJdbcConnection(
+                                POSTGRES_CONTAINER, 
postgresInventoryDatabase.getDatabaseName());
+                Statement stat = conn.createStatement()) {
+            // Test ADD COLUMN
+            stat.execute("ALTER TABLE inventory.products ADD COLUMN category 
VARCHAR(255);");
+            stat.execute(
+                    "INSERT INTO inventory.products VALUES (default, 'widget', 
'A small widget', 1.5, 'tools');");
+
+            // Test DROP COLUMN
+            stat.execute("ALTER TABLE inventory.products DROP COLUMN weight;");
+            stat.execute(
+                    "INSERT INTO inventory.products VALUES (default, 'gadget', 
'A useful gadget', 'electronics');");
+
+            // Test RENAME COLUMN
+            stat.execute(
+                    "ALTER TABLE inventory.products RENAME COLUMN category TO 
product_category;");
+            stat.execute(
+                    "INSERT INTO inventory.products VALUES (default, 'gizmo', 
'A fancy gizmo', 'gadgets');");
+        } catch (Exception e) {
+            LOG.error("Schema change test failed.", e);
+            throw new RuntimeException(e);
+        }
+
+        // Validate schema change events and corresponding data
+        waitUntilSpecificEvent(
+                "AddColumnEvent{tableId=inventory.products, 
addedColumns=[ColumnWithPosition{column=`category` VARCHAR(255), position=LAST, 
existedColumnName=null}]}");
+        waitUntilSpecificEvent(
+                "DataChangeEvent{tableId=inventory.products, before=[], 
after=[110, widget, A small widget, 1.5, tools], op=INSERT, meta=()}");
+
+        waitUntilSpecificEvent(
+                "DropColumnEvent{tableId=inventory.products, 
droppedColumnNames=[weight]}");
+        waitUntilSpecificEvent(
+                "DataChangeEvent{tableId=inventory.products, before=[], 
after=[111, gadget, A useful gadget, electronics], op=INSERT, meta=()}");
+
+        waitUntilSpecificEvent(
+                "RenameColumnEvent{tableId=inventory.products, 
nameMapping={category=product_category}}");
+        waitUntilSpecificEvent(
+                "DataChangeEvent{tableId=inventory.products, before=[], 
after=[112, gizmo, A fancy gizmo, gadgets], op=INSERT, meta=()}");
     }
 }

Reply via email to