????????????flink SQL,????????????tumble window???????????????????????????????????????????????????????????????????????? //????eventtime??????????watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //????????????????????watermark .withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //???????????????????????????? Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); ?????????????? (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 ????????2020-09-01 18:00:00.0?????????????????????????????????? (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 ???????????????????????? ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} ????????????????????????????????