[FLINK-4406] [cluster management] Implement job master registration at resource 
manager

[FLINK-4406] [cluster management] Skip new connection if new resource manager's 
address and leader id are both not changing

[FLINK-4406] [cluster management] Verify registration response with leader id

This closes #2565.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d3cc866
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d3cc866
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d3cc866

Branch: refs/heads/flip-6
Commit: 8d3cc866185c48d5ac5d06c4255ea7bcfd923753
Parents: 8c21de4
Author: Kurt Young <ykt...@gmail.com>
Authored: Thu Sep 29 08:56:27 2016 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Oct 6 13:38:46 2016 +0200

----------------------------------------------------------------------
 .../runtime/jobmaster/JobManagerRunner.java     |   8 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 222 +++++++++++++++++--
 .../runtime/jobmaster/JobMasterGateway.java     |  17 +-
 .../jobmaster/JobMasterRegistrationSuccess.java |  18 +-
 .../JobMasterToResourceManagerConnection.java   | 117 ----------
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |  25 ++-
 7 files changed, 239 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8d3cc866/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index bc2bf9a..6944d85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -63,9 +63,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
 
        private final JobMaster jobManager;
 
-       /** Leader session id when granted leadership */
-       private UUID leaderSessionID;
-
        /** flag marking the runner as shut down */
        private volatile boolean shutdown;
 
@@ -93,7 +90,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                this.executionContext = rpcService.getExecutor();
                this.checkpointRecoveryFactory = 
haServices.getCheckpointRecoveryFactory();
                this.leaderElectionService = 
haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
-               this.leaderSessionID = null;
 
                this.jobManager = new JobMaster(
                        jobGraph, configuration, rpcService, haServices,
@@ -232,7 +228,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                        // The operation may be blocking, but since this runner 
is idle before it been granted the leadership,
                        // it's okay that job manager wait for the operation 
complete
                        
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-                       this.leaderSessionID = leaderSessionID;
 
                        // Double check the leadership after we confirm that, 
there is a small chance that multiple
                        // job managers schedule the same job after if they try 
to recover at the same time.
@@ -242,7 +237,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                                        log.info("Job {} ({}) already finished 
by others.", jobGraph.getName(), jobGraph.getJobID());
                                        jobFinishedByOther();
                                } else {
-                                       jobManager.getSelf().startJob();
+                                       
jobManager.getSelf().startJob(leaderSessionID);
                                }
                        }
                }
@@ -259,7 +254,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                        log.info("JobManager for job {} ({}) was revoked 
leadership at {}.",
                                jobGraph.getName(), jobGraph.getJobID(), 
getAddress());
 
-                       leaderSessionID = null;
                        jobManager.getSelf().suspendJob(new 
Exception("JobManager is no longer the leader."));
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3cc866/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b52a23c..1e01c55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
@@ -34,6 +35,7 @@ import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -47,18 +49,26 @@ import 
org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.slf4j.Logger;
+
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -76,9 +86,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
-       /** Gateway to connected resource manager, null iff not connected */
-       private ResourceManagerGateway resourceManager = null;
-
        /** Logical representation of the job */
        private final JobGraph jobGraph;
 
@@ -123,6 +130,18 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        private MetricGroup jobMetrics;
 
+       private volatile UUID leaderSessionID;
+
+       // --------- resource manager --------
+
+       /** Leader retriever service used to locate ResourceManager's address */
+       private LeaderRetrievalService resourceManagerLeaderRetriever;
+
+       /** Connection with ResourceManager, null if not located address yet or 
we close it initiative */
+       private volatile ResourceManagerConnection resourceManagerConnection;
+
+       // 
------------------------------------------------------------------------
+
        public JobMaster(
                JobGraph jobGraph,
                Configuration configuration,
@@ -151,10 +170,6 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                this.jobCompletionActions = checkNotNull(jobCompletionActions);
        }
 
-       public ResourceManagerGateway getResourceManager() {
-               return resourceManager;
-       }
-
        
//----------------------------------------------------------------------------------------------
        // Lifecycle management
        
