Re: Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-28 Thread dixingxin...@163.com
@Jark,感谢回复,很好的解答了我的疑惑


Best,
Xingxing Di
 
Sender: Jark Wu
Send Time: 2020-08-27 20:13
Receiver: user-zh
Subject: Re: Re: 请问一下,flink 1.11 的cdc历史数据问题
Hi,
 
debezium 是支持全量加载的。debezium 的一个亮点就是能够加载完存量数据以后能够无缝切换到 binlog 模式。
全量加载可以看下 SnapshotReader。
 
另外,全量数据导入到 kafka ,然后从 kafka 加载全量再切换到 mysql
binlog,这里面主要一个问题是很难做到无缝切换,因为不知道确切的 mysql binlog 位点。
 
Best,
Jark
 
On Tue, 25 Aug 2020 at 12:47, dixingxin...@163.com 
wrote:
 
> Hi:
> Leonard Xu 感谢回复
> > 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
> > 不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
>
> 这里恰好是我的疑问,之前看debezium代码,没有找到使用jdbc加载全量历史数据的代码,debezium的snapshot看起来只是保存了表的schema变更记录,这样重新消费binlog时,可以为每条binlog数据找到正确schema,达到正确解析历史数据的目的。
>
> 我的疑问是,如果加载全量历史数据,只是指定binlog的offset,从头读取binlog,那么是不是有可能无法加载到全量的数据,因为通常binlog是有过期时间的,不会保存全量的binlog。如果我理解的没问题,那么目前flink1.11
> 的cdc是无法加载全量历史数据的。
>
> 我理解加载全量数据,无非两种方式:
> 1.jdbc从源表全量拉取数据
> 2.将原表数据初始化到一个kafka
> topic中(topic设置为compact模式),再消费binlog,往这个topic里写入增量数据,确保这个topic的数据和原表一致,然后flink作业启动时,从这个topic的earliest
> offset消费,得到全量的历史数据。
>
> 不知道我的理解是否正确,希望能帮忙解答
>
>
>
> Best,
> Xingxing Di
>
> 发件人: Leonard Xu
> 发送时间: 2020-08-25 10:03
> 收件人: user-zh
> 主题: Re: 请问一下,flink 1.11 的cdc历史数据问题
> Hello
>
> > Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
> > 1.底层是使用了debezium来加载历史数据的吗?
> Flink支持两种CDC格式,debezium json和 canal json, debezium 和
> canal都是CDC系统,简单说他们可以把数据库表的binlog以对应的json写入到消息队列如Kafka,
> 作为下游系统的Flink 支持去消费对应的cdc数据,两个CDC工作都支持加载历史数据的。
> 另外Jark 在Veverica 开源了一个Flink CDC connector
> [1],支持利用debezuim直接读取数据库的cdc数据,不需要搭建CDC系统。
>
> > 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
> 不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
>
>
> Best
> Leonard
> [1] https://github.com/ververica/flink-cdc-connectors <
> https://github.com/ververica/flink-cdc-connectors>
>


回复: Re: 请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 Thread dixingxin...@163.com
Hi:
Leonard Xu 感谢回复
> 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
> 不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
这里恰好是我的疑问,之前看debezium代码,没有找到使用jdbc加载全量历史数据的代码,debezium的snapshot看起来只是保存了表的schema变更记录,这样重新消费binlog时,可以为每条binlog数据找到正确schema,达到正确解析历史数据的目的。

我的疑问是,如果加载全量历史数据,只是指定binlog的offset,从头读取binlog,那么是不是有可能无法加载到全量的数据,因为通常binlog是有过期时间的,不会保存全量的binlog。如果我理解的没问题,那么目前flink1.11
 的cdc是无法加载全量历史数据的。

我理解加载全量数据,无非两种方式:
1.jdbc从源表全量拉取数据
2.将原表数据初始化到一个kafka 
topic中(topic设置为compact模式),再消费binlog,往这个topic里写入增量数据,确保这个topic的数据和原表一致,然后flink作业启动时,从这个topic的earliest
 offset消费,得到全量的历史数据。

不知道我的理解是否正确,希望能帮忙解答



Best,
Xingxing Di
 
发件人: Leonard Xu
发送时间: 2020-08-25 10:03
收件人: user-zh
主题: Re: 请问一下,flink 1.11 的cdc历史数据问题
Hello
 
> Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
> 1.底层是使用了debezium来加载历史数据的吗?
Flink支持两种CDC格式,debezium json和 canal json, debezium 和 
canal都是CDC系统,简单说他们可以把数据库表的binlog以对应的json写入到消息队列如Kafka,
作为下游系统的Flink 支持去消费对应的cdc数据,两个CDC工作都支持加载历史数据的。
另外Jark 在Veverica 开源了一个Flink CDC connector 
[1],支持利用debezuim直接读取数据库的cdc数据,不需要搭建CDC系统。
 
> 2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?
不会,不是直接查询源表,所以不会锁表,加载全量历史数据时只是读取binlog的一个offset
 
 
Best
Leonard
[1] https://github.com/ververica/flink-cdc-connectors 



请问一下,flink 1.11 的cdc历史数据问题

2020-08-24 Thread dixingxin...@163.com
Hi all:
Flink1.11 的cdc是支持加载历史数据的,有两个问题想求证一下:
1.底层是使用了debezium来加载历史数据的吗?
2.debezium是通过jdbc查询源表的数据来加载全量的历史数据吗? 这块会不会对数据库造成瞬时压力?

希望能帮忙解答一下,谢谢。


Best,
Xingxing Di


Re: Re: Flink catalog的几个疑问

2020-07-21 Thread dixingxin...@163.com
@Godfrey @Jingsong 感谢回复,很好的解答了我的疑惑!
背景是这样的,目前我们正打算实现一套支持持久化的catalog,同时基于这个catalog实现一个metaserver,对外暴露REST接口,用来支持日常管理操作,比如:
1.基于原生DDL管理source,sink,支持多种connector,并将这些元数据持久化到mysql中。
2.做统一的权限控制

我们面临两种选择:
1.基于hive catalog建设自己的catalog(或者说直接使用hive catalog):
优势:鉴于hive catalog已经相对比较完善,直接使用可以减少开发量。
劣势:不太明确社区对hive catalog的定位;大小写不敏感带来的麻烦。(大致是之前提到的3个问题)

2.完全自建catalog:
优势:灵活可控;依然可以利用已有的catalog
劣势:设计开发成本高,引入大量代码可能需要持续维护(比如后续catalog 
api发生变动);同时如果社区后续提供官方的catalog默认实现,我们会再次面临是否切换的问题。

目前我们是倾向于自建catalog的。

@Jark 默认的catalog应该算是个通用的需求,感觉在批流一体的大势下,是挺重要的一步(目前hive 
catalog可能还不够)。另外很多公司都在基于开源Flink做计算平台,如果Flink有默认catalog并提供metaserver,那么无疑是十分友好的。
我们优先实现内部版本,实现既定目标。有机会的话,我们也希望能回馈社区。

@All 目前我们想的还不够多,考虑可能不全面,还希望大家给些建议。



Best,
Xingxing Di
 
Sender: Jark Wu
Send Time: 2020-07-22 11:22
Receiver: user-zh
Subject: Re: Flink catalog的几个疑问
非常欢迎贡献开源一个轻量的 catalog 实现 :)
 
On Wed, 22 Jul 2020 at 10:53, Jingsong Li  wrote:
 
> Hi,
>
> HiveCatalog就是官方唯一的可以保存所有表的持久化Catalog,包括kafka,jdbc,hbase等等connectors。
>
> > 后续有可能转正为flink 默认的catalog实现吗?
>
> 目前不太可能,你看,Flink连Hadoop的依赖都没有打进来。Hive的依赖更不会默认打进来。 依赖都没有,也不会成为默认的。
>
> > hive catalog是不支持大小写敏感的
>
> 是的,就像Godfrey说的,特别是JDBC对接的某些大小写敏感的db,这可能导致字段名对应不了。
>
> Best,
> Jingsong
>
> On Wed, Jul 22, 2020 at 10:39 AM godfrey he  wrote:
>
> > hi Xingxing,
> >
> > 1. Flink 提供了一套catalog的接口,提提供了几个内置的实现:in-memory catalog, hive catalog,
> > postgres catalog,
> > 可以根据自己的需求选择。也可以实现自定义的catalog。参考 [1]
> > 2. hive catalog 主要是对接 hive,方便读取现有的hive catalog的meta信息。当然也可以往hive
> > catalog写新的meta。
> > 是否会转为默认catalog,据我所知,目前没有。
> > 3. 一般没什么问题。在和其他区分大小写的db对接的时候,可能有问题。
> >
> > Best,
> > Godfrey
> >
> > dixingxin...@163.com  于2020年7月21日周二 下午11:30写道:
> >
> > > Hi Flink社区:
> > > 有几个疑问希望社区小伙伴们帮忙解答一下:
> > >
> > >
> >
> 1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
> > > 2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗?
> > > 3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。
> > >
> > >
> > >
> > >
> > > Best,
> > > Xingxing Di
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Flink catalog的几个疑问

