The analysis of Gen is correct. Flink currently uses its heartbeat as the primary means to detect dead TaskManagers. This means that Flink will take at least `heartbeat.timeout` time before the system recovers. Even if the cancellation happens fast (e.g. by having configured a low akka.ask.timeout), then Flink will still try to deploy tasks onto the dead TaskManager until it is marked as dead and its slots are released (unless the ResourceManager does not get a signal from the underlying resource management system that a container/pod has died). One way to improve the situation is to introduce logic which can react to a ConnectionException and then black lists or releases a TaskManager, for example. This is currently not implemented in Flink, though.
Concerning the cancellation operation: Flink currently does not listen to the dead letters of Akka. This means that the `akka.ask.timeout` is the primary means to fail the future result of a rpc which could not be sent. This is also an improvement we should add to Flink's RpcService. I've created a JIRA issue for this problem [1]. [1] https://issues.apache.org/jira/browse/FLINK-23202 Cheers, Till On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote: > Thanks Gen! cc flink-dev to collect more inputs. > > Best > Lu > > On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote: > >> I'm also wondering here. >> >> In my opinion, it's because the JM can not confirm whether the TM is lost >> or it's a temporary network trouble and will recover soon, since I can see >> in the log that akka has got a Connection refused but JM still sends a >> heartbeat request to the lost TM until it reaches heartbeat timeout. But >> I'm not sure if it's indeed designed like this. >> >> I would really appreciate it if anyone who knows more details could >> answer. Thanks. >> >