hello,我最近遇到一个问题:
我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> Sink
在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
最后会产生Checkpoint expired before
completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。
不知道有什么好办法解决该问题。
多谢~
hi,我遇到flink修改sink并行度后,无法从checkpoint restore问题
flink 版本: 1.13.1
flink on yarn
DataStream api方式写的java job
试验1:不修改任何代码,cancel job后,能从指定的checkpoint恢复
dataStream.addSink(new Sink(config)).name("").uid("");
试验2:只修改sink端的并行度,job无法启动,一直是Initiating状态
dataStream.addSink(new
Sink(confi
hello,我一直在关注FLIP-27实现的全新source,更新1.12后发现了已经有新的kafkasource实现,目前在使用kafkasource与coordinator通信的时候遇到了困难。
我创建了一个kafkasource和对应的operatorCoordinator,并在source上create了一个reader,在operatorCoordinator的handleEventFromOperator上打了一个断点,在启动的时候能够成功收到注册reader的消息,但是无法收到其他通过sourceReaderContext的sendSourceEventToCoordinato
是DataStream
API,主要是从用户不同行为属性,例如支付,访问等在不同的topic里,union后根据session的窗口聚合,获取到根据userId和sessionId两类型数据,分别是用户全部行为和用户单次访问的全部行为。
处理全实时数据没问题,但是如果处理带有历史数据就会遇到每一个topic内历史数据量不同,之前的解决方案是分为批处理和流处理两步,现在想把这两步合并起来,就会出现watermark被推高,丢掉快流的数据。
其实不光是丢数据的问题,如果按照快流的watermark来设置的话,也会出现,大量数据缓存在state里的情况,所以想做根据时间戳控制source读取速度。
termark(这个要求单分区内部乱序不能太大)。后续算子的watermark是会基于输入算子所有并发实例的watermark取小的,所以貌似也不存在你说的问题。
> >
> > hao kong 于2020年9月27日周日 下午5:16写道:
> >
> >> hi
> >> 感谢各位,pnowoj...@apache.org为我提供了一个FLIP,
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK
要自己实现高可用。
赵一旦 于2020年9月21日周一 下午5:50写道:
> 的确问题没说明白,貌似flink不会存在类似问题。
>
> hao kong 于2020年9月16日周三 下午6:45写道:
>
> > 十分感谢,我尝试实现一下,看看能不能通过processfuncton和反压机制配合实现一下。
> >
> > Congxian Qiu 于2020年9月16日周三 下午1:55写道:
> >
> > > Hi
> > > 没有太明白你这里为什么数据
jects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
> Best,
> Congxian
>
>
> hao kong 于2020年9月16日周三 上午10:24写道:
>
> > hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
> > window,它将导致数据较少的source通过water-mark覆盖数
hello,我有一份来自多个Kafka-source的工作。它们都包含某些历史数据。如果使用events-time
window,它将导致数据较少的source通过water-mark覆盖数据更多的source。
目前我能想到的方案是用一个在source上的调度器,通过redis或者zookeeper缓存每一个source下两条数据,统一比较,时间小的先进入接下来的处理流程,实现起来比较复杂,各位大佬有什么好的办法吗?十分感谢。