Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-25 文章 Storm☀️
在测试环境:
关闭增量chk,全量的state大小大约在:100M左右;
之前开启:我观察了一段时间,膨胀到5G,而且还一直在增长;
sql:
select sum(xx) group by 1 分钟窗口

过期时间设置的为:5-30min



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-23 文章 Yun Tang
Hi @Storm

checkpoint的增量模式目前仅对RocksDB生效,这里的增量是指上传新产生的DB 
sst文件。而RocksDB的全量模式是将DB的有效kv进行序列化写出,除非有大量的数据没有compaction清理掉,否则不可能出现增量checkpoint 
size无限膨胀,而全量checkpoint正常的问题,你这里的无限膨胀的size范围是多大呢?

祝好
唐云

From: Storm☀️ 
Sent: Tuesday, December 22, 2020 19:52
To: user-zh@flink.apache.org 
Subject: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

唐云大佬好,
我关闭了chk的增量模式之后,chkstate确实不会再无线膨胀了。这个是我配置的不准确,还是一个已知问题呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-22 文章 Storm☀️
唐云大佬好,
我关闭了chk的增量模式之后,chkstate确实不会再无线膨胀了。这个是我配置的不准确,还是一个已知问题呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-22 文章 Storm☀️
"计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场 "
 那么解决问题的方法是?生产上state还在不断膨胀。
简单一个问题,生产上发生OOM了,短时间内无法排查出原因,请问如何处理?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-18 文章 r pp
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式

Storm☀️  于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-18 文章 r pp
一般性的推断是,模式 是属于配置项,若出现问题了,系统读取 或者 改变 配置项,能解决问题么?
之前的学习经验,计算机的解决方案是 出现问题,大都是保护现场,等问题解决后,释放现场。
状态 可以 类比是现场,当问题出现的时候,重点在状态的保护是怎么实现的,和配置没有太大关系,因为完全可以不读取配置。
配置项是面向用户选择 state 的方式,不是解决问题的方式。

Storm☀️  于2020年12月18日周五 上午11:50写道:

> state.backend.incremental 出现问题的时候增量模式是开启的吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-17 文章 Storm☀️
state.backend.incremental 出现问题的时候增量模式是开启的吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 Danny Chan
能否提供下完整的 query,方便追踪和排查 ~

Best,
Danny Chan
在 2020年8月31日 +0800 AM10:58,zhuyuping <1050316...@qq.com>,写道:
> 同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长
> 好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g
> 不断的无限增长下去
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 zhuyuping
同样出现了这个问题,SQL 使用中,请问是什么原因,翻转tumble窗口当使用mapview 进行操作时候,状态不断的增长
好像不能清理一样,因为正常的window 窗口 窗口结束后会清理状态,现在的情况是1秒的翻转tumble窗口,满满的从最开始的1m 过一个小时变成了1g
不断的无限增长下去



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 zhuyuping
我这边出现同样的问题,我换成了filesystem 发现state 还是一样缓慢增大,所以应该跟rocksdb 无关



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-30 文章 zhuyuping

 

 

我也出现了这个问题, 我使用的是窗口函数进行group by 

发现state 不会清空,还是10m 到后面 几G 缓慢增长,大概每3个checkpoint 增长
任务没有反压。为了测试我使用discardSink 

先后 换了 1 second 1分钟,还有proctime rowtime模式 来窗口统计都一样 ,state缓慢增大

CREATE VIEW cpd_expo_feature_collect_view as select
imei,incrmentFeatureCollect(CAST(serverTime AS INT),adId) as feature from
dwd_oth_ads_appstore_exposure_process_view
  group by TUMBLE (proctime, INTERVAL '10' SECOND),imei;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-09 文章 Yun Tang
Hi

RocksDB 的文件更新策略是依赖于level-0的文件数目以及level-1 ~ 
level-N的每层文件总size,可以参照RocksDB社区关于conpaction的图文描述[1]。默认情况下level-1的target size 
是256MB [2],也就是说level-1的总size在256MB以下时,应该是没有触发compaction来降低文件大小的。

从你的UI截图看,实际上checkpoint size很小,建议达到一定数据规模之后,再观察是否“状态越来越大”

[1] 
https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#structure-of-the-files
[2] http://www.leviathan.vip/2018/03/05/Rocksdb%E7%9A%84Compact/

祝好
唐云


From: chengyanan1...@foxmail.com 
Sent: Friday, August 7, 2020 10:32
To: user-zh 
Subject: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

HI:
请问这个问题有合理的解释吗,持续关注中。。。

发件人: 鱼子酱
发送时间: 2020-08-03 13:50
收件人: user-zh
主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式

刚启动的时候,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/6.png>

18分钟后,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/9.png>

checkpoints设置:
<http://apache-flink.147419.n8.nabble.com/file/t793/conf.png>

hdfs上面大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png>

页面上看到的大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png>


Congxian Qiu wrote
> Hi   鱼子酱
> 能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
> checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
>
> Best,
> Congxian
>
>
> 鱼子酱 <

> 384939718@

>> 于2020年7月30日周四 上午10:43写道:
>
>> 感谢!
>>
>> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>> StateBackend backend =new
>>
>> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>> StateBackend backend =new
>>
>> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>>
>>
>> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>> RocksDBStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/444.png>;
>> FsStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/555.png>;
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/



Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-09 文章 Congxian Qiu
Hi op
或许你可以把现在的问题整理一下,单独发一个邮件,看你的描述这个问题和 sql 中的 minibatch 有关系
Best,
Congxian


op <520075...@qq.com> 于2020年8月7日周五 下午2:13写道:

