Re:Re: flink sql cdc sum 结果出现NULL

2020-11-30 文章 kandy.wang









@Jianzhi Zhang

嗯,是这个原因,感谢 回复。 就是decimal的精度问题




在 2020-12-01 13:24:23,"Jianzhi Zhang"  写道:
>是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现
>
>> 2020年11月19日 下午10:41,kandy.wang  写道:
>> 
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>   `id` INT UNSIGNED AUTO_INCREMENT,
>>   `spu_id` BIGINT NOT NULL,
>>   `leaving_price`  DECIMAL(10, 5)
>>   PRIMARY KEY ( `id` ),
>>   unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> 
>> 
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>   `spu_id` BIGINT ,
>>   `leaving_price`  DECIMAL(10, 5),
>>PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>  'connector' = 'jdbc',
>>   'url' = 'jdbc:mysql://...',
>>   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>   'username' = '...',
>>   'password' = '..'
>> );
>> 
>> 
>> --binlog 2mysql
>> 
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> 
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> 
>> FROM hive.database.table
>> 
>> group by v_spu_id;
>> 
>> 
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> 
>> 
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>> 
>> 
>> 
>> 
>> 


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark:


打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况

自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark:

打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>