[ 
https://issues.apache.org/jira/browse/FLINK-39613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ran Tao updated FLINK-39613:
----------------------------
    Description: 
*Description*

In a Flink CDC YAML pipeline, the transform rule has three closely related 
string fields:
 * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5 
), which strips the surrounding MySQL-style backticks and yields the unquoted 
identifier.
 * primary-keys / partition-keys — parsed in SchemaMetadataTransform by a plain 
String#split(",") + String#trim() , which keeps the backticks verbatim as part 
of the column name.

Because of the asymmetry, any column name that has to be wrapped in MySQL-style 
backticks (upstream-native column or projection-derived column — it does not 
matter which) cannot be referenced from primary-keys / partition-keys : the 
stored literal `order-id` is matched byte-for-byte against the schema column 
order-id , and the match fails.

This is specifically a defect of primary-keys / partition-keys parsing; 
projection and filter on the same rule already accept backticks correctly 
through Calcite.

*How to reproduce*
To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2 
constraint — a plain (unquoted) YAML scalar may not start with the ` indicator, 
so:
 * uses a column name that contains a special character ( - ), so SQL quoting 
is unambiguously required;
 * puts a plain identifier first in each list, so the YAML scalar itself does 
not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." ) 
is needed.

*Upstream DDL (MySQL)*
{code:java}
CREATE TABLE mydb.web_order (
  id            BIGINT NOT NULL,
  `order-id`    VARCHAR(64) NOT NULL,
  region        VARCHAR(32) NOT NULL,
  payload       VARCHAR(128),
  PRIMARY KEY (id, `order-id`)
); {code}
 

The column name order-id is a perfectly legal MySQL identifier but contains a - 
, so MySQL requires it to be quoted with backticks in any SQL statement. Flink 
CDC carries the raw column name order-id (without backticks) through its 
internal Schema .

*Pipeline YAML*
{code:java}
source:
  type: mysql
  # ...
sink:
  type: paimon            # any sink reproduces
  # ...
transform:
  - source-table: mydb.web_order
    projection: \*
    primary-keys: id, `order-id`
    partition-keys: region, `order-id` 
{code}
 * Expected: the pipeline starts and uses (id, order-id) as primary keys, 
(region, order-id) as partition keys.
 * Actual: the pipeline aborts at startup because the literal string `order-id` 
is not a column in the (already unquoted) schema [id, order-id, region, 
payload] . Only id and region match; the backtick-quoted entries are treated as 
unknown columns.

 

  was:
*Description*

In a Flink CDC YAML pipeline, the transform rule has three closely related 
string fields:
 * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5 
), which strips the surrounding MySQL-style backticks and yields the unquoted 
identifier.
 * primary-keys / partition-keys — parsed in SchemaMetadataTransform by a plain 
String#split(",") + String#trim() , which keeps the backticks verbatim as part 
of the column name.

Because of the asymmetry, any column name that has to be wrapped in MySQL-style 
backticks (upstream-native column or projection-derived column — it does not 
matter which) cannot be referenced from primary-keys / partition-keys : the 
stored literal `order-id` is matched byte-for-byte against the schema column 
order-id , and the match fails.

This is specifically a defect of primary-keys / partition-keys parsing; 
projection and filter on the same rule already accept backticks correctly 
through Calcite.

*How to reproduce*
To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2 
constraint — a plain (unquoted) YAML scalar may not start with the ` indicator, 
so:
 * uses a column name that contains a special character ( - ), so SQL quoting 
is unambiguously required;
 * puts a plain identifier first in each list, so the YAML scalar itself does 
not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." ) 
is needed.


*Upstream DDL (MySQL)*

{code:java}
CREATE TABLE mydb.web_order (
  id            BIGINT NOT NULL,
  `order-id`    VARCHAR(64) NOT NULL,
  region        VARCHAR(32) NOT NULL,
  payload       VARCHAR(128),
  PRIMARY KEY (id, `order-id`)
); {code}
 