> Hi
>  1. 
>  我将原来程序里面的minibatch相关的配置删掉,现在使用FsStateBackend空闲状态能定期清除了,不知道这是不是一个bug,删掉的是以下配置
> val config = tConfig.getConfiguration()
> config.setString("table.exec.mini-batch.enabled", "true")
> config.setString("table.exec.mini-batch.allow-latency", "3s")
> config.setString("table.exec.mini-batch.size", "10")
> 2.使用RocksDBStateBackend时上面的配置不影响空闲状态清理
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
>                     <
> qcx978132...@gmail.com>;
> 发送时间: 2020年8月6日(星期四) 中午1:51
> 收件人: "user-zh"
> 主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi
>     我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件 的发送情况呢?
>
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月6日周四 上午10:36写道:
>
> >    感谢 ,  截图和配置在附件里面
> >   我试试配置  RocksDB StateBackend
> >
> >
> > -- 原始邮件 --
> > *发件人:* "user-zh"  > *发送时间:* 2020年8月5日(星期三) 下午5:43
> > *收件人:* "user-zh" > *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> > Hi
> >   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
> >
> >   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及
> checkpoint UI 的截图,以及 HDFS
> > 上 checkpoint 目录的截图
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
> >
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend>>
> ;
> > Best,
> > Congxian
> >
> >
> > op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:
> >
> > > 你好,ttl配置是
> > > val settings =
> > EnvironmentSettings.newInstance().inStreamingMode().build()
> > > val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> > > val tConfig = tableEnv.getConfig
> > > tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> Time.minutes(1450))
> > >
> > >
> > > &nbsp; &nbsp; 1)目前是有3个任务都是这种情况
> > > &nbsp; &nbsp; 2)目前集群没有RocksDB环境
> > > 谢谢
> > > --&nbsp;原始邮件&nbsp;--
> > > 发件人:
> >
> >  
> "user-zh"
> >
> >
> <
> > > qcx978132...@gmail.com&gt;;
> > > 发送时间:&nbsp;2020年8月5日(星期三) 下午3:30
> > > 收件人:&nbsp;"user-zh" > >
> > > 主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后
> 状态越来越大
> > >
> > >
> > >
> > > Hi op
> > > &nbsp;&nbsp; 这个情况比较奇怪。我想确认下:
> > > &nbsp;&nbsp; 1)你所有作业都遇到 checkpoint size
> 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
> > > &nbsp;&nbsp; 2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
> > >
> > > &nbsp;&nbsp; 另外,你 TTL 其他的配置是怎么设置的呢?
> > >
> > > 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state
> 越来越多。
> > > Best,
> > > Congxian
> > >
> > >
> > > op <520075...@qq.com&gt; 于2020年8月5日周三 下午2:46写道:
> > >
> > > &gt; &amp;nbsp; &amp;nbsp;
> > > &gt;
> > >
> >
> 你好,我使用的是FsStateBackend&amp;nbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> > > &gt; &amp;nbsp;
> > > &amp;nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> > > &gt; &amp;nbsp; &amp;nbsp;观察到的checkpoint shared
> 目录大小一直在增加,也确认过group
> > > &gt; by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> > > &gt; &amp;nbsp; &amp;nbsp;运行5天能满足清理条件
> > > &gt;
> > > &gt;
> > > &gt;
> > > &gt;
> > > &gt; ------ 原始邮件 --
> > > &gt; 发件人:
> > >
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nb

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-06 文章 chengyanan1...@foxmail.com
HI:
请问这个问题有合理的解释吗,持续关注中。。。
 
发件人: 鱼子酱
发送时间: 2020-08-03 13:50
收件人: user-zh
主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式
 
刚启动的时候,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/6.png> 
 
18分钟后,如下:
<http://apache-flink.147419.n8.nabble.com/file/t793/9.png> 
 
checkpoints设置:
<http://apache-flink.147419.n8.nabble.com/file/t793/conf.png> 
 
hdfs上面大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png> 
 
页面上看到的大小:
<http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png> 
 
 
Congxian Qiu wrote
> Hi   鱼子酱
> 能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
> checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
> 
> Best,
> Congxian
> 
> 
> 鱼子酱 <
 
> 384939718@
 
>> 于2020年7月30日周四 上午10:43写道:
> 
>> 感谢!
>>
>> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>> StateBackend backend =new
>>
>> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>> StateBackend backend =new
>>
>> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>>
>>
>> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>> RocksDBStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/444.png>;
>> FsStateBackend:
>> http://apache-flink.147419.n8.nabble.com/file/t793/555.png>;
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
 
 
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-06 文章 Yu Li
看到生产上使用的还是1.8.2版本,请问同样的作业使用1.8.2的表现是怎样的?

Best Regards,
Yu


On Thu, 6 Aug 2020 at 16:29, op <520075...@qq.com> wrote:

