The analysis of Gen is correct. Flink currently uses its heartbeat as the
primary means to detect dead TaskManagers. This means that Flink will take
at least `heartbeat.timeout` time before the system recovers. Even if the
cancellation happens fast (e.g. by having configured a low
akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
TaskManager until it is marked as dead and its slots are released (unless
the ResourceManager does not get a signal from the underlying resource
management system that a container/pod has died). One way to improve the
situation is to introduce logic which can react to a ConnectionException
and then black lists or releases a TaskManager, for example. This is
currently not implemented in Flink, though.

Concerning the cancellation operation: Flink currently does not listen to
the dead letters of Akka. This means that the `akka.ask.timeout` is the
primary means to fail the future result of a rpc which could not be sent.
This is also an improvement we should add to Flink's RpcService. I've
created a JIRA issue for this problem [1].

[1] https://issues.apache.org/jira/browse/FLINK-23202

Cheers,
Till

On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote:

> Thanks Gen! cc flink-dev to collect more inputs.
>
> Best
> Lu
>
> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote:
>
>> I'm also wondering here.
>>
>> In my opinion, it's because the JM can not confirm whether the TM is lost
>> or it's a temporary network trouble and will recover soon, since I can see
>> in the log that akka has got a Connection refused but JM still sends a
>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>> I'm not sure if it's indeed designed like this.
>>
>> I would really appreciate it if anyone who knows more details could
>> answer. Thanks.
>>
>

Reply via email to