自定义 sink 有这么几个疑问
1. 自带的 sink 得都改成 upsert 比如 jdbc/es
2. 这样 append/upsert 代码有大量重复
3. 和 flink 对 append/upsert 流的定义有冲突,有额外 hack 的解释成本
4. 得有地方另外指定 update key

这么做感觉会挖坑

有其他办法吗

lec ssmi <shicheng31...@gmail.com>于2020年5月7日 周四20:42写道:

> 使用自定义的Table Sink就可以了啊.
>
> Luan Cooper <gc.su...@gmail.com> 于2020年5月7日周四 下午8:39写道:
>
> > Hi
> >
> > 有这么个场景,需要将 MySQL 的对一张表的更改 同步到 ElasticSearch 中,两个地方表的 PRIMARY KEY 都是
> ID,SQL
> > 如下
> >
> > INSERT INTO sink_es // 将更改同步 upsert 到 ES
> > SELECT *
> > FROM binlog // mysql 表的 binlog
> >
> > 假设对于 MySQL 中 id = 1 的变更有 10 条,需要在 ES 上都更新 id = 1 的一条记录
> > 但是上面的 SQL 是做不到的,只会一直 Insert
> >
> > 如果想 Upsert ES 的话,就得让 Source 表变成 Upsert Mode,但是现在好像不支持?
> >
> > 社区的 FLIP-87
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
> >  可以解决这个问题吗?
> >
> > 感谢
> >
>

回复