Di Wu created FLINK-39844:
-----------------------------

             Summary: PostgreSQL CDC: filter table columns by schema name to 
fix LIKE-wildcard cross-schema column   leakage
                 Key: FLINK-39844
                 URL: https://issues.apache.org/jira/browse/FLINK-39844
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
            Reporter: Di Wu


*Background*

 

  During the snapshot phase, the PostgreSQL connector reads the column 
structure of captured tables

  via JDBC DatabaseMetaData#getColumns(catalog, schemaPattern, tableNamePattern,

  columnNamePattern).

 

  Per the JDBC spec, *both* *schemaPattern* *and* *tableNamePattern* *are* 
*LIKE* {*}patterns{*}:

  - _ matches any single character

  - % matches any sequence of characters

  

  So when a schema name or table name contains _ / % (both are legal identifier 
characters in

  PostgreSQL), getColumns may return columns from *other schemas / tables that 
were not meant to* 

  {*}match{*}.

 

  *Current State*

 

  An exact filter on the *table* *name* is already in place: for each column 
returned by getColumns,

  the result-set column 3 TABLE_NAME is compared for equality against the 
target TableId.table(),

  and the column is dropped on mismatch.

 

  However, the *schema* *dimension* *is* {*}missing{*}. Result-set column 2 
TABLE_SCHEM is never validated.

 

  *Problem*

 

  Suppose a database has two schemas whose names differ by only a single 
character:

 

  - sch_test (the target)

  - schxtest (when _ acts as a wildcard, the pattern sch_test also matches it)

 

  Each schema contains a table with the same name, cross_schema_tbl. When the 
user only wants to

  capture sch_test.cross_schema_tbl:

 

  1. The connector calls getColumns(catalog, "sch_test", "cross_schema_tbl", 
null).

  2. PostgreSQL treats sch_test as a LIKE pattern, so schxtest matches too, and 
columns of the

  same-named table from *both* schemas are returned together.

  3. The existing table-name filter cannot stop this — both tables have 
TABLE_NAME equal to

  cross_schema_tbl.

  4. The same-named columns are loaded into a single table schema, and the 
snapshot phase throws

  IllegalStateException: Duplicate key Optional.empty, failing the job.

 

  The % case is analogous (when a schema name contains a literal %).

  

  *Impact*

 

  - {*}Trigger condition{*}: a single PostgreSQL instance contains multiple 
schemas whose names are

  LIKE-wildcard matches of one another, and they hold same-named tables.

  - {*}Consequence{*}: the job fails outright during the snapshot phase, or 
(when column names differ)

  columns from another schema are mistakenly merged in, producing an incorrect 
schema / dirty data.

 

  *Proposed* *Change*

 

  Add an *exact comparison on the schema name* next to the existing table-name 
comparison: read

  result-set column 2 TABLE_SCHEM and compare it for equality against 
TableId.schema().

 

  final String resultSchemaName = columnMetadata.getString(2);

  final String resultTableName = columnMetadata.getString(3);

  if (!tableId.table().equals(resultTableName)

          || (tableId.schema() != null && 
!tableId.schema().equals(resultSchemaName))) {

      return Optional.empty();

  }

  

  Edge case: when TableId.schema() is null (no schema specified), the schema 
check is skipped, to

  avoid treating the non-null schema name returned by the metadata as unequal 
and erroneously

  dropping all columns.

  

  *Compatibility*

 

  This is a pure after-the-fact filter. It only tightens the decision of "which 
columns belong to

  the target table"; it does not change the query, and it does not affect 
normal schemas/tables

  (those without wildcard characters). Existing test behavior is unchanged.

  

  *Testing*

 

  Add a cross-schema case to SimilarTableNamesITCase: create two schemas 
sch_test / schxtest that

  are wildcard matches of each other, each holding a same-named table 
cross_schema_tbl, and verify

  that only the target schema's snapshot and incremental data are captured, 
with no leakage from

  the look-alike schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to