Hi,

问题一:删除数据可不单单只是retract stream的功能。upsert
stream是当下游具有按key覆盖的功能时的特殊优化,除了按key覆盖外,它也需要在上游retract时删除数据,意思是upsert
stream也有retract的input数据的。JDBC实现的是upsert stream的消费。

问题二:正确数据应该是:
1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1  ( 删除
zhongtong 1)
3  {"order_id":2,"tms_company":"yuantong"}     数据库1条记录: yuantong 2  (
删除yuantong 1)
4  {"order_id":2,"tms_company":"zhongtong"}   数据库2条记录: yuantong 1,
zhongtong 1    ( 删除yuantong 2)

你用了什么dialect?是不是mysql?
Flink JDBC的Mysql用了DUPLICATE KEY UPDATE的语法来更新数据。
看起来这个语法在RDS没有建主键或者唯一键时可能不会去覆盖老数据?尝试创建下主键或唯一建?

Best,
Jingsong Lee

On Wed, May 6, 2020 at 10:36 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> 更新键是 tms_company, 但这是通过双层的 group 实现了回退功能,总共就 两个 order_id, order_id 对应的
> tms_company 是有变化的。
> 我不是很明白这种回退的具体原理,为什么有的会删除,有的就没有删除。
>
>
>
>
> wangl...@geekplus.com.cn
>
> 发件人: Michael Ran
> 发送时间: 2020-04-30 17:23
> 收件人: user-zh
> 主题: Re:FlinkSQL Retraction 问题原理咨询
>
>
>
> 指定的更新键是tms_company?
>
>
> 结果是:
> yuantong:2
> zhongtong:2
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-30 17:08:22,"wangl...@geekplus.com.cn" <wangl...@geekplus.com.cn>
> 写道:
> >
> >自己实现了一下 https://yq.aliyun.com/articles/457392/ 菜鸟物流订单统计的例子,读 kafka 写到
> RDS, RDS 表没有主键,也没有唯一键。
> >
> >INSERT INTO table_out select  tms_company,  count(distinct order_id) as
> order_cnt from
> >    (select order_id, LAST_VALUE(tms_company) AS tms_company from
> dwd_table group by order_id)
> > group by tms_company;
> >
> >
> >总共发送了 4 条消息,顺序如下:
> >
> >1  {"order_id":1,"tms_company":"zhongtong"}   数据库1条记录: zhongtong 1
> >
> >2  {"order_id":1,"tms_company":"yuantong"}      数据库1条记录: yuantong 1
> (上一条记录被删除了)
> >
> >3  {"order_id":2,"tms_company":"yuantong"}     数据库2条记录: yuantong 1,
> yuantong 2  (增加了条记录,没有删除)
> >
> >4  {"order_id":2,"tms_company":"zhongtong"}   数据库4条记录: yuantong 1,
> yuantong 2, yuantong 1, zhongtong 1    (增加了两条记录,没有删除)
> >
> >
> >问题一:
> >    第 2 条消息发送后,数据库的上一条记录被删除了。我的理解这应该是 RetractStream 的功能。当我看源码
> https://github.com/apache/flink/tree/master/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc
> 这里也没有 RetractionStream 的实现,哪里的代码把他删除了呢?
> >
> >问题二:
> >   第 3 条记录来了后,直接在数据库增加了 一条 yuantong 2, 为什么没把 yuantong 1, 删除呢?
> >   第 4 条记录来了后,又在数据库里加了两条记录,为什么也没有删除之前的呢?
> >
> >谢谢,
> >王磊
> >
> >
> >
> >wangl...@geekplus.com.cn
>


-- 
Best, Jingsong Lee

Reply via email to