?????????? //??json??????LogBean SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean());
KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() { @Override public long extractAscendingTimestamp(LogBean element) { LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); long eventTime = parse.toEpochSecond(ZoneOffset.of("+8")); System.out.println(eventTime); return eventTime; } }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(LogBean value) throws Exception { //????????id?????? return new Tuple3<>(value.getNickname(), value.toString(), 1); } }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }); WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); window.sum(2).print(); ????sum????????reduce????reduce????????????????????reduce?????????????????????????????????????????? ???????????????? //??json??????LogBean SingleOutputStreamOperator<LogBean> data = filter.map(new Json2LogBean()); KeyedStream<Tuple3<String, String, Integer>, String> tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<LogBean>() { @Override public long extractAscendingTimestamp(LogBean element) { LocalDateTime parse = LocalDateTime.parse(element.getOperTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); long eventTime = parse.toEpochSecond(ZoneOffset.of("+8")); System.out.println(eventTime); return eventTime; } }).map(new MapFunction<LogBean, Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> map(LogBean value) throws Exception { //????????id?????? return new Tuple3<>(value.getNickname(), value.toString(), 1); } }).keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() { @Override public String getKey(Tuple3<String, String, Integer> value) throws Exception { return value.f0; } }); WindowedStream<Tuple3<String, String, Integer>, String, TimeWindow> window = tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))); window.sum(2).print(); ????sum????????reduce????reduce????????????????????reduce?????????????????????????????????????????? ??????