loserwang1024 commented on code in PR #3808:
URL: https://github.com/apache/flink-cdc/pull/3808#discussion_r1898288855
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/CustomPostgresSchema.java:
##########
@@ -112,7 +112,13 @@ private List<TableChange> readTableSchema(List<TableId>
tableIds) throws SQLExce
tables,
dbzConfig.databaseName(),
null,
- dbzConfig.getTableFilters().dataCollectionFilter(),
+ // only check context tableIds
+ (tb) ->
Review Comment:
>. each chunk will load a table schema
PostgresDialect has aleady cache a table schema.
> dbzConfig.getTableFilters().dataCollectionFilter(),
But you modification also do same thing. the
dbzConfig.getTableFilters().dataCollectionFilter() is in fellowing:
```java
Predicate<TableId> tablePredicate = eligibleTables
.includeTables(
config.getFallbackStringProperty(
RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST,
RelationalDatabaseConnectorConfig.TABLE_WHITELIST),
tableIdMapper)
.excludeTables(
config.getFallbackStringProperty(
RelationalDatabaseConnectorConfig.TABLE_EXCLUDE_LIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST),
tableIdMapper)
.build();
```
Maybe what you need to do is:
In JM, use getTableSchema(List<TableId> tableIds) to load all the schema of
table only once(also can loaded when need).
In TM, use TableChange getTableSchema(TableId tableId) to load only one
schema when use.
After use, clean the table schema of old table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]