> 感谢回答 
> 我之前用1.10也有同样的问题
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> car...@gmail.com>;
> 发送时间: 2020年8月6日(星期四) 下午4:01
> 收件人: "user-zh"
> 主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> @鱼子酱
>
> 请问同样的作业,都使用RocksDB增量checkpoint,在1.8.2版本和1.11.1版本下的表现是否一致?还是说只有1.11.1版本下增量大小会单调增加?
>
> @op 类似的问题,请问使用FsStateBackend,是否在不同Flink版本下测试过?表现是否一致?
>
> 上述问题主要想确认一下新版本的表现和旧版本是否一致,如果不一致则有可能是新版本中引入的bug。谢谢。
>
> Best Regards,
> Yu
>
>
> On Thu, 6 Aug 2020 at 13:52, Congxian Qiu  wrote:
>
> > Hi
> > 我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件
> 的发送情况呢?
> >
> > Best,
> > Congxian
> >
> >
> > op <520075...@qq.com> 于2020年8月6日周四 上午10:36写道:
> >
> > >    感谢 ,  截图和配置在附件里面
> > >   我试试配置  RocksDB StateBackend
> > >
> > >
> > > -- 原始邮件 --
> > > *发件人:* "user-zh"  > > *发送时间:* 2020年8月5日(星期三) 下午5:43
> > > *收件人:* "user-zh" > > *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> > >
> > > Hi
> > >   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
> > >
> > >   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及
> checkpoint UI 的截图,以及
> > HDFS
> > > 上 checkpoint 目录的截图
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
> >
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend>>;
> >
> > > Best,
> > > Congxian
> > >
> > >
> > > op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:
> > >
> > > > 你好,ttl配置是
> > > > val settings =
> > > EnvironmentSettings.newInstance().inStreamingMode().build()
> > > > val tableEnv = StreamTableEnvironment.create(bsEnv,
> settings)
> > > > val tConfig = tableEnv.getConfig
> > > > tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> > Time.minutes(1450))
> > > >
> > > >
> > > > &nbsp; &nbsp; 1)目前是有3个任务都是这种情况
> > > > &nbsp; &nbsp; 2)目前集群没有RocksDB环境
> > > > 谢谢
> > > > --&nbsp;原始邮件&nbsp;--
> > > > 发件人:
> > >
> >  
> "user-zh"
> > >
> >
> <
> > > > qcx978132...@gmail.com&gt;;
> > > > 发送时间:&nbsp;2020年8月5日(星期三) 下午3:30
> > > > 收件人:&nbsp;"user-zh" > > >
> > > > 主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口
> 操作后 状态越来越大
> > > >
> > > >
> > > >
> > > > Hi op
> > > > &nbsp;&nbsp; 这个情况比较奇怪。我想确认下:
> > > > &nbsp;&nbsp; 1)你所有作业都遇到 checkpoint size
> 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
> > > > &nbsp;&nbsp; 2)是否尝试过 RocksDBStateBackend
> 呢(全量和增量)?情况如何呢
> > > >
> > > > &nbsp;&nbsp; 另外,你 TTL 其他的配置是怎么设置的呢?
> > > >
> > > > 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是
> state 越来越多。
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > op <520075...@qq.com&gt; 于2020年8月5日周三 下午2:46写道:
> > > >
> > > > &gt; &amp;nbsp; &amp;nbsp;
> > > > &gt;
> > > >
> > >
> >
> 你好,我使用的是FsStateBackend&amp;nbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> > > > &gt; &amp;nbsp;
> > > >
> &amp;nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> > > > &gt; &amp;nbsp; &amp;nbsp;观察到的checkpoint shared
> 目录大小一直在增加,也确认过group
> > > > &gt; by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> > > > &gt; &amp;nbsp; &amp;nbsp;运行5天能满足清理条件
> > > > &gt;
> > > > &gt;
> > > > &gt;
> > > > &gt;
> > > > &gt; -- 原始邮件 --
> > > > &gt; 发件人:
> > > >
> > >

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-06 文章 Yu Li
@鱼子酱
请问同样的作业,都使用RocksDB增量checkpoint,在1.8.2版本和1.11.1版本下的表现是否一致?还是说只有1.11.1版本下增量大小会单调增加?

@op 类似的问题,请问使用FsStateBackend,是否在不同Flink版本下测试过?表现是否一致?

上述问题主要想确认一下新版本的表现和旧版本是否一致,如果不一致则有可能是新版本中引入的bug。谢谢。

Best Regards,
Yu


On Thu, 6 Aug 2020 at 13:52, Congxian Qiu  wrote:

> Hi
> 我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件 的发送情况呢?
>
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月6日周四 上午10:36写道:
>
> >感谢 ,  截图和配置在附件里面
> >   我试试配置  RocksDB StateBackend
> >
> >
> > -- 原始邮件 --
> > *发件人:* "user-zh" ;
> > *发送时间:* 2020年8月5日(星期三) 下午5:43
> > *收件人:* "user-zh";
> > *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> > Hi
> >   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
> >
> >   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及
> HDFS
> > 上 checkpoint 目录的截图
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
> >
> > Best,
> > Congxian
> >
> >
> > op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:
> >
> > > 你好,ttl配置是
> > > val settings =
> > EnvironmentSettings.newInstance().inStreamingMode().build()
> > > val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> > > val tConfig = tableEnv.getConfig
> > > tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> Time.minutes(1450))
> > >
> > >
> > >     1)目前是有3个任务都是这种情况
> > >     2)目前集群没有RocksDB环境
> > > 谢谢
> > > ------------------ 原始邮件 --
> > > 发件人:
> > >   "user-zh"
> > > <
> > > qcx978132...@gmail.com>;
> > > 发送时间: 2020年8月5日(星期三) 下午3:30
> > > 收件人: "user-zh" > >
> > > 主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> > >
> > >
> > >
> > > Hi op
> > >    这个情况比较奇怪。我想确认下:
> > >    1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
> > >    2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
> > >
> > >    另外,你 TTL 其他的配置是怎么设置的呢?
> > >
> > > 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> > > Best,
> > > Congxian
> > >
> > >
> > > op <520075...@qq.com> 于2020年8月5日周三 下午2:46写道:
> > >
> > > > &nbsp; &nbsp;
> > > >
> > >
> >
> 你好,我使用的是FsStateBackend&nbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> > > > &nbsp;
> > > &nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> > > > &nbsp; &nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
> > > > by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> > > > &nbsp; &nbsp;运行5天能满足清理条件
> > > >
> > > >
> > > >
> > > >
> > > > -- 原始邮件 --
> > > > 发件人:
> > >
> >
> >  
> > > "user-zh"
> > >
> >
> >
> > > <
> > > > qcx978132...@gmail.com&gt;;
> > > > 发送时间:&nbsp;2020年8月3日(星期一) 下午5:50
> > > > 收件人:&nbsp;"user-zh" > > >
> > > > 主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后
> 状态越来越大
> > > >
> > > >
> > > >
> > > > Hi
> > > > &nbsp;&nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从
> shared
> > > > 目录的数据量看,有增长,后续基本持平。现在
> > > > Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> > > checkpoint
> > > > 之间,数据改动很多的话,这个值会变大
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> > > >
> > > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7>
> > >;
> > > Best,
> > > > Congxian
> > > >
> > > >
> > > > op <520075...@qq.com&gt; 于2020年8月3日周一 下午2:18写道:
> > > >
> > > > &gt; &amp;nbsp; &amp;nbsp;
&

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi
我这边没有看到相关的附件,不确定是邮件客户端的问题还是其他什么,你那边能否再确认下 附件 的发送情况呢?

Best,
Congxian


op <520075...@qq.com> 于2020年8月6日周四 上午10:36写道:

