Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!
抱歉... 题目没有看仔细,才发现你说的是 es sink,那和我上面说的 bug 不是一个问题。 不过从理论分析,不应该出现这个现象。 我在本地1.11分支上,用你给的数据和 sql,也没有复现你说的问题。 是不是 sql 给的不对?我看你 test_status 表的定义在 pk 之前少了一个逗号.. Best, Jark On Sat, 14 Nov 2020 at 17:48, Jark Wu wrote: > 看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423 > 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。 > 这个bug 会在即将发布的 1.11.3 中修复。 > > Best, > Jark > > > > > On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote: > >> 源表test: >> CREATE TABLE test ( >> `id` INT, >> `name` VARCHAR(255), >> `time` TIMESTAMP(3), >> `status` INT, >> PRIMARY KEY(id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'mysql-cdc', >> 'hostname' = 'localhost', >> 'port' = '3306', >> 'username' = 'root', >> 'password' = '1', >> 'database-name' = 'ai_audio_lyric_task', >> 'table-name' = 'test' >> ) >> 源表status >> CREATE TABLE status ( >> `id` INT, >> `name` VARCHAR(255), >> PRIMARY KEY(id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'mysql-cdc', >> 'hostname' = 'localhost', >> 'port' = '3306', >> 'username' = 'root', >> 'password' = '1', >> 'database-name' = 'ai_audio_lyric_task', >> 'table-name' = 'status' >> ); >> >> 输出表 >> CREATE TABLE test_status ( >> `id` INT, >> `name` VARCHAR(255), >> `time` TIMESTAMP(3), >> `status` INT, >> `status_name` VARCHAR(255) >> PRIMARY KEY(id) NOT ENFORCED >> ) WITH ( >> 'connector' = 'elasticsearch-7', >> 'hosts' = 'xxx', >> 'index' = 'xxx', >> 'username' = 'xxx', >> 'password' = 'xxx', >> 'sink.bulk-flush.backoff.max-retries' = '10', >> 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', >> 'sink.bulk-flush.max-actions' = '5000', >> 'sink.bulk-flush.max-size' = '10mb', >> 'sink.bulk-flush.interval' = '1s' >> ); >> >> >> 输出语句: >> INSERT into test_status >> SELECT t.*, s.name >> FROM test AS t >> LEFT JOIN status AS s ON t.status = s.id; >> >> mysql表中已经有数据 >> test: >> 0, name0, 2020-07-06 00:00:00 , 0 >> 1, name1, 2020-07-06 00:00:00 , 1 >> 2, name2, 2020-07-06 00:00:00 , 1 >> . >> >> status >> 0, status0 >> 1, status1 >> 2, status2 >> . >> >> 操作顺序与复现: >> 1、启动任务,设置并行度为40, >> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink >> savepoint保存,然后web ui上取消任务。 >> ==> test_status中的数据正常: >> 0, name0, 2020-07-06 00:00:00 , 0, status0 >> 1, name1, 2020-07-06 00:00:00 , 1, status1 >> 2, name2, 2020-07-06 00:00:00 , 1, status1 >> >> 2、操作mysql, 将status中id=1数据变更为 status1_modify >> >> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。 >> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 1 >> job 下, >> ==> test_status中的数据正常: >> 0, name0, 2020-07-06 00:00:00 , 0, status0 >> 1, name1, 2020-07-06 00:00:00 , 1, status1_modify >> 2, name2, 2020-07-06 00:00:00 , 1, status1_modify >> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 40 >> job 下 >> ==> test_status中的数据不正常, id = 1,2的两条数据缺失: >> 0, name0, 2020-07-06 00:00:00 , 0, status0 >> >> >> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!! >> >> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题? >> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1?? >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> >
Re: flink 1.11.2 如何配置时区
1. 现在 proctime() 在设计上确实有问题,目前返回类型是 timestamp, 而不是 timestamp with local time zone, 所以不会考虑 session time zone,转成 string 会用 utc 时区。这个问题会在 FLINK-20162 [1] 中修复。 2. 可以看下这个文档[2]. Best, Jark [1]: https://issues.apache.org/jira/browse/FLINK-20162 [2]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-timestamp-format-standard On Sat, 14 Nov 2020 at 17:21, Asahi Lee <978466...@qq.com> wrote: > 我使用的是format=json的格式,而flink使用的是jackson,那如何设置jackson的日期格式化的时区配置呢? > > > > > --原始邮件-- > 发件人: > "Asahi Lee" > < > 978466...@qq.com; > 发送时间:2020年11月14日(星期六) 下午4:58 > 收件人:"user-zh" > 主题:flink 1.11.2 如何配置时区 > > > > 你好! 我使用的是flink sql > 1.11.2版本,通过proctime()在源上添加处理时间,发现生成的时间为UTC时间,而我需要的是+08的时间;而我通过设计env.java.opts参数设计jvm的时区参数也没有解决,请问我如何配置才可以拿到+08的时间? > 我的程序的数据是json格式输出
Re: flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题
重复的问题。我将刚刚的回答也贴在这里。 如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发: 1. 保证所有 partition 都有数据。 2. 且每个 partition 数据的 event time 都在前进 3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s = 11s 以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。 Best, Jark On Sat, 14 Nov 2020 at 16:35, 李世钰 wrote: > 您好,请教您一个问题 > flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 > create table kafka_table ( > `log_id` string, > event_date timestamp(3), > process_time as PROCTIME(), > ts as event_date, > watermark for ts as ts - interval '1' second > ) with ( > 'connector' = 'kafka', > 'topic' = 'kafka_table', > 'properties.bootstrap.servers' = '10.2.12.3:9092', > 'properties.group.id' = 'tmp-log-consumer003', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > 执行的sql是 > select TUMBLE_START(kafka_table.event_date, INTERVAL '10' > SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' > SECOND),src_ip,count(dest_ip) from kafka_table group by > TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip > > > > > select log_id,process_time,ts from kafka_table查询的表结构如下 > 表结构为 > root > |-- log_id: STRING > |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME* > |-- ts: TIMESTAMP(3) *ROWTIME* > > > 输入数据为 > log_id,process_time,ts > 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 > 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 > 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 > 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 > 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 > 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806
Re: flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口
如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发: 1. 保证所有 partition 都有数据。 2. 且每个 partition 数据的 event time 都在前进 3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s = 11s 以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。 Best, Jark On Sat, 14 Nov 2020 at 15:11, 李世钰 wrote: > flink版本 flink1.11 > > > flink sql连接kafka > create table kafka_table ( > log_id string, > event_time bigint, > process_time as PROCTIME(), > ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)), > watermark for ts as ts - interval '1' second > ) with ( > 'connector' = 'kafka', > 'topic' = 'kafka_table', > 'properties.bootstrap.servers' = '10.2.12.3:9092', > 'properties.group.id' = 'tmp-log-consumer003', > 'format' = 'json', > 'scan.startup.mode' = 'latest-offset' > ) > > > > > > 使用窗口聚合的代码 > val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL > '10' SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10' > SECOND, INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group > by HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5' > SECOND),kafka_table.src_ip") > > > 相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发, > 系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的 > 求问是什么原因不能触发窗口或者我的用法有什么问题吗
Re: Flink cdc 多表关联处理延迟很大
能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? 需要明确下,到底是什么节点慢了。 On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote: > 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 > 有没有比较好的优化方案能缓解这样的问题?
Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!
看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。 这个bug 会在即将发布的 1.11.3 中修复。 Best, Jark On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote: > 源表test: > CREATE TABLE test ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '1', > 'database-name' = 'ai_audio_lyric_task', > 'table-name' = 'test' > ) > 源表status > CREATE TABLE status ( > `id` INT, > `name` VARCHAR(255), > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = '1', > 'database-name' = 'ai_audio_lyric_task', > 'table-name' = 'status' > ); > > 输出表 > CREATE TABLE test_status ( > `id` INT, > `name` VARCHAR(255), > `time` TIMESTAMP(3), > `status` INT, > `status_name` VARCHAR(255) > PRIMARY KEY(id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = 'xxx', > 'index' = 'xxx', > 'username' = 'xxx', > 'password' = 'xxx', > 'sink.bulk-flush.backoff.max-retries' = '10', > 'sink.bulk-flush.backoff.strategy' = 'CONSTANT', > 'sink.bulk-flush.max-actions' = '5000', > 'sink.bulk-flush.max-size' = '10mb', > 'sink.bulk-flush.interval' = '1s' > ); > > > 输出语句: > INSERT into test_status > SELECT t.*, s.name > FROM test AS t > LEFT JOIN status AS s ON t.status = s.id; > > mysql表中已经有数据 > test: > 0, name0, 2020-07-06 00:00:00 , 0 > 1, name1, 2020-07-06 00:00:00 , 1 > 2, name2, 2020-07-06 00:00:00 , 1 > . > > status > 0, status0 > 1, status1 > 2, status2 > . > > 操作顺序与复现: > 1、启动任务,设置并行度为40, > 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink > savepoint保存,然后web ui上取消任务。 > ==> test_status中的数据正常: > 0, name0, 2020-07-06 00:00:00 , 0, status0 > 1, name1, 2020-07-06 00:00:00 , 1, status1 > 2, name2, 2020-07-06 00:00:00 , 1, status1 > > 2、操作mysql, 将status中id=1数据变更为 status1_modify > > 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。 > /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 1 > job 下, > ==> test_status中的数据正常: > 0, name0, 2020-07-06 00:00:00 , 0, status0 > 1, name1, 2020-07-06 00:00:00 , 1, status1_modify > 2, name2, 2020-07-06 00:00:00 , 1, status1_modify > /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink -s savepoint -p 40 > job 下 > ==> test_status中的数据不正常, id = 1,2的两条数据缺失: > 0, name0, 2020-07-06 00:00:00 , 0, status0 > > > 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!! > > 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题? > 如果是,能不能在sink的时候,只把sink这里的并行度设置为1?? > > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
??????flink 1.11.2 ????????????
??format=json??flinkjacksonjackson?? ---- ??: "Asahi Lee" <978466...@qq.com; :2020??11??14??(??) 4:58 ??:"user-zh"
flink 1.11.2 ????????????
?? ??flink sql 1.11.2??proctime()UTC??+08env.java.optsjvm??+08 json
flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题
您好,请教您一个问题 flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 create table kafka_table ( `log_id` string, event_date timestamp(3), process_time as PROCTIME(), ts as event_date, watermark for ts as ts - interval '1' second ) with ( 'connector' = 'kafka', 'topic' = 'kafka_table', 'properties.bootstrap.servers' = '10.2.12.3:9092', 'properties.group.id' = 'tmp-log-consumer003', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ) 执行的sql是 select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip select log_id,process_time,ts from kafka_table查询的表结构如下 表结构为 root |-- log_id: STRING |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME* |-- ts: TIMESTAMP(3) *ROWTIME* 输入数据为 log_id,process_time,ts 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806