你好:
我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?



在 2021-08-20 09:10:44,"李航飞" <tex...@163.com> 写道:
>你好:
>我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>
>
>我在RichMapFunction接口里面实现open方法
>设置过StateTtlConfig;
>之后在RedisConmmand.SETEX设置过期时间
>都注释了,但upsert()方法还是没效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-08-19 17:44:02,"Caizhi Weng" <tsreape...@gmail.com> 写道:
>>Hi!
>>
>>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>>
>>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>>
>>李航飞 <tex...@163.com> 于2021年8月19日周四 下午5:03写道:
>>
>>> 版本 flink1.13.2
>>> 具体场景
>>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>>>
>>>
>>> 问题:
>>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>>>
>>>

回复