Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6067#discussion_r190583266 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID, SlotReport slotReport) { blobCacheService.setBlobServerAddress(blobServerAddress); + establishedResourceManagerConnection = new EstablishedResourceManagerConnection( + resourceManagerGateway, + resourceManagerResourceId, + taskExecutorRegistrationId); + stopRegistrationTimeout(); } private void closeResourceManagerConnection(Exception cause) { - if (resourceManagerConnection != null) { - - if (resourceManagerConnection.isConnected()) { - if (log.isDebugEnabled()) { - log.debug("Close ResourceManager connection {}.", - resourceManagerConnection.getResourceManagerId(), cause); - } else { - log.info("Close ResourceManager connection {}.", - resourceManagerConnection.getResourceManagerId()); - } - resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId()); + if (establishedResourceManagerConnection != null) { + final ResourceID resourceManagerResourceId = establishedResourceManagerConnection.getResourceManagerResourceId(); - ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); - resourceManagerGateway.disconnectTaskManager(getResourceID(), cause); + if (log.isDebugEnabled()) { + log.debug("Close ResourceManager connection {}.", --- End diff -- I was wondering whether `cause` can get logged twice: 1. ``` log.debug("Close ResourceManager connection {}.", resourceManagerResourceId, cause); ``` 2. ``` log.debug("Terminating registration attempts towards ResourceManager {}.", resourceManagerConnection.getTargetAddress(), cause); ```
---