Hi! 可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 李航飞 <tex...@163.com> 于2021年8月19日周四 下午5:03写道: > 版本 flink1.13.2 > 具体场景 > flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, > 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, > 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 > > > 问题: > 测试发现,每1分钟都会输出一次,落地的数据一样, > 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? > >