lvyanquan commented on code in PR #4239:
URL: https://github.com/apache/flink-cdc/pull/4239#discussion_r2745193685
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java:
##########
@@ -696,6 +696,15 @@ public Optional<Column> readColumnForDecoder(
private Optional<ColumnEditor> doReadTableColumn(
ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter
columnFilter)
throws SQLException {
+ // FLINK-38965: Filter out columns from other tables that might be
returned due to
+ // PostgreSQL LIKE wildcard matching (underscore '_' matches any
single character).
+ // For example, when querying 'ndi_pg_user_sink_1', the LIKE pattern
may also match
+ // 'ndi_pg_userbsink_1' because '_' acts as a wildcard.
+ final String resultTableName = columnMetadata.getString(3);
Review Comment:
Since this code was adapted from Debezium, please document the specific
modifications in the class-level comment to facilitate future tracking and
maintenance.
You can refer to the code here
https://github.com/apache/flink-cdc/blob/79683cf226acd58a40c952d260c22d78abbc66c4/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java#L57
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java:
##########
@@ -696,6 +696,15 @@ public Optional<Column> readColumnForDecoder(
private Optional<ColumnEditor> doReadTableColumn(
ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter
columnFilter)
throws SQLException {
+ // FLINK-38965: Filter out columns from other tables that might be
returned due to
+ // PostgreSQL LIKE wildcard matching (underscore '_' matches any
single character).
+ // For example, when querying 'ndi_pg_user_sink_1', the LIKE pattern
may also match
+ // 'ndi_pg_userbsink_1' because '_' acts as a wildcard.
+ final String resultTableName = columnMetadata.getString(3);
+ if (!tableId.table().equals(resultTableName)) {
Review Comment:
In newer versions of Debezium, a generic fix implemented via JdbcConnection
is provided—see:
https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java#L1327.
However, I believe that approach introduces too many changes; your solution is
simpler and more straightforward.
Additionally, Debezium's fix notes that '%' is also a wildcard character.
You may also want to cover test scenarios involving this character.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]