你好:

SELECT window_start,window_end,SUM(price),item 
 FROM TABLE(
CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR))

GROUP BY window_start,window_end,item
语句没有问题,正常每1分钟输出一次,过期时间代码已注释,
public ChangelogMode  getChanglogMode(ChangelogMode arg0){
       return ChangelogMode.upsert();
}
实现RedisMapper 方法  落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样
这upsert 不合理啊
在 2021-08-20 11:15:17,"Caizhi Weng" <tsreape...@gmail.com> 写道:
>Hi!
>
>之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink
>确实应该每分钟收到一条消息。
>
>sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。
>
>李航飞 <tex...@163.com> 于2021年8月20日周五 上午10:03写道:
>
>> 你好:
>> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗?
>>
>>
>>
>> 在 2021-08-20 09:10:44,"李航飞" <tex...@163.com> 写道:
>> >你好:
>> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。
>> >
>> >
>> >我在RichMapFunction接口里面实现open方法
>> >设置过StateTtlConfig;
>> >之后在RedisConmmand.SETEX设置过期时间
>> >都注释了,但upsert()方法还是没效
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >在 2021-08-19 17:44:02,"Caizhi Weng" <tsreape...@gmail.com> 写道:
>> >>Hi!
>> >>
>> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。
>> >>
>> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key
>> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。
>> >>
>> >>李航飞 <tex...@163.com> 于2021年8月19日周四 下午5:03写道:
>> >>
>> >>> 版本 flink1.13.2
>> >>> 具体场景
>> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时,
>> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入,
>> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次
>> >>>
>> >>>
>> >>> 问题:
>> >>> 测试发现,每1分钟都会输出一次,落地的数据一样,
>> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?
>> >>>
>> >>>
>>

回复