关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
   我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢?

 谢谢


关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好:
  
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, 
CatalogPartitionSpec)的时候遇到一个问题。
  我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的








在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after


3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 
中。由于是按照主键id hash的。





在 2020-11-20 13:25:53,"Jark Wu"  写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2. 没有开启
>>
>>
>>
>>
>> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
>> >实现上应该没什么问题。
>> >
>> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>> >2. 是否开启 mini-batch了?
>> >
>> >Best,
>> >Jark
>> >
>> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>> >
>> >> hi Jark:
>> >>
>> >>
>> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>> >>
>> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
>> update_before
>> >> update_after,format逻辑是应该这么写的吧。
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >> >值的,以验证你的自定义 format 没有问题。
>> >> >
>> >> >Best,
>> >> >Jark
>> >> >
>> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >> >
>> >> >> --mysql表
>> >> >> CREATE TABLE IF NOT EXISTS
>> `mysql_realtime_leaving_price_spu_index_agg`(
>> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >> >>`spu_id` BIGINT NOT NULL,
>> >> >>`leaving_price`  DECIMAL(10, 5)
>> >> >>PRIMARY KEY ( `id` ),
>> >> >>unique key idx_spu_id (spu_id)
>> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >> >>
>> >> >>
>> >> >> --flink表
>> >> >> CREATE TABLE
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> (
>> >> >>`spu_id` BIGINT ,
>> >> >>`leaving_price`  DECIMAL(10, 5),
>> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> >> ) WITH (
>> >> >>   'connector' = 'jdbc',
>> >> >>'url' = 'jdbc:mysql://...',
>> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >> >>'username' = '...',
>> >> >>'password' = '..'
>> >> >> );
>> >> >>
>> >> >>
>> >> >> --binlog 2mysql
>> >> >>
>> >> >> insert into
>> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >> >>
>> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >> >>
>> >> >> FROM hive.database.table
>> >> >>
>> >> >> group by v_spu_id;
>> >> >>
>> >> >>
>> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >> >>
>> >> >>
>> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> >> 有什么好的排查思路么?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >>
>>


Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
1. 没有初始的全量数据可能是会有问题的。

3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?

On Fri, 20 Nov 2020 at 12:46, kandy.wang  wrote:

>
>
>
>
>
>
> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu"  写道:
> >实现上应该没什么问题。
> >
> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
> >2. 是否开启 mini-batch了?
> >
> >Best,
> >Jark
> >
> >On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
> >
> >> hi Jark:
> >>
> >>
> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
> >>
> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
> update_before
> >> update_after,format逻辑是应该这么写的吧。
> >>
> >>
> >>
> >>
> >> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >> >值的,以验证你的自定义 format 没有问题。
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >> >
> >> >> --mysql表
> >> >> CREATE TABLE IF NOT EXISTS
> `mysql_realtime_leaving_price_spu_index_agg`(
> >> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >> >>`spu_id` BIGINT NOT NULL,
> >> >>`leaving_price`  DECIMAL(10, 5)
> >> >>PRIMARY KEY ( `id` ),
> >> >>unique key idx_spu_id (spu_id)
> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >> >>
> >> >>
> >> >> --flink表
> >> >> CREATE TABLE
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> (
> >> >>`spu_id` BIGINT ,
> >> >>`leaving_price`  DECIMAL(10, 5),
> >> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> >> ) WITH (
> >> >>   'connector' = 'jdbc',
> >> >>'url' = 'jdbc:mysql://...',
> >> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >> >>'username' = '...',
> >> >>'password' = '..'
> >> >> );
> >> >>
> >> >>
> >> >> --binlog 2mysql
> >> >>
> >> >> insert into
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> >>
> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >> >>
> >> >> FROM hive.database.table
> >> >>
> >> >> group by v_spu_id;
> >> >>
> >> >>
> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >> >>
> >> >>
> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> >> 有什么好的排查思路么?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >>
>


Re:关于global window

2020-11-19 文章 hailongwang
Hi sparklelj,

Global window 的是所有相同的 key 的元素会在一个 window里,它没有 window end,所以需要自己实现 custom 
trigger 来触发 window 的计算[1]。
它属于 keyed window,并不是只能有一个 window 实例( windowAll 只有一个 window 实例)。
所以看下是不是用法有错误呢,你的 ‘ StreamToBatchWindow’ 类是继承了哪个接口的?


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#global-windows


Best,
Hailong Wang
在 2020-11-20 01:19:09,"j l"  写道:
>您好,我在看global window的时候有一些疑问,首先是global
>window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global
>window,然后设置了process的并行度,但是window确实是只有一个
>示例如下:
>
>dataUnion.keyBy(0).window(new  StreamToBatchWindow()).process(new
>StreamToBatchProcess()).setParallelism(20).print();
>
>如果是这样的话,这个window岂不是成了瓶颈?不知道我理解的对不对,我是希望能多一些窗口对不同的key stream做global
>window的处理。
>另外一个就是global window会为每个key维护一个状态,这样如果key不断增加岂不是要爆了?怎样才能清除永远不会再出现的key的状态呢?
>
>谢谢!


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang






1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启




在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。

2. 没有开启


在 2020-11-20 11:49:44,"Jark Wu"  写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
>> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>>
>> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
>> update_after,format逻辑是应该这么写的吧。
>>
>>
>>
>>
>> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
>> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>> >值的,以验证你的自定义 format 没有问题。
>> >
>> >Best,
>> >Jark
>> >
>> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>> >
>> >> --mysql表
>> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>> >>`id` INT UNSIGNED AUTO_INCREMENT,
>> >>`spu_id` BIGINT NOT NULL,
>> >>`leaving_price`  DECIMAL(10, 5)
>> >>PRIMARY KEY ( `id` ),
>> >>unique key idx_spu_id (spu_id)
>> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>> >>
>> >>
>> >> --flink表
>> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> (
>> >>`spu_id` BIGINT ,
>> >>`leaving_price`  DECIMAL(10, 5),
>> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> >> ) WITH (
>> >>   'connector' = 'jdbc',
>> >>'url' = 'jdbc:mysql://...',
>> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>> >>'username' = '...',
>> >>'password' = '..'
>> >> );
>> >>
>> >>
>> >> --binlog 2mysql
>> >>
>> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>> >>
>> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>> >>
>> >> FROM hive.database.table
>> >>
>> >> group by v_spu_id;
>> >>
>> >>
>> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>> >>
>> >>
>> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> >> 有什么好的排查思路么?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>


Re:Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 文章 hailongwang
可以 grep 看下哪些 jar 包包含这 2 个类的?




在 2020-11-20 08:51:59,"m13162790856"  写道:
>HI:
>   偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 
> 所以包能确保每次启动都是一样,很奇怪这种情况
>
>
>在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
>
>
>Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 
>`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child 
>classload 加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent 
>classload 加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 
>希望对你有帮助。 Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856" 
> 写道: >具体日主信息如下: > > > 
>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer >at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
>at 
>org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at 
>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
> at 
>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
>org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
>java.lang.Thread.run(Thread.java:745) Caused by: 
>org.apache.kafka.common.KafkaException: 
>org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
>of org.apache.kafka.common.serialization.Deserializer at 
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
> at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
>... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过


Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
实现上应该没什么问题。

1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
2. 是否开启 mini-batch了?

Best,
Jark

On Fri, 20 Nov 2020 at 11:44, kandy.wang  wrote:

> hi Jark:
>
>
> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>
> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
> update_after,format逻辑是应该这么写的吧。
>
>
>
>
> 在 2020-11-19 23:13:19,"Jark Wu"  写道:
> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >值的,以验证你的自定义 format 没有问题。
> >
> >Best,
> >Jark
> >
> >On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
> >
> >> --mysql表
> >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
> >>`id` INT UNSIGNED AUTO_INCREMENT,
> >>`spu_id` BIGINT NOT NULL,
> >>`leaving_price`  DECIMAL(10, 5)
> >>PRIMARY KEY ( `id` ),
> >>unique key idx_spu_id (spu_id)
> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >>
> >>
> >> --flink表
> >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> (
> >>`spu_id` BIGINT ,
> >>`leaving_price`  DECIMAL(10, 5),
> >> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'jdbc',
> >>'url' = 'jdbc:mysql://...',
> >>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >>'username' = '...',
> >>'password' = '..'
> >> );
> >>
> >>
> >> --binlog 2mysql
> >>
> >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >>
> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >>
> >> FROM hive.database.table
> >>
> >> group by v_spu_id;
> >>
> >>
> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >>
> >>
> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> 有什么好的排查思路么?
> >>
> >>
> >>
> >>
> >>
> >>
>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark:


打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况

自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark:

打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before 
update_after,format逻辑是应该这么写的吧。




在 2020-11-19 23:13:19,"Jark Wu"  写道:
>你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
>>`spu_id` BIGINT NOT NULL,
>>`leaving_price`  DECIMAL(10, 5)
>>PRIMARY KEY ( `id` ),
>>unique key idx_spu_id (spu_id)
>> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>>
>>
>> --flink表
>> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>>`spu_id` BIGINT ,
>>`leaving_price`  DECIMAL(10, 5),
>> PRIMARY KEY ( `spu_id`) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'jdbc',
>>'url' = 'jdbc:mysql://...',
>>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>>'username' = '...',
>>'password' = '..'
>> );
>>
>>
>> --binlog 2mysql
>>
>> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>>
>> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>>
>> FROM hive.database.table
>>
>> group by v_spu_id;
>>
>>
>> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>>
>>
>> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
>> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
>> 有什么好的排查思路么?
>>
>>
>>
>>
>>
>>


答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 文章 sherlock zw
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector 
key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t -> 
t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?

-邮件原件-
发件人: guanxianchun  
发送时间: 2020年11月19日 20:53
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

flink-1.11使用KeySelector



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于global window

2020-11-19 文章 j l
您好,我在看global window的时候有一些疑问,首先是global
window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global
window,然后设置了process的并行度,但是window确实是只有一个
示例如下:

dataUnion.keyBy(0).window(new  StreamToBatchWindow()).process(new
StreamToBatchProcess()).setParallelism(20).print();

如果是这样的话,这个window岂不是成了瓶颈?不知道我理解的对不对,我是希望能多一些窗口对不同的key stream做global
window的处理。
另外一个就是global window会为每个key维护一个状态,这样如果key不断增加岂不是要爆了?怎样才能清除永远不会再出现的key的状态呢?

谢谢!


Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 文章 guanxianchun
flink-1.11使用KeySelector



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 文章 m13162790856
HI:
   偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 
所以包能确保每次启动都是一样,很奇怪这种情况


在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:


Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 
`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload 
加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 
加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 希望对你有帮助。 
Best, Hailong Wang 在 2020-11-19 14:33:25,"m13162790856"  
写道: >具体日主信息如下: > > > org.apache.kafka.common.KafkaException: Failed to 
construct kafka consumer >at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745) Caused by: 
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
of org.apache.kafka.common.serialization.Deserializer at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
... 15 more 2020-11-19 15:17:32,0 > > >有哪位同学遇见过

Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。

Best,
Jark

On Thu, 19 Nov 2020 at 22:41, kandy.wang  wrote:

> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>`id` INT UNSIGNED AUTO_INCREMENT,
>`spu_id` BIGINT NOT NULL,
>`leaving_price`  DECIMAL(10, 5)
>PRIMARY KEY ( `id` ),
>unique key idx_spu_id (spu_id)
> )ENGINE=InnoDB DEFAULT CHARSET=utf8
>
>
> --flink表
> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
>`spu_id` BIGINT ,
>`leaving_price`  DECIMAL(10, 5),
> PRIMARY KEY ( `spu_id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>'url' = 'jdbc:mysql://...',
>'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
>'username' = '...',
>'password' = '..'
> );
>
>
> --binlog 2mysql
>
> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
>
> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
>
> FROM hive.database.table
>
> group by v_spu_id;
>
>
> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
>
>
> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> 有什么好的排查思路么?
>
>
>
>
>
>


flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
   `id` INT UNSIGNED AUTO_INCREMENT,
   `spu_id` BIGINT NOT NULL,
   `leaving_price`  DECIMAL(10, 5)
   PRIMARY KEY ( `id` ),
   unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8


--flink表
CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg (
   `spu_id` BIGINT ,
   `leaving_price`  DECIMAL(10, 5),
PRIMARY KEY ( `spu_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
   'url' = 'jdbc:mysql://...',
   'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
   'username' = '...',
   'password' = '..'
);