2020-07-21 Thread dixingxin...@163.com
Hi Flink社区:
有几个疑问希望社区小伙伴们帮忙解答一下:
1.个人感觉Flink很有必要提供一个官方的catalog,用来支持各种connector,比如:kafka,jdbc,hbase等等connector。不知道社区有没有这个打算,目前没有看到对应的flip
2.社区对hive catalog的定位是什么,后续有可能转正为flink 默认的catalog实现吗? 
3.hive catalog是不支持大小写敏感的(字段名都是小写),这个后续会带来哪些问题?想征集下大家的意见避免我们以后踩大坑。




Best,
Xingxing Di


Re: Re: 如何做checkpoint的灾备

2020-06-16 Thread dixingxin...@163.com
@Congxian  感谢你的回复,我们会参考你的思路。



Best,
Xingxing Di
 
Sender: Congxian Qiu
Send Time: 2020-06-15 09:55
Receiver: user-zh
cc: zhangyingchen; pengxingbo
Subject: Re: Re: 如何做checkpoint的灾备
正常的流程来说,能找到 checkpoint meta 文件,checkpoint 就是完整的。但是也可能会出现其他的一些异常(主要可能会有
FileNotFound 等异常),那些异常如果需要提前知道的话,可以再 JM 端通过遍历 checkpoint meta 文件来进行判断。
 
对于希望从 checkpoint 恢复的场景来说,可以考虑下能否在你们的场景中把 checkpoint meta
统一存储到某个地方,这样后续直接从这个地方读取即可。
 
Best,
Congxian
 
 
dixingxin...@163.com  于2020年6月15日周一 上午12:06写道:
 
> @Congxian Qiu 感谢你的回复!
> 1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11
> 的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。
> 2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。
> 3.checkpoint双写:
> 感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。
>
> 异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况
> 关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口
> @唐云  感谢你的恢复!
> 我们试一下你说的方式,感谢。
>
> 题外话,我还有个疑问,就是如何判断checkpoint是否可用?
>
> 我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。
>
> 目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。
> PS:我们用的是flink 1.9.2
>
>
>
> Best,
> Xingxing Di
>
> 发件人: Yun Tang
> 发送时间: 2020-06-14 00:48
> 收件人: user-zh
> 主题: Re: 如何做checkpoint的灾备
> Hi Xingxing
>
> 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1]
> 来忽略失败的拷贝(例如FileNotFoundException)
> 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。
>
> [1]
> https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options
>
> 祝好
> 唐云
>
> 
> From: Congxian Qiu 
> Sent: Saturday, June 13, 2020 16:54
> To: user-zh 
> Subject: Re: 如何做checkpoint的灾备
>
> Hi
>
> 你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
> 的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。
>
> 另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
> 端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
> checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
> 异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
> 步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。
>
> 另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
> savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
> checkpoint 慢很多
>
> [1]
>
> https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
> [2] https://issues.apache.org/jira/browse/FLINK-5763
>
> Best,
> Congxian
>
>
> dixingxin...@163.com  于2020年6月11日周四 下午8:21写道:
>
> > Hi Flink社区,
> > 目前我们在调研checkpoint
> > 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
> >
> >
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> > 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> > 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
> >
> >
> >
> > Best,
> > Xingxing Di
> >
>


回复: Re: 如何做checkpoint的灾备

