Re:Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题
你好: SELECT window_start,window_end,SUM(price),item FROM TABLE( CUMULATE(TABLE Bid,DESCRIPTOR(bidtime),INTERVAL '1' MINUTES,INTERVAL'10' HOUR)) GROUP BY window_start,window_end,item 语句没有问题,正常每1分钟输出一次,过期时间代码已注释, public ChangelogMode getChanglogMode(ChangelogMode arg0){ return ChangelogMode.upsert(); } 实现RedisMapper 方法 落地redis 有输出语句,每1分钟都会落地一次,我确定数据每次都一样 这upsert 不合理啊 在 2021-08-20 11:15:17,"Caizhi Weng" 写道: >Hi! > >之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink >确实应该每分钟收到一条消息。 > >sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。 > >李航飞 于2021年8月20日周五 上午10:03写道: > >> 你好: >> 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗? >> >> >> >> 在 2021-08-20 09:10:44,"李航飞" 写道: >> >你好: >> >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 >> > >> > >> >我在RichMapFunction接口里面实现open方法 >> >设置过StateTtlConfig; >> >之后在RedisConmmand.SETEX设置过期时间 >> >都注释了,但upsert()方法还是没效 >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> >在 2021-08-19 17:44:02,"Caizhi Weng" 写道: >> >>Hi! >> >> >> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 >> >> >> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >> >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 >> >> >> >>李航飞 于2021年8月19日周四 下午5:03写道: >> >> >> >>> 版本 flink1.13.2 >> >>> 具体场景 >> >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >> >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >> >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >> >>> >> >>> >> >>> 问题: >> >>> 测试发现,每1分钟都会输出一次,落地的数据一样, >> >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >> >>> >> >>> >>
Re: Re:Re: flink-connector-redis连接器upsert()模式插入问题
Hi! 之前没注意到是 cumulate 窗口,具体的 group by 语句是什么样的呢?描述里写着“小窗口 1 分钟计算一次”,那 sink 确实应该每分钟收到一条消息。 sink 在 streaming 场景可以接收 upsert 消息,只要 getChangelogMode 返回的是 upsert 即可。 李航飞 于2021年8月20日周五 上午10:03写道: > 你好: > 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗? > > > > 在 2021-08-20 09:10:44,"李航飞" 写道: > >你好: > >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 > > > > > >我在RichMapFunction接口里面实现open方法 > >设置过StateTtlConfig; > >之后在RedisConmmand.SETEX设置过期时间 > >都注释了,但upsert()方法还是没效 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2021-08-19 17:44:02,"Caizhi Weng" 写道: > >>Hi! > >> > >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 > >> > >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key > >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 > >> > >>李航飞 于2021年8月19日周四 下午5:03写道: > >> > >>> 版本 flink1.13.2 > >>> 具体场景 > >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, > >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, > >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 > >>> > >>> > >>> 问题: > >>> 测试发现,每1分钟都会输出一次,落地的数据一样, > >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? > >>> > >>> >
Re:Re:Re: flink-connector-redis连接器upsert()模式插入问题
你好: 我查到 Sink在Streaming场景 只能接收insert-only ,是这样吗? 在 2021-08-20 09:10:44,"李航飞" 写道: >你好: >我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 > > >我在RichMapFunction接口里面实现open方法 >设置过StateTtlConfig; >之后在RedisConmmand.SETEX设置过期时间 >都注释了,但upsert()方法还是没效 > > > > > > > > > > > > > > >在 2021-08-19 17:44:02,"Caizhi Weng" 写道: >>Hi! >> >>可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 >> >>如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >>value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 >> >>李航飞 于2021年8月19日周四 下午5:03写道: >> >>> 版本 flink1.13.2 >>> 具体场景 >>> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >>> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >>> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >>> >>> >>> 问题: >>> 测试发现,每1分钟都会输出一次,落地的数据一样, >>> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >>> >>>
Re:Re: flink-connector-redis连接器upsert()模式插入问题
你好: 我确实设置了过期时间,我将过期时间的代码注释了,但还是每分钟落地一次。 我在RichMapFunction接口里面实现open方法 设置过StateTtlConfig; 之后在RedisConmmand.SETEX设置过期时间 都注释了,但upsert()方法还是没效 在 2021-08-19 17:44:02,"Caizhi Weng" 写道: >Hi! > >可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 > >如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key >value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 > >李航飞 于2021年8月19日周四 下午5:03写道: > >> 版本 flink1.13.2 >> 具体场景 >> flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, >> 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, >> 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 >> >> >> 问题: >> 测试发现,每1分钟都会输出一次,落地的数据一样, >> 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? >> >>
Re: flink-connector-redis连接器upsert()模式插入问题
Hi! 可以在接受 sink 数据的方法里 System.out.println 看看是否真的每分钟收到一次数据。 如果是的话,应该是设置了 state ttl 的缘故。为了防止数据过期,group agg 就算每次收到同样的 key value,仍然会往下游发数据(详见 GroupAggFunction#processElement),所以这是一个预期行为。 李航飞 于2021年8月19日周四 下午5:03写道: > 版本 flink1.13.2 > 具体场景 > flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, > 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, > 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 > > > 问题: > 测试发现,每1分钟都会输出一次,落地的数据一样, > 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况? > >
flink-connector-redis连接器upsert()模式插入问题
版本 flink1.13.2 具体场景 flink-connector-redis自定义连接器,在实现DynamicTableSink接口时, 通过重写getChangelogMode方法,设置ChangelogMode.upsert()模式,以更新的方式进行数据写入, 使用的窗口时cumulate函数 开1天的窗口,小窗口1分钟计算一次 问题: 测试发现,每1分钟都会输出一次,落地的数据一样, 根据upsert()模式,正常来说一样的数据应该不会触发redis存储,这个是什么情况?