看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。 这个bug 会在即将发布的 1.11.3 中修复。
Best, Jark On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote: > 源表test: > CREATE TABLE test ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '1', > 'database-name' = 'ai_audio_lyric_task', > 'table-name' = 'test' > ) > 源表status > CREATE TABLE status ( > `id` INT, > `name` VARCHAR(255), > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '1', > 'database-name' = 'ai_audio_lyric_task', > 'table-name' = 'status' > ); > > 输出表 > CREATE TABLE test_status ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > `status_name` VARCHAR(255) > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = 'xxx', > 'index' = 'xxx', > 'username' = 'xxx', > 'password' = 'xxx', > 'sink.bulk-flush.backoff.max-retries' = '100000', > 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', > 'sink.bulk-flush.max-actions' = '5000', > 'sink.bulk-flush.max-size' = '10mb', > 'sink.bulk-flush.interval' = '1s' > ); > > > 输出语句: > INSERT into test_status > SELECT t.*, s.name > FROM test AS t > LEFT JOIN status AS s ON t.status = s.id; > > mysql表中已经有数据 > test: > 0, name0, 2020-07-06 00:00:00 , 0 > 1, name1, 2020-07-06 00:00:00 , 1 > 2, name2, 2020-07-06 00:00:00 , 1 > ..... > > status > 0, status0 > 1, status1 > 2, status2 > ..... > > 操作顺序与复现: > 1、启动任务,设置并行度为40, > 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink > savepoint保存,然后web ui上取消任务。 > ==> test_status中的数据正常: > 0, name0, 2020-07-06 00:00:00 , 0, status0 > 1, name1, 2020-07-06 00:00:00 , 1, status1 > 2, name2, 2020-07-06 00:00:00 , 1, status1 > > 2、操作mysql, 将status中id=1数据变更为 status1_modify > > 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。 > /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 1 > job 下, > ==> test_status中的数据正常: > 0, name0, 2020-07-06 00:00:00 , 0, status0 > 1, name1, 2020-07-06 00:00:00 , 1, status1_modify > 2, name2, 2020-07-06 00:00:00 , 1, status1_modify > /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 40 > job 下 > ==> test_status中的数据不正常, id = 1,2的两条数据缺失: > 0, name0, 2020-07-06 00:00:00 , 0, status0 > > > 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!! > > 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题? > 如果是,能不能在sink的时候,只把sink这里的并行度设置为1?? > > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >