This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch release-3.6 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit adec8c121ecce8e5a05a7237afd95c9f396606d3 Author: Hongshun Wang <[email protected]> AuthorDate: Wed Mar 25 09:41:55 2026 +0800 [hotfix] [pipeline-connector/postgres] Fix wrong SchemaChangeEnabled option during delivery This closes #4333. (cherry picked from commit b58561a081a63953b58344a4b69e7164a3d5687d) --- .../reader/PostgresPipelineRecordEmitter.java | 2 +- .../postgres/source/PostgresPipelineITCase.java | 2 +- .../source/config/PostgresSourceConfig.java | 10 +---- .../source/config/PostgresSourceConfigFactory.java | 10 +---- .../cdc/pipeline/tests/PostgresE2eITCase.java | 46 +++++++++++++++++++++- 5 files changed, 49 insertions(+), 21 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java index 18c1d9b99..49b20e324 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java @@ -135,7 +135,7 @@ public class PostgresPipelineRecordEmitter<T> extends PostgresSourceRecordEmitte maybeSendCreateTableEventFromCache(tableId, output); } else if (isDataChangeRecord(element)) { handleDataChangeRecord(element, output); - } else if (isSchemaChangeEvent(element) && sourceConfig.isSchemaChangeEnabled()) { + } else if (isSchemaChangeEvent(element) && sourceConfig.isIncludeSchemaChanges()) { handleSchemaChangeRecord(element, output, splitState); } super.processElement(element, output, splitState); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java index 51de6a3b4..f45209169 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java @@ -620,7 +620,7 @@ public class PostgresPipelineITCase extends PostgresTestBase { configFactory.database(inventoryDatabase.getDatabaseName()); configFactory.slotName(slotName); configFactory.decodingPluginName("pgoutput"); - configFactory.enableSchemaChange(true); + configFactory.includeSchemaChanges(true); FlinkSourceProvider sourceProvider = (FlinkSourceProvider) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java index 9807bb2dc..4ad4b3e7e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java @@ -40,7 +40,6 @@ public class PostgresSourceConfig extends JdbcSourceConfig { private final int lsnCommitCheckpointsDelay; private final boolean includePartitionedTables; private final boolean includeDatabaseInTableId; - private final boolean schemaChangeEnabled; public PostgresSourceConfig( int subtaskId, @@ -72,8 +71,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig { int lsnCommitCheckpointsDelay, boolean assignUnboundedChunkFirst, boolean includePartitionedTables, - boolean includeDatabaseInTableId, - boolean schemaChangeEnabled) { + boolean includeDatabaseInTableId) { super( startupOptions, databaseList, @@ -105,7 +103,6 @@ public class PostgresSourceConfig extends JdbcSourceConfig { this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay; this.includePartitionedTables = includePartitionedTables; this.includeDatabaseInTableId = includeDatabaseInTableId; - this.schemaChangeEnabled = schemaChangeEnabled; } /** @@ -159,9 +156,4 @@ public class PostgresSourceConfig extends JdbcSourceConfig { public boolean isIncludeDatabaseInTableId() { return includeDatabaseInTableId; } - - /** Returns whether to infer column types via JDBC TypeRegistry on schema change events. */ - public boolean isSchemaChangeEnabled() { - return schemaChangeEnabled; - } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 20e1800d4..847b15474 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -57,8 +57,6 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { private boolean includeDatabaseInTableId = PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue(); - private boolean schemaChangeEnabled = false; - /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override public PostgresSourceConfig create(int subtaskId) { @@ -142,8 +140,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { lsnCommitCheckpointsDelay, assignUnboundedChunkFirst, includePartitionedTables, - includeDatabaseInTableId, - schemaChangeEnabled); + includeDatabaseInTableId); } /** @@ -201,9 +198,4 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) { this.includeDatabaseInTableId = includeDatabaseInTableId; } - - /** Set whether to infer schema change event on relation message. */ - public void enableSchemaChange(boolean schemaChangeEnabled) { - this.schemaChangeEnabled = schemaChangeEnabled; - } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java index e4b33b3ff..bcc70aff2 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java @@ -116,12 +116,14 @@ public class PostgresE2eITCase extends PipelineTestEnvironment { + " scan.startup.mode: initial\n" + " server-time-zone: UTC\n" + " connect.timeout: 120s\n" + + " schema-change.enabled: true\n" + "\n" + "sink:\n" + " type: values\n" + "\n" + "pipeline:\n" - + " parallelism: %d", + + " parallelism: %d\n" + + " schema.change.behavior: evolve", INTER_CONTAINER_POSTGRES_ALIAS, 5432, POSTGRES_TEST_USER, @@ -172,5 +174,47 @@ public class PostgresE2eITCase extends PipelineTestEnvironment { LOG.error("Update table for CDC failed.", e); throw new RuntimeException(e); } + + LOG.info("Begin schema change stage."); + + try (Connection conn = + getJdbcConnection( + POSTGRES_CONTAINER, postgresInventoryDatabase.getDatabaseName()); + Statement stat = conn.createStatement()) { + // Test ADD COLUMN + stat.execute("ALTER TABLE inventory.products ADD COLUMN category VARCHAR(255);"); + stat.execute( + "INSERT INTO inventory.products VALUES (default, 'widget', 'A small widget', 1.5, 'tools');"); + + // Test DROP COLUMN + stat.execute("ALTER TABLE inventory.products DROP COLUMN weight;"); + stat.execute( + "INSERT INTO inventory.products VALUES (default, 'gadget', 'A useful gadget', 'electronics');"); + + // Test RENAME COLUMN + stat.execute( + "ALTER TABLE inventory.products RENAME COLUMN category TO product_category;"); + stat.execute( + "INSERT INTO inventory.products VALUES (default, 'gizmo', 'A fancy gizmo', 'gadgets');"); + } catch (Exception e) { + LOG.error("Schema change test failed.", e); + throw new RuntimeException(e); + } + + // Validate schema change events and corresponding data + waitUntilSpecificEvent( + "AddColumnEvent{tableId=inventory.products, addedColumns=[ColumnWithPosition{column=`category` VARCHAR(255), position=LAST, existedColumnName=null}]}"); + waitUntilSpecificEvent( + "DataChangeEvent{tableId=inventory.products, before=[], after=[110, widget, A small widget, 1.5, tools], op=INSERT, meta=()}"); + + waitUntilSpecificEvent( + "DropColumnEvent{tableId=inventory.products, droppedColumnNames=[weight]}"); + waitUntilSpecificEvent( + "DataChangeEvent{tableId=inventory.products, before=[], after=[111, gadget, A useful gadget, electronics], op=INSERT, meta=()}"); + + waitUntilSpecificEvent( + "RenameColumnEvent{tableId=inventory.products, nameMapping={category=product_category}}"); + waitUntilSpecificEvent( + "DataChangeEvent{tableId=inventory.products, before=[], after=[112, gizmo, A fancy gizmo, gadgets], op=INSERT, meta=()}"); } }
