基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。 Flink 版本 1.12.2
场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出 场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key 场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED 则报错,主要报错信息: java.sql.BatchUpdateException: [30000, 2021060816420117201616500303151172567] cannot update pk column UID to expr 注:此处使用的MySQL 是阿里的 ADB,建表SQL如下 Create Table `v2_dwd_root_game_uid_reg_log` ( `uid` bigint NOT NULL DEFAULT '0' COMMENT '注册uid', `user_name` varchar NOT NULL DEFAULT '', // 此处省略其他字段 primary key (`uid`,`platform`,`root_game_id`) ) DISTRIBUTE BY HASH(`uid`) INDEX_ALL='Y' STORAGE_POLICY='HOT' COMMENT='按根游戏账号注册日志'; 下面是场景3的SQL语句: // Kafka Source CREATE TABLE KafkaTable ( message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'xxxxxxxxxxx', 'properties.bootstrap.servers' = 'xxxxxxxxxxx', 'properties.group.id' = 'xxxxxxxxxxxxx', 'scan.startup.mode' = 'group-offsets', 'format' = 'json' ); // 维表 CREATE TABLE DimTable ( game_id BIGINT, root_game_id BIGINT, main_game_id BIGINT, platform VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxx', 'table-name' = 'v2_dim_game_id', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxx', 'password' = 'xxxxxxxx', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl' = '60s', 'lookup.max-retries'='3' ); // MySQL输出 CREATE TABLE sinktable ( uid BIGINT, root_game_id BIGINT, game_id BIGINT, platform VARCHAR, //....省略其它字段 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'xxxxxxxxxxxxxx', 'table-name' = 'v2_dwd_root_game_uid_reg_log', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxxxxxxxxxx', 'password' = 'xxxxxxxxxxxx', 'sink.buffer-flush.interval'='5s', 'sink.buffer-flush.max-rows' = '10' ); // 插入(关联维表) INSERT INTO sinktable select IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid, IF((k.game_id IS NULL), 0 , k.game_id) as game_id, d.platform as platform, d.root_game_id as root_game_id, // 省略其它字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform = d.platform; -- Sent from: http://apache-flink.147419.n8.nabble.com/