?????? ??????FLINKSQL1.10????????????UV
??blinkval setttings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() -- -- ??: "Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time > > x <35907...@qq.com> ??2020??7??6?? 11:15?? > > > ??1.10.1??sinkwindow??count > > distinct??window??count > > > distinct??windowgroup DATE_FORMAT(rowtm, > > '-MM-dd') sql?? > > val rt_totaluv_view : Table = tabEnv.sqlQuery( > > """ > > SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd > HH:mm:00')) > > time_str,COUNT(DISTINCT userkey) uv > > FROM source > > GROUP BY DATE_FORMAT(rowtm, '-MM-dd') > > """) > > tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) > > > > val totaluvTmp = > tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) > > .filter( line => line._1 == true ).map( line > => line._2 ) > > > > val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) > > > > tabEnv.sqlUpdate( > > s""" > > INSERT INTO mysql_totaluv > > SELECT _1,MAX(_2) > > FROM $totaluvTabTmp > > GROUP BY _1 > > """) > > -- -- > > ??: "Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 > > > > x <35907...@qq.com> ??2020??7??3?? 4:34?? > > > > > checkpoint?? > > > > > > > > > tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key > > > > > > > > > > > > > > > > --&nbsp;&nbsp;-- > > > ??:&nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > > &gt; &amp;gt; [2] > > > &gt; &amp;gt; > > > &gt; &amp;gt; > > > &gt; > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > > &gt; &amp;gt; > > > &gt; &amp;gt; x <35907...@qq.com > &amp;amp;gt; > > ??2020??6??17?? 11:14?? > > > &gt; &amp;gt; > > > &gt; &amp;gt; &amp;amp;gt; > > > ??0??UV??UV?? > > > &gt; &amp;gt; &amp;amp;gt; CREATE > VIEW uv_per_10min AS > > > &gt; &amp;gt; &amp;amp;gt; > SELECT&amp;amp;amp;nbsp; > > > &gt; &amp;gt; &amp;amp;gt; > &amp;amp;amp;nbsp; > > > MAX(DATE_FORMAT(proctime&amp;amp;amp;nbsp;, > > > &gt; '-MM-dd > > > &gt; &amp;gt; > HH:mm:00'))&amp;amp;amp;nbsp;OVER w > > > &gt; &amp;gt; &amp;amp;gt; AS > > time_str,&amp;amp;amp;nbsp; > > > &gt; &amp;gt; &amp;amp;gt; > &amp;amp;amp;nbsp; > > COUNT(DISTINCT user_id) OVER > > > w AS uv > > > &gt; &amp;gt; &amp;amp;gt; FROM > user_behavior > > > &gt; &amp;gt; &amp;amp;gt; WINDOW w > AS (ORDER BY proctime > > ROWS BETWEEN > > > UNBOUNDED > > > &gt; PRECEDING AND > > > &gt; &amp;gt; &amp;amp;gt; CURRENT > ROW); > > > &gt; &amp;gt; &amp;amp;gt; > > > &gt; &amp;gt; &amp;amp;gt; > > > &gt; &amp;gt; &amp;amp;gt; > ?? > > > &gt; &amp;gt; &amp;amp;gt; PARTITION > BY > > DATE_FORMAT(rowtm, '-MM-dd') > > > &gt; ?? > > > &gt; &amp;gt; &amp;amp;gt; > PS??1.10??DDL??CREATE > > VIEW?? > > > &gt; &amp;gt; &amp;amp;gt; > > > > > > > > -- > > > > Best, > > Benchao Li > > > > -- > > Best, > Benc
??????FLINKSQL1.10????????????UV
??0??UV??UV?? CREATE VIEW uv_per_10min AS SELECT MAX(DATE_FORMAT(proctime , '-MM-dd HH:mm:00')) OVER w AS time_str, COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW); ?? PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? PS??1.10??DDL??CREATE VIEW??
?????? ??????FLINKSQL1.10????????????UV
??"??"UV?? sink?? tm uv 2020/06/17 13:46:00 1 2020/06/17 13:47:00 2 2020/06/17 13:48:00 3 group by ?? -- -- ??: "Benchao Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html x <35907...@qq.com> ??2020??6??17?? 11:14?? > ??0??UV??UV?? > CREATE VIEW uv_per_10min AS > SELECT > MAX(DATE_FORMAT(proctime , '-MM-dd HH:mm:00')) OVER w > AS time_str, > COUNT(DISTINCT user_id) OVER w AS uv > FROM user_behavior > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > CURRENT ROW); > > > ?? > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? > PS??1.10??DDL??CREATE VIEW?? >
?????? ??????FLINKSQL1.10????????????UV
-- -- ??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <35907...@qq.com> ??2020??6??17?? 11:14?? > > > ??0??UV??UV?? > > CREATE VIEW uv_per_10min AS > > SELECT > > MAX(DATE_FORMAT(proctime , '-MM-dd > HH:mm:00')) OVER w > > AS time_str, > > COUNT(DISTINCT user_id) OVER w AS uv > > FROM user_behavior > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > > CURRENT ROW); > > > > > > ?? > > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? > > PS??1.10??DDL??CREATE VIEW?? > >
?????? ??????FLINKSQL1.10????????????UV
??1.10??,??? val resTmpTab: Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""") val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) .filter(line=>line._1==true).map(line=>line._2) val res= tabEnv.fromDataStream(resTmpStream) tabEnv.sqlUpdate( s""" INSERT INTO rt_totaluv SELECT _1,MAX(_2) FROM $res GROUP BY _1 """) -- -- ??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <35907...@qq.com> ??2020??6??17?? 11:14?? > > > ??0??UV??UV?? > > CREATE VIEW uv_per_10min AS > > SELECT > > MAX(DATE_FORMAT(proctime , '-MM-dd > HH:mm:00')) OVER w > > AS time_str, > > COUNT(DISTINCT user_id) OVER w AS uv > > FROM user_behavior > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > > CURRENT ROW); > > > > > > ?? > > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ?? > > PS??1.10??DDL??CREATE VIEW?? > >
?????? ??????FLINKSQL1.10????????????UV
checkpoint?? tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key -- -- ??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > [2] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > > > x <35907...@qq.com> ??2020??6??17?? 11:14?? > > > > > ??0??UV??UV?? > > > CREATE VIEW uv_per_10min AS > > > SELECT&nbsp; > > > &nbsp; MAX(DATE_FORMAT(proctime&nbsp;, > '-MM-dd > > HH:mm:00'))&nbsp;OVER w > > > AS time_str,&nbsp; > > > &nbsp; COUNT(DISTINCT user_id) OVER w AS uv > > > FROM user_behavior > > > WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED > PRECEDING AND > > > CURRENT ROW); > > > > > > > > > ?? > > > PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') > ?? > > > PS??1.10??DDL??CREATE VIEW?? > > >
?????? ??????FLINKSQL1.10????????????UV
??1.10.1??sinkwindow??count distinct??window??count distinct??windowgroup DATE_FORMAT(rowtm, '-MM-dd') sql?? val rt_totaluv_view : Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM source GROUP BY DATE_FORMAT(rowtm, '-MM-dd') """) tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) .filter( line => line._1 == true ).map( line => line._2 ) val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) tabEnv.sqlUpdate( s""" INSERT INTO mysql_totaluv SELECT _1,MAX(_2) FROM $totaluvTabTmp GROUP BY _1 """) -- -- ??: "Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 x <35907...@qq.com> ??2020??7??3?? 4:34?? > checkpoint?? > > tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key > > > > > -- -- > ??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > > [2] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > > > > > x <35907...@qq.com&gt; ??2020??6??17?? 11:14?? > > > > > > &gt; > ??0??UV??UV?? > > > &gt; CREATE VIEW uv_per_10min AS > > > &gt; SELECT&amp;nbsp; > > > &gt; &amp;nbsp; > MAX(DATE_FORMAT(proctime&amp;nbsp;, > > '-MM-dd > > > HH:mm:00'))&amp;nbsp;OVER w > > > &gt; AS time_str,&amp;nbsp; > > > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER > w AS uv > > > &gt; FROM user_behavior > > > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN > UNBOUNDED > > PRECEDING AND > > > &gt; CURRENT ROW); > > > &gt; > > > &gt; > > > &gt; ?? > > > &gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') > > ?? > > > &gt; PS??1.10??DDL??CREATE VIEW?? > > > &gt; -- Best, Benchao Li
?????? ??????FLINKSQL1.10????????????UV
sorry,group agg. tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??. -- -- ??: "Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time x <35907...@qq.com> ??2020??7??6?? 11:15?? > ??1.10.1??sinkwindow??count > distinct??window??count > distinct??windowgroup DATE_FORMAT(rowtm, > '-MM-dd') sql?? > val rt_totaluv_view : Table = tabEnv.sqlQuery( > """ > SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) > time_str,COUNT(DISTINCT userkey) uv > FROM source > GROUP BY DATE_FORMAT(rowtm, '-MM-dd') > """) > tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) > > val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) > .filter( line => line._1 == true ).map( line => line._2 ) > > val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp ) > > tabEnv.sqlUpdate( > s""" > INSERT INTO mysql_totaluv > SELECT _1,MAX(_2) > FROM $totaluvTabTmp > GROUP BY _1 > """) > -- -- > ??: "Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 > > x <35907...@qq.com> ??2020??7??3?? 4:34?? > > > checkpoint?? > > > > > tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key > > > > > > > > > > -- -- > > ??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > > > &gt; [2] > > > &gt; > > > &gt; > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > > &gt; > > > &gt; x <35907...@qq.com&amp;gt; > ??2020??6??17?? 11:14?? > > > &gt; > > > &gt; &amp;gt; > > ??0??UV??UV?? > > > &gt; &amp;gt; CREATE VIEW uv_per_10min AS > > > &gt; &amp;gt; SELECT&amp;amp;nbsp; > > > &gt; &amp;gt; &amp;amp;nbsp; > > MAX(DATE_FORMAT(proctime&amp;amp;nbsp;, > > > '-MM-dd > > > &gt; HH:mm:00'))&amp;amp;nbsp;OVER w > > > &gt; &amp;gt; AS > time_str,&amp;amp;nbsp; > > > &gt; &amp;gt; &amp;amp;nbsp; > COUNT(DISTINCT user_id) OVER > > w AS uv > > > &gt; &amp;gt; FROM user_behavior > > > &gt; &amp;gt; WINDOW w AS (ORDER BY proctime > ROWS BETWEEN > > UNBOUNDED > > > PRECEDING AND > > > &gt; &amp;gt; CURRENT ROW); > > > &gt; &amp;gt; > > > &gt; &amp;gt; > > > &gt; &amp;gt; ?? > > > &gt; &amp;gt; PARTITION BY > DATE_FORMAT(rowtm, '-MM-dd') > > > ?? > > > &gt; &amp;gt; PS??1.10??DDL??CREATE > VIEW?? > > > &gt; &amp;gt; > > > > -- > > Best, > Benchao Li -- Best, Benchao Li