loserwang1024 commented on code in PR #4233:
URL: https://github.com/apache/flink-cdc/pull/4233#discussion_r2909675532
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
Review Comment:
---
### ✅ Improved Version
The `schemaMap` can be restored from the `StreamSplit`, specifically from
its `tableSchemas` field upon job restart. To ensure correctness, the
`tableSchemas` in the `StreamSplit` should be updated during each checkpoint to
reflect any changes in the schema.
Otherwise, after a job restart, the `schemaMap` will be empty, and the
previous schema will be `null`. In your current design, it seems that even if
the schema of newly arriving data changes, no schema change event will be
generated.
You can address this by passing the schema information through
`org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema#applyChangeEvent`.
---
To be honest, I believe that
`DebeziumEventDeserializationSchema#createTableEventCache` is not necessary for
PostgreSQL CDC. We can achieve the same — and more powerful — functionality
using the `schemaMap`.
### ✅ My Proposed Design for CreateTableEvent
1. **Replace `PostgresPipelineRecordEmitter#createTableEventCache` with
`schemaMap`**:
Use `schemaMap` as the source of truth for table schemas instead of
maintaining a separate cache.
2. **Remove the logic in `PostgresPipelineRecordEmitter` that generates
`CreateTableEvent` from the table ID**:
```java
// In rare cases, we may miss some CreateTableEvents before
DataChangeEvents.
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest
schema.
if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig,
tableId);
output.collect((T) createTableEvent);
createTableEventCache.put(tableId, createTableEvent);
}
```
3. **Move schema change handling to
`PostgresEventDeserializer#handleSchemaChange`**:
If the previous schema is `null`, emit a `CreateTableEvent`. This ensures
that all schema change events are centralized within the deserializer rather
than being scattered across different components.
---
### ✅ Summary
By centralizing schema change handling in `PostgresEventDeserializer` and
leveraging `schemaMap` via `StreamSplit`, we can simplify the architecture,
reduce redundancy, and improve the robustness of schema tracking in PostgreSQL
CDC scenarios.
Let me know if you'd like help writing this as a pull request description,
or need further clarification on any part!
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
+ }
+ if (isDataChangeRecord(record)) {
+ LOG.trace("Process data change record: {}", record);
+ result.addAll(deserializeDataChangeRecord(record));
+ } else if (isSchemaChangeRecord(record)) {
+ LOG.trace("Process schema change record: {}", record);
+ } else {
+ LOG.trace("Ignored other record: {}", record);
+ return Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void handleSchemaChange(SourceRecord record, List<Event> result) {
+ TableId tableId = getTableId(record);
+ Schema valueSchema = record.valueSchema();
+ Schema beforeSchema = schemaMap.get(tableId);
+ List<String> beforeColumnNames;
+ List<String> afterColumnNames;
+ Schema afterSchema = fieldSchema(valueSchema,
Envelope.FieldName.AFTER);
+ List<Field> afterFields = afterSchema.fields();
+ org.apache.flink.cdc.common.schema.Schema schema =
Review Comment:
```java
org.apache.flink.cdc.common.schema.Schema schema =
PostgresSchemaUtils.getTableSchema(postgresSourceConfig,
tableId);
```
It's too heavy that lookup current schema from database for each record. Why
not just just afterSchema and beforeSchema? why stiil need to lookup current
schema?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -157,6 +163,278 @@ public void testInitialStartupMode() throws Exception {
assertThat(inventoryDatabase.checkSlot(slotName)).isEqualTo(slotName);
}
+ @ParameterizedTest(name = "testType: {0}")
+ @ValueSource(
+ strings = {
+ "modifyType",
+ "drop,add",
+ "rename",
+ "drop,add,rename",
+ "add,rename",
+ "add2column,rename2column",
+ "add,rename,add2column,rename2column",
+ "add,rename,drop,add"
+ })
+ public void testPostgresSchemaEvolution(String testType) throws Exception {
+ inventoryDatabase.createAndInitialize();
+ PostgresSourceConfigFactory configFactory =
+ (PostgresSourceConfigFactory)
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGRES_CONTAINER.getHost())
+
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+
.databaseList(inventoryDatabase.getDatabaseName())
+ .tableList("inventory.products")
+ .startupOptions(StartupOptions.initial())
+ .serverTimeZone("UTC");
+ configFactory.database(inventoryDatabase.getDatabaseName());
+ configFactory.slotName(slotName);
+ configFactory.decodingPluginName("pgoutput");
+ configFactory.includeSchemaChanges(true);
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider)
+ new
PostgresDataSource(configFactory).getEventSourceProvider();
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ PostgresDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ TableId tableId = TableId.tableId("inventory", "products");
+ CreateTableEvent createTableEvent =
getProductsCreateTableEvent(tableId);
+
+ // generate snapshot data
+ List<Event> expectedSnapshot = getSnapshotExpected(tableId);
+
+ // In this configuration, several subtasks might emit their
corresponding CreateTableEvent
+ // to downstream. Since it is not possible to predict how many
CreateTableEvents should we
+ // expect, we simply filter them out from expected sets, and assert
there's at least one.
+ fetchResultsExcept(events, expectedSnapshot.size(), createTableEvent);
+ try (Connection conn =
+ getJdbcConnection(POSTGRES_CONTAINER,
inventoryDatabase.getDatabaseName());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "INSERT INTO inventory.products (name, description,
weight) "
+ + "VALUES ('scooter', 'Small 2-wheel scooter',
3.14)");
+ List<Event> actual;
+ List<Event> expected;
+ Map<String, String> renameMap = new HashMap<>();
+ switch (testType) {
+ case "modifyType":
Review Comment:
In your code,also includes change of `oldField.schema().defaultValue()` and
`oldField.schema().parameters()`, please also test them.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
- this(changelogMode, new ArrayList<>(), false, null);
+ this(changelogMode, new ArrayList<>(), false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList) {
- this(changelogMode, readableMetadataList, false, null);
+ this(changelogMode, readableMetadataList, false, null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId) {
- this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null);
+ this(changelogMode, readableMetadataList, includeDatabaseInTableId,
null, null, null);
}
public PostgresEventDeserializer(
DebeziumChangelogMode changelogMode,
List<PostgreSQLReadableMetadata> readableMetadataList,
boolean includeDatabaseInTableId,
- String databaseName) {
+ String databaseName,
+ PostgresSourceConfig postgresSourceConfig,
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
super(new PostgresSchemaDataTypeInference(), changelogMode);
this.readableMetadataList = readableMetadataList;
this.includeDatabaseInTableId = includeDatabaseInTableId;
this.databaseName = databaseName;
+ this.postgresSourceConfig = postgresSourceConfig;
+ this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
}
@Override
protected List<SchemaChangeEvent>
deserializeSchemaChangeRecord(SourceRecord record) {
return Collections.emptyList();
}
+ @Override
+ public List<? extends Event> deserialize(SourceRecord record) throws
Exception {
+ List<Event> result = new ArrayList<>();
+ if (postgresSourceConfig.isIncludeSchemaChanges()) {
+ handleSchemaChange(record, result);
Review Comment:
the record is not schema change , we have isSchemaChangeRecord(record) in
the bellow. I think method named `inferSchemaChangeByRecord` will be better
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
+ private final PostgresSourceConfig postgresSourceConfig;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
Review Comment:
### problem
The beforeTableColumnsOidMaps is only populated during the YAML compilation
phase. When the job is restarted without recompilation—such as in the case of a
failover or other non-recompile-based restarts—any modifications to this map
within PostgresEventDeserializer will not take effect.
As a result, two potential issues may arise:
* If the job has significant read lag, and it is later recompiled and
restarted, the beforeTableColumnsOidMaps could be newer than the actual data
being processed.
* If the job is simply restarted without recompilation,
PostgresPipelineSource recovers from its serialized state and retrieves an
outdated initial value that lags significantly behind the current log position.
### suggestion
However, I question whether comparing OIDs is strictly necessary to
determine if a table has been renamed or dropped and recreated. In the default
lenient mode:
* Column Rename: In downstream systems, a renamed column is typically
handled by omitting the old column and introducing a new one. This is
functionally similar to removing the column (which is ignored) and then adding
a new one with the same name.
* Drop and Recreate Same Column Name: In lenient mode, this scenario behaves
identically to no change at all.
Therefore, I recommend not using beforeTableColumnsOidMaps for now,
especially since the evolve mode is still immature and large-scale data
scenarios generally avoid dropping or modifying downstream data structures.
This would significantly simplify the current design.
@leonardBang , WDYT?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java:
##########
@@ -50,6 +54,7 @@ public class PostgresDataSource implements DataSource {
private final PostgresSourceConfig postgresSourceConfig;
private final List<PostgreSQLReadableMetadata> readableMetadataList;
+ Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps = new
HashMap<>();
Review Comment:
typo: private final
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]