Hi! 你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData 是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。
涂子令 <tuzil...@kingsoft.com> 于2021年10月27日周三 下午5:49写道: > 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用 > flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下: > > > > clickhouse建表语句如下: > > CREATE TABLE test_local.tzling_tb3( > > uuid String, > > product String, > > platform String, > > batchId String, > > id String, > > account String, > > customerId String, > > reportName String, > > dt String, > > campaign String, > > adGroup String, > > generalField String, > > currency String, > > impressions String, > > cost String, > > clicks String, > > conversions String, > > createDateTime String, > > createTime BIGINT, > > key String, > > pdate String > > )engine = MergeTree PARTITION BY pdate order by createTime; > > 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。 > > > > processData.addSink(new MSKUpsertClickHouseSink()); > > 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为: > 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append > 操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。 > > > > 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢? > > > > > > 从 Windows 版邮件 <https://go.microsoft.com/fwlink/?LinkId=550986>发送 > > >