Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-06 文章 Benchao Li
感觉不太应该有这种情况,你用的是blink planner么?

x <35907...@qq.com> 于2020年7月6日周一 下午1:24写道:

> sorry,我说错了,确实没有,都是group agg.
>
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),但是状态还是越来越大,没有按既定配置自动清理.
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年7月6日(星期一) 中午12:52
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
> 这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> x <35907...@qq.com 于2020年7月6日周一 上午11:15写道:
>
>  版本是1.10.1,最后sink的时候确实是一个window里面做count
>  distinct操作。请问是只要计算过程中含有一个window里面做count
> 
> distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupnbsp;DATE_FORMAT(rowtm,
>  '-MM-dd') 这个sql对应的状态很大。代码如下:
>  val rt_totaluv_view : Table = tabEnv.sqlQuery(
>  """
>  SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd
> HH:mm:00'))
>  time_str,COUNT(DISTINCT userkey) uv
>  FROM source
>  GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
>  """)
>  tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
> 
>  val totaluvTmp =
> tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
>  .filter( line =gt; line._1 == true ).map( line
> =gt; line._2 )
> 
>  val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
> 
>  tabEnv.sqlUpdate(
>  s"""
>  INSERT INTO mysql_totaluv
>  SELECT _1,MAX(_2)
>  FROM $totaluvTabTmp
>  GROUP BY _1
>  """)
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Benchao Li"  发送时间:nbsp;2020年7月3日(星期五) 晚上9:47
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
>  这个已经在1.11中修复了。
> 
>  [1] https://issues.apache.org/jira/browse/FLINK-17942
> 
>  x <35907...@qq.comgt; 于2020年7月3日周五 下午4:34写道:
> 
>  gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
>  gt;
>  gt;
> 
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:amp;nbsp;"Jark Wu"  gt; 发送时间:amp;nbsp;2020年6月18日(星期四) 中午12:16
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt;
>  gt;
>  gt;
>  gt; 是的,我觉得这样子是能绕过的。
>  gt;
>  gt; On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comamp;gt;
> wrote:
>  gt;
>  gt; amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
>  gt; amp;gt; val resTmpTab: Table = tabEnv.sqlQuery(
>  gt; amp;gt;amp;nbsp;amp;nbsp; """
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT
>  MAX(DATE_FORMAT(ts, '-MM-dd
>  gt; HH:mm:00'))
>  gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM
>  user_behavioramp;nbsp;amp;nbsp;amp;nbsp; GROUP BY
>  gt; DATE_FORMAT(ts,
> '-MM-dd')amp;nbsp;amp;nbsp;amp;nbsp; """)
>  gt; amp;gt;
>  gt; amp;gt; val
>  resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>  gt; amp;gt;amp;nbsp;amp;nbsp;
>  gt;
> .filter(line=amp;amp;gt;line._1==true).map(line=amp;amp;gt;line._2)
>  gt; amp;gt;
>  gt; amp;gt; val res= tabEnv.fromDataStream(resTmpStream)
>  gt; amp;gt; tabEnv.sqlUpdate(
>  gt; amp;gt;amp;nbsp;amp;nbsp; s"""
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT
> INTO
>  rt_totaluv
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT
> _1,MAX(_2)
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM
> $res
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP
> BY _1
>  gt;
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """)
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:amp;amp;nbsp;"Jark Wu"<
> imj...@gmail.comamp;amp;gt;;
>  gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午1:55
>  gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"<
> user-zh@flink.apache.org
>  amp;amp;gt;;
>  gt; amp;gt;
>  gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; 在 Flink 1.11 中,你可以尝试这样:
>  gt; amp;gt;
>  gt; amp;gt; CREATE TABLE mysql (
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; time_str
> STRING,
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; uv BIGINT,
>  gt; amp;gt; amp;amp;nbsp;amp;amp;nbsp; PRIMARY
> KEY (ts) NOT ENFORCED
>

Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-05 文章 Benchao Li
我看你的SQL里面并没有用到窗口呀,只是一个普通的聚合。
这种聚合需要设置合理的state retention[1]时间的,要不然状态默认是永远不清理的。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <35907...@qq.com> 于2020年7月6日周一 上午11:15写道:

> 版本是1.10.1,最后sink的时候确实是一个window里面做count
> distinct操作。请问是只要计算过程中含有一个window里面做count
> distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupDATE_FORMAT(rowtm,
> '-MM-dd') 这个sql对应的状态很大。代码如下:
> val rt_totaluv_view : Table = tabEnv.sqlQuery(
>   """
> SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
> FROM source
> GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
> """)
> tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
>
> val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
>   .filter( line = line._1 == true ).map( line = line._2 )
>
> val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
>
> tabEnv.sqlUpdate(
>   s"""
> INSERT INTO mysql_totaluv
> SELECT _1,MAX(_2)
> FROM $totaluvTabTmp
> GROUP BY _1
>     """)
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年7月3日(星期五) 晚上9:47
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
> 这个已经在1.11中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17942
>
> x <35907...@qq.com 于2020年7月3日周五 下午4:34写道:
>
>  您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
> 
> 
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Jark Wu"  发送时间:nbsp;2020年6月18日(星期四) 中午12:16
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  是的,我觉得这样子是能绕过的。
> 
>  On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.comgt; wrote:
> 
>  gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
>  gt; val resTmpTab: Table = tabEnv.sqlQuery(
>  gt;nbsp;nbsp; """
>  gt;nbsp;nbsp;nbsp;nbsp; SELECT
> MAX(DATE_FORMAT(ts, '-MM-dd
>  HH:mm:00'))
>  gt; time_str,COUNT(DISTINCT userkey) uv
>  gt;nbsp;nbsp;nbsp;nbsp; FROM
> user_behaviornbsp;nbsp;nbsp; GROUP BY
>  DATE_FORMAT(ts, '-MM-dd')nbsp;nbsp;nbsp; """)
>  gt;
>  gt; val
> resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>  gt;nbsp;nbsp;
>  .filter(line=amp;gt;line._1==true).map(line=amp;gt;line._2)
>  gt;
>  gt; val res= tabEnv.fromDataStream(resTmpStream)
>  gt; tabEnv.sqlUpdate(
>  gt;nbsp;nbsp; s"""
>  gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO
> rt_totaluv
>  gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2)
>  gt;nbsp;nbsp;nbsp;nbsp; FROM $res
>  gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1
>  gt;nbsp;nbsp;nbsp;nbsp; """)
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:amp;nbsp;"Jark Wu"  gt; 发送时间:amp;nbsp;2020年6月17日(星期三) 中午1:55
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt;
>  gt;
>  gt;
>  gt; 在 Flink 1.11 中,你可以尝试这样:
>  gt;
>  gt; CREATE TABLE mysql (
>  gt; amp;nbsp;amp;nbsp; time_str STRING,
>  gt; amp;nbsp;amp;nbsp; uv BIGINT,
>  gt; amp;nbsp;amp;nbsp; PRIMARY KEY (ts) NOT ENFORCED
>  gt; ) WITH (
>  gt; amp;nbsp;amp;nbsp; 'connector' = 'jdbc',
>  gt; amp;nbsp;amp;nbsp; 'url' =
> 'jdbc:mysql://localhost:3306/mydatabase',
>  gt; amp;nbsp;amp;nbsp; 'table-name' = 'myuv'
>  gt; );
>  gt;
>  gt; INSERT INTO mysql
>  gt; SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')),
>  COUNT(DISTINCTamp;nbsp;
>  gt; user_id)
>  gt; FROM user_behavior;
>  gt;
>  gt; On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.comamp;gt;
> wrote:
>  gt;
>  gt; amp;gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
>  gt; amp;gt; sink表这个样式
>  gt; amp;gt; tm uv
>  gt; amp;gt; 2020/06/17 13:46:00 1
>  gt; amp;gt; 2020/06/17 13:47:00 2
>  gt; amp;gt; 2020/06/17 13:48:00 3
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; group by 日期的话,分钟如何获取
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:amp;amp;nbsp;"Benchao Li"<
> libenc...@apache.org
>  amp;amp;gt;;
>  gt; amp;gt; 发送时间:amp;amp;nbsp;2020年6月17日(星期三) 中午11:46
>  gt; amp;gt; 收件人:amp;amp;nbsp;"user-zh"<
> user-zh@flink.apache.org
>  amp;amp;gt;;
>  gt; amp;gt;
>  gt; amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt; amp;gt;
>  gt; amp;

Re: 求助:FLINKSQL1.10实时统计累计UV

2020-07-03 文章 Benchao Li
你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题,
这个已经在1.11中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-17942

x <35907...@qq.com> 于2020年7月3日周五 下午4:34写道:

> 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期,
>
> 我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
>
>
>
>
> --原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年6月18日(星期四) 中午12:16
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 是的,我觉得这样子是能绕过的。
>
> On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com wrote:
>
>  如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
>  val resTmpTab: Table = tabEnv.sqlQuery(
>  """
>  SELECT MAX(DATE_FORMAT(ts, '-MM-dd
> HH:mm:00'))
>  time_str,COUNT(DISTINCT userkey) uv
>  FROM user_behavior GROUP BY
> DATE_FORMAT(ts, '-MM-dd') """)
> 
>  val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
> 
> .filter(line=gt;line._1==true).map(line=gt;line._2)
> 
>  val res= tabEnv.fromDataStream(resTmpStream)
>  tabEnv.sqlUpdate(
>  s"""
>  INSERT INTO rt_totaluv
>  SELECT _1,MAX(_2)
>  FROM $res
>  GROUP BY _1
>  """)
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Jark Wu"  发送时间:nbsp;2020年6月17日(星期三) 中午1:55
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  在 Flink 1.11 中,你可以尝试这样:
> 
>  CREATE TABLE mysql (
>  nbsp;nbsp; time_str STRING,
>  nbsp;nbsp; uv BIGINT,
>  nbsp;nbsp; PRIMARY KEY (ts) NOT ENFORCED
>  ) WITH (
>  nbsp;nbsp; 'connector' = 'jdbc',
>  nbsp;nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>  nbsp;nbsp; 'table-name' = 'myuv'
>  );
> 
>  INSERT INTO mysql
>  SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')),
> COUNT(DISTINCTnbsp;
>  user_id)
>  FROM user_behavior;
> 
>  On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.comgt; wrote:
> 
>  gt; 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
>  gt; sink表这个样式
>  gt; tm uv
>  gt; 2020/06/17 13:46:00 1
>  gt; 2020/06/17 13:47:00 2
>  gt; 2020/06/17 13:48:00 3
>  gt;
>  gt;
>  gt; group by 日期的话,分钟如何获取
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:amp;nbsp;"Benchao Li" amp;gt;;
>  gt; 发送时间:amp;nbsp;2020年6月17日(星期三) 中午11:46
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
>  gt;
>  gt;
>  gt;
>  gt; Hi,
>  gt; 我感觉这种场景可以有两种方式,
>  gt; 1. 可以直接用group by + mini batch
>  gt; 2. window聚合 + fast emit
>  gt;
>  gt; 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm,
> '-MM-dd')。
>  gt; 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini
> batch的开启也需要
>  gt; 用参数[2] 来打开。
>  gt;
>  gt; 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
>  gt; fast
> emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
>  gt; table.exec.emit.early-fire.enabled = true
>  gt; table.exec.emit.early-fire.delay = 60 s
>  gt;
>  gt; [1]
>  gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
>  gt; [2]
>  gt;
>  gt;
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>  gt;
>  gt; x <35907...@qq.comamp;gt; 于2020年6月17日周三 上午11:14写道:
>  gt;
>  gt; amp;gt;
> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
>  gt; amp;gt; CREATE VIEW uv_per_10min AS
>  gt; amp;gt; SELECTamp;amp;nbsp;
>  gt; amp;gt; amp;amp;nbsp;
> MAX(DATE_FORMAT(proctimeamp;amp;nbsp;,
>  '-MM-dd
>  gt; HH:mm:00'))amp;amp;nbsp;OVER w
>  gt; amp;gt; AS time_str,amp;amp;nbsp;
>  gt; amp;gt; amp;amp;nbsp; COUNT(DISTINCT user_id) OVER
> w AS uv
>  gt; amp;gt; FROM user_behavior
>  gt; amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN
> UNBOUNDED
>  PRECEDING AND
>  gt; amp;gt; CURRENT ROW);
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt; 想请教一下,应该如何处理?
>  gt; amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
>  这样可以吗,另外状态应该如何清理?
>  gt; amp;gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
>  gt; amp;gt; 多谢



-- 

Best,
Benchao Li


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-17 文章 Jark Wu
是的,我觉得这样子是能绕过的。

On Thu, 18 Jun 2020 at 10:34, x <35907...@qq.com> wrote:

> 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗?
> val resTmpTab: Table = tabEnv.sqlQuery(
>   """
> SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00'))
> time_str,COUNT(DISTINCT userkey) uv
> FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""")
>
> val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
>   .filter(line=line._1==true).map(line=line._2)
>
> val res= tabEnv.fromDataStream(resTmpStream)
> tabEnv.sqlUpdate(
>   s"""
> INSERT INTO rt_totaluv
> SELECT _1,MAX(_2)
> FROM $res
> GROUP BY _1
> """)
>
>
> ------原始邮件--
> 发件人:"Jark Wu" 发送时间:2020年6月17日(星期三) 中午1:55
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> 在 Flink 1.11 中,你可以尝试这样:
>
> CREATE TABLE mysql (
>  time_str STRING,
>  uv BIGINT,
>  PRIMARY KEY (ts) NOT ENFORCED
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://localhost:3306/mydatabase',
>  'table-name' = 'myuv'
> );
>
> INSERT INTO mysql
> SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT
> user_id)
> FROM user_behavior;
>
> On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com wrote:
>
>  感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
>  sink表这个样式
>  tm uv
>  2020/06/17 13:46:00 10000
>  2020/06/17 13:47:00 2
>  2020/06/17 13:48:00 3
> 
> 
>  group by 日期的话,分钟如何获取
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"Benchao Li"  发送时间:nbsp;2020年6月17日(星期三) 中午11:46
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV
> 
> 
> 
>  Hi,
>  我感觉这种场景可以有两种方式,
>  1. 可以直接用group by + mini batch
>  2. window聚合 + fast emit
> 
>  对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
>  这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
>  用参数[2] 来打开。
> 
>  对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
>  fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
>  table.exec.emit.early-fire.enabled = true
>  table.exec.emit.early-fire.delay = 60 s
> 
>  [1]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
>  [2]
> 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
> 
>  x <35907...@qq.comgt; 于2020年6月17日周三 上午11:14写道:
> 
>  gt; 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
>  gt; CREATE VIEW uv_per_10min AS
>  gt; SELECTamp;nbsp;
>  gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;,
> '-MM-dd
>  HH:mm:00'))amp;nbsp;OVER w
>  gt; AS time_str,amp;nbsp;
>  gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
>  gt; FROM user_behavior
>  gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
> PRECEDING AND
>  gt; CURRENT ROW);
>  gt;
>  gt;
>  gt; 想请教一下,应该如何处理?
>  gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
> 这样可以吗,另外状态应该如何清理?
>  gt; PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
>  gt; 多谢


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 文章 Jark Wu
在 Flink 1.11 中,你可以尝试这样:

