Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-14 文章 Jark Wu
抱歉... 题目没有看仔细,才发现你说的是 es sink,那和我上面说的 bug 不是一个问题。

不过从理论分析,不应该出现这个现象。
我在本地1.11分支上,用你给的数据和 sql,也没有复现你说的问题。
是不是 sql 给的不对?我看你 test_status 表的定义在 pk 之前少了一个逗号..

Best,
Jark

On Sat, 14 Nov 2020 at 17:48, Jark Wu  wrote:

> 看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
> 这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
> 这个bug 会在即将发布的 1.11.3 中修复。
>
> Best,
> Jark
>
>
>
>
> On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote:
>
>> 源表test:
>> CREATE TABLE test (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'test'
>> )
>> 源表status
>> CREATE TABLE status (
>> `id` INT,
>> `name` VARCHAR(255),
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'mysql-cdc',
>>   'hostname' = 'localhost',
>>   'port' = '3306',
>>   'username' = 'root',
>>   'password' = '1',
>>   'database-name' = 'ai_audio_lyric_task',
>>   'table-name' = 'status'
>> );
>>
>> 输出表
>> CREATE TABLE test_status (
>> `id` INT,
>> `name` VARCHAR(255),
>> `time` TIMESTAMP(3),
>> `status` INT,
>> `status_name` VARCHAR(255)
>> PRIMARY KEY(id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'xxx',
>>   'index' = 'xxx',
>>   'username' = 'xxx',
>>   'password' = 'xxx',
>>   'sink.bulk-flush.backoff.max-retries' = '10',
>>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>>   'sink.bulk-flush.max-actions' = '5000',
>>   'sink.bulk-flush.max-size' = '10mb',
>>   'sink.bulk-flush.interval' = '1s'
>> );
>>
>>
>> 输出语句:
>> INSERT into test_status
>> SELECT t.*, s.name
>> FROM test AS t
>> LEFT JOIN status AS s ON t.status = s.id;
>>
>> mysql表中已经有数据
>> test:
>> 0, name0, 2020-07-06 00:00:00 , 0
>> 1, name1, 2020-07-06 00:00:00 , 1
>> 2, name2, 2020-07-06 00:00:00 , 1
>> .
>>
>> status
>> 0, status0
>> 1, status1
>> 2, status2
>> .
>>
>> 操作顺序与复现:
>> 1、启动任务,设置并行度为40,
>> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
>> savepoint保存,然后web ui上取消任务。
>>   ==> test_status中的数据正常:
>> 0, name0, 2020-07-06 00:00:00 , 0, status0
>> 1, name1, 2020-07-06 00:00:00 , 1, status1
>> 2, name2, 2020-07-06 00:00:00 , 1, status1
>>
>> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>>
>> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
>> job  下,
>>   ==> test_status中的数据正常:
>> 0, name0, 2020-07-06 00:00:00 , 0, status0
>> 1, name1, 2020-07-06 00:00:00 , 1, status1_modify
>> 2, name2, 2020-07-06 00:00:00 , 1, status1_modify
>> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
>> job  下
>>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
>> 0, name0, 2020-07-06 00:00:00 , 0, status0
>>
>>
>> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>>
>> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
>> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


Re: flink 1.11.2 如何配置时区

2020-11-14 文章 Jark Wu
1. 现在 proctime() 在设计上确实有问题,目前返回类型是 timestamp, 而不是 timestamp with local time
zone, 所以不会考虑 session time zone,转成 string 会用 utc 时区。这个问题会在 FLINK-20162 [1]
中修复。

2. 可以看下这个文档[2].

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-20162
[2]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-timestamp-format-standard

On Sat, 14 Nov 2020 at 17:21, Asahi Lee <978466...@qq.com> wrote:

> 我使用的是format=json的格式,而flink使用的是jackson,那如何设置jackson的日期格式化的时区配置呢?
>
>
>
>
> --原始邮件--
> 发件人:
>   "Asahi Lee"
>   <
> 978466...@qq.com;
> 发送时间:2020年11月14日(星期六) 下午4:58
> 收件人:"user-zh"
> 主题:flink 1.11.2 如何配置时区
>
>
>
> 你好!   我使用的是flink sql
> 1.11.2版本,通过proctime()在源上添加处理时间,发现生成的时间为UTC时间,而我需要的是+08的时间;而我通过设计env.java.opts参数设计jvm的时区参数也没有解决,请问我如何配置才可以拿到+08的时间?
> 我的程序的数据是json格式输出


Re: flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

2020-11-14 文章 Jark Wu
重复的问题。我将刚刚的回答也贴在这里。

如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
1. 保证所有 partition 都有数据。
2. 且每个 partition 数据的 event time 都在前进
3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s

以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

Best,
Jark

On Sat, 14 Nov 2020 at 16:35, 李世钰  wrote:

> 您好,请教您一个问题
> flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
> create table kafka_table (
> `log_id` string,
> event_date timestamp(3),
> process_time as PROCTIME(),
> ts as event_date,
> watermark for ts as ts - interval '1' second
> ) with (
> 'connector' = 'kafka',
> 'topic' = 'kafka_table',
> 'properties.bootstrap.servers' = '10.2.12.3:9092',
> 'properties.group.id' = 'tmp-log-consumer003',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
> 执行的sql是
> select TUMBLE_START(kafka_table.event_date, INTERVAL '10'
> SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10'
> SECOND),src_ip,count(dest_ip) from kafka_table group by
> TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip
>
>
>
>
> select log_id,process_time,ts from kafka_table查询的表结构如下
> 表结构为
> root
> |-- log_id: STRING
> |-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
> |-- ts: TIMESTAMP(3) *ROWTIME*
>
>
> 输入数据为
> log_id,process_time,ts
> 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
> 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
> 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
> 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
> 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
> 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806


Re: flink sql ddl连接kafka,flink sql使用事件时间无法正常触发时间窗口

2020-11-14 文章 Jark Wu
如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:
1. 保证所有 partition 都有数据。
2. 且每个 partition 数据的 event time 都在前进
3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =
11s

以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

Best,
Jark

On Sat, 14 Nov 2020 at 15:11, 李世钰  wrote:

> flink版本 flink1.11
>
>
> flink sql连接kafka
> create table kafka_table (
> log_id string,
> event_time bigint,
> process_time as PROCTIME(),
> ts as TO_TIMESTAMP(FROM_UNIXTIME(event_time)),
> watermark for ts as ts - interval '1' second
> ) with (
> 'connector' = 'kafka',
> 'topic' = 'kafka_table',
> 'properties.bootstrap.servers' = '10.2.12.3:9092',
> 'properties.group.id' = 'tmp-log-consumer003',
> 'format' = 'json',
> 'scan.startup.mode' = 'latest-offset'
> )
>
>
>
>
>
> 使用窗口聚合的代码
> val tmp = tableEnv.sqlQuery("select HOP_START(kafka_table.ts, INTERVAL
> '10' SECOND, INTERVAL '5' SECOND),HOP_END(kafka_table.ts, INTERVAL '10'
> SECOND, INTERVAL '5' SECOND),src_ip,count(dest_ip) from kafka_table group
> by HOP(kafka_table.ts, INTERVAL '10' SECOND, INTERVAL '5'
> SECOND),kafka_table.src_ip")
>
>
> 相同的sql使用process_time系统时间就可以成功触发,但是使用事件时间不能触发,
> 系统时间是11月14日当前时间,事件时间是11月6日,我读取的历史数据往kafka中打数据测试的
> 求问是什么原因不能触发窗口或者我的用法有什么问题吗


Re: Flink cdc 多表关联处理延迟很大

2020-11-14 文章 Jark Wu
能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
需要明确下,到底是什么节点慢了。

On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:

> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
> 有没有比较好的优化方案能缓解这样的问题?


Re: flink 1.11.2 cdc: cdc sql 结果表的sink顺序问题, 不同并行度下从save point中恢复job时,会导致sink结果不对!!

2020-11-14 文章 Jark Wu
看起来是这个 jdbc sink bug 导致的 https://issues.apache.org/jira/browse/FLINK-19423
这个 bug 会导致删的时候,取的 pk 索引不对,所以可能导致 index 异常,或是删错数据。
这个bug 会在即将发布的 1.11.3 中修复。

Best,
Jark




On Fri, 13 Nov 2020 at 13:12, jindy_liu <286729...@qq.com> wrote:

> 源表test:
> CREATE TABLE test (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'test'
> )
> 源表status
> CREATE TABLE status (
> `id` INT,
> `name` VARCHAR(255),
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'mysql-cdc',
>   'hostname' = 'localhost',
>   'port' = '3306',
>   'username' = 'root',
>   'password' = '1',
>   'database-name' = 'ai_audio_lyric_task',
>   'table-name' = 'status'
> );
>
> 输出表
> CREATE TABLE test_status (
> `id` INT,
> `name` VARCHAR(255),
> `time` TIMESTAMP(3),
> `status` INT,
> `status_name` VARCHAR(255)
> PRIMARY KEY(id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'xxx',
>   'index' = 'xxx',
>   'username' = 'xxx',
>   'password' = 'xxx',
>   'sink.bulk-flush.backoff.max-retries' = '10',
>   'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
>   'sink.bulk-flush.max-actions' = '5000',
>   'sink.bulk-flush.max-size' = '10mb',
>   'sink.bulk-flush.interval' = '1s'
> );
>
>
> 输出语句:
> INSERT into test_status
> SELECT t.*, s.name
> FROM test AS t
> LEFT JOIN status AS s ON t.status = s.id;
>
> mysql表中已经有数据
> test:
> 0, name0, 2020-07-06 00:00:00 , 0
> 1, name1, 2020-07-06 00:00:00 , 1
> 2, name2, 2020-07-06 00:00:00 , 1
> .
>
> status
> 0, status0
> 1, status1
> 2, status2
> .
>
> 操作顺序与复现:
> 1、启动任务,设置并行度为40,
> 表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
> savepoint保存,然后web ui上取消任务。
>   ==> test_status中的数据正常:
> 0, name0, 2020-07-06 00:00:00 , 0, status0
> 1, name1, 2020-07-06 00:00:00 , 1, status1
> 2, name2, 2020-07-06 00:00:00 , 1, status1
>
> 2、操作mysql, 将status中id=1数据变更为 status1_modify
>
> 3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
> job  下,
>   ==> test_status中的数据正常:
> 0, name0, 2020-07-06 00:00:00 , 0, status0
> 1, name1, 2020-07-06 00:00:00 , 1, status1_modify
> 2, name2, 2020-07-06 00:00:00 , 1, status1_modify
> /data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
> job  下
>   ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
> 0, name0, 2020-07-06 00:00:00 , 0, status0
>
>
> 怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!
>
> 这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
> 如果是,能不能在sink的时候,只把sink这里的并行度设置为1??
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


??????flink 1.11.2 ????????????

2020-11-14 文章 Asahi Lee
??format=json??flinkjacksonjackson??




----
??: 
   "Asahi Lee"  
  
<978466...@qq.com;
:2020??11??14??(??) 4:58
??:"user-zh"

flink 1.11.2 ????????????

2020-11-14 文章 Asahi Lee
??   ??flink sql 
1.11.2??proctime()UTC??+08env.java.optsjvm??+08
json

flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql使用系统时间触发没有问题

2020-11-14 文章 李世钰
您好,请教您一个问题
flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发
create table kafka_table (
`log_id` string,
event_date timestamp(3),
process_time as PROCTIME(),
ts as event_date,
watermark for ts as ts - interval '1' second
) with (
'connector' = 'kafka',
'topic' = 'kafka_table',
'properties.bootstrap.servers' = '10.2.12.3:9092',
'properties.group.id' = 'tmp-log-consumer003',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
执行的sql是
select TUMBLE_START(kafka_table.event_date, INTERVAL '10' 
SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' 
SECOND),src_ip,count(dest_ip) from kafka_table group by 
TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip




select log_id,process_time,ts from kafka_table查询的表结构如下
表结构为
root
|-- log_id: STRING
|-- process_time: TIMESTAMP(3) NOT NULL *PROCTIME*
|-- ts: TIMESTAMP(3) *ROWTIME*


输入数据为
log_id,process_time,ts
13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806
13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806
13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806
13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806
13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806
13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806