>感谢 ,  截图和配置在附件里面
>   我试试配置  RocksDB StateBackend
>
>
> -- 原始邮件 --
> *发件人:* "user-zh" ;
> *发送时间:* 2020年8月5日(星期三) 下午5:43
> *收件人:* "user-zh";
> *主题:* Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
> Hi
>   RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].
>
>   另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及 HDFS
> 上 checkpoint 目录的截图
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend
>
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:
>
> > 你好,ttl配置是
> > val settings =
> EnvironmentSettings.newInstance().inStreamingMode().build()
> > val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> > val tConfig = tableEnv.getConfig
> > tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))
> >
> >
> >     1)目前是有3个任务都是这种情况
> >     2)目前集群没有RocksDB环境
> > 谢谢
> > -- 原始邮件 --
> > 发件人:
> >           "user-zh"
> >         <
> > qcx978132...@gmail.com>;
> > 发送时间: 2020年8月5日(星期三) 下午3:30
> > 收件人: "user-zh" >
> > 主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> >
> >
> > Hi op
> >    这个情况比较奇怪。我想确认下:
> >    1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
> >    2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
> >
> >    另外,你 TTL 其他的配置是怎么设置的呢?
> >
> > 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> > Best,
> > Congxian
> >
> >
> > op <520075...@qq.com> 于2020年8月5日周三 下午2:46写道:
> >
> > > &nbsp; &nbsp;
> > >
> >
> 你好,我使用的是FsStateBackend&nbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> > > &nbsp;
> > &nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> > > &nbsp; &nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
> > > by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> > > &nbsp; &nbsp;运行5天能满足清理条件
> > >
> > >
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人:
> >
> >  
> > "user-zh"
> >
> >
> > <
> > > qcx978132...@gmail.com&gt;;
> > > 发送时间:&nbsp;2020年8月3日(星期一) 下午5:50
> > > 收件人:&nbsp;"user-zh" > >
> > > 主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> > >
> > >
> > >
> > > Hi
> > > &nbsp;&nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> > > 目录的数据量看,有增长,后续基本持平。现在
> > > Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> > checkpoint
> > > 之间,数据改动很多的话,这个值会变大
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> > >
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7>
> >;
> > Best,
> > > Congxian
> > >
> > >
> > > op <520075...@qq.com&gt; 于2020年8月3日周一 下午2:18写道:
> > >
> > > &gt; &amp;nbsp; &amp;nbsp;
> > > &gt;
> > 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> > > &gt; 逻辑是按照 事件day 和 id 进行groupby
> > > &gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> > > &gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> > > &gt; Time.minutes(1440+10))
> > > &gt;
> > > &gt;
> > > &gt;
> > > &gt;
> > > &gt;
> > --------------&amp;nbsp;原始邮件&amp;nbsp;--
> > > &gt; 发件人:
> > >
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&n

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi
  RocksDB StateBackend 只需要在 flink-conf 中进行一下配置就行了[1].

  另外从你前面两份邮件看,我有些信息比较疑惑,你能否贴一下现在使用的 flink-conf,以及 checkpoint UI 的截图,以及 HDFS
上 checkpoint 目录的截图

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E8%AE%BE%E7%BD%AE-state-backend

Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午4:03写道:

> 你好,ttl配置是
> val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
> val tableEnv = StreamTableEnvironment.create(bsEnv, settings)
> val tConfig = tableEnv.getConfig
> tConfig.setIdleStateRetentionTime(Time.minutes(1440), Time.minutes(1450))
>
>
>     1)目前是有3个任务都是这种情况
>     2)目前集群没有RocksDB环境
> 谢谢
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com>;
> 发送时间: 2020年8月5日(星期三) 下午3:30
> 收件人: "user-zh"
> 主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi op
>    这个情况比较奇怪。我想确认下:
>    1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
>    2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢
>
>    另外,你 TTL 其他的配置是怎么设置的呢?
>
> 从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月5日周三 下午2:46写道:
>
> > &nbsp; &nbsp;
> >
> 你好,我使用的是FsStateBackend&nbsp;状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
> > &nbsp;
> &nbsp;设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
> > &nbsp; &nbsp;观察到的checkpoint shared 目录大小一直在增加,也确认过group
> > by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
> > &nbsp; &nbsp;运行5天能满足清理条件
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人:
> >                              
> "user-zh"
> >
> <
> > qcx978132...@gmail.com&gt;;
> > 发送时间:&nbsp;2020年8月3日(星期一) 下午5:50
> > 收件人:&nbsp;"user-zh" >
> > 主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> >
> >
> > Hi
> > &nbsp;&nbsp; 能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> > 目录的数据量看,有增长,后续基本持平。现在
> > Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果
> checkpoint
> > 之间,数据改动很多的话,这个值会变大
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> >
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7>>;
> Best,
> > Congxian
> >
> >
> > op <520075...@qq.com&gt; 于2020年8月3日周一 下午2:18写道:
> >
> > &gt; &amp;nbsp; &amp;nbsp;
> > &gt;
> 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> > &gt; 逻辑是按照 事件day 和 id 进行groupby
> > &gt; 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> > &gt; tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> > &gt; Time.minutes(1440+10))
> > &gt;
> > &gt;
> > &gt;
> > &gt;
> > &gt;
> --&amp;nbsp;原始邮件&amp;nbsp;--
> > &gt; 发件人:
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > &nbsp; "user-zh"
> >
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> > &nbsp; <
> > &gt; 384939...@qq.com&amp;gt;;
> > &gt; 发送时间:&amp;nbsp;2020年8月3日(星期一

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-05 文章 Congxian Qiu
Hi op
   这个情况比较奇怪。我想确认下:
   1)你所有作业都遇到 checkpoint size 不断变大的情况,还是只有这个类型的作业遇到这个问题呢?
   2)是否尝试过 RocksDBStateBackend 呢(全量和增量)?情况如何呢

   另外,你 TTL 其他的配置是怎么设置的呢?

从原理上来说,checkpoint 就是 state 的一个快照,如果 checkpoint 越来越大,那么就是 state 越来越多。
Best,
Congxian


op <520075...@qq.com> 于2020年8月5日周三 下午2:46写道:

