Hi
   按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年8月26日周三 上午11:43写道:

> Hi Congxian,
>
> 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
> 该barrier无法接收到,导致下游也无法拿到由该operator传递的barrier。因此checkpoint一直无法成功。
> 比较奇怪的一点是,这是在我停止数据源发送的情况下出现的。我发送一条经过这个operator的数据,
> 那么从日志中看,立刻获得了barrier,checkpoint可以迅速完成。
>
>
>
> 为何会出现这个情况呢?什么情况下流里面没有数据会导致无法checkpoint成功呢?
>
>
>
>
>
> ---原始邮件---
> *发件人:* "Congxian Qiu"<qcx978132...@gmail.com>
> *发送时间:* 2020年8月25日(周二) 下午5:33
> *收件人:* "user-zh"<user-zh@flink.apache.org>;
> *主题:* Re: 流处理任务中checkpoint失败
>
> Hi
>    对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
> 的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
> snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
> Best,
> Congxian
>
>
> Robert.Zhang <173603...@qq.com> 于2020年8月25日周二 上午12:58写道:
>
> > 看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
> > 官方文档对于在iterative
> > stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
> > 按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
> > ,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
> >
> > ---原始邮件---
> > 发件人: "Congxian Qiu"<qcx978132...@gmail.com&gt;
> > 发送时间: 2020年8月24日(周一) 晚上8:21
> > 收件人: "user-zh"<user-zh@flink.apache.org&gt;;
> > 主题: Re: 流处理任务中checkpoint失败
> >
> >
> > Hi
> > &nbsp;&nbsp; 从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的
> > checkpoint
> > 有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
> > &nbsp;&nbsp; 另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
> >
> > [1] https://zhuanlan.zhihu.com/p/87131964
> > Best,
> > Congxian
> >
> >
> > Robert.Zhang <173603...@qq.com&gt; 于2020年8月21日周五 下午6:31写道:
> >
> > &gt; Hello all,
> > &gt; 目前遇到一个问题,在iterative stream job
> > &gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
> > &gt; 测试state
> 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
> > &gt; Exceeded checkpoint tolerable failure threshold.的报错
> > &gt;
> > &gt;
> > &gt; 配置如下:
> > &gt; env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE,
> true);
> > &gt; CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> > &gt; checkpointConfig.setCheckpointTimeout(600000);
> > &gt; checkpointConfig.setMinPauseBetweenCheckpoints(60000);
> > &gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
> > &gt;
> > &gt;
> >
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > &gt; checkpointConfig.setPreferCheckpointForRecovery(true);
> > &gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
> > &gt; checkpointConfig.enableUnalignedCheckpoints();
> > &gt;
> > &gt;
> > &gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?
>

回复