CREATE TABLE mysql (
   time_str STRING,
   uv BIGINT,
   PRIMARY KEY (ts) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'myuv'
);

INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')), COUNT(DISTINCT  user_id)
FROM user_behavior;

On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote:

> 感谢您的回复,您提到的"方法一"我还是有点不太理解,我需要每分钟输出一个累计UV,
> sink表这个样式
> tm uv
> 2020/06/17 13:46:00 1
> 2020/06/17 13:47:00 2
> 2020/06/17 13:48:00 3
>
>
> group by 日期的话,分钟如何获取
>
>
> --原始邮件--
> 发件人:"Benchao Li" 发送时间:2020年6月17日(星期三) 中午11:46
> 收件人:"user-zh"
> 主题:Re: 求助:FLINKSQL1.10实时统计累计UV
>
>
>
> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907...@qq.com 于2020年6月17日周三 上午11:14写道:
>
>  需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
>  CREATE VIEW uv_per_10min AS
>  SELECTnbsp;
>  nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd
> HH:mm:00'))nbsp;OVER w
>  AS time_str,nbsp;
>  nbsp; COUNT(DISTINCT user_id) OVER w AS uv
>  FROM user_behavior
>  WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
>  CURRENT ROW);
> 
> 
>  想请教一下,应该如何处理?
>  PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
>  PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
>  多谢


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 文章 Jark Wu
本超提的两个方案也是阿里内部解决这个问题最常用的方式,但是 1.10 会有 primary key 的限制,要等到 1.11 才行。
另外这两个方案在追数据时,都可能会有毛刺现象(有几分钟没有值,因为数据追太快,跳过了)。



