?????? flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz
tks??




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19449

kcz <573693...@qq.com.invalid ??2021??9??22?? 11:41??

 
 behavior,next_bv 


 ??
 {
 nbsp;nbsp;"user_id":nbsp;1,
 nbsp;nbsp;"item_id":nbsp;1,
 nbsp;nbsp;"behavior":"pv1"
 }
 {
 nbsp;nbsp;"user_id":nbsp;1,
 nbsp;nbsp;"item_id":nbsp;1,
 nbsp;nbsp;"behavior":"pv2"
 }






 CREATE TABLE KafkaTable (
 nbsp; `user_id` BIGINT,
 nbsp; `item_id` BIGINT,
 nbsp; `behavior` STRING,
 nbsp; proctime as PROCTIME()
 ) WITH (
 nbsp; 'connector' = 'kafka',
 nbsp; 'topic' = 'user_behavior',
 nbsp; 'properties.bootstrap.servers' = '',
 nbsp; 'properties.group.id' = 'testGroup',
 nbsp; 'scan.startup.mode' = 'earliest-offset',
 nbsp; 'format' = 'json'
 );



 SELECT
 user_id,
 item_id,
 behavior,
 next_bvnbsp;
 FROM
 ( SELECT *, lag( behavior, 
1 ) over ( PARTITION BY user_id ORDER
 BY proctime ) AS next_bv FROM KafkaTable ) t;



-- 

Best,
Benchao Li

Re: flink-1.12.0 流模式 使用 lag问题

2021-09-21 文章 Benchao Li
Hi,这个是一个已知的bug[1],已经在1.13.1以及1.4版本修复了。
可以使用一下1.13.1试一下,1.4版本现在也正在投票中了,应该很快就会发布出来了。

[1] https://issues.apache.org/jira/browse/FLINK-19449

kcz <573693...@qq.com.invalid> 于2021年9月22日周三 上午11:41写道:

> 如何使用才是正确的,求大佬帮看看
> behavior,next_bv 字段内容一直是保持一致的,无法得到自己想要的结果
>
>
> 发送的数据
> {
> "user_id":1,
> "item_id":1,
> "behavior":"pv1"
> }
> {
> "user_id":1,
> "item_id":1,
> "behavior":"pv2"
> }
>
>
>
>
>
>
> CREATE TABLE KafkaTable (
>  `user_id` BIGINT,
>  `item_id` BIGINT,
>  `behavior` STRING,
>  proctime as PROCTIME()
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = '',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'json'
> );
>
>
>
> SELECT
> user_id,
> item_id,
> behavior,
> next_bv
> FROM
> ( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER
> BY proctime ) AS next_bv FROM KafkaTable ) t;



-- 

Best,
Benchao Li


flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz

behavior,next_bv 


??
{
"user_id":1,
"item_id":1,
"behavior":"pv1"
}
{
"user_id":1,
"item_id":1,
"behavior":"pv2"
}






CREATE TABLE KafkaTable (
 `user_id` BIGINT,
 `item_id` BIGINT,
 `behavior` STRING,
 proctime as PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = '',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'json'
);



SELECT
user_id,
item_id,
behavior,
next_bv
FROM
( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER BY 
proctime ) AS next_bv FROM KafkaTable ) t;

Re:Flinksql关联维表数据延迟咨询

2021-09-21 文章 Michael Ran
先看看瓶颈在哪儿? join的话,理论上你cache全表之后就会快起来。sink 瓶颈,可以扩大并发数(sink端压力不大的情况)
在 2021-09-16 14:40:17,"邓 雪昭"  写道:
>各位老师好,
>   
> 我目前使用Flinksql构建了一个应用,数据源是kafka,关联了一张23w数据的维表(存放在Tidb),该维表和流中的数据关联会有一些发散(业务逻辑),使用了lookup.cache.maxprows=25,ttl=3600s,目前输出到kafka,延迟很严重,处理时间会领先事件时间几十分钟并且还会持续扩大,请问有什么好的解决办法吗?
>
>
>从 Windows 版邮件发送
>


Re: HOP窗口较短导致checkpoint失败

