??????????
        //??json??????LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = 
filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; 
tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long 
extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = 
LocalDateTime.parse(element.getOperTime(), 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = 
parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, 
String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, 
Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //????????id??????
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new 
Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, 
Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, 
String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, 
String, TimeWindow&gt; window = 
tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

????sum????????reduce????reduce????????????????????reduce??????????????????????????????????????????


????????????????
&nbsp; &nbsp; &nbsp; &nbsp; //??json??????LogBean
&nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<LogBean&gt; data = 
filter.map(new Json2LogBean());

&nbsp; &nbsp; &nbsp; KeyedStream<Tuple3<String, String, Integer&gt;, String&gt; 
tuple3StringKeyedStream = data.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<LogBean&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long 
extractAscendingTimestamp(LogBean element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; LocalDateTime parse = 
LocalDateTime.parse(element.getOperTime(), 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long eventTime = 
parse.toEpochSecond(ZoneOffset.of("+8"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
System.out.println(eventTime);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return eventTime;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).map(new MapFunction<LogBean, Tuple3<String, 
String, Integer&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Tuple3<String, String, 
Integer&gt; map(LogBean value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //????????id??????
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return new 
Tuple3<&gt;(value.getNickname(), value.toString(), 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; }).keyBy(new KeySelector<Tuple3<String, String, 
Integer&gt;, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public String getKey(Tuple3<String, 
String, Integer&gt; value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return value.f0;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });


&nbsp; &nbsp; &nbsp; &nbsp; WindowedStream<Tuple3<String, String, Integer&gt;, 
String, TimeWindow&gt; window = 
tuple3StringKeyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));


&nbsp; &nbsp; &nbsp; &nbsp; window.sum(2).print();

????sum????????reduce????reduce????????????????????reduce??????????????????????????????????????????


??????

回复