Hi!

可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。

如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。

李航飞 <tex...@163.com> 于2021年8月19日周四 下午5:03写道:

> 版本 flink1.13.2
> 具体场景
> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>
>
> 问题:
> 测试发现,每1分钟都会输出一次,落地的数据一样,
> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>
>

回复