A quick addition, I think with FLINK-23202 it should now also be possible
to improve the heartbeat mechanism in the general case. We can leverage the
unreachability exception thrown if a remote target is no longer reachable
to mark an heartbeat target as no longer reachable [1]. This can then be
considered as if the heartbeat timeout has been triggered. That way we
should detect lost TaskExecutors as fast as our heartbeat interval is.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote:

> Since you are deploying Flink workloads on Yarn, the Flink ResourceManager
> should get the container
> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM, which
> is 8 seconds by default.
> And Flink ResourceManager will release the dead TaskManager container once
> received the completion event.
> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>
>
> I think most of the time cost in Phase 1 might be cancelling the tasks on
> the dead TaskManagers.
>
>
> Best,
> Yang
>
>
> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>
>> The analysis of Gen is correct. Flink currently uses its heartbeat as the
>> primary means to detect dead TaskManagers. This means that Flink will take
>> at least `heartbeat.timeout` time before the system recovers. Even if the
>> cancellation happens fast (e.g. by having configured a low
>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>> TaskManager until it is marked as dead and its slots are released (unless
>> the ResourceManager does not get a signal from the underlying resource
>> management system that a container/pod has died). One way to improve the
>> situation is to introduce logic which can react to a ConnectionException
>> and then black lists or releases a TaskManager, for example. This is
>> currently not implemented in Flink, though.
>>
>> Concerning the cancellation operation: Flink currently does not listen to
>> the dead letters of Akka. This means that the `akka.ask.timeout` is the
>> primary means to fail the future result of a rpc which could not be sent.
>> This is also an improvement we should add to Flink's RpcService. I've
>> created a JIRA issue for this problem [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote:
>>
>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>
>>> Best
>>> Lu
>>>
>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote:
>>>
>>>> I'm also wondering here.
>>>>
>>>> In my opinion, it's because the JM can not confirm whether the TM is
>>>> lost or it's a temporary network trouble and will recover soon, since I can
>>>> see in the log that akka has got a Connection refused but JM still sends a
>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>>> I'm not sure if it's indeed designed like this.
>>>>
>>>> I would really appreciate it if anyone who knows more details could
>>>> answer. Thanks.
>>>>
>>>

Reply via email to