关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好: 我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。 我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢? 谢谢

关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好: 我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。 我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的

Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
1. 没有初始的全量数据可能是会有问题的。 3. 你的 format 再解析 update 时,时先发的 before 还是 after? 4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > > > > > > > 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 > > 2. 没有开启 > > > > > 在 2020-11-20 11:49:44,"Jark Wu" 写道: >

Re:关于global window

2020-11-19 文章 hailongwang
Hi sparklelj, Global window 的是所有相同的 key 的元素会在一个 window里,它没有 window end,所以需要自己实现 custom trigger 来触发 window 的计算[1]。 它属于 keyed window,并不是只能有一个 window 实例( windowAll 只有一个 window 实例)。 所以看下是不是用法有错误呢,你的 ‘ StreamToBatchWindow’ 类是继承了哪个接口的? [1]

Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id

Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num

Re:Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 文章 hailongwang
可以 grep 看下哪些 jar 包包含这 2 个类的? 在 2020-11-20 08:51:59,"m13162790856" 写道: >HI: > 偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, > 所以包能确保每次启动都是一样,很奇怪这种情况 > > >在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道: > > >Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果

Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
实现上应该没什么问题。 1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? 2. 是否开启 mini-batch了? Best, Jark On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > hi Jark: > > > 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > > 自定义的format逻辑和canal的类似,insert update delete

Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table;

Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table;

答复: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 文章 sherlock zw
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t -> t.f0)),如果我想按多个字段进行分组的话该怎么操作呢? -邮件原件- 发件人: guanxianchun 发送时间: 2020年11月19日 20:53 收件人: user-zh@flink.apache.org 主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

关于global window

2020-11-19 文章 j l
您好,我在看global window的时候有一些疑问,首先是global window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global window,然后设置了process的并行度,但是window确实是只有一个 示例如下: dataUnion.keyBy(0).window(new StreamToBatchWindow()).process(new

Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 文章 guanxianchun
flink-1.11使用KeySelector -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 文章 m13162790856
HI: 偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去, 所以包能确保每次启动都是一样,很奇怪这种情况 在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道: Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload 加载了, 而

Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null 值的,以验证你的自定义 format 没有问题。 Best, Jark On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >`id` INT UNSIGNED AUTO_INCREMENT, >`spu_id`

flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
--mysql表 CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( `id` INT UNSIGNED AUTO_INCREMENT, `spu_id` BIGINT NOT NULL, `leaving_price` DECIMAL(10, 5) PRIMARY KEY ( `id` ), unique key idx_spu_id (spu_id) )ENGINE=InnoDB DEFAULT CHARSET=utf8 --flink表 CREATE

Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题

2020-11-19 文章 sherlock zw
大佬们: 在 Flink 1.10.x 中的 keyBy 算子可以同时按多个字段分组,比如 map.keyBy(0,1),但在 1.11.x 版本中这种方式被弃用了,看了下源码好像不支持按多字段分组了?还是有别的其他形式? 如果我想按多个字段分组的话需要怎么操作? 请大佬指点!

Re:flink1.11.1任务重启 偶现org.apache.kafka.common.KafkaException: Failed to construct kafka consumer异常

2020-11-19 文章 hailongwang
Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload 加载了, 而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload 加载了,那么会有问题。 你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。 希望对你有帮助。 Best, Hailong