HOP窗口较短导致checkpoint失败

2021-09-17 Thread xiaohui zhang
FLink:1.12.1

源: kafka
create table dev_log (
devid,
ip,
op_ts
) with (
connector = kafka
)

sink: Hbase connect 2.2

目前用flink sql的hop window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
执行SQL如下
insert into h_table
select
  devid as rowkey
  row(hop_end, ip_cnt)
from (
  select
 devid,
 hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
 count(distinct(ip)) as ip_cnt
from
  dev_logs
group by
   hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
  devid
)

测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢


Re: HOP窗口较短导致checkpoint失败

2021-09-21 Thread Caizhi Weng
Hi!

24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。

xiaohui zhang  于2021年9月18日周六 上午9:54写道:

> FLink:1.12.1
>
> 源: kafka
> create table dev_log (
> devid,
> ip,
> op_ts
> ) with (
> connector = kafka
> )
>
> sink: Hbase connect 2.2
>
> 目前用flink sql的hop
> window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> 执行SQL如下
> insert into h_table
> select
>   devid as rowkey
>   row(hop_end, ip_cnt)
> from (
>   select
>  devid,
>  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
>  count(distinct(ip)) as ip_cnt
> from
>   dev_logs
> group by
>hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
>   devid
> )
>
> 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
>


Re: HOP窗口较短导致checkpoint失败

2021-09-21 Thread xiaohui zhang
checkpoint的状态大约只有50M左右就会开始出现cp失败的问题。如果失败了,尝试停止任务生成savepoint基本也不能成功。但同时运行的其他任务,cp在300M左右,
save point 1G左右的就很顺利,基本不会出问题。
因为实际的数据压力并不是很大,如果单纯增加并行度,是否能在窗口多的情况下有比较明显的改善呢?

Caizhi Weng  于2021年9月22日周三 上午11:27写道:

> Hi!
>
> 24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致 cp 被 back
> pressure 也有可能导致 cp 超时。开启 mini batch 可以加快 window 的运算速度,但这么长时间而且这么频繁的 window
> 目前确实没有什么很好的优化方法,仍然建议扩大并发以分担计算以及 cp 的压力。
>
> xiaohui zhang  于2021年9月18日周六 上午9:54写道:
>
> > FLink:1.12.1
> >
> > 源: kafka
> > create table dev_log (
> > devid,
> > ip,
> > op_ts
> > ) with (
> > connector = kafka
> > )
> >
> > sink: Hbase connect 2.2
> >
> > 目前用flink sql的hop
> > window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> > 执行SQL如下
> > insert into h_table
> > select
> >   devid as rowkey
> >   row(hop_end, ip_cnt)
> > from (
> >   select
> >  devid,
> >  hop_end(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) as hop_end,
> >  count(distinct(ip)) as ip_cnt
> > from
> >   dev_logs
> > group by
> >hop(op_ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR),
> >   devid
> > )
> >
> > 测试中发现任务运行大约3个小时后,就会出现checkpoint失败,任务反复重启。
> > 实际上数据量并不大,测试数据是1s/条输入,一个窗口输出大约只有4000条,成功的checkpoint不超过50M。
> > 修改为10分钟的滑动步长就可以正常执行,但是延迟就比较高了。
> > 请问有什么办法可以排查是哪里出的问题?有什么优化的方法呢
> >
>