loserwang1024 commented on code in PR #4233:
URL: https://github.com/apache/flink-cdc/pull/4233#discussion_r2909675532
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresEventDeserializer.java:
##########
@@ -31,63 +42,375 @@
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.data.geometry.Point;
+import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.io.WKBReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Event deserializer for {@link PostgresDataSource}. */
@Internal
public class PostgresEventDeserializer extends
DebeziumEventDeserializationSchema {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(PostgresEventDeserializer.class);
private static final long serialVersionUID = 1L;
private List<PostgreSQLReadableMetadata> readableMetadataList;
private final boolean includeDatabaseInTableId;
private final String databaseName;
+ private Map<TableId, Schema> schemaMap = new HashMap<>();
Review Comment:
---
The `schemaMap` can be restored from the `StreamSplit`, specifically from
its `tableSchemas` field upon job restart. To ensure correctness, the
`tableSchemas` in the `StreamSplit` should be updated during each checkpoint to
reflect any changes in the schema.
Otherwise, after a job restart, the `schemaMap` will be empty, and the
previous schema will be `null`. In your current design, it seems that even if
the schema of newly arriving data changes, no schema change event will be
generated.
You can address this by passing the schema information through
`org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema#applyChangeEvent`.
---
To be honest, I believe that
`DebeziumEventDeserializationSchema#createTableEventCache` is not necessary for
PostgreSQL CDC. We can achieve the same — and more powerful — functionality
using the `schemaMap`.
### ✅ My Proposed Design for CreateTableEvent
1. **Replace `PostgresPipelineRecordEmitter#createTableEventCache` with
`schemaMap`**:
Use `schemaMap` as the source of truth for table schemas instead of
maintaining a separate cache.
2. **Remove the logic in `PostgresPipelineRecordEmitter` that generates
`CreateTableEvent` from the table ID**:
```java
// In rare cases, we may miss some CreateTableEvents before
DataChangeEvents.
// Don't send CreateTableEvent for SchemaChangeEvents as it's the latest
schema.
if (isDataChangeRecord && !createTableEventCache.containsKey(tableId)) {
CreateTableEvent createTableEvent = getCreateTableEvent(sourceConfig,
tableId);
output.collect((T) createTableEvent);
createTableEventCache.put(tableId, createTableEvent);
}
```
3. **Move schema change handling to
`PostgresEventDeserializer#handleSchemaChange`**:
If the previous schema is `null`, emit a `CreateTableEvent`. This ensures
that all schema change events are centralized within the deserializer rather
than being scattered across different components.
---
### ✅ Summary
By centralizing schema change handling in `PostgresEventDeserializer` and
leveraging `schemaMap` via `StreamSplit`, we can simplify the architecture,
reduce redundancy, and improve the robustness of schema tracking in PostgreSQL
CDC scenarios.
Let me know if you'd like help writing this as a pull request description,
or need further clarification on any part!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]