Hi
???????????????????????????????? 1.??????30s??1s???????????????????????????????????????????? 2.????????????????????????????????????????????????????Congxian?????????????? 3.????????????????????????MaxOutOfOrderness = 100ms??????????????????????????????????????????????Flink???????????????????????????????????????????????? ------------------ ???????? ------------------ ??????: "Congxian Qiu"<qcx978132...@gmail.com>; ????????: 2020??5??30??(??????) ????11:47 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: Session Window????event time?????????? Hi ???? processing time window ?????????????? event time window ?????????????????????? event-time ???? watermark ?????????????????????????? watermark ???????? window ???????????????????????????? Best, Congxian ?????? <lijiachen...@gmail.com> ??2020??5??22?????? ????3:27?????? > ???????? > > ?????????????????????????????????????????? > > ????????keyby userid ????????session window ??????????userid???? ???????????? topN?????? > > ???????????? > // Topn???? > DataStream itemList = resultDataStream > .assignTimestampsAndWatermarks( > new > > BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100)) > { > @Override > public long extractTimestamp(PredictResult > predictResult) { > return predictResult.getDate_timestamp(); > } > } > ) > .keyBy("userId") > > .window(EventTimeSessionWindows.withGap(Time.milliseconds(100))) > .process(new TopNService(11)); > itemList.print("IRS_RESULT: "); > > > ????????????????????????30?????????????????? ?????????????????? topN???????????????????? > ProcessTimeSessionWindow???????????????????????? > ????processtime ????????gap?????????????????????????????????? ?????????????????????????????????????????????? > ???????????????????????????????????????????????????????????????? > > > ???? > > // top n???? > > public static class TopNService extends > ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> { > > private final int topSize; > > public TopNService(int topSize) { > this.topSize = topSize; > } > @Override > public void process(Tuple tuple, Context context, > Iterable<PredictResult> iterable, Collector<Object> collector) throws > Exception { > List<PredictResult> allItems = new ArrayList<>(); > for (PredictResult predictResult:iterable){ > allItems.add(predictResult); > } > allItems.sort(new Comparator<PredictResult>() { > @Override > public int compare(PredictResult o1, PredictResult o2) { > return o2.probability.compareTo(o1.probability); > } > }); > int userId = allItems.get(0).userId ; > String logonType=allItems.get(0).getLogonType(); > StringBuilder result = new StringBuilder(); > for (int i=0;i<topSize;i++) { > PredictResult currentItem = allItems.get(i); > result.append(currentItem.serviceId).append(","); > } > LocalDate localDate = LocalDate.now(); > LocalTime localTime = LocalTime.now(); > //NXZW_ZNTJ_TOPIC_IRS_RESULT ?????????? start > JSONObject resultJson = new JSONObject(); > resultJson.put("user_id", userId); > resultJson.put("logon_type", logonType); > resultJson.put("date", localDate + " " + localTime); > JSONArray jsonArray = new JSONArray(); > jsonArray.add(resultJson); > resultJson.put("service_id", result.toString()); > //NXZW_ZNTJ_TOPIC_IRS_RESULT ?????????? end > collector.collect(jsonArray.toString()); > } > } >