[hotfix] Treat taskManager's rpc address and location separately
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6fd8a295 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6fd8a295 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6fd8a295 Branch: refs/heads/flip-6 Commit: 6fd8a295ceae8256a40bb0d3a47530964ff48c61 Parents: fb7bfd4 Author: Kurt Young <[email protected]> Authored: Mon Oct 17 09:38:46 2016 +0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 20 19:50:35 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 7 ++-- .../runtime/jobmaster/JobMasterGateway.java | 8 +++-- .../runtime/taskexecutor/JobLeaderService.java | 37 +++++++++++++------- .../runtime/taskexecutor/TaskExecutor.java | 2 +- .../runtime/taskexecutor/TaskExecutorTest.java | 14 ++++---- 5 files changed, 43 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6fd8a295/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 8cb9946..306a28a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -680,13 +680,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public Future<RegistrationResponse> registerTaskManager( + final String taskManagerRpcAddress, final TaskManagerLocation taskManagerLocation, final UUID leaderId) throws Exception { if (!JobMaster.this.leaderSessionID.equals(leaderId)) { log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + "leader session ID {} did not equal the received leader session ID {}.", - taskManagerLocation.getResourceID(), taskManagerLocation.addressString(), + taskManagerLocation.getResourceID(), taskManagerRpcAddress, JobMaster.this.leaderSessionID, leaderId); throw new Exception("Leader id not match, expected: " + JobMaster.this.leaderSessionID + ", actual: " + leaderId); @@ -702,7 +703,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return getRpcService().execute(new Callable<TaskExecutorGateway>() { @Override public TaskExecutorGateway call() throws Exception { - return getRpcService().connect(taskManagerLocation.addressString(), TaskExecutorGateway.class) + return getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class) .get(rpcTimeout.getSize(), rpcTimeout.getUnit()); } }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { @@ -715,7 +716,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (!JobMaster.this.leaderSessionID.equals(leaderId)) { log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " + "leader session ID {} did not equal the received leader session ID {}.", - taskManagerLocation.getResourceID(), taskManagerLocation.addressString(), + taskManagerId, taskManagerRpcAddress, JobMaster.this.leaderSessionID, leaderId); return new RegistrationResponse.Decline("Invalid leader session id"); } http://git-wip-us.apache.org/repos/asf/flink/blob/6fd8a295/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 4c85839..4ee9f92 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -196,12 +196,14 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { /** * Register the task manager at the job manager. * - * @param taskManagerLocation location of the task manager - * @param leaderId identifying the job leader - * @param timeout for the rpc call + * @param taskManagerRpcAddress the rpc address of the task manager + * @param taskManagerLocation location of the task manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call * @return Future registration response indicating whether the registration was successful or not */ Future<RegistrationResponse> registerTaskManager( + final String taskManagerRpcAddress, final TaskManagerLocation taskManagerLocation, final UUID leaderId, @RpcTimeout final Time timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/6fd8a295/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 14d36ab..93c7bb7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -62,6 +62,9 @@ public class JobLeaderService { /** Internal state of the service */ private volatile JobLeaderService.State state; + /** Address of the owner of this service. This address is used for the job manager connection */ + private String ownerAddress; + /** Rpc service to use for establishing connections */ private RpcService rpcService; @@ -78,6 +81,7 @@ public class JobLeaderService { state = JobLeaderService.State.CREATED; + ownerAddress = null; rpcService = null; highAvailabilityServices = null; jobLeaderListener = null; @@ -90,20 +94,23 @@ public class JobLeaderService { /** * Start the job leader service with the given services. * + * @param initialOwnerAddress to be used for establishing connections (source address) * @param initialRpcService to be used to create rpc connections * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs * @param initialJobLeaderListener listening for job leader changes */ public void start( - final RpcService initialRpcService, - final HighAvailabilityServices initialHighAvailabilityServices, - final JobLeaderListener initialJobLeaderListener) { + final String initialOwnerAddress, + final RpcService initialRpcService, + final HighAvailabilityServices initialHighAvailabilityServices, + final JobLeaderListener initialJobLeaderListener) { if (JobLeaderService.State.CREATED != state) { throw new IllegalStateException("The service has already been started."); } else { LOG.info("Start job leader service."); + this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress); this.rpcService = Preconditions.checkNotNull(initialRpcService); this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices); this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener); @@ -310,6 +317,7 @@ public class JobLeaderService { JobMasterGateway.class, getTargetAddress(), getTargetLeaderId(), + ownerAddress, ownLocation); } @@ -345,19 +353,23 @@ public class JobLeaderService { extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> { + private final String taskManagerRpcAddress; + private final TaskManagerLocation taskManagerLocation; JobManagerRetryingRegistration( - Logger log, - RpcService rpcService, - String targetName, - Class<JobMasterGateway> targetType, - String targetAddress, - UUID leaderId, - TaskManagerLocation taskManagerLocation) { - + Logger log, + RpcService rpcService, + String targetName, + Class<JobMasterGateway> targetType, + String targetAddress, + UUID leaderId, + String taskManagerRpcAddress, + TaskManagerLocation taskManagerLocation) + { super(log, rpcService, targetName, targetType, targetAddress, leaderId); + this.taskManagerRpcAddress = taskManagerRpcAddress; this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); } @@ -365,7 +377,8 @@ public class JobLeaderService { protected Future<RegistrationResponse> invokeRegistration( JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { - return gateway.registerTaskManager(taskManagerLocation, leaderId, Time.milliseconds(timeoutMillis)); + return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, + leaderId, Time.milliseconds(timeoutMillis)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6fd8a295/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 1201281..3e3a544 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -206,7 +206,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { taskSlotTable.start(new SlotActionsImpl()); // start the job leader service - jobLeaderService.start(getRpcService(), haServices, new JobLeaderListenerImpl()); + jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6fd8a295/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 2220f12..2b5d2dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -379,9 +379,10 @@ public class TaskExecutorTest extends TestLogger { final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( - eq(taskManagerLocation), - eq(jobManagerLeaderId), - any(Time.class) + any(String.class), + eq(taskManagerLocation), + eq(jobManagerLeaderId), + any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress); @@ -483,9 +484,10 @@ public class TaskExecutorTest extends TestLogger { final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.registerTaskManager( - eq(taskManagerLocation), - eq(jobManagerLeaderId), - any(Time.class) + any(String.class), + eq(taskManagerLocation), + eq(jobManagerLeaderId), + any(Time.class) )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
