看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
这个bug 会在即将发布的 1.11.3 中修复。

Best,
Jark




On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote:

> 源表test:
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test'
> )
> 源表status
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'status'
> );
>
> 输出表
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'xxx',
>   'index' = 'xxx',
>   'username' = 'xxx',
>   'password' = 'xxx',
>   'sink.bulk-flush.backoff.max-retries' = '100000',
>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>   'sink.bulk-flush.max-actions' = '5000',
>   'sink.bulk-flush.max-size' = '10mb',
>   'sink.bulk-flush.interval' = '1s'
> );
>
>
> 输出语句:
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
>
> mysql表中已经有数据
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .....
>
> status
> 0, status0
> 1, status1
> 2, status2
> .....
>
> 操作顺序与复现:
> 1、启动任务,设置并行度为40,
> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
> savepoint保存,然后web ui上取消任务。
>   ==> test_status中的数据正常:
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>     1, name1, 2020-07-06 00:00:00 , 1, status1
>     2, name2, 2020-07-06 00:00:00 , 1, status1
>
> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>
> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
> job  下,
>   ==> test_status中的数据正常:
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>     1, name1, 2020-07-06 00:00:00 , 1, status1_modify
>     2, name2, 2020-07-06 00:00:00 , 1, status1_modify
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
> job  下
>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
>     0, name0, 2020-07-06 00:00:00 , 0, status0
>
>
> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>
> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复