The column name order-id is a perfectly legal MySQL identifier but contains a - 
, so MySQL requires it to be quoted with backticks in any SQL statement. Flink 
CDC carries the raw column name order-id (without backticks) through its 
internal Schema .

*Pipeline YAML*

{code:java}
source:
  type: mysql
  # ...
sink:
  type: paimon            # any sink reproduces
  # ...
transform:
  - source-table: mydb.web_order
    # No projection needed to reproduce — the column comes straight from the 
upstream schema.
    # The list starts with a plain identifier, so the YAML scalar does not 
begin with '`'.
    primary-keys: id, `order-id`
    partition-keys: region, `order-id` 
{code}
 * Expected: the pipeline starts and uses (id, order-id) as primary keys, 
(region, order-id) as partition keys.
 * Actual: the pipeline aborts at startup because the literal string `order-id` 
is not a column in the (already unquoted) schema [id, order-id, region, 
payload] . Only id and region match; the backtick-quoted entries are treated as 
unknown columns.

 


> Flink cdc pipeline primary-keys / partition-keys keep backticks verbatim and 
> fail to match column names that require SQL quoting
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39613
>                 URL: https://issues.apache.org/jira/browse/FLINK-39613
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.4.0, cdc-3.5.0
>            Reporter: Ran Tao
>            Priority: Major
>
> *Description*
> In a Flink CDC YAML pipeline, the transform rule has three closely related 
> string fields:
>  * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5 
> ), which strips the surrounding MySQL-style backticks and yields the unquoted 
> identifier.
>  * primary-keys / partition-keys — parsed in SchemaMetadataTransform by a 
> plain String#split(",") + String#trim() , which keeps the backticks verbatim 
> as part of the column name.
> Because of the asymmetry, any column name that has to be wrapped in 
> MySQL-style backticks (upstream-native column or projection-derived column — 
> it does not matter which) cannot be referenced from primary-keys / 
> partition-keys : the stored literal `order-id` is matched byte-for-byte 
> against the schema column order-id , and the match fails.
> This is specifically a defect of primary-keys / partition-keys parsing; 
> projection and filter on the same rule already accept backticks correctly 
> through Calcite.
> *How to reproduce*
> To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2 
> constraint — a plain (unquoted) YAML scalar may not start with the ` 
> indicator, so:
>  * uses a column name that contains a special character ( - ), so SQL quoting 
> is unambiguously required;
>  * puts a plain identifier first in each list, so the YAML scalar itself does 
> not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." 
> ) is needed.
> *Upstream DDL (MySQL)*
> {code:java}
> CREATE TABLE mydb.web_order (
>   id            BIGINT NOT NULL,
>   `order-id`    VARCHAR(64) NOT NULL,
>   region        VARCHAR(32) NOT NULL,
>   payload       VARCHAR(128),
>   PRIMARY KEY (id, `order-id`)
> ); {code}
>  
> The column name order-id is a perfectly legal MySQL identifier but contains a 
> - , so MySQL requires it to be quoted with backticks in any SQL statement. 
> Flink CDC carries the raw column name order-id (without backticks) through 
> its internal Schema .
> *Pipeline YAML*
> {code:java}
> source:
>   type: mysql
>   # ...
> sink:
>   type: paimon            # any sink reproduces
>   # ...
> transform:
>   - source-table: mydb.web_order
>     projection: \*
>     primary-keys: id, `order-id`
>     partition-keys: region, `order-id` 
> {code}
>  * Expected: the pipeline starts and uses (id, order-id) as primary keys, 
> (region, order-id) as partition keys.
>  * Actual: the pipeline aborts at startup because the literal string 
> `order-id` is not a column in the (already unquoted) schema [id, order-id, 
> region, payload] . Only id and region match; the backtick-quoted entries are 
> treated as unknown columns.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to