Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re: Re: Re: flink sql cdc sum 结果出现NULL
1. 没有初始的全量数据可能是会有问题的。 3. 你的 format 再解析 update 时,时先发的 before 还是 after? 4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > > > > > > > 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 > > 2. 没有开启 > > > > > 在 2020-11-20 11:49:44,"Jark Wu" 写道: > >实现上应该没什么问题。 > > > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? > >2. 是否开启 mini-batch了? > > > >Best, > >Jark > > > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > > > >> hi Jark: > >> > >> > >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > >> > >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 > update_before > >> update_after,format逻辑是应该这么写的吧。 > >> > >> > >> > >> > >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: > >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >> >值的,以验证你的自定义 format 没有问题。 > >> > > >> >Best, > >> >Jark > >> > > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> > > >> >> --mysql表 > >> >> CREATE TABLE IF NOT EXISTS > `mysql_realtime_leaving_price_spu_index_agg`( > >> >>`id` INT UNSIGNED AUTO_INCREMENT, > >> >>`spu_id` BIGINT NOT NULL, > >> >>`leaving_price` DECIMAL(10, 5) > >> >>PRIMARY KEY ( `id` ), > >> >>unique key idx_spu_id (spu_id) > >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> >> > >> >> > >> >> --flink表 > >> >> CREATE TABLE > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> ( > >> >>`spu_id` BIGINT , > >> >>`leaving_price` DECIMAL(10, 5), > >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> >> ) WITH ( > >> >> 'connector' = 'jdbc', > >> >>'url' = 'jdbc:mysql://...', > >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >> >>'username' = '...', > >> >>'password' = '..' > >> >> ); > >> >> > >> >> > >> >> --binlog 2mysql > >> >> > >> >> insert into > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> >> > >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> >> > >> >> FROM hive.database.table > >> >> > >> >> group by v_spu_id; > >> >> > >> >> > >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> >> > >> >> > >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> >> 有什么好的排查思路么? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >
Re: Re: flink sql cdc sum 结果出现NULL
实现上应该没什么问题。 1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? 2. 是否开启 mini-batch了? Best, Jark On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > hi Jark: > > > 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > > 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before > update_after,format逻辑是应该这么写的吧。 > > > > > 在 2020-11-19 23:13:19,"Jark Wu" 写道: > >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >值的,以验证你的自定义 format 没有问题。 > > > >Best, > >Jark > > > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > > > >> --mysql表 > >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > >>`id` INT UNSIGNED AUTO_INCREMENT, > >>`spu_id` BIGINT NOT NULL, > >>`leaving_price` DECIMAL(10, 5) > >>PRIMARY KEY ( `id` ), > >>unique key idx_spu_id (spu_id) > >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> > >> > >> --flink表 > >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > ( > >>`spu_id` BIGINT , > >>`leaving_price` DECIMAL(10, 5), > >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'jdbc', > >>'url' = 'jdbc:mysql://...', > >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >>'username' = '...', > >>'password' = '..' > >> ); > >> > >> > >> --binlog 2mysql > >> > >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> > >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> > >> FROM hive.database.table > >> > >> group by v_spu_id; > >> > >> > >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> > >> > >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> 有什么好的排查思路么? > >> > >> > >> > >> > >> > >> >