Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190557380 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID resourceID) { allocationId, jobId, resourceManagerId); try { - if (resourceManagerConnection == null) { - final String message = "TaskManager is not connected to a resource manager."; + if (!isConnectedToResourceManager(resourceManagerId)) { + final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId); log.debug(message); - throw new SlotAllocationException(message); - } - - if (!Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) { - final String message = "The leader id " + resourceManagerId + - " does not match with the leader id of the connected resource manager " + - resourceManagerConnection.getTargetLeaderId() + '.'; - - log.debug(message); - throw new SlotAllocationException(message); + throw new TaskManagerException(message); --- End diff -- Why not return an exceptional future here?
---