2021-09-21 文章 xiaohui zhang
checkpoint的状态大约只有50M左右就会开始出现cp失败的问题。如果失败了,尝试停止任务生成savepoint基本也不能成功。但同时运行的其他任务,cp在300M左右,
save point 1G左右的就很顺利,基本不会出问题。
因为实际的数据压力并不是很大,如果单纯增加并行度,是否能在窗口多的情况下有比较明显的改善呢?

Caizhi Weng  于2021年9月22日周三 上午11:27写道:

> Hi!
>
> 24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
> pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
> 目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。
>
> xiaohui zhang  于2021年9月18日周六 上午9:54写道:
>
> > FLink:1.12.1
> >
> > 源: kafka
> > create table dev_log (
> > devid,
> > ip,
> > op_ts
> > ) with (
> > connector = kafka
> > )
> >
> > sink: Hbase connect 2.2
> >
> > 目前用flink sql的hop
> > window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> > 执行SQL如下
> > insert into h_table
> > select
> >   devid as rowkey
> >   row(hop_end, ip_cnt)
> > from (
> >   select
> >  devid,
> >  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
> >  count(distinct(ip)) as ip_cnt
> > from
> >   dev_logs
> > group by
> >hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
> >   devid
> > )
> >
> > 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> > 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> > 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> > 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
> >
>


回复: flink sql是否支持动态创建sink table?

2021-09-21 文章 酷酷的浑蛋
我也有这个需求,意思就是topic里实时新增了一种日志,然后想动态创建对应新的日志的topic表,并写入到新的topic表,在一个任务中完成


| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制


在2021年09月22日 11:23,Caizhi Weng 写道:
Hi!

不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。

如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like

casel.chen  于2021年9月18日周六 上午8:27写道:

上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink
sql支持动态创建sink table吗?


Re: 退订

2021-09-21 文章 Caizhi Weng
Hi!

退订中文邮件列表请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org,其他邮件列表退订邮箱参见
https://flink.apache.org/community.html#mailing-lists

Elaiza <1360619...@qq.com.invalid> 于2021年9月19日周日 上午9:14写道:

>


Re: HOP窗口较短导致checkpoint失败

2021-09-21 文章 Caizhi Weng
Hi!

24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。

xiaohui zhang  于2021年9月18日周六 上午9:54写道:

> FLink:1.12.1
>
> 源: kafka
> create table dev_log (
> devid,
> ip,
> op_ts
> ) with (
> connector = kafka
> )
>
> sink: Hbase connect 2.2
>
> 目前用flink sql的hop
> window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> 执行SQL如下
> insert into h_table
> select
>   devid as rowkey
>   row(hop_end, ip_cnt)
> from (
>   select
>  devid,
>  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
>  count(distinct(ip)) as ip_cnt
> from
>   dev_logs
> group by
>hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
>   devid
> )
>
> 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
>


Re: flink sql是否支持动态创建sink table?

2021-09-21 文章 Caizhi Weng
Hi!

不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。

如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like

casel.chen  于2021年9月18日周六 上午8:27写道:

> 上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink
> sql支持动态创建sink table吗?


Re: Flink SQL是否支持Count Window函数?

2021-09-21 文章 Caizhi Weng
Hi!

据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。

不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为
datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。

