您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:

clickhouse建表语句如下:
CREATE TABLE test_local.tzling_tb3(
    uuid String,
    product String,
    platform String,
        batchId String,
        id String,
        account String,
        customerId String,
        reportName String,
        dt String,
        campaign String,
        adGroup String,
        generalField String,
        currency String,
        impressions String,
        cost String,
        clicks String,
        conversions String,
        createDateTime String,
        createTime BIGINT,
        key String,
        pdate String
)engine = MergeTree PARTITION BY pdate order by createTime;
将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。

processData.addSink(new MSKUpsertClickHouseSink());
附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为: 
先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。

请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?


从 Windows 版邮件发送

回复