如果说一共n个分区,但是你就是希望使用m<n个并发的kafka数据源算子,也是ok的。因为kafka算子有个参数用于设置从kakfa算子就生成watermark,即不自己使用assignXXX那个api。 kafkaSouce本身会将当前并发实例消费的多个分区的数据分别生成watermark并取小后再发射出去。
赵一旦 <hinobl...@gmail.com> 于2020年9月27日周日 下午5:51写道: > > 还是没看懂。即使你有多个kafka数据源。但是从flink角度来说,就是从若干个partition(一共n个)输入数据(不管来自哪个kafka集群,哪个topic的分区),只要你设置的数据源并行度为n,保证每个并行度只消费一个分区,这样每个并行task实例只针对某一个分区生成watermark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。 > > hao kong <h...@lemonbox.me> 于2020年9月27日周日 下午5:16写道: > >> hi >> 感谢各位,pnowoj...@apache.org为我提供了一个FLIP, >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-EventTimeAlignment >> ,我的问题其实就是这个问题,快流会推高水印导致漫流里的数据被丢弃,我已经在开始尝试依赖zookeeper来写*SourceCoordinator,* >> 从而协调多个源的数据,主要是考虑这样不用考虑高可用,单独的coordinator需要自己实现高可用。 >> >> 赵一旦 <hinobl...@gmail.com> 于2020年9月21日周一 下午5:50写道: >> >> > 的确问题没说明白,貌似flink不会存在类似问题。 >> > >> > hao kong <h...@lemonbox.me> 于2020年9月16日周三 下午6:45写道: >> > >> > > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。 >> > > >> > > Congxian Qiu <qcx978132...@gmail.com> 于2020年9月16日周三 下午1:55写道: >> > > >> > > > Hi >> > > > 没有太明白你这里为什么数据较少的 source 会覆盖数据更多的 source。能否详细描述下呢? >> > > > 如果是因为某些 source 的数据把 watermark 给推高了,从而导致其他的 source >> > > > 数据变成迟到的数据,那么我理解这个暂时没法直接解决的,因为 watermark 是 task 级别的。如果你想要在一个 Flink >> > > > 作业中实现更细粒度的 watermark,或许你可以尝试下 processfuncton[1] 自己来定制一套类似的机制 >> > > > 另外你也可以看下这个文档[2] 看是否在你的场景中有帮助 >> > > > >> > > > [1] >> > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#the-keyedprocessfunction >> > > > [2] >> > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector >> > > > Best, >> > > > Congxian >> > > > >> > > > >> > > > hao kong <h...@lemonbox.me> 于2020年9月16日周三 上午10:24写道: >> > > > >> > > > > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time >> > > > > window,它将导致数据较少的source通过water-mark覆盖数据更多的source。 >> > > > > >> > > > > >> > > > >> > > >> > >> 目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。 >> > > > > >> > > > >> > > >> > >> >