????????????????????TTL??keyby????????key????????????????1????????????????????????????????state??
------------------ ???????? ------------------ ??????: "????"<zk357794...@gmail.com>; ????????: 2021??1??22??(??????) ????5:04 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: ??????????????????????flink state @?????? ??????????????????????????????????????????????????????window??????reduce??new myReduceFunc(),new AssignWindowProcessFunc()?????????????????????????????????????????????????????????????????????? ?????? <hinobl...@gmail.com> ??2021??1??22?????? ????10:10?????? > ????????????????mysql?????????? > ??????ID??????ID??????????????????????durationn=?????????? - ????????????.... > > ????user1????????1????????????10??????????10????????????????????????duration?? > > > ?????????????????????????????? > > xuhaiLong <xiagu...@163.com> ??2021??1??22?????? ????10:03?????? > > > ??????????????mysql ?? ?????????????????? startTime ?? > > userId???????????????????????????????????????????????????????????????????????????????????????? startTime ????????????????sum > ?????? > > > > > > ??2021??1??21?? 18:24??????<zk357794...@gmail.com> ?????? > > ????????????????????????????????????????????????????min/max????????procss??????????context.window.getStart?? > > > > > context.window.getEnd????????????????????????????????????????????????????????????????????????????4???????????????????????????????????????????????????????????????????????? > > ???????????????????????? > > > > val ds = dataStream > > .filter(_.liveType == 1) > > .keyBy(1, 2) > > .window(EventTimeSessionWindows.withGap(Time.minutes(1))) > > .process(new myProcessWindow()).uid("process-id") > > > > class myProcessWindow() extends > > ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, > > TimeWindow] { > > > > override def process(key: Tuple, context: Context, elements: > > Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit > > = { > > var startTime = context.window.getStart //???????????????????????????????? > > var endTime = context.window.getEnd //?????????????????????????????? > > > > val currentDate = DateUtil.currentDate > > val created_time = currentDate > > val modified_time = currentDate > > ?????? > > > > val join_time: String = > > DateUtil.convertTimeStamp2DateStr(startTime, > > DateUtil.SECOND_DATE_FORMAT) > > val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, > > DateUtil.SECOND_DATE_FORMAT) > > val duration = (endTime - startTime) / 1000 //?????????? > > val duration_time = DateUtil.secondsToFormat(duration) //?????????? > > out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime)) > > > > CloudliveWatcher(id, partnerId, courseId, customerId, > > courseNumber, nickName, ip, device_type, net_opretor, net_type, area, > > join_time, leave_time, created_time, modified_time > > , liveType, plat_form, duration, duration_time, > > network_operator, role, useragent, uid, eventTime) > > > > } > > > > > > ????????????????????????min/max?????????????????????????? > > > > > > > > > > ?????? <hinobl...@gmail.com> ??2020??12??28?????? ????7:12?????? > > > > ????????ID??????ID??????????session window??????1min????gap??????key+window????count????????sum(1)?? > > > > ??????????????????????????????1min??2min??????????????????????key+window????min/max?????????????????????????? > > > > session window??????????????????2????????gap??????????2????????????????window???? > > > > > > ???? <zk357794...@gmail.com> ??2020??12??28?????? ????5:35?????? > > > > ????????????session window???????? > > > > Akisaya <akikevins...@gmail.com> ??2020??12??28?????? ????5:00?????? > > > > ?????????? session window ?? > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows > > > > news_...@163.com <news_...@163.com> ??2020??12??28?????? ????2:15?????? > > > > ????????????????????????????????KAFKA??????????????????????????????????????????????????????????????kafka?????? > > > > > > > > news_...@163.com > > > > ???????? ???? > > ?????????? 2020-12-28 13:35 > > ???????? user-zh > > ?????? ??????????????????????flink state > > ???????????????????????????????????? > > > > ?????????? > > > > ????????????????????????????????????????????????????????????kafka????????????????????????????????????eventTime????????????????????A > > > > > > > > > > > > > ??1??2??3??????????????????????????????????????????3????????5??6??????????????????????????????2?????????????????????????????????????????????????????????????????? > > > > ?????????? > > ??????????????????????ID??????ID??????????process????????state????????????????????????????event Time?????????? > > ????????????????????????????????????1??????????????????????????????????1????????????????????????????????????????????key???????? > > > > ???????????????????????????????????????????????????????????????? > > > > flink ????1.10.1 > > > > > > > > > > >