loserwang1024 commented on code in PR #4233:
URL: https://github.com/apache/flink-cdc/pull/4233#discussion_r2909675532


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Event deserializer for {@link PostgresDataSource}. */
 @Internal
 public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchema {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresEventDeserializer.class);
     private static final long serialVersionUID = 1L;
     private List<PostgreSQLReadableMetadata> readableMetadataList;
     private final boolean includeDatabaseInTableId;
     private final String databaseName;
+    private Map<TableId, Schema> schemaMap = new HashMap<>();

Review Comment:
   ---
   
   ### ✅ Improved Version
   
   The `schemaMap` can be restored from the `StreamSplit`, specifically from 
its `tableSchemas` field upon job restart. To ensure correctness, the 
`tableSchemas` in the `StreamSplit` should be updated during each checkpoint to 
reflect any changes in the schema.
   
   Otherwise, after a job restart, the `schemaMap` will be empty, and the 
previous schema will be `null`. In your current design, it seems that even if 
the schema of newly arriving data changes, no schema change event will be 
generated.
   
   You can address this by passing the schema information through  
`org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema#applyChangeEvent`.
   
   ---
   To be honest, I believe that 
`DebeziumEventDeserializationSchema#createTableEventCache` is not necessary for 
PostgreSQL CDC. We can achieve the same — and more powerful — functionality 
using the `schemaMap`.
   
   ### ✅ My Proposed Design for CreateTableEvent
   
   1. **Replace `PostgresPipelineRecordEmitter#createTableEventCache` with 
`schemaMap`**:  
      Use `schemaMap` as the source of truth for table schemas instead of 
maintaining a separate cache.
   
   2. **Remove the logic in `PostgresPipelineRecordEmitter` that generates 
`CreateTableEvent` from the table ID**:
      ```java
      // In rare cases, we may miss some CreateTableEvents before 
DataChangeEvents.
      // Don't send CreateTableEvent for SchemaChangeEvents as it's the latest 
schema.
      if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
          CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, 
tableId);
          output.collect((T) createTableEvent);
          createTableEventCache.put(tableId, createTableEvent);
      }
      ```
   
   3. **Move schema change handling to 
`PostgresEventDeserializer#handleSchemaChange`**:  
      If the previous schema is `null`, emit a `CreateTableEvent`. This ensures 
that all schema change events are centralized within the deserializer rather 
than being scattered across different components.
   
   ---
   
   ### ✅ Summary
   
   By centralizing schema change handling in `PostgresEventDeserializer` and 
leveraging `schemaMap` via `StreamSplit`, we can simplify the architecture, 
reduce redundancy, and improve the robustness of schema tracking in PostgreSQL 
CDC scenarios.
   
   Let me know if you'd like help writing this as a pull request description, 
or need further clarification on any part!



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Event deserializer for {@link PostgresDataSource}. */
 @Internal
 public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchema {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresEventDeserializer.class);
     private static final long serialVersionUID = 1L;
     private List<PostgreSQLReadableMetadata> readableMetadataList;
     private final boolean includeDatabaseInTableId;
     private final String databaseName;
+    private Map<TableId, Schema> schemaMap = new HashMap<>();
+    private final PostgresSourceConfig postgresSourceConfig;
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
 
     public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