2020-06-14 Thread dixingxin...@163.com
@Congxian Qiu 感谢你的回复!
1.先回答你的疑问,我们目前checkpoint跨机房容灾的需求比较强烈,是需要上生产的;关于flink 1.11 
的savepoint,我们后面可以尝试一下,但是最近几个月还没有升级1.11版本的计划。
2.你说的定期把文件copy到本地集群,然后再复制到远程集群感觉是个可行的方案,我们让hadoop运维同学评估下。
3.checkpoint双写:
感谢分享checkpoint双写的思路,这块我们最近几天会先出个MVP版本验证一下,如果遇到问题肯定还需要请教一下。
异常处理确实需要仔细设计,至少需要保证写备用checkpoint时,不会影响原checkpoint。同时需要引入metric,记录备用checkpoint存储失败的情况
关于选用哪个集群的checkpoint,目前我们是打算手动控制的,会在flink上层提供一个可以批量切换HDFS(Flink集群)的接口
@唐云  感谢你的恢复!
我们试一下你说的方式,感谢。

题外话,我还有个疑问,就是如何判断checkpoint是否可用?
我们基本没有使用savepoint,如果作业挂了需要重新启动时,我们会指定从checkpoint恢复。如果从最新的checkpoint恢复,那很有可能因为checkpoint不完整,导致作业无法启动。
目前我们是简单处理的,优先使用倒数第2个checkpoint,但如果作业checkpoint少于2个,可能需要查找checkpoint路径,并手动指定。
PS:我们用的是flink 1.9.2 



Best,
Xingxing Di
 
发件人: Yun Tang
发送时间: 2020-06-14 00:48
收件人: user-zh
主题: Re: 如何做checkpoint的灾备
Hi Xingxing
 
由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] 
来忽略失败的拷贝(例如FileNotFoundException) 
文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。
 
[1] 
https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line_Options
 
祝好
唐云
 

From: Congxian Qiu 
Sent: Saturday, June 13, 2020 16:54
To: user-zh 
Subject: Re: 如何做checkpoint的灾备
 
Hi
 
你好,这个需求看上去是一个通用的需求(可以是任何的文件进行灾备,刚好这里是 checkpoint 文件),对 hdfs distcp
的限制不太了解,不过如果你要是想做这件事情,能否尝试定时的把文件先 copy 到本集群,然后 copy 后的文件通过 distcp 拷贝到其他集群。
 
另外改造 flink 代码支持 checkpoint 异步双写这从理论上来说是可以的,不过做的事情可能也不简单,粗略想了下需要:1)tm
端能够双写,可以参考现在开启 localrecovery 的双写(本地和远程)的实现[1] -- 另外需要考虑异常情况;2)jm 端需要考虑
checkpoint meta 的格式,以及存储,meta 也需要存两份(如果只存一份的话,那么 hdfs
异常了也无法恢复),这里面异常的情况也需要仔细考虑;3)恢复的时候从哪个集群进行恢复,这一步可能和第 2
步中的实现有一定的关系。整个过程中比较麻烦的是需要考虑各种异常情况如何解决。
 
另外多问一句,你们 checkpoint 跨机房容灾需求大吗?还是说仅仅是调研一下呢?
savepoint 能否支持你们的需求呢?在 1.11 中 savepoint 能够很好的支持跨集群迁移[2],就是 savepoint 比
checkpoint 慢很多
 
[1]
https://github.com/apache/flink/blob/481c509f2e034c912e5e5d278e0b3f3d29a21f2b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java#L91
[2] https://issues.apache.org/jira/browse/FLINK-5763
 
Best,
Congxian
 
 
dixingxin...@163.com  于2020年6月11日周四 下午8:21写道:
 
> Hi Flink社区,
> 目前我们在调研checkpoint
> 跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
>
> 本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
> 1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
> 2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?
>
>
>
> Best,
> Xingxing Di
>


如何做checkpoint的灾备

2020-06-11 Thread dixingxin...@163.com
Hi Flink社区,
目前我们在调研checkpoint 
跨机房容灾的方案,目标是把checkpoint备份到hdfs2上,当hdfs1故障时,可以快速从hdfs2恢复flink作业。
本来我们打算使用hadoop的distcp命令定期把hdfs1上的checkpoint备份到hdfs2上,但是运维同学反馈distcp命令会报错,因为checkpoint路径下文件变动太频繁。
1.想问下大家在这种文件频繁创建的场景下,是如何做hdfs间的增量文件拷贝的,有没有现成的工具或成熟的方案?
2.改造flink代码,支持checkpoint异步双写是否是一个可行的方案?