>    
> 你好,我使用的是FsStateBackend 状态后端,调到5分钟也是一样,看了下checkpoint花费的时间都在300ms左右,我们的业务数据量每天基本一样,
>    设置空闲状态清理时间为1440minute,按道理运行一天以后状态大小会趋于平稳,但是目前运行了5天,
>    观察到的checkpoint shared 目录大小一直在增加,也确认过group
> by的key只会在处理当天出现,就是说这天的状态当天过后就会处于空闲状态,
>    运行5天能满足清理条件
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> qcx978132...@gmail.com>;
> 发送时间: 2020年8月3日(星期一) 下午5:50
> 收件人: "user-zh"
> 主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> Hi
>    能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared
> 目录的数据量看,有增长,后续基本持平。现在
> Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果 checkpoint
> 之间,数据改动很多的话,这个值会变大
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
> Best,
> Congxian
>
>
> op <520075...@qq.com> 于2020年8月3日周一 下午2:18写道:
>
> > &nbsp; &nbsp;
> > 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> > 逻辑是按照 事件day 和 id 进行groupby
> > 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> > tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> > Time.minutes(1440+10))
> >
> >
> >
> >
> > --&nbsp;原始邮件&nbsp;--
> > 发件人:
> >                                
>   "user-zh"
> >  
>   <
> > 384939...@qq.com&gt;;
> > 发送时间:&nbsp;2020年8月3日(星期一) 中午1:50
> > 收件人:&nbsp;"user-zh" >
> > 主题:&nbsp;Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
> >
> >
> >
> > hi,您好:
> > 我改回增量模式重新收集了一些数据:
> > 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
> > 2、checkpoint是interval设置的是5秒
> > 3、目前这个作业是每分钟一个窗口
> > 4、并行度设置的1,使用on-yarn模式
> >
> > 刚启动的时候,如下:
> > <http://apache-flink.147419.n8.nabble.com/file/t793/6.png&gt;
> >
> > 18分钟后,如下:
> > <http://apache-flink.147419.n8.nabble.com/file/t793/9.png&gt;
> >
> > checkpoints设置:
> > <http://apache-flink.147419.n8.nabble.com/file/t793/conf.png&gt;
> >
> > hdfs上面大小:
> > <http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png&gt;
> >
> > 页面上看到的大小:
> > <
> http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png&gt
> ;
> >
> >
> > Congxian Qiu wrote
> > &gt; Hi&nbsp;&nbsp; 鱼子酱
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp; 能否把在使用增量 checkpoint
> 的模式下,截图看一下 checkpoint
> > size 的走势呢?另外可以的话,也麻烦你在每次
> > &gt; checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
> > &gt;&nbsp;&nbsp;&nbsp;&nbsp;
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
> > &gt;
> > &gt; Best,
> > &gt; Congxian
> > &gt;
> > &gt;
> > &gt; 鱼子酱 <
> >
> > &gt; 384939718@
> >
> > &gt;&gt; 于2020年7月30日周四 上午10:43写道:
> > &gt;
> > &gt;&gt; 感谢!
> > &gt;&gt;
> > &gt;&gt; flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
> > &gt;&gt; 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
> > &gt;&gt; StateBackend backend =new
> > &gt;&gt;
> > &gt;&gt;
> >
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> > &gt;&gt; StateBackend backend =new
> > &gt;&gt;
> > &gt;&gt;
> >
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> > &gt;&gt;
> > &gt;&gt;
> > &gt;&gt; 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
> > &gt;&gt; RocksDBStateBackend:
> > &gt;&gt; &amp;lt;
> > http://apache-flink.147419.n8.nabble.com/file/t793/444.png&amp;gt
> ;
> > &gt;&gt; FsStateBackend:
> > &gt;&gt; &amp;lt;
> > http://apache-flink.147419.n8.nabble.com/file/t793/555.png&amp;gt
> ;
> > &gt;&gt;
> > &gt;&gt;
> > &gt;&gt;
> > &gt;&gt;
> > &gt;&gt; --
> > &gt;&gt; Sent from: http://apache-flink.147419.n8.nabble.com/
> > <http://apache-flink.147419.n8.nabble.com/>>; &gt;&gt;
> >
> >
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-03 文章 Congxian Qiu
Hi
   能否把 checkpoint 的 interval 调长一点再看看是否稳定呢?从 shared 目录的数据量看,有增长,后续基本持平。现在
Checkpointed Data Size 是增量的大小[1],而不是整个 checkpoint 的数据量的大小,如果 checkpoint
之间,数据改动很多的话,这个值会变大

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/state/state_backends.html#%E5%A2%9E%E9%87%8F%E5%BF%AB%E7%85%A7
Best,
Congxian


op <520075...@qq.com> 于2020年8月3日周一 下午2:18写道:

>    
> 同问,我也遇到了状态越来越大的情况,使用的是1.11.0版本,用hdfs保存checkpoint,checkpoint间隔3分钟,
> 逻辑是按照 事件day 和 id 进行groupby
> 然后有十几个聚合指标,运行了7天左右,状态一直在增加,设置了失效时间,然后watermark看着也正常在走
> tConfig.setIdleStateRetentionTime(Time.minutes(1440),
> Time.minutes(1440+10))
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> 384939...@qq.com>;
> 发送时间: 2020年8月3日(星期一) 中午1:50
> 收件人: "user-zh"
> 主题: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大
>
>
>
> hi,您好:
> 我改回增量模式重新收集了一些数据:
> 1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
> 2、checkpoint是interval设置的是5秒
> 3、目前这个作业是每分钟一个窗口
> 4、并行度设置的1,使用on-yarn模式
>
> 刚启动的时候,如下:
> <http://apache-flink.147419.n8.nabble.com/file/t793/6.png>;
>
> 18分钟后,如下:
> <http://apache-flink.147419.n8.nabble.com/file/t793/9.png>;
>
> checkpoints设置:
> <http://apache-flink.147419.n8.nabble.com/file/t793/conf.png>;
>
> hdfs上面大小:
> <http://apache-flink.147419.n8.nabble.com/file/t793/hdfs.png>;
>
> 页面上看到的大小:
> <http://apache-flink.147419.n8.nabble.com/file/t793/checkpoinsts1.png>;
>
>
> Congxian Qiu wrote
> > Hi   鱼子酱
> > 能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint
> size 的走势呢?另外可以的话,也麻烦你在每次
> > checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
> > 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月30日周四 上午10:43写道:
> >
> >> 感谢!
> >>
> >> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
> >> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
> >> StateBackend backend =new
> >>
> >>
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> >> StateBackend backend =new
> >>
> >>
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> >>
> >>
> >> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
> >> RocksDBStateBackend:
> >> &lt;
> http://apache-flink.147419.n8.nabble.com/file/t793/444.png&gt;
> >> FsStateBackend:
> >> &lt;
> http://apache-flink.147419.n8.nabble.com/file/t793/555.png&gt;
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-08-02 文章 鱼子酱
hi,您好:
我改回增量模式重新收集了一些数据:
1、数据处理速度:3000条每秒,是测试环境的,压力比较稳定,几乎没有波动
2、checkpoint是interval设置的是5秒
3、目前这个作业是每分钟一个窗口
4、并行度设置的1,使用on-yarn模式

