Re: 求助:FLINKSQL1.10实时统计累计UV
感觉不太应该有这种情况,你用的是blink planner么? x <35907...@qq.com> 于2020年7月6日周一 下午1:24写道: > sorry,我说错了,确实没有,都是group agg. > > 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理. > > > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年7月6日(星期一) 中午12:52 > 收件人:"user-zh" > 主题:Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 > 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time > > x <35907...@qq.com 于2020年7月6日周一 上午11:15写道: > > 版本是1.10.1,最后sink的时候确实是一个window里面做count > distinct操作。请问是只要计算过程中含有一个window里面做count > > distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupnbsp;DATE_FORMAT(rowtm, > '-MM-dd') 这个sql对应的状态很大。代码如下: > val rt_totaluv_view : Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd > HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM source > GROUP BY DATE_FORMAT(rowtm, '-MM-dd') > """) > tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) > > val totaluvTmp = > tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) > .filter( line =gt; line._1 == true ).map( line > =gt; line._2 ) > > val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) > > tabEnv.sqlUpdate( > s""" > INSERT INTO mysql_totaluv > SELECT _1,MAX(_2) > FROM $totaluvTabTmp > GROUP BY _1 > """) > --nbsp;原始邮件nbsp;-- > 发件人:nbsp;"Benchao Li" 发送时间:nbsp;2020年7月3日(星期五) 晚上9:47 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, > 这个已经在1.11中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-17942 > > x <35907...@qq.comgt; 于2020年7月3日周五 下午4:34写道: > > gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, > gt; > gt; > > 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 > gt; > gt; > gt; > gt; > gt; > --amp;nbsp;原始邮件amp;nbsp;-- > gt; 发件人:amp;nbsp;"Jark Wu" gt; 发送时间:amp;nbsp;2020年6月18日(星期四) 中午12:16 > gt; 收件人:amp;nbsp;"user-zh" amp;gt;; > gt; > gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > gt; > gt; > gt; > gt; 是的,我觉得这样子是能绕过的。 > gt; > gt; On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comamp;gt; > wrote: > gt; > gt; amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > gt; amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( > gt; amp;gt;amp;nbsp;amp;nbsp; """ > gt; > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT > MAX(DATE_FORMAT(ts, '-MM-dd > gt; HH:mm:00')) > gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv > gt; > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM > user_behavioramp;nbsp;amp;nbsp;amp;nbsp; GROUP BY > gt; DATE_FORMAT(ts, > '-MM-dd')amp;nbsp;amp;nbsp;amp;nbsp; """) > gt; amp;gt; > gt; amp;gt; val > resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > gt; amp;gt;amp;nbsp;amp;nbsp; > gt; > .filter(line=amp;amp;gt;line._1==true).map(line=amp;amp;gt;line._2) > gt; amp;gt; > gt; amp;gt; val res= tabEnv.fromDataStream(resTmpStream) > gt; amp;gt; tabEnv.sqlUpdate( > gt; amp;gt;amp;nbsp;amp;nbsp; s""" > gt; > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT > INTO > rt_totaluv > gt; > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT > _1,MAX(_2) > gt; > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM > $res > gt; > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP > BY _1 > gt; > amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; > > --amp;amp;nbsp;原始邮件amp;amp;nbsp;-- > gt; amp;gt; 发件人:amp;amp;nbsp;"Jark Wu"< > imj...@gmail.comamp;amp;gt;; > gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午1:55 > gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"< > user-zh@flink.apache.org > amp;amp;gt;; > gt; amp;gt; > gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; 在 Flink 1.11 中,你可以尝试这样: > gt; amp;gt; > gt; amp;gt; CREATE TABLE mysql ( > gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; time_str > STRING, > gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; uv BIGINT, > gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; PRIMARY > KEY (ts) NOT ENFORCED >
Re: 求助:FLINKSQL1.10实时统计累计UV
我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time x <35907...@qq.com> 于2020年7月6日周一 上午11:15写道: > 版本是1.10.1,最后sink的时候确实是一个window里面做count > distinct操作。请问是只要计算过程中含有一个window里面做count > distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupDATE_FORMAT(rowtm, > '-MM-dd') 这个sql对应的状态很大。代码如下: > val rt_totaluv_view : Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM source > GROUP BY DATE_FORMAT(rowtm, '-MM-dd') > """) > tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) > > val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) > .filter( line = line._1 == true ).map( line = line._2 ) > > val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) > > tabEnv.sqlUpdate( > s""" > INSERT INTO mysql_totaluv > SELECT _1,MAX(_2) > FROM $totaluvTabTmp > GROUP BY _1 > """) > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年7月3日(星期五) 晚上9:47 > 收件人:"user-zh" > 主题:Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, > 这个已经在1.11中修复了。 > > [1] https://issues.apache.org/jira/browse/FLINK-17942 > > x <35907...@qq.com 于2020年7月3日周五 下午4:34写道: > > 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, > > > 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 > > > > > --nbsp;原始邮件nbsp;-- > 发件人:nbsp;"Jark Wu" 发送时间:nbsp;2020年6月18日(星期四) 中午12:16 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 是的,我觉得这样子是能绕过的。 > > On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comgt; wrote: > > gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > gt; val resTmpTab: Table = tabEnv.sqlQuery( > gt;nbsp;nbsp; """ > gt;nbsp;nbsp;nbsp;nbsp; SELECT > MAX(DATE_FORMAT(ts, '-MM-dd > HH:mm:00')) > gt; time_str,COUNT(DISTINCT userkey) uv > gt;nbsp;nbsp;nbsp;nbsp; FROM > user_behaviornbsp;nbsp;nbsp; GROUP BY > DATE_FORMAT(ts, '-MM-dd')nbsp;nbsp;nbsp; """) > gt; > gt; val > resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > gt;nbsp;nbsp; > .filter(line=amp;gt;line._1==true).map(line=amp;gt;line._2) > gt; > gt; val res= tabEnv.fromDataStream(resTmpStream) > gt; tabEnv.sqlUpdate( > gt;nbsp;nbsp; s""" > gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO > rt_totaluv > gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2) > gt;nbsp;nbsp;nbsp;nbsp; FROM $res > gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1 > gt;nbsp;nbsp;nbsp;nbsp; """) > gt; > gt; > gt; > --amp;nbsp;原始邮件amp;nbsp;-- > gt; 发件人:amp;nbsp;"Jark Wu" gt; 发送时间:amp;nbsp;2020年6月17日(星期三) 中午1:55 > gt; 收件人:amp;nbsp;"user-zh" amp;gt;; > gt; > gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > gt; > gt; > gt; > gt; 在 Flink 1.11 中,你可以尝试这样: > gt; > gt; CREATE TABLE mysql ( > gt; amp;nbsp;amp;nbsp; time_str STRING, > gt; amp;nbsp;amp;nbsp; uv BIGINT, > gt; amp;nbsp;amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED > gt; ) WITH ( > gt; amp;nbsp;amp;nbsp; 'connector' = 'jdbc', > gt; amp;nbsp;amp;nbsp; 'url' = > 'jdbc:mysql://localhost:3306/mydatabase', > gt; amp;nbsp;amp;nbsp; 'table-name' = 'myuv' > gt; ); > gt; > gt; INSERT INTO mysql > gt; SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), > COUNT(DISTINCTamp;nbsp; > gt; user_id) > gt; FROM user_behavior; > gt; > gt; On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.comamp;gt; > wrote: > gt; > gt; amp;gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > gt; amp;gt; sink表这个样式 > gt; amp;gt; tm uv > gt; amp;gt; 2020/06/17 13:46:00 1 > gt; amp;gt; 2020/06/17 13:47:00 2 > gt; amp;gt; 2020/06/17 13:48:00 3 > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; group by 日期的话,分钟如何获取 > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; > > --amp;amp;nbsp;原始邮件amp;amp;nbsp;-- > gt; amp;gt; 发件人:amp;amp;nbsp;"Benchao Li"< > libenc...@apache.org > amp;amp;gt;; > gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午11:46 > gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"< > user-zh@flink.apache.org > amp;amp;gt;; > gt; amp;gt; > gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > gt; amp;gt; > gt; amp;
Re: 求助:FLINKSQL1.10实时统计累计UV
你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 这个已经在1.11中修复了。 [1] https://issues.apache.org/jira/browse/FLINK-17942 x <35907...@qq.com> 于2020年7月3日周五 下午4:34写道: > 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, > > 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。 > > > > > --原始邮件-- > 发件人:"Jark Wu" 发送时间:2020年6月18日(星期四) 中午12:16 > 收件人:"user-zh" > 主题:Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 是的,我觉得这样子是能绕过的。 > > On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com wrote: > > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > val resTmpTab: Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(ts, '-MM-dd > HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM user_behavior GROUP BY > DATE_FORMAT(ts, '-MM-dd') """) > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > > .filter(line=gt;line._1==true).map(line=gt;line._2) > > val res= tabEnv.fromDataStream(resTmpStream) > tabEnv.sqlUpdate( > s""" > INSERT INTO rt_totaluv > SELECT _1,MAX(_2) > FROM $res > GROUP BY _1 > """) > > > --nbsp;原始邮件nbsp;-- > 发件人:nbsp;"Jark Wu" 发送时间:nbsp;2020年6月17日(星期三) 中午1:55 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 在 Flink 1.11 中,你可以尝试这样: > > CREATE TABLE mysql ( > nbsp;nbsp; time_str STRING, > nbsp;nbsp; uv BIGINT, > nbsp;nbsp; PRIMARY KEY (ts) NOT ENFORCED > ) WITH ( > nbsp;nbsp; 'connector' = 'jdbc', > nbsp;nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > nbsp;nbsp; 'table-name' = 'myuv' > ); > > INSERT INTO mysql > SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), > COUNT(DISTINCTnbsp; > user_id) > FROM user_behavior; > > On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.comgt; wrote: > > gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > gt; sink表这个样式 > gt; tm uv > gt; 2020/06/17 13:46:00 1 > gt; 2020/06/17 13:47:00 2 > gt; 2020/06/17 13:48:00 3 > gt; > gt; > gt; group by 日期的话,分钟如何获取 > gt; > gt; > gt; > --amp;nbsp;原始邮件amp;nbsp;-- > gt; 发件人:amp;nbsp;"Benchao Li" amp;gt;; > gt; 发送时间:amp;nbsp;2020年6月17日(星期三) 中午11:46 > gt; 收件人:amp;nbsp;"user-zh" amp;gt;; > gt; > gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > gt; > gt; > gt; > gt; Hi, > gt; 我感觉这种场景可以有两种方式, > gt; 1. 可以直接用group by + mini batch > gt; 2. window聚合 + fast emit > gt; > gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, > '-MM-dd')。 > gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini > batch的开启也需要 > gt; 用参数[2] 来打开。 > gt; > gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > gt; fast > emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > gt; table.exec.emit.early-fire.enabled = true > gt; table.exec.emit.early-fire.delay = 60 s > gt; > gt; [1] > gt; > gt; > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > gt; [2] > gt; > gt; > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > gt; > gt; x <35907...@qq.comamp;gt; 于2020年6月17日周三 上午11:14写道: > gt; > gt; amp;gt; > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > gt; amp;gt; CREATE VIEW uv_per_10min AS > gt; amp;gt; SELECTamp;amp;nbsp; > gt; amp;gt; amp;amp;nbsp; > MAX(DATE_FORMAT(proctimeamp;amp;nbsp;, > '-MM-dd > gt; HH:mm:00'))amp;amp;nbsp;OVER w > gt; amp;gt; AS time_str,amp;amp;nbsp; > gt; amp;gt; amp;amp;nbsp; COUNT(DISTINCT user_id) OVER > w AS uv > gt; amp;gt; FROM user_behavior > gt; amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN > UNBOUNDED > PRECEDING AND > gt; amp;gt; CURRENT ROW); > gt; amp;gt; > gt; amp;gt; > gt; amp;gt; 想请教一下,应该如何处理? > gt; amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') > 这样可以吗,另外状态应该如何清理? > gt; amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > gt; amp;gt; 多谢 -- Best, Benchao Li
Re: 求助:FLINKSQL1.10实时统计累计UV
是的,我觉得这样子是能绕过的。 On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com> wrote: > 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? > val resTmpTab: Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""") > > val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) > .filter(line=line._1==true).map(line=line._2) > > val res= tabEnv.fromDataStream(resTmpStream) > tabEnv.sqlUpdate( > s""" > INSERT INTO rt_totaluv > SELECT _1,MAX(_2) > FROM $res > GROUP BY _1 > """) > > > ------原始邮件-- > 发件人:"Jark Wu" 发送时间:2020年6月17日(星期三) 中午1:55 > 收件人:"user-zh" > 主题:Re: 求助:FLINKSQL1.10实时统计累计UV > > > > 在 Flink 1.11 中,你可以尝试这样: > > CREATE TABLE mysql ( > time_str STRING, > uv BIGINT, > PRIMARY KEY (ts) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/mydatabase', > 'table-name' = 'myuv' > ); > > INSERT INTO mysql > SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT > user_id) > FROM user_behavior; > > On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com wrote: > > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > sink表这个样式 > tm uv > 2020/06/17 13:46:00 10000 > 2020/06/17 13:47:00 2 > 2020/06/17 13:48:00 3 > > > group by 日期的话,分钟如何获取 > > > --nbsp;原始邮件nbsp;-- > 发件人:nbsp;"Benchao Li" 发送时间:nbsp;2020年6月17日(星期三) 中午11:46 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV > > > > Hi, > 我感觉这种场景可以有两种方式, > 1. 可以直接用group by + mini batch > 2. window聚合 + fast emit > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。 > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > 用参数[2] 来打开。 > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <35907...@qq.comgt; 于2020年6月17日周三 上午11:14写道: > > gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > gt; CREATE VIEW uv_per_10min AS > gt; SELECTamp;nbsp; > gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, > '-MM-dd > HH:mm:00'))amp;nbsp;OVER w > gt; AS time_str,amp;nbsp; > gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv > gt; FROM user_behavior > gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED > PRECEDING AND > gt; CURRENT ROW); > gt; > gt; > gt; 想请教一下,应该如何处理? > gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') > 这样可以吗,另外状态应该如何清理? > gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > gt; 多谢
Re: 求助:FLINKSQL1.10实时统计累计UV
在 Flink 1.11 中,你可以尝试这样: CREATE TABLE mysql ( time_str STRING, uv BIGINT, PRIMARY KEY (ts) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'myuv' ); INSERT INTO mysql SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT user_id) FROM user_behavior; On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote: > 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV, > sink表这个样式 > tm uv > 2020/06/17 13:46:00 1 > 2020/06/17 13:47:00 2 > 2020/06/17 13:48:00 3 > > > group by 日期的话,分钟如何获取 > > > --原始邮件-- > 发件人:"Benchao Li" 发送时间:2020年6月17日(星期三) 中午11:46 > 收件人:"user-zh" > 主题:Re: 求助:FLINKSQL1.10实时统计累计UV > > > > Hi, > 我感觉这种场景可以有两种方式, > 1. 可以直接用group by + mini batch > 2. window聚合 + fast emit > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。 > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > 用参数[2] 来打开。 > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <35907...@qq.com 于2020年6月17日周三 上午11:14写道: > > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > CREATE VIEW uv_per_10min AS > SELECTnbsp; > nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd > HH:mm:00'))nbsp;OVER w > AS time_str,nbsp; > nbsp; COUNT(DISTINCT user_id) OVER w AS uv > FROM user_behavior > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW); > > > 想请教一下,应该如何处理? > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理? > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > 多谢
Re: 求助:FLINKSQL1.10实时统计累计UV
本超提的两个方案也是阿里内部解决这个问题最常用的方式,但是 1.10 会有 primary key 的限制,要等到 1.11 才行。 另外这两个方案在追数据时,都可能会有毛刺现象(有几分钟没有值,因为数据追太快,跳过了)。 On Wed, 17 Jun 2020 at 11:46, Benchao Li wrote: > Hi, > 我感觉这种场景可以有两种方式, > 1. 可以直接用group by + mini batch > 2. window聚合 + fast emit > > 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。 > 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 > 用参数[2] 来打开。 > > 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 > fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道: > > > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > > CREATE VIEW uv_per_10min AS > > SELECT > > MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER > w > > AS time_str, > > COUNT(DISTINCT user_id) OVER w AS uv > > FROM user_behavior > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > > CURRENT ROW); > > > > > > 想请教一下,应该如何处理? > > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理? > > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > > 多谢 >
Re: 求助:FLINKSQL1.10实时统计累计UV
Hi, 我感觉这种场景可以有两种方式, 1. 可以直接用group by + mini batch 2. window聚合 + fast emit 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要 用参数[2] 来打开。 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。 fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下: table.exec.emit.early-fire.enabled = true table.exec.emit.early-fire.delay = 60 s [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道: > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计 > CREATE VIEW uv_per_10min AS > SELECT > MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w > AS time_str, > COUNT(DISTINCT user_id) OVER w AS uv > FROM user_behavior > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW); > > > 想请教一下,应该如何处理? > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理? > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧 > 多谢