Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6035#discussion_r189017088 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -947,6 +964,36 @@ private void closeResourceManagerConnection(Exception cause) { resourceManagerConnection.close(); resourceManagerConnection = null; } + + startRegistrationTimeout(); --- End diff -- The problem is that we want this timeout to start whenever the `TaskExecutor` loses its connection to the RM and that's when we close the RM connection. This also covers the case, where we don't know the RM address (e.g. if the RM loses leadership).
---