hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。
在 2020-11-19 23:13:19,"Jark Wu" <imj...@gmail.com> 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang <kandy1...@163.com> wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT UNSIGNED AUTO_INCREMENT, >> `spu_id` BIGINT NOT NULL, >> `leaving_price` DECIMAL(10, 5) >> PRIMARY KEY ( `id` ), >> unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >> `spu_id` BIGINT , >> `leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://...', >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> 'username' = '...', >> 'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >>