??????1.10??????????????????,??????????????????????????????? val resTmpTab: Table = tabEnv.sqlQuery( """ SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT userkey) uv FROM user_behavior GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd') """)
val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) .filter(line=>line._1==true).map(line=>line._2) val res= tabEnv.fromDataStream(resTmpStream) tabEnv.sqlUpdate( s""" INSERT INTO rt_totaluv SELECT _1,MAX(_2) FROM $res GROUP BY _1 """) ------------------ ???????? ------------------ ??????: "Jark Wu"<imj...@gmail.com>; ????????: 2020??6??17??(??????) ????1:55 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: ??????FLINKSQL1.10????????????UV ?? Flink 1.11 ???????????????????? CREATE TABLE mysql ( time_str STRING, uv BIGINT, PRIMARY KEY (ts) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'myuv' ); INSERT INTO mysql SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT user_id) FROM user_behavior; On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com> wrote: > ??????????????????????"??????"????????????????????????????????????????????UV?? > sink?????????? > tm uv > 2020/06/17 13:46:00 10000 > 2020/06/17 13:47:00 20000 > 2020/06/17 13:48:00 30000 > > > group by ?????????????????????? > > > ------------------&nbsp;????????&nbsp;------------------ > ??????:&nbsp;"Benchao Li"<libenc...@apache.org&gt;; > ????????:&nbsp;2020??6??17??(??????) ????11:46 > ??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;; > > ????:&nbsp;Re: ??????FLINKSQL1.10????????????UV > > > > Hi?? > ?????????????????????????????? > 1. ??????????group by + mini batch > 2. window???? + fast emit > > ????#1??group by????????????????????????????????????????????????DATE_FORMAT(rowtm, 'yyyy-MM-dd')?? > ??????????????????????????????state retention??????????????????????[1] ????????mini batch???????????? > ??????[2] ???????? > > ????#2????????????????????????tumble???????????????????????????????????????????????????? > fast emit????????????????????experimental??feature???????????????????????????????????????????????????????????? > table.exec.emit.early-fire.enabled = true > table.exec.emit.early-fire.delay = 60 s > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html > > x <35907...@qq.com&gt; ??2020??6??17?????? ????11:14?????? > > &gt; ??????????????????????0??????????????UV??????????????????????????????????????UV?????????????????????? > &gt; CREATE VIEW uv_per_10min AS > &gt; SELECT&amp;nbsp; > &gt; &amp;nbsp; MAX(DATE_FORMAT(proctime&amp;nbsp;, 'yyyy-MM-dd > HH:mm:00'))&amp;nbsp;OVER w > &gt; AS time_str,&amp;nbsp; > &gt; &amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv > &gt; FROM user_behavior > &gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND > &gt; CURRENT ROW); > &gt; > &gt; > &gt; ?????????????????????????? > &gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') ?????????????????????????????????? > &gt; PS??1.10??????????DDL??????????CREATE VIEW?? > &gt; ????