[hotfix] [clustermgnt] Set pending registration properly in TaskExecutorToResourceManagerConnection
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39a36994 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39a36994 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39a36994 Branch: refs/heads/flip-6 Commit: 39a36994df5f0f57ea1f9258eaaa1d42487b4b10 Parents: 1a8a993 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Aug 29 17:40:57 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sun Oct 2 23:44:42 2016 +0200 ---------------------------------------------------------------------- .../TaskExecutorToResourceManagerConnection.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/39a36994/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java index f398b7d..7ccc879 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -55,7 +55,7 @@ public class TaskExecutorToResourceManagerConnection { private final String resourceManagerAddress; - private ResourceManagerRegistration pendingRegistration; + private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration; private ResourceManagerGateway registeredResourceManager; @@ -86,13 +86,13 @@ public class TaskExecutorToResourceManagerConnection { checkState(!closed, "The connection is already closed"); checkState(!isRegistered() && pendingRegistration == null, "The connection is already started"); - ResourceManagerRegistration registration = new ResourceManagerRegistration( + pendingRegistration = new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( log, taskExecutor.getRpcService(), resourceManagerAddress, resourceManagerLeaderId, taskExecutor.getAddress(), taskExecutor.getResourceID()); - registration.startRegistration(); + pendingRegistration.startRegistration(); - Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = registration.getFuture(); + Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = pendingRegistration.getFuture(); future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() { @Override @@ -167,14 +167,14 @@ public class TaskExecutorToResourceManagerConnection { // Utilities // ------------------------------------------------------------------------ - static class ResourceManagerRegistration + private static class ResourceManagerRegistration extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> { private final String taskExecutorAddress; private final ResourceID resourceID; - public ResourceManagerRegistration( + ResourceManagerRegistration( Logger log, RpcService rpcService, String targetAddress,