刚启动的时候,如下:
 

18分钟后,如下:
 

checkpoints设置:
 

hdfs上面大小:
 

页面上看到的大小:
 


Congxian Qiu wrote
> Hi   鱼子酱
> 能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
> checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
> 另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?
> 
> Best,
> Congxian
> 
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月30日周四 上午10:43写道:
> 
>> 感谢!
>>
>> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
>> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
>> StateBackend backend =new
>>
>> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>> StateBackend backend =new
>>
>> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>>
>>
>> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
>> RocksDBStateBackend:
>> ;
>> FsStateBackend:
>> ;
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-30 文章 Congxian Qiu
Hi   鱼子酱
能否把在使用增量 checkpoint 的模式下,截图看一下 checkpoint size 的走势呢?另外可以的话,也麻烦你在每次
checkpoint 做完之后,到 hdfs 上 ls 一下 checkpoint 目录的大小。
另外有一个问题还需要回答一下,你的处理速度大概是多少,state 的更新频率能否评估一下呢?

Best,
Congxian


鱼子酱 <384939...@qq.com> 于2020年7月30日周四 上午10:43写道:

> 感谢!
>
> flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
> 能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
> StateBackend backend =new
>
> RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
> StateBackend backend =new
>
> FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
>
>
> 这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
> RocksDBStateBackend:
> 
> FsStateBackend:
> 
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);
StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);


这样看,有可能是RocksDBStateBackend增量模式这边可能存在一些问题。
RocksDBStateBackend:
 
FsStateBackend:
 




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,目前是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
 

FsStateBackend:
 





Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
> 
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> ;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> ;
>> ;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >> select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,commandId as request_id
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >> union all
>> >>
>> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >> )
>> >>
>> >>
>> >> source:
>> >>
>> >> create table log (
>> >>   eventTime bigint
>> >>   ,times timestamp(3)
>> >>   
>> >>   ,commandId integer
>> >>   ,watermark for times as times - interval '5' second
>> >> )
>> >> with(
>> >>  'connector' = 'kafka-0.10',
>> >>  'topic' = '……',
>> >>  'properties.bootstrap.servers' = '……',
>> >>  'properties.group.id' = '……',
>> >>  'scan.startup.mode' = 'latest-offset',
>> >>  'format' = 'json'
>> >> )
>> >>
>> >> sink1:
>> >> create table result (
>> >>   request_time varchar
>> >>   ,request_id integer
>> >>   ,request_cnt bigint
>> >>   ,avg_resptime double
>> >>   ,stddev_resptime double
>> >>   ,insert_time varchar
>> >> ) with (
>> >>   'connector' = 'kafka-0.10',
>> >>   'topic' = '……',
>> >>   'properties.bootstrap.servers' = '……',
>> >>   'properties.group.id' = '……',
>> >>   'scan.startup.mode' = 'latest-offset',
>> >>   'format' = 'json'
>> >> )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,有可能是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
 

FsStateBackend:
 


Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
> 
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> ;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> ;
>> ;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >> select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,commandId as request_id
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >> union all
>> >>
>> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >> )
>> >>
>> >>
>> >> source:
>> >>
>> >> create table log (
>> >>   eventTime bigint
>> >>   ,times timestamp(3)
>> >>   
>> >>   ,commandId integer
>> >>   ,watermark for times as times - interval '5' second
>> >> )
>> >> with(
>> >>  'connector' = 'kafka-0.10',
>> >>  'topic' = '……',
>> >>  'properties.bootstrap.servers' = '……',
>> >>  'properties.group.id' = '……',
>> >>  'scan.startup.mode' = 'latest-offset',
>> >>  'format' = 'json'
>> >> )
>> >>
>> >> sink1:
>> >> create table result (
>> >>   request_time varchar
>> >>   ,request_id integer
>> >>   ,request_cnt bigint
>> >>   ,avg_resptime double
>> >>   ,stddev_resptime double
>> >>   ,insert_time varchar
>> >> ) with (
>> >>   'connector' = 'kafka-0.10',
>> >>   'topic' = '……',
>> >>   'properties.bootstrap.servers' = '……',
>> >>   'properties.group.id' = '……',
>> >>   'scan.startup.mode' = 'latest-offset',
>> >>   'format' = 'json'
>> >> )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.c

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-29 文章 鱼子酱
感谢!

flink1.11.1版本里面,我尝试了下面两种backend,目前运行了20多个小时,
能够看到状态的大小在一个区间内波动,没有发现一直增长的情况了。
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

StateBackend backend =new
FsStateBackend("hdfs:///checkpoints-data/"+yamlReader.getValueByKey("jobName").toString()+"/",false);

这样看,有可能是RocksDBStateBackend*增量模式*这边可能存在一些问题。

下面两种都能成功清理
RocksDBStateBackend:
 

FsStateBackend:
 


