Flink CDC Issue Import created FLINK-34772:
----------------------------------------------
Summary: [Bug] [Critical] cdc-myql-connector snapshot/binlog phase
MySQL -> MySQL temporal related field transfer WRONG with/or wrong epoch time
across timezone
Key: FLINK-34772
URL: https://issues.apache.org/jira/browse/FLINK-34772
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Flink CDC Issue Import
### Search before asking
- [X] I searched in the
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found
nothing similar.
### Flink version
1.14.3
### Flink CDC version
2.3.0
### Database and its version
mysql 8.0.23
### Minimal reproduce step
source db DDL:
```
CREATE TABLE `t_cdc_bench` (
`order_id` bigint NOT NULL,
`order_date` date DEFAULT NULL,
`order_time` datetime DEFAULT NULL,
`dt_3` datetime(3) DEFAULT CURRENT_TIMESTAMP(3),
`ts_3` timestamp(3) NULL DEFAULT CURRENT_TIMESTAMP(3),
`quantity` int DEFAULT NULL,
`product_id` int DEFAULT NULL,
`purchaser` varchar(32) COLLATE utf8mb4_bin DEFAULT NULL,
`acct` varchar(3) COLLATE utf8mb4_bin DEFAULT '911',
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
```
sink db DDL:
```
CREATE TABLE `t_cdc_bench_sink` (
`order_id` bigint NOT NULL,
`order_date` date DEFAULT NULL,
`order_time` datetime DEFAULT NULL,
`dt_3` datetime(3) DEFAULT NULL,
`ts_3` timestamp(3) NULL DEFAULT NULL,
`quantity` int DEFAULT NULL,
`product_id` int DEFAULT NULL,
`purchaser` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT
NULL,
PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
```
source DML:
```
insert into t_cdc_bench values(1001, curdate(), now(), current_timestamp(3),
current_timestamp(3), 1, 33, 'scala', 911];
```
run table cdc process:
```
public class MySqlToJdbcTableCDC {
public static void main(String[] args) throws Exception{
JSONObject json = JSONObject.parseObject("{\"logConfDirection\":
\"\\/Users\\/nirvana.xu\\/IdeaProjects\\/Prod\\/realtime-ingestion\\/flink-cdc-jobs\\/src\\/main\\/resources\\/log4j2.xml\"
}");
SourceConfigUtil.doSetupLogConfigDir(json, MySqlToJdbcTableCDC.class);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
env.setParallelism(1);
// note: 增量同步需开启 cpkt
env.enableCheckpointing(3000);
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, envSettings);
tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" +
" `order_id` INTEGER ,\n" +
" `order_date` DATE ,\n" +
" `order_time` TIMESTAMP(3),\n" +
" `dt_3` TIMESTAMP(3),\n" +
" `ts_3` TIMESTAMP(3),\n" +
" `quantity` INT ,\n" +
" `product_id` INT ,\n" +
" `purchaser` STRING,\n" +
" primary key(order_id) NOT ENFORCED" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '10.162.34.110',\n" +
" 'port' = '6606',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'database-name' = 'testdb',\n" +
" 'table-name' = 't_cdc_bench', " +
" 'server-time-zone' = 'UTC+7', " +
// 全量 + 增量同步
" 'scan.startup.mode' = 'initial' " +
" )");
tableEnvironment.executeSql("CREATE TABLE sink (\n" +
" `order_id` INTEGER ,\n" +
" `order_date` DATE ,\n" +
" `order_time` TIMESTAMP(3),\n" +
" `dt_3` TIMESTAMP(3),\n" +
" `ts_3` TIMESTAMP(3),\n" +
" `quantity` INT ,\n" +
" `product_id` INT ,\n" +
" `purchaser` STRING,\n" +
" primary key (order_id) NOT ENFORCED " +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' =
'jdbc:mysql://localhost:3306/testdb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&GMT%2b8',\n"
+
" 'username' = 'root',\n" +
" 'password' = 'root',\n" +
" 'table-name' = 't_cdc_bench_sink',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'sink.buffer-flush.interval' = '3s',\n" +
" 'sink.buffer-flush.max-rows' = '1',\n" +
" 'sink.max-retries' = '5') ");
tableEnvironment.executeSql("insert into sink select * from
demoOrders");
}
}
```
### What did you expect to see?
Presumably, after cdc change to binlog phase, we are expected to see :
**source**:
```
select * from t_cdc_bench where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 17:15:33
dt_3: 2023-02-10 17:15:33.312
ts_3: 2023-02-10 17:15:33.312
quantity: 1
product_id: 33
purchaser: scala
acct: 911
**sink**:
```
select * from t_cdc_bench_sink where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 18:15:33
dt_3: 2023-02-10 18:15:33.312
ts_3: 2023-02-10 18:15:33.312
quantity: 1
product_id: 33
purchaser: scala
_**the datetime and timestamp field correctly converted by pre-settings of
source/sink server timezone**_
### What did you see instead?
Actually, after cdc shift to binlog phase, query source/sink table result in
snapshot data:
**source**:
```
select * from t_cdc_bench where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 17:15:33
dt_3: 2023-02-10 17:15:33.312
ts_3: 2023-02-10 17:15:33.312
quantity: 1
product_id: 33
purchaser: scala
acct: 911
**sink**:
```
select * from t_cdc_bench_sink where order_id = 1001 limit 1\G
```
*************************** 1. row ***************************
order_id: 1001
order_date: 2023-02-10
order_time: 2023-02-10 17:15:33
dt_3: 2023-02-10 17:15:33.312
ts_3: 2023-02-11 00:15:33.312
quantity: 1
product_id: 33
purchaser: scala
**_Not only the LTZ is not correctly converted, the dt_3 and ts_3 field changed
at same time are contradict to each other_**
### Anything else?
At start, We find this issue from internal version of cdc 2.3-SNAPSHOT with
stream API with our Production realtime computing services we got from cdc
source (the epoch time of source MySQL binlog datetime type transferred wrong)
, and then fixed by temporal related deserializer remedy, then I was wondering
is it a fixed issue with 2.3.0 mysql-cdc tables, but with no luck, it's still a
bug
Notice when we were updating `ts_3` and `dt_3` field, the contradiction issue
is updated, but still, the local timezone conversion is not seen
So the main issues are :
1. For the historical data that are scanned only in snapshot state, the `ts_3`
and `dt_3` field are definitely WRONG if the data not updated;
2. For the source and sink end across regions, the temporal fields cannot be
correctly converted.
### Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/1906
Created by: [Capri0110|https://github.com/Capri0110]
Labels: bug,
Created at: Fri Feb 10 18:35:08 CST 2023
State: open
--
This message was sent by Atlassian Jira
(v8.20.10#820010)