[
https://issues.apache.org/jira/browse/FLINK-39613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao updated FLINK-39613:
----------------------------
Description:
*Description*
In a Flink CDC YAML pipeline, the transform rule has three closely related
string fields:
* projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
), which strips the surrounding MySQL-style backticks and yields the unquoted
identifier.
* primary-keys / partition-keys — parsed in SchemaMetadataTransform by a plain
String#split(",") + String#trim() , which keeps the backticks verbatim as part
of the column name.
Because of the asymmetry, any column name that has to be wrapped in MySQL-style
backticks (upstream-native column or projection-derived column — it does not
matter which) cannot be referenced from primary-keys / partition-keys : the
stored literal `order-id` is matched byte-for-byte against the schema column
order-id , and the match fails.
This is specifically a defect of primary-keys / partition-keys parsing;
projection and filter on the same rule already accept backticks correctly
through Calcite.
*How to reproduce*
To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2
constraint — a plain (unquoted) YAML scalar may not start with the ` indicator,
so:
* uses a column name that contains a special character ( - ), so SQL quoting
is unambiguously required;
* puts a plain identifier first in each list, so the YAML scalar itself does
not start with ` , and no YAML-side workaround (flow sequence, '...' , "..." )
is needed. Upstream DDL (MySQL)
{code:java}
CREATE TABLE mydb.web_order (
id BIGINT NOT NULL,
`order-id` VARCHAR(64) NOT NULL,
region VARCHAR(32) NOT NULL,
payload VARCHAR(128),
PRIMARY KEY (id, `order-id`)
); {code}
The column name order-id is a perfectly legal MySQL identifier but contains a -
, so MySQL requires it to be quoted with backticks in any SQL statement. Flink
CDC carries the raw column name order-id (without backticks) through its
internal Schema .
Pipeline YAML
{code:java}
source:
type: mysql
# ...
sink:
type: paimon # any sink reproduces
# ...
transform:
- source-table: mydb.web_order
# No projection needed to reproduce — the column comes straight from the
upstream schema.
# The list starts with a plain identifier, so the YAML scalar does not
begin with '`'.
primary-keys: id, `order-id`
partition-keys: region, `order-id`
{code}
* Expected: the pipeline starts and uses (id, order-id) as primary keys,
(region, order-id) as partition keys.
* Actual: the pipeline aborts at startup because the literal string `order-id`
is not a column in the (already unquoted) schema [id, order-id, region,
payload] . Only id and region match; the backtick-quoted entries are treated as
unknown columns.
was:
In a Flink CDC YAML pipeline, a transform rule accepts three closely related
fields:
* projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
). Calcite strips the MySQL-style backticks around identifiers, so the
parsed/propagated column name is the unquoted name.
* primary-keys / partition-keys — currently parsed in SchemaMetadataTransform
by a plain String#split(",") + String#trim() . The backticks are kept verbatim
as part of the column name.
Because the two fields use different parsing strategies, any column name that
has to be wrapped in MySQL-style backticks cannot be referenced from
primary-keys / partition-keys . This is not specific to projection aliases —
the same failure occurs when:
1. The column is a plain upstream column carried through the pipeline (no
projection is defined, or projection: * is used), and the upstream column name
itself is a SQL reserved word ( `time` , `user` , `order` , `key` …) or
contains special characters (spaces, - , non-ASCII, …).
2. The column is produced by a projection alias that has to be backtick-quoted
(because the alias is a reserved word or contains special characters).
In both cases the user must write the backticks on the primary-keys /
partition-keys side to express the quoted identifier semantically, but the
current implementation stores the literal string `time` (with backticks) and
then tries to match it against the downstream schema column time (without
backticks). The match fails and the pipeline aborts at startup/runtime with a
column-not-found / schema-mismatch error.
The issue is surprising to users because:
- The YAML is syntactically valid and passes the YAML/Jackson layer.
- Calcite-based parts of the same rule (projection, filter) already understand
and unquote backticks.
- The failure only surfaces at runtime, and the error message does not hint at
the quoting mismatch.
*How to reproduce*
*Case A* — upstream column whose name is a SQL reserved word.
Assume an upstream MySQL table whose DDL legitimately uses a reserved word as a
column name:
db:
{code:java}
CREATE TABLE mydb.web_order (
id BIGINT PRIMARY KEY,
`time` TIMESTAMP,
payload VARCHAR(128)
); {code}
pipeline:
{code:java}
source:
type: mysql
# ...
sink:
type: paimon # or values / doris / starrocks — sink is irrelevant
# ...
transform:
- source-table: mydb.web_order
projection: \*
primary-keys: id, `time`
partition-keys: `time` {code}
Expected: the pipeline starts and uses id + time as primary key.Actual: startup
fails because the literal string `time` is not a column in the
upstream/projected schema (which only contains time ).
*Case B* — projection alias that has to be backtick-quoted
{code:java}
transform:
- source-table: mydb.web_order
projection: \*, DATE_FORMAT(create_at, 'yyyyMMdd') AS `time`
primary-keys: id, `time`
partition-keys: `time` {code}
Same root cause, same failure mode.
Both cases reproduce for any identifier that requires quoting: reserved words (
time , user , order , key , group , select , …), identifiers containing spaces,
- , . , or non-ASCII characters, and identifiers that collide with Calcite
keywords.
> Flink cdc pipeline primary-keys / partition-keys keep backticks verbatim and
> fail to match column names that require SQL quoting
> --------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39613
> URL: https://issues.apache.org/jira/browse/FLINK-39613
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.4.0, cdc-3.5.0
> Reporter: Ran Tao
> Priority: Major
>
> *Description*
> In a Flink CDC YAML pipeline, the transform rule has three closely related
> string fields:
> * projection — parsed by Calcite with the MySQL dialect ( Lex.JAVA + MYSQL_5
> ), which strips the surrounding MySQL-style backticks and yields the unquoted
> identifier.
> * primary-keys / partition-keys — parsed in SchemaMetadataTransform by a
> plain String#split(",") + String#trim() , which keeps the backticks verbatim
> as part of the column name.
> Because of the asymmetry, any column name that has to be wrapped in
> MySQL-style backticks (upstream-native column or projection-derived column —
> it does not matter which) cannot be referenced from primary-keys /
> partition-keys : the stored literal `order-id` is matched byte-for-byte
> against the schema column order-id , and the match fails.
> This is specifically a defect of primary-keys / partition-keys parsing;
> projection and filter on the same rule already accept backticks correctly
> through Calcite.
> *How to reproduce*
> To keep the repro focused on the CDC bug and avoid a well-known YAML 1.2
> constraint — a plain (unquoted) YAML scalar may not start with the `
> indicator, so:
> * uses a column name that contains a special character ( - ), so SQL quoting
> is unambiguously required;
> * puts a plain identifier first in each list, so the YAML scalar itself does
> not start with ` , and no YAML-side workaround (flow sequence, '...' , "..."
> ) is needed. Upstream DDL (MySQL)
> {code:java}
> CREATE TABLE mydb.web_order (
> id BIGINT NOT NULL,
> `order-id` VARCHAR(64) NOT NULL,
> region VARCHAR(32) NOT NULL,
> payload VARCHAR(128),
> PRIMARY KEY (id, `order-id`)
> ); {code}
>
> The column name order-id is a perfectly legal MySQL identifier but contains a
> - , so MySQL requires it to be quoted with backticks in any SQL statement.
> Flink CDC carries the raw column name order-id (without backticks) through
> its internal Schema .
> Pipeline YAML
> {code:java}
> source:
> type: mysql
> # ...
> sink:
> type: paimon # any sink reproduces
> # ...
> transform:
> - source-table: mydb.web_order
> # No projection needed to reproduce — the column comes straight from the
> upstream schema.
> # The list starts with a plain identifier, so the YAML scalar does not
> begin with '`'.
> primary-keys: id, `order-id`
> partition-keys: region, `order-id`
> {code}
> * Expected: the pipeline starts and uses (id, order-id) as primary keys,
> (region, order-id) as partition keys.
> * Actual: the pipeline aborts at startup because the literal string
> `order-id` is not a column in the (already unquoted) schema [id, order-id,
> region, payload] . Only id and region match; the backtick-quoted entries are
> treated as unknown columns.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)