This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new b58561a08 [hotfix] [pipeline-connector/postgres] Fix wrong
SchemaChangeEnabled option during delivery
b58561a08 is described below
commit b58561a081a63953b58344a4b69e7164a3d5687d
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Mar 25 09:41:55 2026 +0800
[hotfix] [pipeline-connector/postgres] Fix wrong SchemaChangeEnabled option
during delivery
This closes #4333.
---
.../reader/PostgresPipelineRecordEmitter.java | 2 +-
.../postgres/source/PostgresPipelineITCase.java | 2 +-
.../source/config/PostgresSourceConfig.java | 10 +----
.../source/config/PostgresSourceConfigFactory.java | 10 +----
.../cdc/pipeline/tests/PostgresE2eITCase.java | 46 +++++++++++++++++++++-
5 files changed, 49 insertions(+), 21 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index 18c1d9b99..49b20e324 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -135,7 +135,7 @@ public class PostgresPipelineRecordEmitter<T> extends
PostgresSourceRecordEmitte
maybeSendCreateTableEventFromCache(tableId, output);
} else if (isDataChangeRecord(element)) {
handleDataChangeRecord(element, output);
- } else if (isSchemaChangeEvent(element) &&
sourceConfig.isSchemaChangeEnabled()) {
+ } else if (isSchemaChangeEvent(element) &&
sourceConfig.isIncludeSchemaChanges()) {
handleSchemaChangeRecord(element, output, splitState);
}
super.processElement(element, output, splitState);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
index 51de6a3b4..f45209169 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCase.java
@@ -620,7 +620,7 @@ public class PostgresPipelineITCase extends
PostgresTestBase {
configFactory.database(inventoryDatabase.getDatabaseName());
configFactory.slotName(slotName);
configFactory.decodingPluginName("pgoutput");
- configFactory.enableSchemaChange(true);
+ configFactory.includeSchemaChanges(true);
FlinkSourceProvider sourceProvider =
(FlinkSourceProvider)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index 9807bb2dc..4ad4b3e7e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -40,7 +40,6 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private final int lsnCommitCheckpointsDelay;
private final boolean includePartitionedTables;
private final boolean includeDatabaseInTableId;
- private final boolean schemaChangeEnabled;
public PostgresSourceConfig(
int subtaskId,
@@ -72,8 +71,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst,
boolean includePartitionedTables,
- boolean includeDatabaseInTableId,
- boolean schemaChangeEnabled) {
+ boolean includeDatabaseInTableId) {
super(
startupOptions,
databaseList,
@@ -105,7 +103,6 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.includePartitionedTables = includePartitionedTables;
this.includeDatabaseInTableId = includeDatabaseInTableId;
- this.schemaChangeEnabled = schemaChangeEnabled;
}
/**
@@ -159,9 +156,4 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
public boolean isIncludeDatabaseInTableId() {
return includeDatabaseInTableId;
}
-
- /** Returns whether to infer column types via JDBC TypeRegistry on schema
change events. */
- public boolean isSchemaChangeEnabled() {
- return schemaChangeEnabled;
- }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 20e1800d4..847b15474 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -57,8 +57,6 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
private boolean includeDatabaseInTableId =
PostgresSourceOptions.TABLE_ID_INCLUDE_DATABASE.defaultValue();
- private boolean schemaChangeEnabled = false;
-
/** Creates a new {@link PostgresSourceConfig} for the given subtask
{@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
@@ -142,8 +140,7 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
includePartitionedTables,
- includeDatabaseInTableId,
- schemaChangeEnabled);
+ includeDatabaseInTableId);
}
/**
@@ -201,9 +198,4 @@ public class PostgresSourceConfigFactory extends
JdbcSourceConfigFactory {
public void setIncludeDatabaseInTableId(boolean includeDatabaseInTableId) {
this.includeDatabaseInTableId = includeDatabaseInTableId;
}
-
- /** Set whether to infer schema change event on relation message. */
- public void enableSchemaChange(boolean schemaChangeEnabled) {
- this.schemaChangeEnabled = schemaChangeEnabled;
- }
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
index e4b33b3ff..bcc70aff2 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
@@ -116,12 +116,14 @@ public class PostgresE2eITCase extends
PipelineTestEnvironment {
+ " scan.startup.mode: initial\n"
+ " server-time-zone: UTC\n"
+ " connect.timeout: 120s\n"
+ + " schema-change.enabled: true\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
- + " parallelism: %d",
+ + " parallelism: %d\n"
+ + " schema.change.behavior: evolve",
INTER_CONTAINER_POSTGRES_ALIAS,
5432,
POSTGRES_TEST_USER,
@@ -172,5 +174,47 @@ public class PostgresE2eITCase extends
PipelineTestEnvironment {
LOG.error("Update table for CDC failed.", e);
throw new RuntimeException(e);
}
+
+ LOG.info("Begin schema change stage.");
+
+ try (Connection conn =
+ getJdbcConnection(
+ POSTGRES_CONTAINER,
postgresInventoryDatabase.getDatabaseName());
+ Statement stat = conn.createStatement()) {
+ // Test ADD COLUMN
+ stat.execute("ALTER TABLE inventory.products ADD COLUMN category
VARCHAR(255);");
+ stat.execute(
+ "INSERT INTO inventory.products VALUES (default, 'widget',
'A small widget', 1.5, 'tools');");
+
+ // Test DROP COLUMN
+ stat.execute("ALTER TABLE inventory.products DROP COLUMN weight;");
+ stat.execute(
+ "INSERT INTO inventory.products VALUES (default, 'gadget',
'A useful gadget', 'electronics');");
+
+ // Test RENAME COLUMN
+ stat.execute(
+ "ALTER TABLE inventory.products RENAME COLUMN category TO
product_category;");
+ stat.execute(
+ "INSERT INTO inventory.products VALUES (default, 'gizmo',
'A fancy gizmo', 'gadgets');");
+ } catch (Exception e) {
+ LOG.error("Schema change test failed.", e);
+ throw new RuntimeException(e);
+ }
+
+ // Validate schema change events and corresponding data
+ waitUntilSpecificEvent(
+ "AddColumnEvent{tableId=inventory.products,
addedColumns=[ColumnWithPosition{column=`category` VARCHAR(255), position=LAST,
existedColumnName=null}]}");
+ waitUntilSpecificEvent(
+ "DataChangeEvent{tableId=inventory.products, before=[],
after=[110, widget, A small widget, 1.5, tools], op=INSERT, meta=()}");
+
+ waitUntilSpecificEvent(
+ "DropColumnEvent{tableId=inventory.products,
droppedColumnNames=[weight]}");
+ waitUntilSpecificEvent(
+ "DataChangeEvent{tableId=inventory.products, before=[],
after=[111, gadget, A useful gadget, electronics], op=INSERT, meta=()}");
+
+ waitUntilSpecificEvent(
+ "RenameColumnEvent{tableId=inventory.products,
nameMapping={category=product_category}}");
+ waitUntilSpecificEvent(
+ "DataChangeEvent{tableId=inventory.products, before=[],
after=[112, gizmo, A fancy gizmo, gadgets], op=INSERT, meta=()}");
}
}