Benchao Li-2 wrote
> 这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
> 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
> state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月29日周三 上午9:47写道:
> 
>> 您好:
>>
>> 我按照您说的试了看了一下watermark,
>> 发现可以 正常更新,相关的计算结果也没发现问题。
>> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
>> ;
>> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
>> ;
>> ;
>>
>>
>>
>> Congxian Qiu wrote
>> > Hi
>> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
>> 越来越大的情况,或许可以检查下
>> > watermark[1]
>> >
>> > [1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > 鱼子酱 <
>>
>> > 384939718@
>>
>> >> 于2020年7月28日周二 下午2:45写道:
>> >
>> >> Hi,社区的各位大家好:
>> >> 我目前生产上面使用的是1.8.2版本,相对稳定
>> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>> >>
>> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> >> 状态后端使用的是rocksdb 的增量模式
>> >> StateBackend backend =new
>> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> >> 设置了官网文档中找到的删除策略:
>> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> >> Time.minutes(7));
>> >>
>> >> 请问是我使用的方式不对吗?
>> >>
>> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>> >>
>> >>
>> >>
>> >> 版本影响:flink1.10.1 flink1.11.1
>> >> planner:blink planner
>> >> source : kafka source
>> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> sql:
>> >> insert into  result
>> >> select request_time ,request_id ,request_cnt ,avg_resptime
>> >> ,stddev_resptime ,terminal_cnt
>> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> >> from
>> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,commandId as request_id
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>> >>
>> >> union all
>> >>
>> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> >> ,
>> >> ,count(*) as request_cnt
>> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> >> stddev_resptime
>> >> from log
>> >> where
>> >> commandId in (104005 ,204005 ,404005)
>> >> and errCode=0 and attr=0
>> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> >> )
>> >>
>> >>
>> >> source:
>> >>
>> >> create table log (
>> >>   eventTime bigint
>> >>   ,times timestamp(3)
>> >>   
>> >>   ,commandId integer
>> >>   ,watermark for times as times - interval '5' second
>> >> )
>> >> with(
>> >>  'connector' = 'kafka-0.10',
>> >>  'topic' = '……',
>> >>  'properties.bootstrap.servers' = '……',
>> >>  'properties.group.id' = '……',
>> >>  'scan.startup.mode' = 'latest-offset',
>> >>  'format' = 'json'
>> >> )
>> >>
>> >> sink1:
>> >> create table result (
>> >>   request_time varchar
>> >>   ,request_id integer
>> >>   ,request_cnt bigint
>> >>   ,avg_resptime double
>> >>   ,stddev_resptime double
>> >>   ,insert_time varchar
>> >> ) with (
>> >>   'connector' = 'kafka-0.10',
>> >>   'topic' = '……',
>> >>   'properties.bootstrap.servers' = '……',
>> >>   'properties.group.id' = '……',
>> >>   'scan.startup.mode' = 'latest-offset',
>> >>   'format' = 'json'
>> >> )
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Sent from: http://apache-flink.147419.n8.nabble.com/
>> >>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.c

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 Dream-底限
hi 鱼子酱、
我当初这样用的时候状态也不清理(子查询+时间窗口+union),后来把时间窗口改成全局group函数,union改成订阅topic列表后,设置状态过期时间状态才清理。。。
后来看资料有的说分区数据不均衡导致水印不推进的话可能导致这种状态不清理的问题,但是我感觉不是水印导致的,水印导致的窗口应该不触发计算吧,感觉这里面有些bug,需要专业人士定位一下

鱼子酱 <384939...@qq.com> 于2020年7月29日周三 上午9:53写道:

> 您好:
>
> 我按照您说的试了看了一下watermark,
> 发现可以 正常更新,相关的计算结果也没发现问题。
> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
> 
> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
> 
> 
>
>
>
> Congxian Qiu wrote
> > Hi
> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
> 越来越大的情况,或许可以检查下
> > watermark[1]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月28日周二 下午2:45写道:
> >
> >> Hi,社区的各位大家好:
> >> 我目前生产上面使用的是1.8.2版本,相对稳定
> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
> >>
> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> >> 状态后端使用的是rocksdb 的增量模式
> >> StateBackend backend =new
> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> >> 设置了官网文档中找到的删除策略:
> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> >> Time.minutes(7));
> >>
> >> 请问是我使用的方式不对吗?
> >>
> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
> >>
> >>
> >>
> >> 版本影响:flink1.10.1 flink1.11.1
> >> planner:blink planner
> >> source : kafka source
> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >>
> >>
> >>
> >> sql:
> >> insert into  result
> >> select request_time ,request_id ,request_cnt ,avg_resptime
> >> ,stddev_resptime ,terminal_cnt
> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
> >> from
> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,commandId as request_id
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
> >>
> >> union all
> >>
> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
> >> )
> >>
> >>
> >> source:
> >>
> >> create table log (
> >>   eventTime bigint
> >>   ,times timestamp(3)
> >>   
> >>   ,commandId integer
> >>   ,watermark for times as times - interval '5' second
> >> )
> >> with(
> >>  'connector' = 'kafka-0.10',
> >>  'topic' = '……',
> >>  'properties.bootstrap.servers' = '……',
> >>  'properties.group.id' = '……',
> >>  'scan.startup.mode' = 'latest-offset',
> >>  'format' = 'json'
> >> )
> >>
> >> sink1:
> >> create table result (
> >>   request_time varchar
> >>   ,request_id integer
> >>   ,request_cnt bigint
> >>   ,avg_resptime double
> >>   ,stddev_resptime double
> >>   ,insert_time varchar
> >> ) with (
> >>   'connector' = 'kafka-0.10',
> >>   'topic' = '……',
> >>   'properties.bootstrap.servers' = '……',
> >>   'properties.group.id' = '……',
> >>   'scan.startup.mode' = 'latest-offset',
> >>   'format' = 'json'
> >> )
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 Benchao Li
这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。

鱼子酱 <384939...@qq.com> 于2020年7月29日周三 上午9:47写道:

