loserwang1024 commented on code in PR #4233:
URL: https://github.com/apache/flink-cdc/pull/4233#discussion_r2909675532


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
 import io.debezium.data.geometry.Geography;
 import io.debezium.data.geometry.Geometry;
 import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.locationtech.jts.geom.Coordinate;
 import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /** Event deserializer for {@link PostgresDataSource}. */
 @Internal
 public class PostgresEventDeserializer extends 
DebeziumEventDeserializationSchema {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgresEventDeserializer.class);
     private static final long serialVersionUID = 1L;
     private List<PostgreSQLReadableMetadata> readableMetadataList;
     private final boolean includeDatabaseInTableId;
     private final String databaseName;
+    private Map<TableId, Schema> schemaMap = new HashMap<>();

Review Comment:
   ---
   The `schemaMap` can be restored from the `StreamSplit`, specifically from 
its `tableSchemas` field upon job restart. To ensure correctness, the 
`tableSchemas` in the `StreamSplit` should be updated during each checkpoint to 
reflect any changes in the schema.
   
   Otherwise, after a job restart, the `schemaMap` will be empty, and the 
previous schema will be `null`. In your current design, it seems that even if 
the schema of newly arriving data changes, no schema change event will be 
generated.
   
   You can address this by passing the schema information through  
`org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema#applyChangeEvent`.
   
   ---
   To be honest, I believe that 
`DebeziumEventDeserializationSchema#createTableEventCache` is not necessary for 
PostgreSQL CDC. We can achieve the same — and more powerful — functionality 
using the `schemaMap`.
   
   ### ✅ My Proposed Design for CreateTableEvent
   
   1. **Replace `PostgresPipelineRecordEmitter#createTableEventCache` with 
`schemaMap`**:  
      Use `schemaMap` as the source of truth for table schemas instead of 
maintaining a separate cache.
   
   2. **Remove the logic in `PostgresPipelineRecordEmitter` that generates 
`CreateTableEvent` from the table ID**:
      ```java
      // In rare cases, we may miss some CreateTableEvents before 
DataChangeEvents.
      // Don't send CreateTableEvent for SchemaChangeEvents as it's the latest 
schema.
      if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
          CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig, 
tableId);
          output.collect((T) createTableEvent);
          createTableEventCache.put(tableId, createTableEvent);
      }
      ```
   
   3. **Move schema change handling to 
`PostgresEventDeserializer#handleSchemaChange`**:  
      If the previous schema is `null`, emit a `CreateTableEvent`. This ensures 
that all schema change events are centralized within the deserializer rather 
than being scattered across different components.
   
   ---
   
   ### ✅ Summary
   
   By centralizing schema change handling in `PostgresEventDeserializer` and 
leveraging `schemaMap` via `StreamSplit`, we can simplify the architecture, 
reduce redundancy, and improve the robustness of schema tracking in PostgreSQL 
CDC scenarios.
   
   Let me know if you'd like help writing this as a pull request description, 
or need further clarification on any part!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to