??????flink????????????????????????

------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<samuel....@ubtrobot.com&gt;;
????????:&nbsp;2020??9??2??(??????) ????3:20
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;??????????????????????????????????????



????????????flink SQL,????????????tumble 
window????????????????????????????????????????????????????????????????????????
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 //????eventtime??????????watermark
DataStream<Tuple4<String,String,String,Long&gt;&gt; 
withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; 
//????????????????????watermark
.withTimestampAssigner((event, timestamp)-&gt;event.f3));

StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");

String sql = "select appid,eventid,cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(starttime 
+ interval '8' hour ) as stime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(endtime + 
interval '8' hour ) as etime&nbsp; " +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from 
(select appid,eventid,count(*) as cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
"TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
"TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from 
log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME 
'00:00:00')";&nbsp;&nbsp;&nbsp; //????????????????????????????

Table table = tenv.sqlQuery(sql);
DataStream<Result&gt; dataStream = tenv.toAppendStream(table, Result.class);

??????????????
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 
????????2020-09-01 18:00:00.0??????????????????????????????????
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 
????????????????????????
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, 
etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
????????????????????????????????

回复