你好: 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
在 2021-08-20 09:10:44,"李航飞" <tex...@163.com> 写道: >你好: >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 > > >我在RichMapFunction接口里面实现open方法 >设置过StateTtlConfig; >之后在RedisConmmand.SETEX设置过期时间 >都注释了,但upsert()方法还是没效 > > > > > > > > > > > > > > >在 2021-08-19 17:44:02,"Caizhi Weng" <tsreape...@gmail.com> 写道: >>Hi! >> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 >> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 >> >>李航飞 <tex...@163.com> 于2021年8月19日周四 下午5:03写道: >> >>> 版本 flink1.13.2 >>> 具体场景 >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >>> >>> >>> 问题: >>> 测试发现,每1分钟都会输出一次,落地的数据一样, >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >>> >>>