你好: SELECT window_start,window_end,SUM(price),item FROM TABLE( CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR))
GROUP BY window_start,window_end,item 语句没有问题,正常每1分钟输出一次,过期时间代码已注释, public ChangelogMode getChanglogMode(ChangelogMode arg0){ return ChangelogMode.upsert(); } 实现RedisMapper 方法 落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样 这upsert 不合理啊 在 2021-08-20 11:15:17,"Caizhi Weng" <tsreape...@gmail.com> 写道: >Hi! > >之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink >确实应该每分钟收到一条消息。 > >sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。 > >李航飞 <tex...@163.com> 于2021年8月20日周五 上午10:03写道: > >> 你好: >> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗? >> >> >> >> 在 2021-08-20 09:10:44,"李航飞" <tex...@163.com> 写道: >> >你好: >> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 >> > >> > >> >我在RichMapFunction接口里面实现open方法 >> >设置过StateTtlConfig; >> >之后在RedisConmmand.SETEX设置过期时间 >> >都注释了,但upsert()方法还是没效 >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> >在 2021-08-19 17:44:02,"Caizhi Weng" <tsreape...@gmail.com> 写道: >> >>Hi! >> >> >> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 >> >> >> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 >> >> >> >>李航飞 <tex...@163.com> 于2021年8月19日周四 下午5:03写道: >> >> >> >>> 版本 flink1.13.2 >> >>> 具体场景 >> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >> >>> >> >>> >> >>> 问题: >> >>> 测试发现,每1分钟都会输出一次,落地的数据一样, >> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >> >>> >> >>> >>