//----------------------------------------------------------------------------------------------
@@ -196,7 +211,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                        .getRestartStrategy();
                        if (restartStrategyConfiguration != null) {
                                restartStrategy = 
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
-                       } else {
+                       }
+                       else {
                                restartStrategy = 
restartStrategyFactory.createRestartStrategy();
                        }
 
@@ -216,6 +232,13 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                throw new 
JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint 
recovery factory.", e);
                        }
 
+                       try {
+                               resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
+                       } catch (Exception e) {
+                               log.error("Could not get the resource manager 
leader retriever.", e);
+                               throw new 
JobSubmissionException(jobGraph.getJobID(),
+                                       "Could not get the resource manager 
leader retriever.", e);
+                       }
                } catch (Throwable t) {
                        log.error("Failed to initializing job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
 
@@ -223,7 +246,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                        if (t instanceof JobSubmissionException) {
                                throw (JobSubmissionException) t;
-                       } else {
+                       }
+                       else {
                                throw new 
JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
                                        jobGraph.getName() + " (" + 
jobGraph.getJobID() + ")", t);
                        }
@@ -240,8 +264,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                super.shutDown();
 
                suspendJob(new Exception("JobManager is shutting down."));
+
+               disposeCommunicationWithResourceManager();
        }
 
+
+
        
//----------------------------------------------------------------------------------------------
        // RPC methods
        
//----------------------------------------------------------------------------------------------
@@ -251,8 +279,10 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         * being recovered. After this, we will begin to schedule the job.
         */
        @RpcMethod
-       public void startJob() {
-               log.info("Starting job {} ({}).", jobGraph.getName(), 
jobGraph.getJobID());
+       public void startJob(final UUID leaderSessionID) {
+               log.info("Starting job {} ({}) with leaderId {}.", 
jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
+
+               this.leaderSessionID = leaderSessionID;
 
                if (executionGraph != null) {
                        executionGraph = new ExecutionGraph(
@@ -267,7 +297,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                jobGraph.getClasspaths(),
                                userCodeLoader,
                                jobMetrics);
-               } else {
+               }
+               else {
                        // TODO: update last active time in JobInfo
                }
 
@@ -343,7 +374,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                final CheckpointStatsTracker 
checkpointStatsTracker;
                                if (isStatsDisabled) {
                                        checkpointStatsTracker = new 
DisabledCheckpointStatsTracker();
-                               } else {
+                               }
+                               else {
                                        int historySize = 
configuration.getInteger(
                                                
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
                                                
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
@@ -397,6 +429,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        }
                        */
 
+                       // job is good to go, try to locate resource manager's 
address
+                       resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
                } catch (Throwable t) {
                        log.error("Failed to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
 
@@ -406,7 +440,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        final Throwable rt;
                        if (t instanceof JobExecutionException) {
                                rt = (JobExecutionException) t;
-                       } else {
+                       }
+                       else {
                                rt = new 
JobExecutionException(jobGraph.getJobID(),
                                        "Failed to start job " + 
jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
                        }
@@ -439,10 +474,14 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         */
        @RpcMethod
        public void suspendJob(final Throwable cause) {
+               leaderSessionID = null;
+
                if (executionGraph != null) {
                        executionGraph.suspend(cause);
                        executionGraph = null;
                }
+
+               disposeCommunicationWithResourceManager();
        }
 
        /**
@@ -457,14 +496,90 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                return Acknowledge.get();
        }
 
-       /**
-        * Triggers the registration of the job master at the resource manager.
-        *
-        * @param address Address of the resource manager
-        */
-       @RpcMethod
-       public void registerAtResourceManager(final String address) {
-               //TODO:: register at the RM
+       
//----------------------------------------------------------------------------------------------

+       // Internal methods

+       // 
----------------------------------------------------------------------------------------------


+
+       private void handleFatalError(final Throwable cause) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
+                               shutDown();
+                               jobCompletionActions.onFatalError(cause);
+                       }
+               });
+       }
+
+       private void notifyOfNewResourceManagerLeader(
+               final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
+       {
+               // IMPORTANT: executed by main thread to avoid concurrence
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               if (resourceManagerConnection != null) {
+                                       if (resourceManagerAddress != null) {
+                                               if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
+                                                       && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
+                                               {
+                                                       // both address and 
leader id are not changed, we can keep the old connection
+                                                       return;
+                                               }
+                                               log.info("ResourceManager 
leader changed from {} to {}. Registering at new leader.",
+                                                       
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+                                       }
+                                       else {
+                                               log.info("Current 
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
+                                                       
resourceManagerConnection.getTargetAddress());
+                                       }
+                               }
+
+                               closeResourceManagerConnection();
+
+                               if (resourceManagerAddress != null) {
+                                       log.info("Attempting to register at 
ResourceManager {}", resourceManagerAddress);
+                                       resourceManagerConnection = new 
ResourceManagerConnection(
+                                               log, jobGraph.getJobID(), 
leaderSessionID,
+                                               resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
+                                       resourceManagerConnection.start();
+                               }
+                       }
+               });
+       }
+
+       private void onResourceManagerRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
+               getRpcService().execute(new Runnable() {
+                       @Override
+                       public void run() {
+                               // TODO - add tests for comment in 
https://github.com/apache/flink/pull/2565
+                               // verify the response with current connection
+                               if (resourceManagerConnection != null
+                                       && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
 {
+                                       log.info("JobManager successfully 
registered at ResourceManager, leader id: {}.",
+                                               
success.getResourceManagerLeaderId());
+                               }
+                       }
+               });
+       }
+
+       private void disposeCommunicationWithResourceManager() {
+               // 1. stop the leader retriever so we will not receiving 
updates anymore
+               try {
+                       resourceManagerLeaderRetriever.stop();
+               } catch (Exception e) {
+                       log.warn("Failed to stop resource manager leader 
retriever.");
+               }
+
+               // 2. close current connection with ResourceManager if exists
+               closeResourceManagerConnection();
+       }
+
+       private void closeResourceManagerConnection() {
+               if (resourceManagerConnection != null) {
+                       resourceManagerConnection.close();
+                       resourceManagerConnection = null;
+               }
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -494,4 +609,67 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
                return ret;
        }
+
+       
//----------------------------------------------------------------------------------------------
+       // Utility classes
+       
//----------------------------------------------------------------------------------------------
+
+       private class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
+               @Override
+               public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
+                       notifyOfNewResourceManagerLeader(leaderAddress, 
leaderSessionID);
+               }
+
+               @Override
+               public void handleError(final Exception exception) {
+                       handleFatalError(exception);
+               }
+       }
+
+       private class ResourceManagerConnection
+               extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess>
+       {
+               private final JobID jobID;
+
+               private final UUID jobManagerLeaderID;
+
+               ResourceManagerConnection(
+                       final Logger log,
+                       final JobID jobID,
+                       final UUID jobManagerLeaderID,
+                       final String resourceManagerAddress,
+                       final UUID resourceManagerLeaderID,
+                       final Executor executor)
+               {
+                       super(log, resourceManagerAddress, 
resourceManagerLeaderID, executor);
+                       this.jobID = checkNotNull(jobID);
+                       this.jobManagerLeaderID = 
checkNotNull(jobManagerLeaderID);
+               }
+
+               @Override
+               protected RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess> generateRegistration() {
+                       return new RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess>(
+                               log, getRpcService(), "ResourceManager", 
ResourceManagerGateway.class,
+                               getTargetAddress(), getTargetLeaderId())
+                       {
+                               @Override
+                               protected Future<RegistrationResponse> 
invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
+                                       long timeoutMillis) throws Exception
+                               {
+                                       Time timeout = 
Time.milliseconds(timeoutMillis);
+                                       return 
gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, 
timeout);
+                               }
+                       };
+               }
+
+               @Override
+               protected void onRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
+                       onResourceManagerRegistrationSuccess(success);
+               }
+
+               @Override
+               protected void onRegistrationFailure(final Throwable failure) {
+                       handleFatalError(failure);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3cc866/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b281ea8..6587ccb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -23,19 +23,21 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
+import java.util.UUID;
+
 /**
  * {@link JobMaster} rpc gateway interface
  */
 public interface JobMasterGateway extends RpcGateway {
 
        /**
-        * Making this job begins to run.
+        * Starting the job under the given leader session ID.
         */
-       void startJob();
+       void startJob(final UUID leaderSessionID);
 
        /**
-        * Suspending job, all the running tasks will be cancelled, and runtime 
status will be cleared. Should re-submit
-        * the job before restarting it.
+        * Suspending job, all the running tasks will be cancelled, and runtime 
status will be cleared.
+        * Should re-submit the job before restarting it.
         *
         * @param cause The reason of why this job been suspended.
         */
@@ -48,11 +50,4 @@ public interface JobMasterGateway extends RpcGateway {
         * @return Future acknowledge of the task execution state update
         */
        Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
-
-       /**
-        * Triggers the registration of the job master at the resource manager.
-        *
-        * @param address Address of the resource manager
-        */
-       void registerAtResourceManager(final String address);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3cc866/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
index 031c38e..4058452 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -20,6 +20,10 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Base class for responses from the ResourceManager to a registration attempt 
by a JobMaster.
  */
@@ -29,8 +33,11 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
 
        private final long heartbeatInterval;
 
-       public JobMasterRegistrationSuccess(long heartbeatInterval) {
+       private final UUID resourceManagerLeaderId;
+
+       public JobMasterRegistrationSuccess(final long heartbeatInterval, final 
UUID resourceManagerLeaderId) {
                this.heartbeatInterval = heartbeatInterval;
+               this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
        }
 
        /**
@@ -42,8 +49,15 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
                return heartbeatInterval;
        }
 
+       public UUID getResourceManagerLeaderId() {
+               return resourceManagerLeaderId;
+       }
+
        @Override
        public String toString() {
-               return "JobMasterRegistrationSuccess(" + heartbeatInterval + 
')';
+               return "JobMasterRegistrationSuccess{" +
+                       "heartbeatInterval=" + heartbeatInterval +
+                       ", resourceManagerLeaderId=" + resourceManagerLeaderId +
+                       '}';
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3cc866/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
deleted file mode 100644
index 71fce8c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.registration.RegisteredRpcConnection;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.registration.RetryingRegistration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.concurrent.Future;
-
-import org.slf4j.Logger;
-
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The connection between a JobMaster and the ResourceManager.
- */
-public class JobMasterToResourceManagerConnection 
-               extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess> {
-
-       /** the JobMaster whose connection to the ResourceManager this 
represents */
-       private final JobMaster jobMaster;
-
-       private final JobID jobID;
-
-       private final UUID jobMasterLeaderId;
-
-       public JobMasterToResourceManagerConnection(
-                       Logger log,
-                       JobID jobID,
-                       JobMaster jobMaster,
-                       UUID jobMasterLeaderId,
-                       String resourceManagerAddress,
-                       UUID resourceManagerLeaderId,
-                       Executor executor) {
-
-               super(log, resourceManagerAddress, resourceManagerLeaderId, 
executor);
-               this.jobMaster = checkNotNull(jobMaster);
-               this.jobID = checkNotNull(jobID);
-               this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
-       }
-
-       @Override
-       protected RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess> generateRegistration() {
-               return new 
JobMasterToResourceManagerConnection.ResourceManagerRegistration(
-                       log, jobMaster.getRpcService(),
-                       getTargetAddress(), getTargetLeaderId(),
-                       jobMaster.getAddress(),jobID, jobMasterLeaderId);
-       }
-
-       @Override
-       protected void onRegistrationSuccess(JobMasterRegistrationSuccess 
success) {
-       }
-
-       @Override
-       protected void onRegistrationFailure(Throwable failure) {
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static class ResourceManagerRegistration
-               extends RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess> {
-
-               private final String jobMasterAddress;
-
-               private final JobID jobID;
-
-               private final UUID jobMasterLeaderId;
-
-               ResourceManagerRegistration(
-                       Logger log,
-                       RpcService rpcService,
-                       String targetAddress,
-                       UUID leaderId,
-                       String jobMasterAddress,
-                       JobID jobID,
-                       UUID jobMasterLeaderId) {
-
-                       super(log, rpcService, "ResourceManager", 
ResourceManagerGateway.class, targetAddress, leaderId);
-                       this.jobMasterAddress = checkNotNull(jobMasterAddress);
-                       this.jobID = checkNotNull(jobID);
-                       this.jobMasterLeaderId = 
checkNotNull(jobMasterLeaderId);
-               }
-
-               @Override
-               protected Future<RegistrationResponse> invokeRegistration(
-                       ResourceManagerGateway gateway, UUID leaderId, long 
timeoutMillis) throws Exception {
-
-                       Time timeout = Time.milliseconds(timeoutMillis);
-                       return gateway.registerJobMaster(leaderId, 
jobMasterLeaderId,jobMasterAddress, jobID, timeout);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3cc866/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 190a4de..f695de4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -215,7 +215,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                                                if (existingGateway != null) {
                                                        log.info("Replacing 
gateway for registered JobID {}.", jobID);
                                                }
-                                               return new 
JobMasterRegistrationSuccess(5000);
+                                               return new 
JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
                                        }
                                }
                        }, getMainThreadExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/8d3cc866/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index dc3b5fd..bfe5f55 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -142,8 +142,9 @@ public class JobManagerRunnerMockTest {
        public void testJobFinished() throws Exception {
                runner.start();
 
-               runner.grantLeadership(UUID.randomUUID());
-               verify(jobManagerGateway).startJob();
+               UUID leaderSessionID = UUID.randomUUID();
+               runner.grantLeadership(leaderSessionID);
+               verify(jobManagerGateway).startJob(leaderSessionID);
                assertTrue(!jobCompletion.isJobFinished());
 
                // runner been told by JobManager that job is finished
@@ -160,8 +161,9 @@ public class JobManagerRunnerMockTest {
        public void testJobFailed() throws Exception {
                runner.start();
 
-               runner.grantLeadership(UUID.randomUUID());
-               verify(jobManagerGateway).startJob();
+               UUID leaderSessionID = UUID.randomUUID();
+               runner.grantLeadership(leaderSessionID);
+               verify(jobManagerGateway).startJob(leaderSessionID);
                assertTrue(!jobCompletion.isJobFinished());
 
                // runner been told by JobManager that job is failed
@@ -177,8 +179,9 @@ public class JobManagerRunnerMockTest {
        public void testLeadershipRevoked() throws Exception {
                runner.start();
 
-               runner.grantLeadership(UUID.randomUUID());
-               verify(jobManagerGateway).startJob();
+               UUID leaderSessionID = UUID.randomUUID();
+               runner.grantLeadership(leaderSessionID);
+               verify(jobManagerGateway).startJob(leaderSessionID);
                assertTrue(!jobCompletion.isJobFinished());
 
                runner.revokeLeadership();
@@ -190,16 +193,18 @@ public class JobManagerRunnerMockTest {
        public void testRegainLeadership() throws Exception {
                runner.start();
 
-               runner.grantLeadership(UUID.randomUUID());
-               verify(jobManagerGateway).startJob();
+               UUID leaderSessionID = UUID.randomUUID();
+               runner.grantLeadership(leaderSessionID);
+               verify(jobManagerGateway).startJob(leaderSessionID);
                assertTrue(!jobCompletion.isJobFinished());
 
                runner.revokeLeadership();
                verify(jobManagerGateway).suspendJob(any(Throwable.class));
                assertFalse(runner.isShutdown());
 
-               runner.grantLeadership(UUID.randomUUID());
-               verify(jobManagerGateway, times(2)).startJob();
+               UUID leaderSessionID2 = UUID.randomUUID();
+               runner.grantLeadership(leaderSessionID2);
+               verify(jobManagerGateway, times(2)).startJob(leaderSessionID2);
        }
 
        private static class TestingOnCompletionActions implements 
OnCompletionActions {

Reply via email to