Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4462#discussion_r130929937 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId, if (registeredTaskManagers.containsKey(taskManagerId)) { final RegistrationResponse response = new JMTMRegistrationSuccess( resourceId, libraryCacheManager.getBlobServerPort()); - return FlinkCompletableFuture.completed(response); + return CompletableFuture.completedFuture(response); } else { - return getRpcService().execute(new Callable<TaskExecutorGateway>() { - @Override - public TaskExecutorGateway call() throws Exception { - return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class) - .get(rpcTimeout.getSize(), rpcTimeout.getUnit()); - } - }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { - @Override - public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway, Throwable throwable) { - if (throwable != null) { - return new RegistrationResponse.Decline(throwable.getMessage()); - } - - if (!JobMaster.this.leaderSessionID.equals(leaderId)) { - log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + - "leader session ID {} did not equal the received leader session ID {}.", - taskManagerId, taskManagerRpcAddress, - JobMaster.this.leaderSessionID, leaderId); - return new RegistrationResponse.Decline("Invalid leader session id"); - } - - slotPoolGateway.registerTaskManager(taskManagerId); - registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); - - // monitor the task manager as heartbeat target - taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() { - @Override - public void receiveHeartbeat(ResourceID resourceID, Void payload) { - // the task manager will not request heartbeat, so this method will never be called currently + return getRpcService() + .connect(taskManagerRpcAddress, TaskExecutorGateway.class) --- End diff -- Because we were blocking a thread from the `RpcService's` `Executor` without a reason by calling `get` on the returned future by `RpcService#connect`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---