关于CatalogPartitionSpec类的一些想法
大家好: 我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。 我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢? 谢谢
关于CatalogPartitionSpec类的一些想法
大家好: 我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。 我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map
Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: Re: flink sql cdc sum 结果出现NULL
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的 format 再解析 update 时,时先发的 before 还是 after? >4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? > >On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > >> >> >> >> >> >> >> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 >> >> 2. 没有开启 >> >> >> >> >> 在 2020-11-20 11:49:44,"Jark Wu" 写道: >> >实现上应该没什么问题。 >> > >> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >> >2. 是否开启 mini-batch了? >> > >> >Best, >> >Jark >> > >> >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: >> > >> >> hi Jark: >> >> >> >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 >> update_before >> >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >> >值的,以验证你的自定义 format 没有问题。 >> >> > >> >> >Best, >> >> >Jark >> >> > >> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> >> > >> >> >> --mysql表 >> >> >> CREATE TABLE IF NOT EXISTS >> `mysql_realtime_leaving_price_spu_index_agg`( >> >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >> >>`spu_id` BIGINT NOT NULL, >> >> >>`leaving_price` DECIMAL(10, 5) >> >> >>PRIMARY KEY ( `id` ), >> >> >>unique key idx_spu_id (spu_id) >> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> >> >> >> --flink表 >> >> >> CREATE TABLE >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> ( >> >> >>`spu_id` BIGINT , >> >> >>`leaving_price` DECIMAL(10, 5), >> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> >> ) WITH ( >> >> >> 'connector' = 'jdbc', >> >> >>'url' = 'jdbc:mysql://...', >> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >> >>'username' = '...', >> >> >>'password' = '..' >> >> >> ); >> >> >> >> >> >> >> >> >> --binlog 2mysql >> >> >> >> >> >> insert into >> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> >> >> FROM hive.database.table >> >> >> >> >> >> group by v_spu_id; >> >> >> >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >>
Re: Re: Re: flink sql cdc sum 结果出现NULL
1. 没有初始的全量数据可能是会有问题的。 3. 你的 format 再解析 update 时,时先发的 before 还是 after? 4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > > > > > > > 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 > > 2. 没有开启 > > > > > 在 2020-11-20 11:49:44,"Jark Wu" 写道: > >实现上应该没什么问题。 > > > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? > >2. 是否开启 mini-batch了? > > > >Best, > >Jark > > > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > > > >> hi Jark: > >> > >> > >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > >> > >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 > update_before > >> update_after,format逻辑是应该这么写的吧。 > >> > >> > >> > >> > >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: > >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >> >值的,以验证你的自定义 format 没有问题。 > >> > > >> >Best, > >> >Jark > >> > > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> > > >> >> --mysql表 > >> >> CREATE TABLE IF NOT EXISTS > `mysql_realtime_leaving_price_spu_index_agg`( > >> >>`id` INT UNSIGNED AUTO_INCREMENT, > >> >>`spu_id` BIGINT NOT NULL, > >> >>`leaving_price` DECIMAL(10, 5) > >> >>PRIMARY KEY ( `id` ), > >> >>unique key idx_spu_id (spu_id) > >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> >> > >> >> > >> >> --flink表 > >> >> CREATE TABLE > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> ( > >> >>`spu_id` BIGINT , > >> >>`leaving_price` DECIMAL(10, 5), > >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> >> ) WITH ( > >> >> 'connector' = 'jdbc', > >> >>'url' = 'jdbc:mysql://...', > >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >> >>'username' = '...', > >> >>'password' = '..' > >> >> ); > >> >> > >> >> > >> >> --binlog 2mysql > >> >> > >> >> insert into > hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> >> > >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> >> > >> >> FROM hive.database.table > >> >> > >> >> group by v_spu_id; > >> >> > >> >> > >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> >> > >> >> > >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> >> 有什么好的排查思路么? > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >
Re:关于global window
Hi sparklelj, Global window 的是所有相同的 key 的元素会在一个 window里,它没有 window end,所以需要自己实现 custom trigger 来触发 window 的计算[1]。 它属于 keyed window,并不是只能有一个 window 实例( windowAll 只有一个 window 实例)。 所以看下是不是用法有错误呢,你的 ‘ StreamToBatchWindow’ 类是继承了哪个接口的? [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#global-windows Best, Hailong Wang 在 2020-11-20 01:19:09,"j l" 写道: >您好,我在看global window的时候有一些疑问,首先是global >window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global >window,然后设置了process的并行度,但是window确实是只有一个 >示例如下: > >dataUnion.keyBy(0).window(new StreamToBatchWindow()).process(new >StreamToBatchProcess()).setParallelism(20).print(); > >如果是这样的话,这个window岂不是成了瓶颈?不知道我理解的对不对,我是希望能多一些窗口对不同的key stream做global >window的处理。 >另外一个就是global window会为每个key维护一个状态,这样如果key不断增加岂不是要爆了?怎样才能清除永远不会再出现的key的状态呢? > >谢谢!
Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re: Re: flink sql cdc sum 结果出现NULL
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 >> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before >> update_after,format逻辑是应该这么写的吧。 >> >> >> >> >> 在 2020-11-19 23:13:19,"Jark Wu" 写道: >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >> >值的,以验证你的自定义 format 没有问题。 >> > >> >Best, >> >Jark >> > >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: >> > >> >> --mysql表 >> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> >>`id` INT UNSIGNED AUTO_INCREMENT, >> >>`spu_id` BIGINT NOT NULL, >> >>`leaving_price` DECIMAL(10, 5) >> >>PRIMARY KEY ( `id` ), >> >>unique key idx_spu_id (spu_id) >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> >> >> >> --flink表 >> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> ( >> >>`spu_id` BIGINT , >> >>`leaving_price` DECIMAL(10, 5), >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> >> ) WITH ( >> >> 'connector' = 'jdbc', >> >>'url' = 'jdbc:mysql://...', >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >> >>'username' = '...', >> >>'password' = '..' >> >> ); >> >> >> >> >> >> --binlog 2mysql >> >> >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> >> >> FROM hive.database.table >> >> >> >> group by v_spu_id; >> >> >> >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> >> 有什么好的排查思路么? >> >> >> >> >> >> >> >> >> >> >> >> >>
Re:Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常
可以 grep 看下哪些 jar 包包含这 2 个类的? 在 2020-11-20 08:51:59,"m13162790856" 写道: >HI: > 偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, > 所以包能确保每次启动都是一样,很奇怪这种情况 > > >在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道: > > >Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 >`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child >classload 加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent >classload 加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 >希望对你有帮助。 Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856" > 写道: >具体日主信息如下: > > > >org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) >at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) >at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) >at >org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > at >org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) > at >org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at >org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at >org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at >org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at >org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at >java.lang.Thread.run(Thread.java:745) Caused by: >org.apache.kafka.common.KafkaException: >org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance >of org.apache.kafka.common.serialization.Deserializer at >org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263) > at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) >... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过
Re: Re: flink sql cdc sum 结果出现NULL
实现上应该没什么问题。 1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? 2. 是否开启 mini-batch了? Best, Jark On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > hi Jark: > > > 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > > 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before > update_after,format逻辑是应该这么写的吧。 > > > > > 在 2020-11-19 23:13:19,"Jark Wu" 写道: > >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >值的,以验证你的自定义 format 没有问题。 > > > >Best, > >Jark > > > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > > > >> --mysql表 > >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > >>`id` INT UNSIGNED AUTO_INCREMENT, > >>`spu_id` BIGINT NOT NULL, > >>`leaving_price` DECIMAL(10, 5) > >>PRIMARY KEY ( `id` ), > >>unique key idx_spu_id (spu_id) > >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> > >> > >> --flink表 > >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > ( > >>`spu_id` BIGINT , > >>`leaving_price` DECIMAL(10, 5), > >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'jdbc', > >>'url' = 'jdbc:mysql://...', > >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >>'username' = '...', > >>'password' = '..' > >> ); > >> > >> > >> --binlog 2mysql > >> > >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> > >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> > >> FROM hive.database.table > >> > >> group by v_spu_id; > >> > >> > >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> > >> > >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> 有什么好的排查思路么? > >> > >> > >> > >> > >> > >> >
Re:Re: flink sql cdc sum 结果出现NULL
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >>`id` INT UNSIGNED AUTO_INCREMENT, >>`spu_id` BIGINT NOT NULL, >>`leaving_price` DECIMAL(10, 5) >>PRIMARY KEY ( `id` ), >>unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >>`spu_id` BIGINT , >>`leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >>'url' = 'jdbc:mysql://...', >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >>'username' = '...', >>'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >>
Re:Re: flink sql cdc sum 结果出现NULL
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null >值的,以验证你的自定义 format 没有问题。 > >Best, >Jark > >On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >>`id` INT UNSIGNED AUTO_INCREMENT, >>`spu_id` BIGINT NOT NULL, >>`leaving_price` DECIMAL(10, 5) >>PRIMARY KEY ( `id` ), >>unique key idx_spu_id (spu_id) >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 >> >> >> --flink表 >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >>`spu_id` BIGINT , >>`leaving_price` DECIMAL(10, 5), >> PRIMARY KEY ( `spu_id`) NOT ENFORCED >> ) WITH ( >> 'connector' = 'jdbc', >>'url' = 'jdbc:mysql://...', >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >>'username' = '...', >>'password' = '..' >> ); >> >> >> --binlog 2mysql >> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price >> >> FROM hive.database.table >> >> group by v_spu_id; >> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 >> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 >> 有什么好的排查思路么? >> >> >> >> >> >>
答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t -> t.f0)),如果我想按多个字段进行分组的话该怎么操作呢? -邮件原件- 发件人: guanxianchun 发送时间: 2020年11月19日 20:53 收件人: user-zh@flink.apache.org 主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题 flink-1.11使用KeySelector -- Sent from: http://apache-flink.147419.n8.nabble.com/
关于global window
您好,我在看global window的时候有一些疑问,首先是global window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global window,然后设置了process的并行度,但是window确实是只有一个 示例如下: dataUnion.keyBy(0).window(new StreamToBatchWindow()).process(new StreamToBatchProcess()).setParallelism(20).print(); 如果是这样的话,这个window岂不是成了瓶颈?不知道我理解的对不对,我是希望能多一些窗口对不同的key stream做global window的处理。 另外一个就是global window会为每个key维护一个状态,这样如果key不断增加岂不是要爆了?怎样才能清除永远不会再出现的key的状态呢? 谢谢!
Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
flink-1.11使用KeySelector -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常
HI: 偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 所以包能确保每次启动都是一样,很奇怪这种情况 在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道: Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload 加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 希望对你有帮助。 Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856" 写道: >具体日主信息如下: > > > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) ... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过
Re: flink sql cdc sum 结果出现NULL
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null 值的,以验证你的自定义 format 没有问题。 Best, Jark On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >`id` INT UNSIGNED AUTO_INCREMENT, >`spu_id` BIGINT NOT NULL, >`leaving_price` DECIMAL(10, 5) >PRIMARY KEY ( `id` ), >unique key idx_spu_id (spu_id) > )ENGINE=InnoDB DEFAULT CHARSET=utf8 > > > --flink表 > CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( >`spu_id` BIGINT , >`leaving_price` DECIMAL(10, 5), > PRIMARY KEY ( `spu_id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', >'url' = 'jdbc:mysql://...', >'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', >'username' = '...', >'password' = '..' > ); > > > --binlog 2mysql > > insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > > SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > > FROM hive.database.table > > group by v_spu_id; > > > hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > > > 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > 有什么好的排查思路么? > > > > > >
flink sql cdc sum 结果出现NULL
--mysql表 CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( `id` INT UNSIGNED AUTO_INCREMENT, `spu_id` BIGINT NOT NULL, `leaving_price` DECIMAL(10, 5) PRIMARY KEY ( `id` ), unique key idx_spu_id (spu_id) )ENGINE=InnoDB DEFAULT CHARSET=utf8 --flink表 CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg ( `spu_id` BIGINT , `leaving_price` DECIMAL(10, 5), PRIMARY KEY ( `spu_id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://...', 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', 'username' = '...', 'password' = '..' ); --binlog 2mysql insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price FROM hive.database.table group by v_spu_id; hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 有什么好的排查思路么?
Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
大佬们: 在 Flink 1.10.x 中的 keyBy 算子可以同时按多个字段分组,比如 map.keyBy(0,1),但在 1.11.x 版本中这种方式被弃用了,看了下源码好像不支持按多字段分组了?还是有别的其他形式? 如果我想按多个字段分组的话需要怎么操作? 请大佬指点!
Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常
Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload 加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 希望对你有帮助。 Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856" 写道: >具体日主信息如下: > > > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) >at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) >at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) >at >org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > at >org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > at >org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) > at >org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at >org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at >org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at >org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at >org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at >org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at >org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at >java.lang.Thread.run(Thread.java:745) Caused by: >org.apache.kafka.common.KafkaException: >org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance >of org.apache.kafka.common.serialization.Deserializer at >org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263) > at >org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) >... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过