> 您好:
>
> 我按照您说的试了看了一下watermark,
> 发现可以 正常更新,相关的计算结果也没发现问题。
> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
> 
> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
> 
> 
>
>
>
> Congxian Qiu wrote
> > Hi
> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
> 越来越大的情况,或许可以检查下
> > watermark[1]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月28日周二 下午2:45写道:
> >
> >> Hi,社区的各位大家好:
> >> 我目前生产上面使用的是1.8.2版本,相对稳定
> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
> >>
> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> >> 状态后端使用的是rocksdb 的增量模式
> >> StateBackend backend =new
> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> >> 设置了官网文档中找到的删除策略:
> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> >> Time.minutes(7));
> >>
> >> 请问是我使用的方式不对吗?
> >>
> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
> >>
> >>
> >>
> >> 版本影响:flink1.10.1 flink1.11.1
> >> planner:blink planner
> >> source : kafka source
> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >>
> >>
> >>
> >> sql:
> >> insert into  result
> >> select request_time ,request_id ,request_cnt ,avg_resptime
> >> ,stddev_resptime ,terminal_cnt
> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
> >> from
> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,commandId as request_id
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
> >>
> >> union all
> >>
> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
> >> )
> >>
> >>
> >> source:
> >>
> >> create table log (
> >>   eventTime bigint
> >>   ,times timestamp(3)
> >>   
> >>   ,commandId integer
> >>   ,watermark for times as times - interval '5' second
> >> )
> >> with(
> >>  'connector' = 'kafka-0.10',
> >>  'topic' = '……',
> >>  'properties.bootstrap.servers' = '……',
> >>  'properties.group.id' = '……',
> >>  'scan.startup.mode' = 'latest-offset',
> >>  'format' = 'json'
> >> )
> >>
> >> sink1:
> >> create table result (
> >>   request_time varchar
> >>   ,request_id integer
> >>   ,request_cnt bigint
> >>   ,avg_resptime double
> >>   ,stddev_resptime double
> >>   ,insert_time varchar
> >> ) with (
> >>   'connector' = 'kafka-0.10',
> >>   'topic' = '……',
> >>   'properties.bootstrap.servers' = '……',
> >>   'properties.group.id' = '……',
> >>   'scan.startup.mode' = 'latest-offset',
> >>   'format' = 'json'
> >> )
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 鱼子酱
您好:

我按照您说的试了看了一下watermark,
发现可以 正常更新,相关的计算结果也没发现问题。
1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
 
2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
 
 



Congxian Qiu wrote
> Hi
> SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
> watermark[1]
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> 
> Best,
> Congxian
> 
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月28日周二 下午2:45写道:
> 
>> Hi,社区的各位大家好:
>> 我目前生产上面使用的是1.8.2版本,相对稳定
>> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>>
>> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> 状态后端使用的是rocksdb 的增量模式
>> StateBackend backend =new
>> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> 设置了官网文档中找到的删除策略:
>> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> Time.minutes(7));
>>
>> 请问是我使用的方式不对吗?
>>
>> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>>
>>
>>
>> 版本影响:flink1.10.1 flink1.11.1
>> planner:blink planner
>> source : kafka source
>> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>>
>>
>>
>> sql:
>> insert into  result
>> select request_time ,request_id ,request_cnt ,avg_resptime
>> ,stddev_resptime ,terminal_cnt
>> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> from
>> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> ,commandId as request_id
>> ,count(*) as request_cnt
>> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>> from log
>> where
>> commandId in (104005 ,204005 ,404005)
>> and errCode=0 and attr=0
>> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>>
>> union all
>>
>> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> ,
>> ,count(*) as request_cnt
>> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>> from log
>> where
>> commandId in (104005 ,204005 ,404005)
>> and errCode=0 and attr=0
>> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> )
>>
>>
>> source:
>>
>> create table log (
>>   eventTime bigint
>>   ,times timestamp(3)
>>   
>>   ,commandId integer
>>   ,watermark for times as times - interval '5' second
>> )
>> with(
>>  'connector' = 'kafka-0.10',
>>  'topic' = '……',
>>  'properties.bootstrap.servers' = '……',
>>  'properties.group.id' = '……',
>>  'scan.startup.mode' = 'latest-offset',
>>  'format' = 'json'
>> )
>>
>> sink1:
>> create table result (
>>   request_time varchar
>>   ,request_id integer
>>   ,request_cnt bigint
>>   ,avg_resptime double
>>   ,stddev_resptime double
>>   ,insert_time varchar
>> ) with (
>>   'connector' = 'kafka-0.10',
>>   'topic' = '……',
>>   'properties.bootstrap.servers' = '……',
>>   'properties.group.id' = '……',
>>   'scan.startup.mode' = 'latest-offset',
>>   'format' = 'json'
>> )
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 Congxian Qiu
Hi
SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
watermark[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html

Best,
Congxian


鱼子酱 <384939...@qq.com> 于2020年7月28日周二 下午2:45写道:

> Hi,社区的各位大家好:
> 我目前生产上面使用的是1.8.2版本,相对稳定
> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>
> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> 状态后端使用的是rocksdb 的增量模式
> StateBackend backend =new
> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> 设置了官网文档中找到的删除策略:
> TableConfig tableConfig = streamTableEnvironment.getConfig();
> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> Time.minutes(7));
>
> 请问是我使用的方式不对吗?
>
> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>
>
>
> 版本影响:flink1.10.1 flink1.11.1
> planner:blink planner
> source : kafka source
> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>
>
>
> sql:
> insert into  result
> select request_time ,request_id ,request_cnt ,avg_resptime
> ,stddev_resptime ,terminal_cnt
> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19) from
> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> ,commandId as request_id
> ,count(*) as request_cnt
> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
> from log
> where
> commandId in (104005 ,204005 ,404005)
> and errCode=0 and attr=0
> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>
> union all
>
> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> ,
> ,count(*) as request_cnt
> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
> from log
> where
> commandId in (104005 ,204005 ,404005)
> and errCode=0 and attr=0
> group by TUMBLE(times, INTERVAL '1' MINUTE)
> )
>
>
> source:
>
> create table log (
>   eventTime bigint
>   ,times timestamp(3)
>   
>   ,commandId integer
>   ,watermark for times as times - interval '5' second
> )
> with(
>  'connector' = 'kafka-0.10',
>  'topic' = '……',
>  'properties.bootstrap.servers' = '……',
>  'properties.group.id' = '……',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> )
>
> sink1:
> create table result (
>   request_time varchar
>   ,request_id integer
>   ,request_cnt bigint
>   ,avg_resptime double
>   ,stddev_resptime double
>   ,insert_time varchar
> ) with (
>   'connector' = 'kafka-0.10',
>   'topic' = '……',
>   'properties.bootstrap.servers' = '……',
>   'properties.group.id' = '……',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> )
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>