Re:Re: flink sql cdc sum 结果出现NULL
@Jianzhi Zhang 嗯,是这个原因,感谢 回复。 就是decimal的精度问题 在 2020-12-01 13:24:23,"Jianzhi Zhang" 写道: >是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > >> 2020年11月19日 下午10:41,kandy.wang 写道: >> >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT UNSIGNED AUTO_INCREMENT, >> `spu_id` BIGINT NOT NULL, >> `leaving_price` DECIMAL(10, 5) >> PRIMARY KEY ( `id` ), >> unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >> `spu_id` BIGINT , >> `leaving_price` DECIMAL(10, 5), >>PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >> 'url' = 'jdbc:mysql://...', >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> 'username' = '...', >> 'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >>
Re: flink sql cdc sum 结果出现NULL
是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > 2020年11月19日 下午10:41,kandy.wang 写道: > > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > `id` INT UNSIGNED AUTO_INCREMENT, > `spu_id` BIGINT NOT NULL, > `leaving_price` DECIMAL(10, 5) > PRIMARY KEY ( `id` ), > unique key idx_spu_id (spu_id) > )ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > --flink表 > CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( > `spu_id` BIGINT , > `leaving_price` DECIMAL(10, 5), >PRIMARY KEY ( `spu_id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://...', > 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > 'username' = '...', > 'password' = '..' > ); > > > --binlog 2mysql > > insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > > SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > > FROM hive.database.table > > group by v_spu_id; > > > hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > > > 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > 有什么好的排查思路么? > > > > >
Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re: Re: Re: flink sql cdc sum 结果出现NULL
1. 没有初始的全量数据可能是会有问题的。 3. 你的 format 再解析 update 时,时先发的 before 还是 after? 4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > > > > > > > 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 > > 2. 没有开启 > > > > > 在 2020-11-20 11:49:44,"Jark Wu" 写道: > >实现上应该没什么问题。 > > > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? > >2. 是否开启 mini-batch了? > > > >Best, > >Jark > > > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > > > >> hi Jark: > >> > >> > >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > >> > >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 > update_before > >> update_after,format逻辑是应该这么写的吧。 > >> > >> > >> > >> > >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: > >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >> >值的,以验证你的自定义 format 没有问题。 > >> > > >> >Best, > >> >Jark > >> > > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> > > >> >> --mysql表 > >> >> CREATE TABLE IF NOT EXISTS > `mysql_realtime_leaving_price_spu_index_agg`( > >> >>`id` INT UNSIGNED AUTO_INCREMENT, > >> >>`spu_id` BIGINT NOT NULL, > >> >>`leaving_price` DECIMAL(10, 5) > >> >>PRIMARY KEY ( `id` ), > >> >>unique key idx_spu_id (spu_id) > >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> >> > >> >> > >> >> --flink表 > >> >> CREATE TABLE > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> ( > >> >>`spu_id` BIGINT , > >> >>`leaving_price` DECIMAL(10, 5), > >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> >> ) WITH ( > >> >> 'connector' = 'jdbc', > >> >>'url' = 'jdbc:mysql://...', > >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >> >>'username' = '...', > >> >>'password' = '..' > >> >> ); > >> >> > >> >> > >> >> --binlog 2mysql > >> >> > >> >> insert into > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> >> > >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> >> > >> >> FROM hive.database.table > >> >> > >> >> group by v_spu_id; > >> >> > >> >> > >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> >> > >> >> > >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> >> 有什么好的排查思路么? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >
Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>
Re: Re: flink sql cdc sum 结果出现NULL
实现上应该没什么问题。 1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? 2. 是否开启 mini-batch了? Best, Jark On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > hi Jark: > > > 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > > 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before > update_after,format逻辑是应该这么写的吧。 > > > > > 在 2020-11-19 23:13:19,"Jark Wu" 写道: > >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >值的,以验证你的自定义 format 没有问题。 > > > >Best, > >Jark > > > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > > > >> --mysql表 > >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > >>`id` INT UNSIGNED AUTO_INCREMENT, > >>`spu_id` BIGINT NOT NULL, > >>`leaving_price` DECIMAL(10, 5) > >>PRIMARY KEY ( `id` ), > >>unique key idx_spu_id (spu_id) > >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> > >> > >> --flink表 > >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > ( > >>`spu_id` BIGINT , > >>`leaving_price` DECIMAL(10, 5), > >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'jdbc', > >>'url' = 'jdbc:mysql://...', > >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >>'username' = '...', > >>'password' = '..' > >> ); > >> > >> > >> --binlog 2mysql > >> > >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> > >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> > >> FROM hive.database.table > >> > >> group by v_spu_id; > >> > >> > >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> > >> > >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> 有什么好的排查思路么? > >> > >> > >> > >> > >> > >> >
Re:Re: flink sql cdc sum 结果出现NULL
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >>`id` INT UNSIGNED AUTO_INCREMENT, >>`spu_id` BIGINT NOT NULL, >>`leaving_price` DECIMAL(10, 5) >>PRIMARY KEY ( `id` ), >>unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >>`spu_id` BIGINT , >>`leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >>'url' = 'jdbc:mysql://...', >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >>'username' = '...', >>'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >>
Re:Re: flink sql cdc sum 结果出现NULL
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >>`id` INT UNSIGNED AUTO_INCREMENT, >>`spu_id` BIGINT NOT NULL, >>`leaving_price` DECIMAL(10, 5) >>PRIMARY KEY ( `id` ), >>unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >>`spu_id` BIGINT , >>`leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >>'url' = 'jdbc:mysql://...', >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >>'username' = '...', >>'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >>
Re: flink sql cdc sum 结果出现NULL
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null 值的,以验证你的自定义 format 没有问题。 Best, Jark On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >`id` INT UNSIGNED AUTO_INCREMENT, >`spu_id` BIGINT NOT NULL, >`leaving_price` DECIMAL(10, 5) >PRIMARY KEY ( `id` ), >unique key idx_spu_id (spu_id) > )ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > --flink表 > CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >`spu_id` BIGINT , >`leaving_price` DECIMAL(10, 5), > PRIMARY KEY ( `spu_id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', >'url' = 'jdbc:mysql://...', >'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >'username' = '...', >'password' = '..' > ); > > > --binlog 2mysql > > insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > > SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > > FROM hive.database.table > > group by v_spu_id; > > > hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > > > 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > 有什么好的排查思路么? > > > > > >
flink sql cdc sum 结果出现NULL
--mysql表 CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( `id` INT UNSIGNED AUTO_INCREMENT, `spu_id` BIGINT NOT NULL, `leaving_price` DECIMAL(10, 5) PRIMARY KEY ( `id` ), unique key idx_spu_id (spu_id) )ENGINE=InnoDB DEFAULT CHARSET=utf8 --flink表 CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( `spu_id` BIGINT , `leaving_price` DECIMAL(10, 5), PRIMARY KEY ( `spu_id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://...', 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', 'username' = '...', 'password' = '..' ); --binlog 2mysql insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price FROM hive.database.table group by v_spu_id; hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 有什么好的排查思路么?