Hi, 验证了下,问题疑似出现在reduce函数中,复用了下wordCount1这个对象。我试了下new一个新的WordCount作为输出应该就可以了。 猜测这可能和基于Heap的state backend有关,多个窗口的heap state可能直接使用的是一个对象的地址。
``` .reduce( (wordCount1, wordCount2) -> { WordCount newWC = new WordCount( wordCount1.word, wordCount1.count + wordCount2.count); System.err.println(newWC); return newWC; }) ``` -- Best! Xuyang 在 2023-11-03 10:53:37,"tao zhang" <zhangtao9...@hotmail.com> 写道: >reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加 >是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复 > >测试输入如下: >1001,/home,1000 >1002,/home,2000 > >输出如下: >input> test.Event(user=1001, page=/home, ts=1000) >input> test.Event(user=1002, page=/home, ts=2000) >test.WordCount(word=/home, count=2) >test.WordCount(word=/home, count=3) > >代码如下: > >import lombok.AllArgsConstructor; >import lombok.Data; >import lombok.NoArgsConstructor; >import org.apache.flink.api.common.eventtime.WatermarkStrategy; >import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import >org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; >import org.apache.flink.streaming.api.windowing.time.Time; >import java.io.Serializable; >import java.time.Duration; > >public class test { > public static void main(String[] args) { > //准备环境 > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > //从端口读数据 > SingleOutputStreamOperator<Event> ds1 = > env.socketTextStream("hadoop102", 55555).map( > value->{ > String[] strings = value.split(","); > return new > Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) ); > } > > ).assignTimestampsAndWatermarks( > //增加水位线策略 > > WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event, > l) -> Event.getTs()) > ); > //检查输入流 > ds1.print("input"); > > > ds1.map(event -> new WordCount(event.getPage(), 1) > ).keyBy(WordCount::getWord > //按键分组 > ).window( > //TumblingEventTimeWindows.of(Time.seconds(10)) > SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)) > //size为10步长为5的滑动窗口 > ).reduce( > //先增量聚合.将多个数据处理为一个中间结果 > > (wordCount1, wordCount2) -> { > > Integer count = wordCount1.getCount(); > > wordCount1.setCount(count + 1); > > System.out.println(wordCount1); > > return wordCount1; > } > > > ); > > try { > env.execute(); > } catch (Exception e) { > throw new RuntimeException(e); > } > } > > @Data > @AllArgsConstructor > @NoArgsConstructor > public static class Event { > private String user; > private String page; > private Long ts; > > } > > @Data > @AllArgsConstructor > @NoArgsConstructor > > public static class WordCount implements Serializable { > private String word; > private Integer count; > > } > > > >} >