Hi!

你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData
是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。

涂子令 <tuzil...@kingsoft.com> 于2021年10月27日周三 下午5:49写道:

>  您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用
> flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
>
>
>
> clickhouse建表语句如下:
>
> CREATE TABLE test_local.tzling_tb3(
>
>     uuid String,
>
>     product String,
>
>     platform String,
>
>       batchId String,
>
>       id String,
>
>       account String,
>
>       customerId String,
>
>       reportName String,
>
>       dt String,
>
>       campaign String,
>
>       adGroup String,
>
>       generalField String,
>
>       currency String,
>
>       impressions String,
>
>       cost String,
>
>       clicks String,
>
>       conversions String,
>
>       createDateTime String,
>
>       createTime BIGINT,
>
>       key String,
>
>       pdate String
>
> )engine = MergeTree PARTITION BY pdate order by createTime;
>
> 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
>
>
>
> processData.addSink(new MSKUpsertClickHouseSink());
>
> 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为:
> 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append
> 操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
>
>
>
> 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?
>
>
>
>
>
> 从 Windows 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
>
>

回复