大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢?
谢谢
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据?
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after
3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition
中。由于是按照主键id hash的
在 2020-11-20 13:25:53,"Jark Wu" 写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的
1.没有初始的全量数据可能是会有问题的
这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据?
我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。
2.先发的before 后发的after
3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition
中。由于是按照主键id hash的。
在 2020-11-20 13:25:53,"Jark Wu" 写道:
>1. 没有初始的全量数据可能是会有问题的。
>
>3. 你的
1. 没有初始的全量数据可能是会有问题的。
3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote:
>
>
>
>
>
>
> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu" 写道:
>
Hi sparklelj,
Global window 的是所有相同的 key 的元素会在一个 window里,它没有 window end,所以需要自己实现 custom
trigger 来触发 window 的计算[1]。
它属于 keyed window,并不是只能有一个 window 实例( windowAll 只有一个 window 实例)。
所以看下是不是用法有错误呢,你的 ‘ StreamToBatchWindow’ 类是继承了哪个接口的?
[1]
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
2. 没有开启
在 2020-11-20 11:49:44,"Jark Wu" 写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
2. 没有开启
在 2020-11-20 11:49:44,"Jark Wu" 写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
>
>> hi Jark:
>>
>>
>> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num
可以 grep 看下哪些 jar 包包含这 2 个类的?
在 2020-11-20 08:51:59,"m13162790856" 写道:
>HI:
> 偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去,
> 所以包能确保每次启动都是一样,很奇怪这种情况
>
>
>在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
>
>
>Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果
实现上应该没什么问题。
1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
2. 是否开启 mini-batch了?
Best,
Jark
On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
> hi Jark:
>
>
> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
>
> 自定义的format逻辑和canal的类似,insert update delete
hi Jark:
打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
update_after,format逻辑是应该这么写的吧。
在 2020-11-19 23:13:19,"Jark Wu" 写道:
>你可以先直接 select * from hive.database.table;
hi Jark:
打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before
update_after,format逻辑是应该这么写的吧。
在 2020-11-19 23:13:19,"Jark Wu" 写道:
>你可以先直接 select * from hive.database.table;
我看了现在的 flink 1.11 的 keyBy 的代码,是使用的KeySelector
key,但每次只能返回一个字段,不支持返回多个字段,也就说明了一次只能按一个字段去分组(PS: test.keyBy(t ->
t.f0)),如果我想按多个字段进行分组的话该怎么操作呢?
-邮件原件-
发件人: guanxianchun
发送时间: 2020年11月19日 20:53
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10 和 Flink 1.11 中 keyBy 算子聚合多个字段的问题
您好,我在看global window的时候有一些疑问,首先是global
window是全局唯一的,也就是所以的元素都会流到这一个window实例上,这样的话前面加keyBy还有什么意义呢?反正都要流到一个window里面,我测试的结果好像也确实是这样,我自定义了一个global
window,然后设置了process的并行度,但是window确实是只有一个
示例如下:
dataUnion.keyBy(0).window(new StreamToBatchWindow()).process(new
flink-1.11使用KeySelector
--
Sent from: http://apache-flink.147419.n8.nabble.com/
HI:
偶现的 ,但是最近几次出现的概率还是比较大的, 之前怀疑classload , 我的jar包做了分离, 不会把任何包打进去,
所以包能确保每次启动都是一样,很奇怪这种情况
在 2020年11月19日 17:14,hailongwang<18868816...@163.com> 写道:
Hi, 这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。 如果
`org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child classload
加载了, 而
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
值的,以验证你的自定义 format 没有问题。
Best,
Jark
On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote:
> --mysql表
> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>`id` INT UNSIGNED AUTO_INCREMENT,
>`spu_id`
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
`id` INT UNSIGNED AUTO_INCREMENT,
`spu_id` BIGINT NOT NULL,
`leaving_price` DECIMAL(10, 5)
PRIMARY KEY ( `id` ),
unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8
--flink表
CREATE
大佬们:
在 Flink 1.10.x 中的 keyBy 算子可以同时按多个字段分组,比如 map.keyBy(0,1),但在 1.11.x
版本中这种方式被弃用了,看了下源码好像不支持按多字段分组了?还是有别的其他形式?
如果我想按多个字段分组的话需要怎么操作?
请大佬指点!
Hi,
这个如果是偶尔出现的,应该是跟 ClassLoad 加载有关。
如果 `org.apache.kafka.common.serialization.ByteArrayDeserializer` 被 child
classload 加载了,
而 `org.apache.kafka.common.serialization.Deserializer` 被 parent classload
加载了,那么会有问题。
你的 jar 包里面打了 kakfa-connector 的包了吗,如果集群上有,可以provided 看下。
希望对你有帮助。
Best,
Hailong
20 matches
Mail list logo