Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130929937
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
    @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId,
                if (registeredTaskManagers.containsKey(taskManagerId)) {
                        final RegistrationResponse response = new 
JMTMRegistrationSuccess(
                                resourceId, 
libraryCacheManager.getBlobServerPort());
    -                   return FlinkCompletableFuture.completed(response);
    +                   return CompletableFuture.completedFuture(response);
                } else {
    -                   return getRpcService().execute(new 
Callable<TaskExecutorGateway>() {
    -                           @Override
    -                           public TaskExecutorGateway call() throws 
Exception {
    -                                   return 
getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    -                                                   
.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
    -                           }
    -                   }).handleAsync(new BiFunction<TaskExecutorGateway, 
Throwable, RegistrationResponse>() {
    -                           @Override
    -                           public RegistrationResponse apply(final 
TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
    -                                   if (throwable != null) {
    -                                           return new 
RegistrationResponse.Decline(throwable.getMessage());
    -                                   }
    -
    -                                   if 
(!JobMaster.this.leaderSessionID.equals(leaderId)) {
    -                                           log.warn("Discard registration 
from TaskExecutor {} at ({}) because the expected " +
    -                                                                           
"leader session ID {} did not equal the received leader session ID {}.",
    -                                                           taskManagerId, 
taskManagerRpcAddress,
    -                                                           
JobMaster.this.leaderSessionID, leaderId);
    -                                           return new 
RegistrationResponse.Decline("Invalid leader session id");
    -                                   }
    -
    -                                   
slotPoolGateway.registerTaskManager(taskManagerId);
    -                                   
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, 
taskExecutorGateway));
    -
    -                                   // monitor the task manager as 
heartbeat target
    -                                   
taskManagerHeartbeatManager.monitorTarget(taskManagerId, new 
HeartbeatTarget<Void>() {
    -                                           @Override
    -                                           public void 
receiveHeartbeat(ResourceID resourceID, Void payload) {
    -                                                   // the task manager 
will not request heartbeat, so this method will never be called currently
    +                   return getRpcService()
    +                           .connect(taskManagerRpcAddress, 
TaskExecutorGateway.class)
    --- End diff --
    
    Because we were blocking a thread from the `RpcService's` `Executor` 
without a reason by calling `get` on the returned future by 
`RpcService#connect`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to