Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 Shengkai Fang
hi, 你说的是为每个partition生成一个watermark吗? 这样子快流和慢流都会有独立的watermark gererator。 datastream已经支持了该特性, table层正在支持该特性,你可以看看flink-19282的jira。 赵一旦 于2020年9月28日 周一上午11:39写道: > 我这边负责的作业,一个作业上有2-3个kafka数据源,还包括多个mysql配置流数据源。也是各种join,但是没有union的case。 > > 没有任何watermark的问题,flink现有机制都是可以完美解决的。 > > > > 赵一旦 于2020年9月28日周一

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 hao kong
是DataStream API,主要是从用户不同行为属性,例如支付,访问等在不同的topic里,union后根据session的窗口聚合,获取到根据userId和sessionId两类型数据,分别是用户全部行为和用户单次访问的全部行为。 处理全实时数据没问题,但是如果处理带有历史数据就会遇到每一个topic内历史数据量不同,之前的解决方案是分为批处理和流处理两步,现在想把这两步合并起来,就会出现watermark被推高,丢掉快流的数据。 其实不光是丢数据的问题,如果按照快流的watermark来设置的话,也会出现,大量数据缓存在state里的情况,所以想做根据时间戳控制source读取速度。

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 赵一旦
我这边负责的作业,一个作业上有2-3个kafka数据源,还包括多个mysql配置流数据源。也是各种join,但是没有union的case。 没有任何watermark的问题,flink现有机制都是可以完美解决的。 赵一旦 于2020年9月28日周一 上午11:37写道: > 说实话,还是不觉得有这种case。KafkaSouceA(1,1,1,2,2,2,.,100),KafkaSouceB(1,100),然后AB都接到 > operatorC 上,operatorC后续跟个窗口算子operatorD。 > (1)oepratorC负责调用assignTimestampsAnd

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 赵一旦
说实话,还是不觉得有这种case。KafkaSouceA(1,1,1,2,2,2,.,100),KafkaSouceB(1,100),然后AB都接到 operatorC 上,operatorC后续跟个窗口算子operatorD。 (1)oepratorC负责调用assignTimestampsAndWatermarks 完成watermark的生成。这么搞会出问题,因为operatorC会更快收到KafkaSouceB的100,进而生成watermark=100(假设maxOutOfOrderness=0)。那么operatorD在收到KafkaSouceA的后续元素会认为迟到丢弃。

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 hao kong
我目前的情况是从多个kafka topic获取数据并union到一起进行处理,例如A流的时间是1-100共1w条数据,B流只有时间是1和100的两条数据,由于目前flink source之间没有*Coordinator* ,两个流的流速在数据量相同的时候是一样的,在union后的timewindow标记watermark里会先拿到A流的1,B流的1,然后拿到A流的1.X,B流的100,这时根据watermark的配置,如果没有设置延迟等待,会丢弃掉A流剩下的9998条数据,如果是多源不union,并行处理的话,不会有这个问题。 赵一旦 于2020年9月27日周日 下午5:53写道: >

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 赵一旦
如果说一共n个分区,但是你就是希望使用m 于2020年9月27日周日 下午5:51写道: > > 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。 > > hao kong 于2020年9月27日周日

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 赵一旦
还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。 hao kong 于2020年9月27日周日 下午5:16写道: > hi > 感谢各位,pnowoj...@apache.org为我提供了

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-27 文章 hao kong
hi 感谢各位,pnowoj...@apache.org为我提供了一个FLIP, https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,* 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-21 文章 赵一旦
的确问题没说明白,貌似flink不会存在类似问题。 hao kong 于2020年9月16日周三 下午6:45写道: > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。 > > Congxian Qiu 于2020年9月16日周三 下午1:55写道: > > > Hi > > 没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢? > > 如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source > > 数据变成迟到的数据,那么我理解这个

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-16 文章 hao kong
十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。 Congxian Qiu 于2020年9月16日周三 下午1:55写道: > Hi > 没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢? > 如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink > 作业中实现更细粒度的 watermark,或许你可

Re: 含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-15 文章 Congxian Qiu
Hi 没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢? 如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制 另外你也可以看下这个文档[2] 看是否在你的场景中有帮助 [1] https://ci.apache.org/p

含有多个kafka-source的job里如果用events_time作为时间的话,如何解决快流通过水印覆盖漫流的情况。

2020-09-15 文章 hao kong
hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time window,它将导致数据较少的source通过water-mark覆盖数据更多的source。 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。