On Wed, 17 Jun 2020 at 11:46, Benchao Li  wrote:

> Hi,
> 我感觉这种场景可以有两种方式,
> 1. 可以直接用group by + mini batch
> 2. window聚合 + fast emit
>
> 对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
> 这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
> 用参数[2] 来打开。
>
> 对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
> fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 60 s
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
>
> x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道:
>
> > 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> > CREATE VIEW uv_per_10min AS
> > SELECT
> >  MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER
> w
> > AS time_str,
> >  COUNT(DISTINCT user_id) OVER w AS uv
> > FROM user_behavior
> > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> > CURRENT ROW);
> >
> >
> > 想请教一下,应该如何处理?
> > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
> > PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> > 多谢
>


Re: 求助:FLINKSQL1.10实时统计累计UV

2020-06-16 文章 Benchao Li
Hi,
我感觉这种场景可以有两种方式,
1. 可以直接用group by + mini batch
2. window聚合 + fast emit

对于#1,group by的字段里面可以有一个日期的字段,例如你上面提到的DATE_FORMAT(rowtm, '-MM-dd')。
这种情况下的状态清理,需要配置state retention时间,配置方法可以参考[1] 。同时,mini batch的开启也需要
用参数[2] 来打开。

对于#2,这种直接开一个天级别的tumble窗口就行。然后状态清理不用特殊配置,默认就可以清理。
fast emit这个配置现在还是一个experimental的feature,所以没有在文档中列出来,我把配置贴到这里,你可以参考一下:
table.exec.emit.early-fire.enabled = true
table.exec.emit.early-fire.delay = 60 s

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

x <35907...@qq.com> 于2020年6月17日周三 上午11:14写道:

> 需求是,每分钟统计当日0时到当前的累计UV数,用下面的这种方式,跨天的时候,累计UV不会清零,还会一直累计
> CREATE VIEW uv_per_10min AS
> SELECT
>  MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w
> AS time_str,
>  COUNT(DISTINCT user_id) OVER w AS uv
> FROM user_behavior
> WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW);
>
>
> 想请教一下,应该如何处理?
> PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 这样可以吗,另外状态应该如何清理?
> PS:1.10貌似不支持DDL貌似不支持CREATE VIEW吧
> 多谢