基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable  维表。
Flink 版本 1.12.2

场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出
场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key
场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
则报错,主要报错信息:
java.sql.BatchUpdateException: [30000,
2021060816420117201616500303151172567] cannot update pk column UID to expr

注:此处使用的MySQL 是阿里的 ADB,建表SQL如下
Create Table `v2_dwd_root_game_uid_reg_log` (
 `uid` bigint NOT NULL DEFAULT '0' COMMENT '注册uid',
 `user_name` varchar NOT NULL DEFAULT '',
  // 此处省略其他字段
 primary key (`uid`,`platform`,`root_game_id`)
) DISTRIBUTE BY HASH(`uid`) INDEX_ALL='Y' STORAGE_POLICY='HOT'
COMMENT='按根游戏账号注册日志';


下面是场景3的SQL语句:
// Kafka Source
CREATE TABLE KafkaTable (
    message STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'xxxxxxxxxxx',
    'properties.bootstrap.servers' = 'xxxxxxxxxxx',
    'properties.group.id' = 'xxxxxxxxxxxxx',
    'scan.startup.mode' = 'group-offsets',
    'format' = 'json'
);

// 维表
CREATE TABLE DimTable (
    game_id BIGINT,
    root_game_id BIGINT,
    main_game_id BIGINT,
    platform VARCHAR
) WITH (
    'connector' = 'jdbc',
    'url' = 'xxxxxxxxxxxx',
    'table-name' = 'v2_dim_game_id',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'xxxxxxxxx',
    'password' = 'xxxxxxxx',
    'lookup.cache.max-rows'='5000',
    'lookup.cache.ttl' = '60s',
    'lookup.max-retries'='3'
);

// MySQL输出
CREATE TABLE sinktable (
    uid BIGINT,
    root_game_id BIGINT,
    game_id BIGINT,
    platform VARCHAR,
    //....省略其它字段
    PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'xxxxxxxxxxxxxx',
    'table-name' = 'v2_dwd_root_game_uid_reg_log',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'xxxxxxxxxxx',
    'password' = 'xxxxxxxxxxxx',
    'sink.buffer-flush.interval'='5s',
    'sink.buffer-flush.max-rows' = '10'
);

// 插入(关联维表)
INSERT INTO sinktable select
    IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid,
    IF((k.game_id IS NULL), 0 , k.game_id) as game_id,
    d.platform as platform,
    d.root_game_id as root_game_id,
    // 省略其它字段
 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
)) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform =
d.platform;




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复