回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-08 文章 seeksst
你好,我刚刚切换到1.10.1版本,还是可以设置的,这个接口是在StreamTableEnvironment里面,使用的是flink-table-api-java-bridge,如果你使用的是scala版本,这个我不是很了解,理论应该差不多。
在1.10版本中,我一般是这么写的:
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
也不需要进行sqlUpdate传入,直接tableEnv.getConfig().setIdleStateRetentionTime() 设置就可以了。
另外,你指的状态越来越大需要描述的更详细一点,具体现象和状态后端的选择。之前也提到了,使用rocksdb做状态后端,需要开启ttl配置,
否则现象就是checkpoint的文件大小会不断增大。


原始邮件
发件人:x35907...@qq.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年7月8日(周三) 11:08
主题:回复: 求助:FLINKSQL1.10实时统计累计UV


您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的 void sqlUpdate(String stmt); 
--nbsp;原始邮件nbsp;-- 
发件人:nbsp;"seeksst"seeksst@163.comgt;; 发送时间:nbsp;2020年7月7日(星期二) 中午11:35 
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;; 主题:nbsp;回复: 
求助:FLINKSQL1.10实时统计累计UV 
我看你代码上是sqlUpdate,tableConfig是另外设置的,需要作为入参一同放入sqlUpdate中, 使用方法sqlUpdate(str, 
config) 另外如果你使用的是rocksdb,需要开启rocksdb的ttl 
state.backend.rocksdb.ttl.compaction.filter.enabled设置成true 低版本这个参数默认是false 原始邮件 
发件人:x35907...@qq.com 收件人:user-zhuser...@flink.apache.org 发送时间:2020年7月7日(周二) 
10:46 主题:回复: 求助:FLINKSQL1.10实时统计累计UV 是blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() 
--nbsp;原始邮件nbsp;-- 发件人:nbsp;"Benchao 
Li"libenchao@apache.orggt;; 发送时间:nbsp;2020年7月6日(星期一) 晚上11:11 
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;; 主题:nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV 感觉不太应该有这种情况,你用的是blink planner么? x 35907418@qq.comgt; 
于2020年7月6日周一 下午1:24写道: gt; sorry,我说错了,确实没有,都是group agg. gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
 gt; gt; gt; --amp;nbsp;原始邮件amp;nbsp;-- gt; 
发件人:amp;nbsp;"Benchao Li"libenchao@apache.orgamp;gt;; gt; 
发送时间:amp;nbsp;2020年7月6日(星期一) 中午12:52 gt; 
收件人:amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;gt;; gt; gt; 主题:amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; gt; gt; gt; 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 gt; 
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 gt; gt; [1] gt; gt; 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp

回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-06 文章 seeksst
我看你代码上是sqlUpdate,tableConfig是另外设置的,需要作为入参一同放入sqlUpdate中,
使用方法sqlUpdate(str, config)
另外如果你使用的是rocksdb,需要开启rocksdb的ttl
state.backend.rocksdb.ttl.compaction.filter.enabled设置成true
低版本这个参数默认是false




原始邮件
发件人:x35907...@qq.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年7月7日(周二) 10:46
主题:回复: 求助:FLINKSQL1.10实时统计累计UV


是blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() 
--nbsp;原始邮件nbsp;-- 发件人:nbsp;"Benchao 
Li"libenchao@apache.orggt;; 发送时间:nbsp;2020年7月6日(星期一) 晚上11:11 
收件人:nbsp;"user-zh"user-zh@flink.apache.orggt;; 主题:nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV 感觉不太应该有这种情况,你用的是blink planner么? x 35907418@qq.comgt; 
于2020年7月6日周一 下午1:24写道: gt; sorry,我说错了,确实没有,都是group agg. gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
 gt; gt; gt; --amp;nbsp;原始邮件amp;nbsp;-- gt; 
发件人:amp;nbsp;"Benchao Li"libenchao@apache.orgamp;gt;; gt; 
发送时间:amp;nbsp;2020年7月6日(星期一) 中午12:52 gt; 
收件人:amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;gt;; gt; gt; 主题:amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; gt; gt; gt; 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 gt; 
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 gt; gt; [1] gt; gt; 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
--amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; 
amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark 
Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 
发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 
收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; 
amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 
35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; 
amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 SELECT 

Re: flink sql能否显示地创造一列null行

2020-06-29 文章 seeksst
@Jingsong Li 尝试了一下,cast确实可行,解锁新知识,thanks.


原始邮件
发件人:lakeshenshenleifight...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年6月30日(周二) 10:07
主题:Re: flink sql能否显示地创造一列null行


或者补齐一个非 Null ,但是又不影响业务逻辑的数值 Jingsong Li jingsongl...@gmail.com 于2020年6月30日周二 
上午9:58写道:  Hi,   我记得NULL的literal是可以的,不过需要cast成确定的类型,比如 select CAST(null AS 
VARCHAR);  你试试。   Best,  Jingsong   On Tue, Jun 30, 2020 at 9:40 AM seeksst 
seek...@163.com wrote:Hi,   按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。  
 显示的NULL是不行的,你可以使用更复杂的方式进行对齐:   case when 1 = 2 then 1 end as 字段   
1永远不可能等于2,又没有else分支,所以结果是会返回null.   原始邮件   发件人:naisili 
yuanyuanlong1...@gmail.com   收件人:user-zhuser...@flink.apache.org   
发送时间:2020年6月30日(周二) 09:31   主题:flink sql能否显示地创造一列null行   
由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT NULL , aaa, bbb, NULL   FROM 
() --  Best, Jingsong Lee

回复:flink sql能否显示地创造一列null行

2020-06-29 文章 seeksst
Hi,


  按照你的意思是想将两个不同的数据集进行union,但是由于字段不同需要补充NULL。
  显示的NULL是不行的,你可以使用更复杂的方式进行对齐:
case when 1 = 2 then 1 end as 字段
  1永远不可能等于2,又没有else分支,所以结果是会返回null.


原始邮件
发件人:naisili yuanyuanlong1...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年6月30日(周二) 09:31
主题:flink sql能否显示地创造一列null行


由于union的操作,结果集的列数必须一致,我能否加入这一列,语法如下: UNION SELECT NULL , aaa, bbb, NULL FROM ()