Hi!

你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData
是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。

另:我记得我已经回复了两封相同的邮件,之前的回复是丢失了吗?

扯 <tzling2...@foxmail.com> 于2021年11月5日周五 上午10:50写道:

>
> 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
>
> clickhouse建表语句如下:
> CREATE TABLE test_local.tzling_tb3(
>     uuid String,
>     product String,
>     platform String,
> batchId String,
> id String,
> account String,
> customerId String,
> reportName String,
> dt String,
> campaign String,
> adGroup String,
> generalField String,
> currency String,
> impressions String,
> cost String,
> clicks String,
> conversions String,
> createDateTime String,
> createTime BIGINT,
> key String,
> pdate String
> )engine = MergeTree PARTITION BY pdate order by createTime;
> 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
>
> processData.addSink(new MSKUpsertClickHouseSink());
> 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为:
> 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
>
> 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?
>

回复