??????1.10??????????????????,???????????????????????????????
val resTmpTab: Table = tabEnv.sqlQuery(
  """
    SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT 
userkey) uv
    FROM user_behavior    GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd')    """)

val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
  .filter(line=>line._1==true).map(line=>line._2)

val res= tabEnv.fromDataStream(resTmpStream)
tabEnv.sqlUpdate(
  s"""
    INSERT INTO rt_totaluv
    SELECT _1,MAX(_2)
    FROM $res
    GROUP BY _1
    """)


------------------ ???????? ------------------
??????:&nbsp;"Jark Wu"<imj...@gmail.com&gt;;
????????:&nbsp;2020??6??17??(??????) ????1:55
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: ??????FLINKSQL1.10????????????UV



?? Flink 1.11 ????????????????????

CREATE TABLE mysql (
&nbsp;&nbsp; time_str STRING,
&nbsp;&nbsp; uv BIGINT,
&nbsp;&nbsp; PRIMARY KEY (ts) NOT ENFORCED
) WITH (
&nbsp;&nbsp; 'connector' = 'jdbc',
&nbsp;&nbsp; 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
&nbsp;&nbsp; 'table-name' = 'myuv'
);

INSERT INTO mysql
SELECT MAX(DATE_FORMAT(ts, 'yyyy-MM-dd HH:mm:00')), COUNT(DISTINCT&nbsp; 
user_id)
FROM user_behavior;

On Wed, 17 Jun 2020 at 13:49, x <35907...@qq.com&gt; wrote:

&gt; 
??????????????????????"??????"????????????????????????????????????????????UV??
&gt; sink??????????
&gt; tm uv
&gt; 2020/06/17 13:46:00 10000
&gt; 2020/06/17 13:47:00 20000
&gt; 2020/06/17 13:48:00 30000
&gt;
&gt;
&gt; group by ??????????????????????
&gt;
&gt;
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:&amp;nbsp;"Benchao Li"<libenc...@apache.org&amp;gt;;
&gt; ????????:&amp;nbsp;2020??6??17??(??????) ????11:46
&gt; ??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Re: ??????FLINKSQL1.10????????????UV
&gt;
&gt;
&gt;
&gt; Hi??
&gt; ??????????????????????????????
&gt; 1. ??????????group by + mini batch
&gt; 2. window???? + fast emit
&gt;
&gt; ????#1??group 
by????????????????????????????????????????????????DATE_FORMAT(rowtm, 
'yyyy-MM-dd')??
&gt; ??????????????????????????????state retention??????????????????????[1] 
????????mini batch????????????
&gt; ??????[2] ????????
&gt;
&gt; 
????#2????????????????????????tumble????????????????????????????????????????????????????
&gt; fast 
emit????????????????????experimental??feature????????????????????????????????????????????????????????????
&gt; table.exec.emit.early-fire.enabled = true
&gt; table.exec.emit.early-fire.delay = 60 s
&gt;
&gt; [1]
&gt;
&gt; 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
&gt; [2]
&gt;
&gt; 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
&gt;
&gt; x <35907...@qq.com&amp;gt; ??2020??6??17?????? ????11:14??????
&gt;
&gt; &amp;gt; 
??????????????????????0??????????????UV??????????????????????????????????????UV??????????????????????
&gt; &amp;gt; CREATE VIEW uv_per_10min AS
&gt; &amp;gt; SELECT&amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; MAX(DATE_FORMAT(proctime&amp;amp;nbsp;, 'yyyy-MM-dd
&gt; HH:mm:00'))&amp;amp;nbsp;OVER w
&gt; &amp;gt; AS time_str,&amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
&gt; &amp;gt; FROM user_behavior
&gt; &amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
&gt; &amp;gt; CURRENT ROW);
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ??????????????????????????
&gt; &amp;gt; PARTITION BY DATE_FORMAT(rowtm, 'yyyy-MM-dd') 
??????????????????????????????????
&gt; &amp;gt; PS??1.10??????????DDL??????????CREATE VIEW??
&gt; &amp;gt; ????

Reply via email to