-        this(changelogMode, new ArrayList<>(), false, null);
+        this(changelogMode, new ArrayList<>(), false, null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList) {
-        this(changelogMode, readableMetadataList, false, null);
+        this(changelogMode, readableMetadataList, false, null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList,
             boolean includeDatabaseInTableId) {
-        this(changelogMode, readableMetadataList, includeDatabaseInTableId, 
null);
+        this(changelogMode, readableMetadataList, includeDatabaseInTableId, 
null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList,
             boolean includeDatabaseInTableId,
-            String databaseName) {
+            String databaseName,
+            PostgresSourceConfig postgresSourceConfig,
+            Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
         super(new PostgresSchemaDataTypeInference(), changelogMode);
         this.readableMetadataList = readableMetadataList;
         this.includeDatabaseInTableId = includeDatabaseInTableId;
         this.databaseName = databaseName;
+        this.postgresSourceConfig = postgresSourceConfig;
+        this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
     }
 
     @Override
     protected List<SchemaChangeEvent> 
deserializeSchemaChangeRecord(SourceRecord record) {
         return Collections.emptyList();
     }
 
+    @Override
+    public List<? extends Event> deserialize(SourceRecord record) throws 
Exception {
+        List<Event> result = new ArrayList<>();
+        if (postgresSourceConfig.isIncludeSchemaChanges()) {
+            handleSchemaChange(record, result);
+        }
+        if (isDataChangeRecord(record)) {
+            LOG.trace("Process data change record: {}", record);
+            result.addAll(deserializeDataChangeRecord(record));
+        } else if (isSchemaChangeRecord(record)) {
+            LOG.trace("Process schema change record: {}", record);
+        } else {
+            LOG.trace("Ignored other record: {}", record);
+            return Collections.emptyList();
+        }
+        return result;
+    }
+
+    private void handleSchemaChange(SourceRecord record, List<Event> result) {
+        TableId tableId = getTableId(record);
+        Schema valueSchema = record.valueSchema();
+        Schema beforeSchema = schemaMap.get(tableId);
+        List<String> beforeColumnNames;
+        List<String> afterColumnNames;
+        Schema afterSchema = fieldSchema(valueSchema, 
Envelope.FieldName.AFTER);
+        List<Field> afterFields = afterSchema.fields();
+        org.apache.flink.cdc.common.schema.Schema schema =

Review Comment:
   ```java
    org.apache.flink.cdc.common.schema.Schema schema =
                   PostgresSchemaUtils.getTableSchema(postgresSourceConfig, 
tableId);
   ```
   
   It's too heavy that lookup current schema from database for each record. Why 
not just just afterSchema and beforeSchema? why stiil need to lookup current 
schema?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresPipelineITCaseTest.java:
##########
@@ -157,6 +163,278 @@ public void testInitialStartupMode() throws Exception {
         assertThat(inventoryDatabase.checkSlot(slotName)).isEqualTo(slotName);
     }
 
+    @ParameterizedTest(name = "testType: {0}")
+    @ValueSource(
+            strings = {
+                "modifyType",
+                "drop,add",
+                "rename",
+                "drop,add,rename",
+                "add,rename",
+                "add2column,rename2column",
+                "add,rename,add2column,rename2column",
+                "add,rename,drop,add"
+            })
+    public void testPostgresSchemaEvolution(String testType) throws Exception {
+        inventoryDatabase.createAndInitialize();
+        PostgresSourceConfigFactory configFactory =
+                (PostgresSourceConfigFactory)
+                        new PostgresSourceConfigFactory()
+                                .hostname(POSTGRES_CONTAINER.getHost())
+                                
.port(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT))
+                                .username(TEST_USER)
+                                .password(TEST_PASSWORD)
+                                
.databaseList(inventoryDatabase.getDatabaseName())
+                                .tableList("inventory.products")
+                                .startupOptions(StartupOptions.initial())
+                                .serverTimeZone("UTC");
+        configFactory.database(inventoryDatabase.getDatabaseName());
+        configFactory.slotName(slotName);
+        configFactory.decodingPluginName("pgoutput");
+        configFactory.includeSchemaChanges(true);
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider)
+                        new 
PostgresDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                PostgresDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        TableId tableId = TableId.tableId("inventory", "products");
+        CreateTableEvent createTableEvent = 
getProductsCreateTableEvent(tableId);
+
+        // generate snapshot data
+        List<Event> expectedSnapshot = getSnapshotExpected(tableId);
+
+        // In this configuration, several subtasks might emit their 
corresponding CreateTableEvent
+        // to downstream. Since it is not possible to predict how many 
CreateTableEvents should we
+        // expect, we simply filter them out from expected sets, and assert 
there's at least one.
+        fetchResultsExcept(events, expectedSnapshot.size(), createTableEvent);
+        try (Connection conn =
+                        getJdbcConnection(POSTGRES_CONTAINER, 
inventoryDatabase.getDatabaseName());
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(
+                    "INSERT INTO inventory.products (name, description, 
weight) "
+                            + "VALUES ('scooter', 'Small 2-wheel scooter', 
3.14)");
+            List<Event> actual;
+            List<Event> expected;
+            Map<String, String> renameMap = new HashMap<>();
+            switch (testType) {
+                case "modifyType":

Review Comment:
   In your code,also includes change of `oldField.schema().defaultValue()` and 
`oldField.schema().parameters()`, please also test them.
   



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Event deserializer for {@link PostgresDataSource}. */
 @Internal
 public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchema {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresEventDeserializer.class);
     private static final long serialVersionUID = 1L;
     private List<PostgreSQLReadableMetadata> readableMetadataList;
     private final boolean includeDatabaseInTableId;
     private final String databaseName;
+    private Map<TableId, Schema> schemaMap = new HashMap<>();
+    private final PostgresSourceConfig postgresSourceConfig;
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;
 
     public PostgresEventDeserializer(DebeziumChangelogMode changelogMode) {
-        this(changelogMode, new ArrayList<>(), false, null);
+        this(changelogMode, new ArrayList<>(), false, null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList) {
-        this(changelogMode, readableMetadataList, false, null);
+        this(changelogMode, readableMetadataList, false, null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList,
             boolean includeDatabaseInTableId) {
-        this(changelogMode, readableMetadataList, includeDatabaseInTableId, 
null);
+        this(changelogMode, readableMetadataList, includeDatabaseInTableId, 
null, null, null);
     }
 
     public PostgresEventDeserializer(
             DebeziumChangelogMode changelogMode,
             List<PostgreSQLReadableMetadata> readableMetadataList,
             boolean includeDatabaseInTableId,
-            String databaseName) {
+            String databaseName,
+            PostgresSourceConfig postgresSourceConfig,
+            Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps) {
         super(new PostgresSchemaDataTypeInference(), changelogMode);
         this.readableMetadataList = readableMetadataList;
         this.includeDatabaseInTableId = includeDatabaseInTableId;
         this.databaseName = databaseName;
+        this.postgresSourceConfig = postgresSourceConfig;
+        this.beforeTableColumnsOidMaps = beforeTableColumnsOidMaps;
     }
 
     @Override
     protected List<SchemaChangeEvent> 
deserializeSchemaChangeRecord(SourceRecord record) {
         return Collections.emptyList();
     }
 
+    @Override
+    public List<? extends Event> deserialize(SourceRecord record) throws 
Exception {
+        List<Event> result = new ArrayList<>();
+        if (postgresSourceConfig.isIncludeSchemaChanges()) {
+            handleSchemaChange(record, result);

Review Comment:
   the record is not schema change , we have isSchemaChangeRecord(record) in 
the bellow. I think method named `inferSchemaChangeByRecord` will be better



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Event deserializer for {@link PostgresDataSource}. */
 @Internal
 public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchema {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresEventDeserializer.class);
     private static final long serialVersionUID = 1L;
     private List<PostgreSQLReadableMetadata> readableMetadataList;
     private final boolean includeDatabaseInTableId;
     private final String databaseName;
+    private Map<TableId, Schema> schemaMap = new HashMap<>();
+    private final PostgresSourceConfig postgresSourceConfig;
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps;

Review Comment:
   ### problem
   The beforeTableColumnsOidMaps is only populated during the YAML compilation 
phase. When the job is restarted without recompilation—such as in the case of a 
failover or other non-recompile-based restarts—any modifications to this map 
within PostgresEventDeserializer will not take effect.
   
   As a result, two potential issues may arise:
   * If the job has significant read lag, and it is later recompiled and 
restarted, the beforeTableColumnsOidMaps could be newer than the actual data 
being processed.
   * If the job is simply restarted without recompilation, 
PostgresPipelineSource recovers from its serialized state and retrieves an 
outdated initial value that lags significantly behind the current log position.
   
   ### suggestion
   However, I question whether comparing OIDs is strictly necessary to 
determine if a table has been renamed or dropped and recreated. In the default 
lenient mode:
   
   * Column Rename: In downstream systems, a renamed column is typically 
handled by omitting the old column and introducing a new one. This is 
functionally similar to removing the column (which is ignored) and then adding 
a new one with the same name.
   * Drop and Recreate Same Column Name: In lenient mode, this scenario behaves 
identically to no change at all.
   
   Therefore, I recommend not using beforeTableColumnsOidMaps for now, 
especially since the evolve mode is still immature and large-scale data 
scenarios generally avoid dropping or modifying downstream data structures. 
This would significantly simplify the current design.
   
   @leonardBang , WDYT?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDataSource.java:
##########
@@ -50,6 +54,7 @@ public class PostgresDataSource implements DataSource {
     private final PostgresSourceConfig postgresSourceConfig;
 
     private final List<PostgreSQLReadableMetadata> readableMetadataList;
+    Map<TableId, Map<String, Integer>> beforeTableColumnsOidMaps = new 
HashMap<>();

Review Comment:
   typo: private final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to