--binlog 2mysql

insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg

SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price

FROM hive.database.table

group by v_spu_id;


hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。


问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price 
字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
有什么好的排查思路么?







Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 文章 sherlock zw
大佬们:
   在 Flink 1.10.x 中的 keyBy 算子可以同时按多个字段分组,比如 map.keyBy(0,1),但在 1.11.x 
版本中这种方式被弃用了,看了下源码好像不支持按多字段分组了?还是有别的其他形式?
   如果我想按多个字段分组的话需要怎么操作?
   请大佬指点!


Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 文章 hailongwang
Hi,
   这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。
   如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child 
classload 加载了,
而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 
加载了,那么会有问题。
你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。
希望对你有帮助。


Best,
Hailong Wang

在 2020-11-19 14:33:25,"m13162790856"  写道:
>具体日主信息如下:
>
>
>   org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:789) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:643) 
>at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:623) 
>at 
>org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at 
>org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at 
>org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
> at 
>org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
>org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at 
>org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at 
>org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
>org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
>java.lang.Thread.run(Thread.java:745) Caused by: 
>org.apache.kafka.common.KafkaException: 
>org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance 
>of org.apache.kafka.common.serialization.Deserializer at 
>org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:263)
> at 
>org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:688) 
>... 15 more 2020-11-19 15:17:32,0
>
>
>有哪位同学遇见过