Best,
Xingxing Di


????: ?????? ??flink-connector-kafka??????????Subscribe????

2020-04-22 Thread dixingxin...@163.com
 ??kafka balance??PK??
kafka 
partition??offset??kafka 
balance





dixingxin...@163.com
 
 i'mpossible
?? 2020-04-22 17:14
 user-zh; zhisheng2018
?? ?? ??flink-connector-kafka??Subscribe
Hi??
  
??kafka??Flink??kafka??rebalance??
 
 
  
??flink-connector-kafkatopic??kafkakafka
 checkpoint??
 
 
  

 
 
  
 
 
 
 
----
??:"zhisheng"

Re: Re: Flink streaming sql是否支持两层group by聚合

2020-04-18 Thread dixingxin...@163.com
@Benchao  @Jark  
thank you very much. We have use flink 1.9 for a while , and we will try 1.9 + 
minibatch. 



dixingxin...@163.com
 
Sender: Jark Wu
Send Time: 2020-04-18 21:38
Receiver: Benchao Li
cc: dixingxing85; user; user-zh
Subject: Re: Flink streaming sql是否支持两层group by聚合
Hi, 

I will use English because we are also sending to user@ ML. 

This behavior is as expected, not a bug. Benchao gave a good explanation about 
the reason. I will give some further explanation.
In Flink SQL, we will split an update operation (such as uv from 100 -> 101) 
into two separate messages, one is -[key, 100], the other is +[key, 101]. 
Once these two messages arrive the downstream aggregation, it will also send 
two result messages (assuming the previous SUM(uv) is 500),
one is [key, 400], the other is [key, 501].

But this problem is almost addressed since 1.9, if you enabled the mini-batch 
optimization [1]. Because mini-batch optimization will try best to the 
accumulate the separate + and - message in a single mini-batch processing. You 
can upgrade and have a try. 

Best,
Jark

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation



On Sat, 18 Apr 2020 at 12:26, Benchao Li  wrote:
这个按照目前的设计,应该不能算是bug,应该是by desigh的。
主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。

dixingxing85 于2020年4月18日 周六上午11:38写道:
多谢benchao,
我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
20200417,86
20200417,90
20200417,130
20200417,131

而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
20200417,90
20200417,86
20200417,130
20200417,86
20200417,131

我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?

Sent from my iPhone

On Apr 18, 2020, at 10:08, Benchao Li  wrote:


Hi,

这个是支持的哈。
你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
如果是两层的话,就成了:
第一层-[old], 第二层-[cur], +[old]
第一层+[new], 第二层[-old], +[new]

dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:

Hi all:

我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。
具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt,  B 
-> pvareaid)
SELECT dt, SUM(a.uv) AS uv
FROM (
   SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
   FROM streaming_log_event
   WHERE action IN ('action1')
  AND pvareaid NOT IN ('pv1', 'pv2')
  AND pvareaid IS NOT NULL
   GROUP BY dt, pvareaid
) a
GROUP BY dt;
sink接收到的数据对应日志为:
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)

我们使用的是1.7.2, 测试作业的并行度为1。
这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228




dixingxin...@163.com


-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn
-- 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxin...@163.com

Hi all:

我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。
具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt,  B 
-> pvareaid)
SELECT dt, SUM(a.uv) AS uv
FROM (
   SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
   FROM streaming_log_event
   WHERE action IN ('action1')
  AND pvareaid NOT IN ('pv1', 'pv2')
  AND pvareaid IS NOT NULL
   GROUP BY dt, pvareaid
) a
GROUP BY dt;
sink接收到的数据对应日志为:
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)

我们使用的是1.7.2, 测试作业的并行度为1。
这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228




dixingxin...@163.com


Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxin...@163.com

Hi all:

我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。
具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt,  B 
-> pvareaid)
SELECT dt, SUM(a.uv) AS uv
FROM (
   SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
   FROM streaming_log_event
   WHERE action IN ('action1')
  AND pvareaid NOT IN ('pv1', 'pv2')
  AND pvareaid IS NOT NULL
   GROUP BY dt, pvareaid
) a
GROUP BY dt;
sink接收到的数据对应日志为:
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)

我们使用的是1.7.2, 测试作业的并行度为1。
这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228




dixingxin...@163.com