????????????????????TTL??keyby????????key????????????????1????????????????????????????????state??




------------------ ???????? ------------------
??????: "????"<zk357794...@gmail.com&gt;; 
????????: 2021??1??22??(??????) ????5:04
??????: "user-zh"<user-zh@flink.apache.org&gt;; 
????: Re: ??????????????????????flink state



@??????
??????????????????????????????????????????????????????window??????reduce??new 
myReduceFunc(),new
AssignWindowProcessFunc()??????????????????????????????????????????????????????????????????????

?????? <hinobl...@gmail.com&gt; ??2021??1??22?????? ????10:10??????

&gt; ????????????????mysql??????????
&gt; ??????ID??????ID??????????????????????durationn=?????????? - 
????????????....
&gt;
&gt; 
????user1????????1????????????10??????????10????????????????????????duration??
&gt;
&gt;
&gt; ??????????????????????????????
&gt;
&gt; xuhaiLong <xiagu...@163.com&gt; ??2021??1??22?????? ????10:03??????
&gt;
&gt; &gt; ??????????????mysql ?? ?????????????????? startTime ??
&gt; &gt; 
userId????????????????????????????????????????????????????????????????????????????????????????
 startTime ????????????????sum
&gt; ??????
&gt; &gt;
&gt; &gt;
&gt; &gt; ??2021??1??21?? 18:24??????<zk357794...@gmail.com&gt; ??????
&gt; &gt; 
????????????????????????????????????????????????????min/max????????procss??????????context.window.getStart??
&gt; &gt;
&gt; &gt;
&gt; 
context.window.getEnd????????????????????????????????????????????????????????????????????????????4????????????????????????????????????????????????????????????????????????
&gt; &gt; ????????????????????????
&gt; &gt;
&gt; &gt; val ds = dataStream
&gt; &gt; .filter(_.liveType == 1)
&gt; &gt; .keyBy(1, 2)
&gt; &gt; .window(EventTimeSessionWindows.withGap(Time.minutes(1)))
&gt; &gt; .process(new myProcessWindow()).uid("process-id")
&gt; &gt;
&gt; &gt; class myProcessWindow() extends
&gt; &gt; ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
&gt; &gt; TimeWindow] {
&gt; &gt;
&gt; &gt; override def process(key: Tuple, context: Context, elements:
&gt; &gt; Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
&gt; &gt; = {
&gt; &gt; var startTime = context.window.getStart 
//????????????????????????????????
&gt; &gt; var endTime = context.window.getEnd //??????????????????????????????
&gt; &gt;
&gt; &gt; val currentDate = DateUtil.currentDate
&gt; &gt; val created_time = currentDate
&gt; &gt; val modified_time = currentDate
&gt; &gt; ??????
&gt; &gt;
&gt; &gt; val join_time: String =
&gt; &gt; DateUtil.convertTimeStamp2DateStr(startTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
&gt; &gt; DateUtil.SECOND_DATE_FORMAT)
&gt; &gt; val duration = (endTime - startTime) / 1000&nbsp; //??????????
&gt; &gt; val duration_time = DateUtil.secondsToFormat(duration)&nbsp; 
//??????????
&gt; &gt; out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime))
&gt; &gt;
&gt; &gt; CloudliveWatcher(id, partnerId, courseId, customerId,
&gt; &gt; courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
&gt; &gt; join_time, leave_time, created_time, modified_time
&gt; &gt; , liveType, plat_form, duration, duration_time,
&gt; &gt; network_operator, role, useragent, uid, eventTime)
&gt; &gt;
&gt; &gt; }
&gt; &gt;
&gt; &gt;
&gt; &gt; ????????????????????????min/max??????????????????????????
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; ?????? <hinobl...@gmail.com&gt; ??2020??12??28?????? ????7:12??????
&gt; &gt;
&gt; &gt; ????????ID??????ID??????????session 
window??????1min????gap??????key+window????count????????sum(1)??
&gt; &gt;
&gt; &gt; 
??????????????????????????????1min??2min??????????????????????key+window????min/max??????????????????????????
&gt; &gt;
&gt; &gt; session 
window??????????????????2????????gap??????????2????????????????window????
&gt; &gt;
&gt; &gt;
&gt; &gt; ???? <zk357794...@gmail.com&gt; ??2020??12??28?????? ????5:35??????
&gt; &gt;
&gt; &gt; ????????????session window????????
&gt; &gt;
&gt; &gt; Akisaya <akikevins...@gmail.com&gt; ??2020??12??28?????? 
????5:00??????
&gt; &gt;
&gt; &gt; ?????????? session window ??
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
&gt; &gt;
&gt; &gt; news_...@163.com <news_...@163.com&gt; ??2020??12??28?????? 
????2:15??????
&gt; &gt;
&gt; &gt; 
????????????????????????????????KAFKA??????????????????????????????????????????????????????????????kafka??????
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt; news_...@163.com
&gt; &gt;
&gt; &gt; ???????? ????
&gt; &gt; ?????????? 2020-12-28 13:35
&gt; &gt; ???????? user-zh
&gt; &gt; ?????? ??????????????????????flink state
&gt; &gt; ????????????????????????????????????
&gt; &gt;
&gt; &gt; ??????????
&gt; &gt;
&gt; &gt; 
????????????????????????????????????????????????????????????kafka????????????????????????????????????eventTime????????????????????A
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; 
??1??2??3??????????????????????????????????????????3????????5??6??????????????????????????????2??????????????????????????????????????????????????????????????????
&gt; &gt;
&gt; &gt; ??????????
&gt; &gt; 
??????????????????????ID??????ID??????????process????????state????????????????????????????event
 Time??????????
&gt; &gt; 
????????????????????????????????????1??????????????????????????????????1????????????????????????????????????????????key????????
&gt; &gt;
&gt; &gt; ????????????????????????????????????????????????????????????????
&gt; &gt;
&gt; &gt; flink ????1.10.1
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt; &gt;
&gt;

回复