Flink CDC Issue Import created FLINK-34826:
----------------------------------------------

             Summary: flink-connector-mysql-cdc-2.4.2 when a multi-column 
primary key index starts with a varchar column, a snapshot chunk can 
potentially return a large number of rows and cause jvm to OOM.
                 Key: FLINK-34826
                 URL: https://issues.apache.org/jira/browse/FLINK-34826
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
nothing similar.


### Flink version

1.17.1

### Flink CDC version

flink-connector-mysql-cdc-2.4.2

### Database and its version

source: MySQL 8.0.19
destination: Doris 2.0.2

### Minimal reproduce step

1. create a MySQL table with multi-column primary key and its first column is 
varchar

```
CREATE DATABASE `mydb`;
CREATE TABLE `t` (
  `col1` varchar(64) NOT NULL,
  `col2` varchar(64) NOT NULL,
  `col3` datetime(6) NOT NULL,
  `col4` varchar(200) DEFAULT NULL,
  `col5` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`col1`,`col2`,`col3`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
```

2. insert a lot of rows into the table
```
use mydb;
# execute the below SQL for 5000000 times
insert into t values('aaa'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
insert into t values('aaa'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
insert into t values('aaa'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
...

# execute the below SQL for 5000000 times
insert into t values('bbb'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
insert into t values('bbb'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
insert into t values('bbb'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
...

# execute the below SQL for 5000000 times
insert into t values('ccc'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
insert into t values('ccc'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
insert into t values('ccc'), repeat('a', 64), now(6), repeat('a', 200), 
repeat('a', 500));
...
```

3. create database in doris
```
CREATE DATABASE `mydb`;
```

4. create user in MySQL and doris

In MySQL:
```
create user flink identified by 'pass';
grant all on *.* to flink;
```

In Doris:
```
create user flink identified by 'pass';
grant all on *.* to flink;
```

5. download flink-doris and flink-cdc jar and put them into the lib directory 
of flink-1.17.1

```
flink-doris-connector-1.17-1.4.0.jar
flink-sql-connector-mysql-cdc-2.4.2.jar
```

6. use flink to sync the data from MySQL to doris
```
bin/flink run -d \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-1.4.0.jar \
    mysql-sync-database \
    --database idc_manager \
    --job-name flink_sync_mysql_to_doris \
    --mysql-conf hostname=<MYSQL_IP> \
    --mysql-conf port=3306 \
    --mysql-conf username=flink \
    --mysql-conf password=pass \
    --mysql-conf database-name=mydb\
    --including-tables "t" \
    --sink-conf fenodes=<DORIS_FE_IP>:8030 \
    --sink-conf username=flink \
    --sink-conf password=pass \
    --sink-conf jdbc-url=jdbc:mysql://<DORIS_FE_IP>:9030 \
    --sink-conf sink.label-prefix=label1 \
    --table-conf replication_num=3
```

### What did you expect to see?

The table should be split into 8096 rows per chunk.

### What did you see instead?

The table was split into 500M rows per chunk and finally flink job OOM.

The split chunk SQL in MySQL will return 500M rows.
```
SELECT * FROM `mydb`.`t` WHERE `col1` <= 'bbb' AND NOT (`col1` = 'bbb'];
```

flink job OOM:
```
Caused by: java.lang.OutOfMemoryError: Java heap space
```

### Anything else?

When the below conditions are met, the above bad case will occur:

1. table with multi-column primary key;
2. the first column of the primary key is varchar;
3. there are a lot of rows with the same value of the first column;

Root cause:

The flink-mysql-cdc will use the first column to split the table into chunks 
and a chunk should contain 8096 rows approximately. But this is a ideal case. 
In a table with multi-column primary key, multiple rows can have the same first 
column value and it will make flink-cdc-mysql to read a lot of rows as a chunk 
because the begin and end of the chunk is only marked with the first column so 
it
cannot just read 8096 rows. Instead it will read the next different value of 
the first column and this may result a lot of rows in a chunk.

Suggest fix:

Use the full column names of the primary key to split the table into chunks.

I am willing to submit a PR but I am new to Java and I don't know how much time 
it will take.

### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2676
Created by: [trikker|https://github.com/trikker]
Labels: enhancement, 
Created at: Thu Nov 09 21:01:34 CST 2023
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to