Hi,
   验证了下,问题疑似出现在reduce函数中,复用了下wordCount1这个对象。我试了下new一个新的WordCount作为输出应该就可以了。
猜测这可能和基于Heap的state backend有关,多个窗口的heap state可能直接使用的是一个对象的地址。


```
.reduce(
(wordCount1, wordCount2) -> {
WordCount newWC =
new WordCount(
wordCount1.word, wordCount1.count + wordCount2.count);
System.err.println(newWC);
            return newWC;
})
```

--

    Best!
    Xuyang





在 2023-11-03 10:53:37,"tao zhang" <zhangtao9...@hotmail.com> 写道:
>reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加
>是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复
>
>测试输入如下:
>1001,/home,1000
>1002,/home,2000
>
>输出如下:
>input> test.Event(user=1001, page=/home, ts=1000)
>input> test.Event(user=1002, page=/home, ts=2000)
>test.WordCount(word=/home, count=2)
>test.WordCount(word=/home, count=3)
>
>代码如下:
>
>import lombok.AllArgsConstructor;
>import lombok.Data;
>import lombok.NoArgsConstructor;
>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import 
>org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>import java.io.Serializable;
>import java.time.Duration;
>
>public class test {
>    public static void main(String[] args) {
>        //准备环境
>        StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>        env.setParallelism(1);
>
>        //从端口读数据
>        SingleOutputStreamOperator<Event> ds1 = 
> env.socketTextStream("hadoop102", 55555).map(
>                value->{
>                    String[] strings = value.split(",");
>                    return new 
> Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) );
>                }
>
>        ).assignTimestampsAndWatermarks(
>                //增加水位线策略
>                
> WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event,
>  l) -> Event.getTs())
>        );
>        //检查输入流
>        ds1.print("input");
>
>
>        ds1.map(event -> new WordCount(event.getPage(), 1)
>        ).keyBy(WordCount::getWord
>                //按键分组
>        ).window(
>                //TumblingEventTimeWindows.of(Time.seconds(10))
>                SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))
>                //size为10步长为5的滑动窗口
>        ).reduce(
>                //先增量聚合.将多个数据处理为一个中间结果
>
>                (wordCount1, wordCount2) -> {
>
>                    Integer count = wordCount1.getCount();
>
>                    wordCount1.setCount(count + 1);
>
>                    System.out.println(wordCount1);
>
>                    return wordCount1;
>                }
>
>
>        );
>
>        try {
>            env.execute();
>        } catch (Exception e) {
>            throw new RuntimeException(e);
>        }
>    }
>
>    @Data
>    @AllArgsConstructor
>    @NoArgsConstructor
>    public static class Event {
>        private String user;
>        private String page;
>        private Long ts;
>
>    }
>
>    @Data
>    @AllArgsConstructor
>    @NoArgsConstructor
>
>    public static class WordCount implements Serializable {
>        private String word;
>        private Integer count;
>
>    }
>
>
>
>}
>

回复