EnvironmentSettings settings = EnvironmentSettings.newInstance().
inStreamingMode().build();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(), settings);
tEnv.executeSql(
"CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH
( 'connector' = 'datagen' )");
Table table = tEnv.sqlQuery("SELECT key, val FROM T");
DataStream dataStream = tEnv.toDataStream(table);
DataStream> summedStream =
dataStream
.keyBy(row -> (int) row.getField(0))
.countWindow(100)
.apply(
(WindowFunction<
Row,
Tuple2,
Integer,
GlobalWindow>)
(key, window, input, out) -> {
int sum = 0;
for (Row row : input) {
Integer field = (Integer) row.getField(1);
if (field != null) {
sum += field;
}
}
out.collect(Tuple2.of(key, sum));
})
.returns(
new TupleTypeInfo<>(
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
Table summedTable = tEnv.fromDataStream(summedStream);
tEnv.registerTable("S", summedTable);
tEnv.executeSql("SELECT f0, f1 FROM S").print();

casel.chen  于2021年9月17日周五 下午6:05写道:

> 今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time
> window,问一下官方是否打算sql支持count window呢?
> 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!


Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-21 文章 Yun Tang
Hi,

其实目前Flink社区并不是那么欢迎新增官方支持的connector,主要原因就是社区的开发人员有限,没有精力维护太多的connector,尤其是一些connector的实现需要一定的相关背景,但很难保证review代码的开发人员具有相关背景,毕竟大家都需要为自己approve的代码负责。
你可以在 flink-packages [1] 里面找一下,或者考虑自己实现并维护(基础实现应该是复杂度不高的)。


[1] https://flink-packages.org/


祝好
唐云


From: 黑色 
Sent: Saturday, September 18, 2021 17:17
To: user-zh@flink.apache.org 
Subject: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

这个可以自已定义一个,参考源码写一个,自己写出来的才是自己的,直接用别人的还是别人的




--原始邮件--
发件人:
"user-zh"   
 


flink消费kafka分区消息不均衡问题

2021-09-21 文章 casel.chen
kafka topic有32个分区,实时作业开了32个并行度消费kafka 
topic,现在监控发现部分分区消息积压严重(如下图所示),请问会有哪些原因造成的?有什么解决办法吗?扩大分区数是不是也不能根治这种情况?
PS: 
每个分区消息数的确有所不均,但是同样消息数的几个分区也会出现积压不同情况(如15,16,17,18)。会是因为节点带宽受限造成的吗?当前numberOfSlots=8,改成numberOfSlots=1会有效果么?


|
分区 ID
|
客户端
|
最大位点
|
消费位点
|
堆积量
|
|
0
|
n/a
|
14,131,397
|
14,130,923
|
474
|
|
1
|
n/a
|
14,191,455
|
14,189,396
|
2,059
|
|
2
|
n/a
|
14,611,826
|
14,610,262
|
1,564
|
|
3
|
n/a
|
15,340,150
|
15,335,944
|
4,206
|
|
4
|
n/a
|
16,379,487
|
16,372,237
|
7,250
|
|
5
|
n/a
|
17,696,565
|
17,639,308
|
57,257
|
|
6
|
n/a
|
19,200,829
|
19,129,856
|
70,973
|
|
7
|
n/a
|
20,889,954
|
20,888,652
|
1,302
|
|
8
|
n/a
|
22,643,539
|
22,536,468
|
107,071
|
|
9
|
n/a
|
24,440,881
|
24,439,357
|
1,524
|
|
10
|
n/a
|
26,178,250
|
26,073,197
|
105,053
|
|
11
|
n/a
|
27,828,497
|
27,670,732
|
157,765
|
|
12
|
n/a
|
29,284,463
|
29,283,105
|
1,358
|
|
13
|
n/a
|
30,526,020
|
29,781,704
|
744,316
|
|
14
|
n/a
|
31,468,482
|
31,467,243
|
1,239
|
|
15
|
n/a
|
32,084,198
|
31,467,610
|
616,588
|
|
16
|
n/a
|
32,393,752
|
32,019,836
|
373,916
|
|
17
|
n/a
|
32,302,065
|
32,141,999
|
160,066
|
|
18
|
n/a
|
31,875,063
|
31,874,452
|
611
|
|
19
|
n/a
|
31,137,894
|
31,002,867
|
135,027
|
|
20
|
n/a
|
30,098,926
|
29,930,855
|
168,071
|
|
21
|
n/a
|
28,739,235
|
28,603,509
|
135,726
|
|
22
|
n/a
|
27,221,026
|
27,220,821
|
205
|
|
23
|
n/a
|
25,514,265
|
25,382,536
|
131,729
|
|
24
|
n/a
|
23,779,714
|
23,689,296
|
90,418
|
|
25
|
n/a
|
21,981,307
|
21,981,267
|
40
|
|
26
|
n/a
|
20,237,925
|
20,223,880
|
14,045
|
|
27
|
n/a
|
18,606,490
|
18,606,387
|
103
|
|
28
|
n/a
|
17,178,098
|
17,177,971
|
127
|
|
29
|
n/a
|
15,972,292
|
15,972,105
|
187
|
|
30
|
n/a
|
15,032,355
|
15,032,138
|
217
|
|
31
|
n/a
|
14,426,366
|
14,425,462
|
904
|