Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>