[
https://issues.apache.org/jira/browse/FLINK-39844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39844:
-----------------------------------
Labels: pull-request-available (was: )
> PostgreSQL CDC: filter table columns by schema name to fix LIKE-wildcard
> cross-schema column leakage
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39844
> URL: https://issues.apache.org/jira/browse/FLINK-39844
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.6.0
> Reporter: Di Wu
> Priority: Major
> Labels: pull-request-available
>
> *Background*
>
> During the snapshot phase, the PostgreSQL connector reads the column
> structure of captured tables
> via JDBC DatabaseMetaData#getColumns(catalog, schemaPattern,
> tableNamePattern,
> columnNamePattern).
>
> Per the JDBC spec, *both* *schemaPattern* *and* *tableNamePattern* *are*
> *LIKE* {*}patterns{*}:
> - _ matches any single character
> - % matches any sequence of characters
>
> So when a schema name or table name contains _ / % (both are legal
> identifier characters in
> PostgreSQL), getColumns may return columns from *other schemas / tables
> that were not meant to*
> {*}match{*}.
>
> *Current State*
>
> An exact filter on the *table* *name* is already in place: for each column
> returned by getColumns,
> the result-set column 3 TABLE_NAME is compared for equality against the
> target TableId.table(),
> and the column is dropped on mismatch.
>
> However, the *schema* *dimension* *is* {*}missing{*}. Result-set column 2
> TABLE_SCHEM is never validated.
>
> *Problem*
>
> Suppose a database has two schemas whose names differ by only a single
> character:
>
> - sch_test (the target)
> - schxtest (when _ acts as a wildcard, the pattern sch_test also matches it)
>
> Each schema contains a table with the same name, cross_schema_tbl. When the
> user only wants to
> capture sch_test.cross_schema_tbl:
>
> 1. The connector calls getColumns(catalog, "sch_test", "cross_schema_tbl",
> null).
> 2. PostgreSQL treats sch_test as a LIKE pattern, so schxtest matches too,
> and columns of the
> same-named table from *both* schemas are returned together.
> 3. The existing table-name filter cannot stop this — both tables have
> TABLE_NAME equal to
> cross_schema_tbl.
> 4. The same-named columns are loaded into a single table schema, and the
> snapshot phase throws
> IllegalStateException: Duplicate key Optional.empty, failing the job.
>
> The % case is analogous (when a schema name contains a literal %).
>
> *Impact*
>
> - {*}Trigger condition{*}: a single PostgreSQL instance contains multiple
> schemas whose names are
> LIKE-wildcard matches of one another, and they hold same-named tables.
> - {*}Consequence{*}: the job fails outright during the snapshot phase, or
> (when column names differ)
> columns from another schema are mistakenly merged in, producing an
> incorrect schema / dirty data.
>
> *Proposed* *Change*
>
> Add an *exact comparison on the schema name* next to the existing
> table-name comparison: read
> result-set column 2 TABLE_SCHEM and compare it for equality against
> TableId.schema().
>
> final String resultSchemaName = columnMetadata.getString(2);
> final String resultTableName = columnMetadata.getString(3);
> if (!tableId.table().equals(resultTableName)
> || (tableId.schema() != null &&
> !tableId.schema().equals(resultSchemaName))) {
> return Optional.empty();
> }
>
> Edge case: when TableId.schema() is null (no schema specified), the schema
> check is skipped, to
> avoid treating the non-null schema name returned by the metadata as unequal
> and erroneously
> dropping all columns.
>
> *Compatibility*
>
> This is a pure after-the-fact filter. It only tightens the decision of
> "which columns belong to
> the target table"; it does not change the query, and it does not affect
> normal schemas/tables
> (those without wildcard characters). Existing test behavior is unchanged.
>
> *Testing*
>
> Add a cross-schema case to SimilarTableNamesITCase: create two schemas
> sch_test / schxtest that
> are wildcard matches of each other, each holding a same-named table
> cross_schema_tbl, and verify
> that only the target schema's snapshot and incremental data are captured,
> with no leakage from
> the look-alike schema.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)