[FLINK-4408] [JobManager] Introduce JobMasterRunner and implement job submission & setting up the ExecutionGraph
This closes #2480 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cda5933 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cda5933 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cda5933 Branch: refs/heads/flip-6 Commit: 3cda59339ed593b04be3a897bf04a01e3673db5b Parents: 629078e Author: Kurt Young <ykt...@gmail.com> Authored: Thu Sep 8 12:00:13 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Sep 23 20:30:05 2016 +0200 ---------------------------------------------------------------------- .../flink/api/common/JobExecutionResult.java | 2 +- .../flink/api/common/JobSubmissionResult.java | 2 +- .../HighAvailabilityServices.java | 12 + .../runtime/highavailability/NonHaServices.java | 16 +- .../runtime/jobmanager/OnCompletionActions.java | 31 ++ .../runtime/jobmanager/scheduler/Scheduler.java | 9 + .../runtime/jobmaster/JobManagerRunner.java | 288 +++++++++++ .../runtime/jobmaster/JobManagerServices.java | 73 +++ .../flink/runtime/jobmaster/JobMaster.java | 485 ++++++++++++++----- .../runtime/jobmaster/JobMasterGateway.java | 13 + .../jobmaster/MiniClusterJobDispatcher.java | 385 +++++++++++++++ .../flink/runtime/rpc/FatalErrorHandler.java | 24 + .../runtime/taskexecutor/TaskExecutor.java | 12 + .../TestingHighAvailabilityServices.java | 39 +- .../jobmaster/JobManagerRunnerMockTest.java | 254 ++++++++++ .../flink/runtime/rpc/RpcConnectionTest.java | 17 +- 16 files changed, 1533 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java index cb4ecc5..7286cc5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit; @Public public class JobExecutionResult extends JobSubmissionResult { - private long netRuntime; + private final long netRuntime; private final Map<String, Object> accumulatorResults; http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java index c5dc869..b0e7e24 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java @@ -26,7 +26,7 @@ import org.apache.flink.annotation.Public; @Public public class JobSubmissionResult { - private JobID jobID; + private final JobID jobID; public JobSubmissionResult(JobID jobID) { this.jobID = jobID; http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 7634176..d67e927 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -61,4 +63,14 @@ public interface HighAvailabilityServices { * @param jobID The identifier of the job running the election. */ LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception; + + /** + * Gets the checkpoint recovery factory for the job manager + */ + CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception; + + /** + * Gets the submitted job graph store for the job manager + */ + SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 33dc2d7..a2c9cc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -19,13 +19,17 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; -import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -88,4 +92,14 @@ public class NonHaServices implements HighAvailabilityServices { public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { return new StandaloneLeaderElectionService(); } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + return new StandaloneCheckpointRecoveryFactory(); + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + return new StandaloneSubmittedJobGraphStore(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java new file mode 100644 index 0000000..6de4253 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.runtime.rpc.FatalErrorHandler; + +public interface OnCompletionActions extends FatalErrorHandler { + + void jobFinished(JobExecutionResult result); + + void jobFailed(Throwable cause); + + void jobFinishedByOther(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index c9cdd00..67f2fcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -31,6 +31,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import akka.dispatch.Futures; @@ -54,6 +55,7 @@ import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; /** * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots. @@ -107,6 +109,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl /** * Creates a new scheduler. */ + public Scheduler(ExecutorService executor) { + this(ExecutionContext$.MODULE$.fromExecutor(executor)); + } + + /** + * Creates a new scheduler. + */ public Scheduler(ExecutionContext executionContext) { this.executionContext = executionContext; } http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java new file mode 100644 index 0000000..bc2bf9a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; +import java.util.concurrent.Executor; + +/** + * The runner for the job manager. It deals with job level leader election and make underlying job manager + * properly reacted. + */ +public class JobManagerRunner implements LeaderContender, OnCompletionActions { + + private final Logger log = LoggerFactory.getLogger(JobManagerRunner.class); + + /** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously */ + private final Object lock = new Object(); + + /** The job graph needs to run */ + private final JobGraph jobGraph; + + private final OnCompletionActions toNotify; + + /** The execution context which is used to execute futures */ + private final Executor executionContext; + + // TODO: use this to decide whether the job is finished by other + private final CheckpointRecoveryFactory checkpointRecoveryFactory; + + /** Leader election for this job */ + private final LeaderElectionService leaderElectionService; + + private final JobMaster jobManager; + + /** Leader session id when granted leadership */ + private UUID leaderSessionID; + + /** flag marking the runner as shut down */ + private volatile boolean shutdown; + + public JobManagerRunner( + final JobGraph jobGraph, + final Configuration configuration, + final RpcService rpcService, + final HighAvailabilityServices haServices, + final OnCompletionActions toNotify) throws Exception + { + this(jobGraph, configuration, rpcService, haServices, + JobManagerServices.fromConfiguration(configuration), toNotify); + } + + public JobManagerRunner( + final JobGraph jobGraph, + final Configuration configuration, + final RpcService rpcService, + final HighAvailabilityServices haServices, + final JobManagerServices jobManagerServices, + final OnCompletionActions toNotify) throws Exception + { + this.jobGraph = jobGraph; + this.toNotify = toNotify; + this.executionContext = rpcService.getExecutor(); + this.checkpointRecoveryFactory = haServices.getCheckpointRecoveryFactory(); + this.leaderElectionService = haServices.getJobMasterLeaderElectionService(jobGraph.getJobID()); + this.leaderSessionID = null; + + this.jobManager = new JobMaster( + jobGraph, configuration, rpcService, haServices, + jobManagerServices.libraryCacheManager, + jobManagerServices.restartStrategyFactory, + jobManagerServices.savepointStore, + jobManagerServices.timeout, + new Scheduler(jobManagerServices.executorService), + jobManagerServices.jobManagerMetricGroup, + this); + } + + //---------------------------------------------------------------------------------------------- + // Lifecycle management + //---------------------------------------------------------------------------------------------- + + public void start() throws Exception { + jobManager.init(); + jobManager.start(); + + try { + leaderElectionService.start(this); + } + catch (Exception e) { + log.error("Could not start the JobManager because the leader election service did not start.", e); + throw new Exception("Could not start the leader election service.", e); + } + } + + public void shutdown() { + shutdown(new Exception("The JobManager runner is shutting down")); + } + + public void shutdown(Throwable cause) { + // TODO what is the cause used for ? + shutdownInternally(); + } + + private void shutdownInternally() { + synchronized (lock) { + shutdown = true; + + if (leaderElectionService != null) { + try { + leaderElectionService.stop(); + } catch (Exception e) { + log.error("Could not properly shutdown the leader election service."); + } + } + + jobManager.shutDown(); + } + } + + //---------------------------------------------------------------------------------------------- + // Result and error handling methods + //---------------------------------------------------------------------------------------------- + + /** + * Job completion notification triggered by JobManager + */ + @Override + public void jobFinished(JobExecutionResult result) { + try { + shutdownInternally(); + } + finally { + if (toNotify != null) { + toNotify.jobFinished(result); + } + } + } + + /** + * Job completion notification triggered by JobManager + */ + @Override + public void jobFailed(Throwable cause) { + try { + shutdownInternally(); + } + finally { + if (toNotify != null) { + toNotify.jobFailed(cause); + } + } + } + + /** + * Job completion notification triggered by self + */ + @Override + public void jobFinishedByOther() { + try { + shutdownInternally(); + } + finally { + if (toNotify != null) { + toNotify.jobFinishedByOther(); + } + } + } + + /** + * Job completion notification triggered by JobManager or self + */ + @Override + public void onFatalError(Throwable exception) { + // first and in any case, notify our handler, so it can react fast + try { + if (toNotify != null) { + toNotify.onFatalError(exception); + } + } + finally { + log.error("JobManager runner encountered a fatal error.", exception); + shutdownInternally(); + } + } + + //---------------------------------------------------------------------------------------------- + // Leadership methods + //---------------------------------------------------------------------------------------------- + + @Override + public void grantLeadership(final UUID leaderSessionID) { + synchronized (lock) { + if (shutdown) { + log.info("JobManagerRunner already shutdown."); + return; + } + + log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.", + jobGraph.getName(), jobGraph.getJobID(), leaderSessionID, getAddress()); + + // The operation may be blocking, but since this runner is idle before it been granted the leadership, + // it's okay that job manager wait for the operation complete + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + this.leaderSessionID = leaderSessionID; + + // Double check the leadership after we confirm that, there is a small chance that multiple + // job managers schedule the same job after if they try to recover at the same time. + // This will eventually be noticed, but can not be ruled out from the beginning. + if (leaderElectionService.hasLeadership()) { + if (isJobFinishedByOthers()) { + log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID()); + jobFinishedByOther(); + } else { + jobManager.getSelf().startJob(); + } + } + } + } + + @Override + public void revokeLeadership() { + synchronized (lock) { + if (shutdown) { + log.info("JobManagerRunner already shutdown."); + return; + } + + log.info("JobManager for job {} ({}) was revoked leadership at {}.", + jobGraph.getName(), jobGraph.getJobID(), getAddress()); + + leaderSessionID = null; + jobManager.getSelf().suspendJob(new Exception("JobManager is no longer the leader.")); + } + } + + @Override + public String getAddress() { + return jobManager.getAddress(); + } + + @Override + public void handleError(Exception exception) { + log.error("Leader Election Service encountered a fatal error.", exception); + onFatalError(exception); + } + + @VisibleForTesting + boolean isJobFinishedByOthers() { + // TODO + return false; + } + + @VisibleForTesting + boolean isShutdown() { + return shutdown; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java new file mode 100644 index 0000000..e6beba6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import java.util.concurrent.ExecutorService; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class to hold all auxiliary services used by the {@link JobMaster}. + */ +public class JobManagerServices { + + public final ExecutorService executorService; + + public final BlobLibraryCacheManager libraryCacheManager; + + public final RestartStrategyFactory restartStrategyFactory; + + public final SavepointStore savepointStore; + + public final Time timeout; + + public final JobManagerMetricGroup jobManagerMetricGroup; + + public JobManagerServices( + ExecutorService executorService, + BlobLibraryCacheManager libraryCacheManager, + RestartStrategyFactory restartStrategyFactory, + SavepointStore savepointStore, + Time timeout, + JobManagerMetricGroup jobManagerMetricGroup) { + + this.executorService = checkNotNull(executorService); + this.libraryCacheManager = checkNotNull(libraryCacheManager); + this.restartStrategyFactory = checkNotNull(restartStrategyFactory); + this.savepointStore = checkNotNull(savepointStore); + this.timeout = checkNotNull(timeout); + this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); + } + + // ------------------------------------------------------------------------ + // Creating the components from a configuration + // ------------------------------------------------------------------------ + + public static JobManagerServices fromConfiguration(Configuration config) throws Exception { + // TODO not yet implemented + return null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1537396..b52a23c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -18,21 +18,50 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.leaderelection.LeaderContender; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; +import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.util.Preconditions; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.duration.FiniteDuration; -import java.util.UUID; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * JobMaster implementation. The job master is responsible for the execution of a single @@ -41,7 +70,7 @@ import java.util.UUID; * It offers the following methods as part of its rpc interface to interact with the JobMaster * remotely: * <ul> - * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for + * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for * given task</li> * </ul> */ @@ -52,7 +81,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** Logical representation of the job */ private final JobGraph jobGraph; - private final JobID jobID; /** Configuration of the job */ private final Configuration configuration; @@ -60,32 +88,67 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** Service to contend for and retrieve the leadership of JM and RM */ private final HighAvailabilityServices highAvailabilityServices; - /** Leader Management */ - private LeaderElectionService leaderElectionService = null; - private UUID leaderSessionID; + /** Blob cache manager used across jobs */ + private final BlobLibraryCacheManager libraryCacheManager; + + /** Factory to create restart strategy for this job */ + private final RestartStrategyFactory restartStrategyFactory; + + /** Store for save points */ + private final SavepointStore savepointStore; + + /** The timeout for this job */ + private final Time timeout; + + /** The scheduler to use for scheduling new tasks as they are needed */ + private final Scheduler scheduler; + + /** The metrics group used across jobs */ + private final JobManagerMetricGroup jobManagerMetricGroup; + + /** The execution context which is used to execute futures */ + private final Executor executionContext; + + private final OnCompletionActions jobCompletionActions; + + /** The execution graph of this job */ + private volatile ExecutionGraph executionGraph; + + /** The checkpoint recovery factory used by this job */ + private CheckpointRecoveryFactory checkpointRecoveryFactory; + + private ClassLoader userCodeLoader; + + private RestartStrategy restartStrategy; + + private MetricGroup jobMetrics; - /** - * The JM's Constructor - * - * @param jobGraph The representation of the job's execution plan - * @param configuration The job's configuration - * @param rpcService The RPC service at which the JM serves - * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders. - */ public JobMaster( JobGraph jobGraph, Configuration configuration, RpcService rpcService, - HighAvailabilityServices highAvailabilityService) { - + HighAvailabilityServices highAvailabilityService, + BlobLibraryCacheManager libraryCacheManager, + RestartStrategyFactory restartStrategyFactory, + SavepointStore savepointStore, + Time timeout, + Scheduler scheduler, + JobManagerMetricGroup jobManagerMetricGroup, + OnCompletionActions jobCompletionActions) + { super(rpcService); - this.jobGraph = Preconditions.checkNotNull(jobGraph); - this.jobID = Preconditions.checkNotNull(jobGraph.getJobID()); - - this.configuration = Preconditions.checkNotNull(configuration); - - this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService); + this.jobGraph = checkNotNull(jobGraph); + this.configuration = checkNotNull(configuration); + this.highAvailabilityServices = checkNotNull(highAvailabilityService); + this.libraryCacheManager = checkNotNull(libraryCacheManager); + this.restartStrategyFactory = checkNotNull(restartStrategyFactory); + this.savepointStore = checkNotNull(savepointStore); + this.timeout = checkNotNull(timeout); + this.scheduler = checkNotNull(scheduler); + this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); + this.executionContext = checkNotNull(rpcService.getExecutor()); + this.jobCompletionActions = checkNotNull(jobCompletionActions); } public ResourceManagerGateway getResourceManager() { @@ -93,93 +156,294 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } //---------------------------------------------------------------------------------------------- - // Initialization methods + // Lifecycle management //---------------------------------------------------------------------------------------------- + + /** + * Initializing the job execution environment, should be called before start. Any error occurred during + * initialization will be treated as job submission failure. + * + * @throws JobSubmissionException + */ + public void init() throws JobSubmissionException { + log.info("Initializing job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); + + try { + // IMPORTANT: We need to make sure that the library registration is the first action, + // because this makes sure that the uploaded jar files are removed in case of + // unsuccessful + try { + libraryCacheManager.registerJob(jobGraph.getJobID(), jobGraph.getUserJarBlobKeys(), + jobGraph.getClasspaths()); + } catch (Throwable t) { + throw new JobSubmissionException(jobGraph.getJobID(), + "Cannot set up the user code libraries: " + t.getMessage(), t); + } + + userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID()); + if (userCodeLoader == null) { + throw new JobSubmissionException(jobGraph.getJobID(), + "The user code class loader could not be initialized."); + } + + if (jobGraph.getNumberOfVertices() == 0) { + throw new JobSubmissionException(jobGraph.getJobID(), "The given job is empty"); + } + + final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = + jobGraph.getSerializedExecutionConfig() + .deserializeValue(userCodeLoader) + .getRestartStrategy(); + if (restartStrategyConfiguration != null) { + restartStrategy = RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration); + } else { + restartStrategy = restartStrategyFactory.createRestartStrategy(); + } + + log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobGraph.getName(), jobGraph.getJobID()); + + if (jobManagerMetricGroup != null) { + jobMetrics = jobManagerMetricGroup.addJob(jobGraph); + } + if (jobMetrics == null) { + jobMetrics = new UnregisteredMetricsGroup(); + } + + try { + checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory(); + } catch (Exception e) { + log.error("Could not get the checkpoint recovery factory.", e); + throw new JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint recovery factory.", e); + } + + } catch (Throwable t) { + log.error("Failed to initializing job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); + + libraryCacheManager.unregisterJob(jobGraph.getJobID()); + + if (t instanceof JobSubmissionException) { + throw (JobSubmissionException) t; + } else { + throw new JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " + + jobGraph.getName() + " (" + jobGraph.getJobID() + ")", t); + } + } + } + + @Override public void start() { super.start(); - - // register at the election once the JM starts - registerAtElectionService(); } + @Override + public void shutDown() { + super.shutDown(); + + suspendJob(new Exception("JobManager is shutting down.")); + } //---------------------------------------------------------------------------------------------- - // JobMaster Leadership methods + // RPC methods //---------------------------------------------------------------------------------------------- /** - * Retrieves the election service and contend for the leadership. + * Start to run the job, runtime data structures like ExecutionGraph will be constructed now and checkpoint + * being recovered. After this, we will begin to schedule the job. */ - private void registerAtElectionService() { - try { - leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID); - leaderElectionService.start(new JobMasterLeaderContender()); - } catch (Exception e) { - throw new RuntimeException("Fail to register at the election of JobMaster", e); + @RpcMethod + public void startJob() { + log.info("Starting job {} ({}).", jobGraph.getName(), jobGraph.getJobID()); + + if (executionGraph != null) { + executionGraph = new ExecutionGraph( + ExecutionContext$.MODULE$.fromExecutor(executionContext), + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getJobConfiguration(), + jobGraph.getSerializedExecutionConfig(), + new FiniteDuration(timeout.getSize(), timeout.getUnit()), + restartStrategy, + jobGraph.getUserJarBlobKeys(), + jobGraph.getClasspaths(), + userCodeLoader, + jobMetrics); + } else { + // TODO: update last active time in JobInfo } - } - /** - * Start the execution when the leadership is granted. - * - * @param newLeaderSessionID The identifier of the new leadership session - */ - public void grantJobMasterLeadership(final UUID newLeaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID); + try { + executionGraph.setScheduleMode(jobGraph.getScheduleMode()); + executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling()); + + try { + executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph)); + } catch (Exception e) { + log.warn("Cannot create JSON plan for job {} ({})", jobGraph.getJobID(), jobGraph.getName(), e); + executionGraph.setJsonPlan("{}"); + } - // The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that - // JM waits here for the operation's completeness. - leaderSessionID = newLeaderSessionID; - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); + // initialize the vertices that have a master initialization hook + // file output formats create directories here, input formats create splits + if (log.isDebugEnabled()) { + log.debug("Running initialization on master for job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); + } + for (JobVertex vertex : jobGraph.getVertices()) { + final String executableClass = vertex.getInvokableClassName(); + if (executableClass == null || executableClass.length() == 0) { + throw new JobExecutionException(jobGraph.getJobID(), + "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class."); + } + if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) { + vertex.setParallelism(scheduler.getTotalNumberOfSlots()); + } + + try { + vertex.initializeOnMaster(userCodeLoader); + } catch (Throwable t) { + throw new JobExecutionException(jobGraph.getJobID(), + "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t); + } + } - // TODO:: execute the job when the leadership is granted. + // topologically sort the job vertices and attach the graph to the existing one + final List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); + if (log.isDebugEnabled()) { + log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), + jobGraph.getJobID(), jobGraph.getName()); } - }); - } + executionGraph.attachJobGraph(sortedTopology); - /** - * Stop the execution when the leadership is revoked. - */ - public void revokeJobMasterLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("JobManager {} was revoked leadership.", getAddress()); + if (log.isDebugEnabled()) { + log.debug("Successfully created execution graph from job graph {} ({}).", + jobGraph.getJobID(), jobGraph.getName()); + } - // TODO:: cancel the job's execution and notify all listeners - cancelAndClearEverything(new Exception("JobManager is no longer the leader.")); + final JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings(); + if (snapshotSettings != null) { + List<ExecutionJobVertex> triggerVertices = getExecutionJobVertexWithId( + executionGraph, snapshotSettings.getVerticesToTrigger()); + + List<ExecutionJobVertex> ackVertices = getExecutionJobVertexWithId( + executionGraph, snapshotSettings.getVerticesToAcknowledge()); + + List<ExecutionJobVertex> confirmVertices = getExecutionJobVertexWithId( + executionGraph, snapshotSettings.getVerticesToConfirm()); + + CompletedCheckpointStore completedCheckpoints = checkpointRecoveryFactory.createCheckpointStore( + jobGraph.getJobID(), userCodeLoader); + + CheckpointIDCounter checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter( + jobGraph.getJobID()); + + // Checkpoint stats tracker + boolean isStatsDisabled = configuration.getBoolean( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE); + + final CheckpointStatsTracker checkpointStatsTracker; + if (isStatsDisabled) { + checkpointStatsTracker = new DisabledCheckpointStatsTracker(); + } else { + int historySize = configuration.getInteger( + ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE, + ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); + checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics); + } + + executionGraph.enableSnapshotCheckpointing( + snapshotSettings.getCheckpointInterval(), + snapshotSettings.getCheckpointTimeout(), + snapshotSettings.getMinPauseBetweenCheckpoints(), + snapshotSettings.getMaxConcurrentCheckpoints(), + triggerVertices, + ackVertices, + confirmVertices, + checkpointIdCounter, + completedCheckpoints, + savepointStore, + checkpointStatsTracker); + } + + // TODO: register this class to execution graph as job status change listeners + + // TODO: register client as job / execution status change listeners if they are interested + + /* + TODO: decide whether we should take the savepoint before recovery + + if (isRecovery) { + // this is a recovery of a master failure (this master takes over) + executionGraph.restoreLatestCheckpointedState(); + } else { + if (snapshotSettings != null) { + String savepointPath = snapshotSettings.getSavepointPath(); + if (savepointPath != null) { + // got a savepoint + log.info("Starting job from savepoint {}.", savepointPath); + + // load the savepoint as a checkpoint into the system + final CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint( + jobGraph.getJobID(), executionGraph.getAllVertices(), savepointStore, savepointPath); + executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint); + + // Reset the checkpoint ID counter + long nextCheckpointId = savepoint.getCheckpointID() + 1; + log.info("Reset the checkpoint ID to " + nextCheckpointId); + executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId); - leaderSessionID = null; + executionGraph.restoreLatestCheckpointedState(); + } + } } - }); - } + */ - /** - * Handles error occurring in the leader election service - * - * @param exception Exception thrown in the leader election service - */ - public void onJobMasterElectionError(final Exception exception) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("Received an error from the LeaderElectionService.", exception); + } catch (Throwable t) { + log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); - // TODO:: cancel the job's execution and shutdown the JM - cancelAndClearEverything(exception); + executionGraph.fail(t); + executionGraph = null; - leaderSessionID = null; + final Throwable rt; + if (t instanceof JobExecutionException) { + rt = (JobExecutionException) t; + } else { + rt = new JobExecutionException(jobGraph.getJobID(), + "Failed to start job " + jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t); } - }); + // TODO: notify client about this failure + + jobCompletionActions.jobFailed(rt); + return; + } + + // start scheduling job in another thread + executionContext.execute(new Runnable() { + @Override + public void run() { + if (executionGraph != null) { + try { + executionGraph.scheduleForExecution(scheduler); + } catch (Throwable t) { + executionGraph.fail(t); + } + } + } + }); } - //---------------------------------------------------------------------------------------------- - // RPC methods - //---------------------------------------------------------------------------------------------- + /** + * Suspending job, all the running tasks will be cancelled, and runtime data will be cleared. + * + * @param cause The reason of why this job been suspended. + */ + @RpcMethod + public void suspendJob(final Throwable cause) { + if (executionGraph != null) { + executionGraph.suspend(cause); + executionGraph = null; + } + } /** * Updates the task execution state for a given task. @@ -208,37 +472,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { //---------------------------------------------------------------------------------------------- /** - * Cancel the current job and notify all listeners the job's cancellation. + * Converts JobVertexIDs to corresponding ExecutionJobVertexes * - * @param cause Cause for the cancelling. + * @param executionGraph The execution graph that holds the relationship + * @param vertexIDs The vertexIDs need to be converted + * @return The corresponding ExecutionJobVertexes + * @throws JobExecutionException */ - private void cancelAndClearEverything(Throwable cause) { - // currently, nothing to do here - } - - // ------------------------------------------------------------------------ - // Utility classes - // ------------------------------------------------------------------------ - private class JobMasterLeaderContender implements LeaderContender { - - @Override - public void grantLeadership(UUID leaderSessionID) { - JobMaster.this.grantJobMasterLeadership(leaderSessionID); - } - - @Override - public void revokeLeadership() { - JobMaster.this.revokeJobMasterLeadership(); - } - - @Override - public String getAddress() { - return JobMaster.this.getAddress(); - } - - @Override - public void handleError(Exception exception) { - onJobMasterElectionError(exception); + private static List<ExecutionJobVertex> getExecutionJobVertexWithId( + final ExecutionGraph executionGraph, final List<JobVertexID> vertexIDs) + throws JobExecutionException + { + final List<ExecutionJobVertex> ret = new ArrayList<>(vertexIDs.size()); + for (JobVertexID vertexID : vertexIDs) { + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(vertexID); + if (executionJobVertex == null) { + throw new JobExecutionException(executionGraph.getJobID(), + "The snapshot checkpointing settings refer to non-existent vertex " + vertexID); + } + ret.add(executionJobVertex); } + return ret; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 86bf17c..b281ea8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -29,6 +29,19 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; public interface JobMasterGateway extends RpcGateway { /** + * Making this job begins to run. + */ + void startJob(); + + /** + * Suspending job, all the running tasks will be cancelled, and runtime status will be cleared. Should re-submit + * the job before restarting it. + * + * @param cause The reason of why this job been suspended. + */ + void suspendJob(final Throwable cause); + + /** * Updates the task execution state for a given task. * * @param taskExecutionState New task execution state for a given task http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java new file mode 100644 index 0000000..792bfd5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * The dispatcher that runs in the mini cluster, waits for jobs, and starts job masters + * upon receiving jobs. + */ +public class MiniClusterJobDispatcher { + + private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class); + + // ------------------------------------------------------------------------ + + /** lock to ensure that this dispatcher executes only one job at a time */ + private final Object lock = new Object(); + + /** the configuration with which the mini cluster was started */ + private final Configuration configuration; + + /** the RPC service to use by the job managers */ + private final RpcService rpcService; + + /** services for discovery, leader election, and recovery */ + private final HighAvailabilityServices haServices; + + /** al the services that the JobManager needs, such as BLOB service, factories, etc */ + private final JobManagerServices jobManagerServices; + + /** The number of JobManagers to launch (more than one simulates a high-availability setup) */ + private final int numJobManagers; + + /** The runner for the job and master. non-null if a job is currently running */ + private volatile JobManagerRunner[] runners; + + /** flag marking the dispatcher as hut down */ + private volatile boolean shutdown; + + + /** + * Starts a mini cluster job dispatcher. + * + * <p>The dispatcher kicks off one JobManager per job, a behavior similar to a + * non-highly-available setup. + * + * @param config The configuration of the mini cluster + * @param haServices Access to the discovery, leader election, and recovery services + * + * @throws Exception Thrown, if the services for the JobMaster could not be started. + */ + public MiniClusterJobDispatcher( + Configuration config, + RpcService rpcService, + HighAvailabilityServices haServices) throws Exception { + this(config, rpcService, haServices, 1); + } + + /** + * Starts a mini cluster job dispatcher. + * + * <p>The dispatcher may kick off more than one JobManager per job, thus simulating + * a highly-available setup. + * + * @param config The configuration of the mini cluster + * @param haServices Access to the discovery, leader election, and recovery services + * @param numJobManagers The number of JobMasters to start for each job. + * + * @throws Exception Thrown, if the services for the JobMaster could not be started. + */ + public MiniClusterJobDispatcher( + Configuration config, + RpcService rpcService, + HighAvailabilityServices haServices, + int numJobManagers) throws Exception { + + checkArgument(numJobManagers >= 1); + this.configuration = checkNotNull(config); + this.rpcService = checkNotNull(rpcService); + this.haServices = checkNotNull(haServices); + this.numJobManagers = numJobManagers; + + LOG.info("Creating JobMaster services"); + this.jobManagerServices = JobManagerServices.fromConfiguration(config); + } + + // ------------------------------------------------------------------------ + // life cycle + // ------------------------------------------------------------------------ + + /** + * Shuts down the mini cluster dispatcher. If a job is currently running, that job will be + * terminally failed. + */ + public void shutdown() { + synchronized (lock) { + if (!shutdown) { + shutdown = true; + + LOG.info("Shutting down the dispatcher"); + + // in this shutdown code we copy the references to the stack first, + // to avoid concurrent modification + + JobManagerRunner[] runners = this.runners; + if (runners != null) { + this.runners = null; + + Exception shutdownException = new Exception("The MiniCluster is shutting down"); + for (JobManagerRunner runner : runners) { + runner.shutdown(shutdownException); + } + } + } + } + } + + // ------------------------------------------------------------------------ + // submitting jobs + // ------------------------------------------------------------------------ + + /** + * This method executes a job in detached mode. The method returns immediately after the job + * has been added to the + * + * @param job The Flink job to execute + * + * @throws JobExecutionException Thrown if anything went amiss during initial job launch, + * or if the job terminally failed. + */ + public void runDetached(JobGraph job) throws JobExecutionException { + checkNotNull(job); + + LOG.info("Received job for detached execution {} ({})", job.getName(), job.getJobID()); + + synchronized (lock) { + checkState(!shutdown, "mini cluster is shut down"); + checkState(runners == null, "mini cluster can only execute one job at a time"); + + OnCompletionActions onJobCompletion = new DetachedFinalizer(numJobManagers); + + this.runners = startJobRunners(job, onJobCompletion); + } + } + + /** + * This method runs a job in blocking mode. The method returns only after the job + * completed successfully, or after it failed terminally. + * + * @param job The Flink job to execute + * @return The result of the job execution + * + * @throws JobExecutionException Thrown if anything went amiss during initial job lauch, + * or if the job terminally failed. + */ + public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { + checkNotNull(job); + + LOG.info("Received job for blocking execution {} ({})", job.getName(), job.getJobID()); + final BlockingJobSync onJobCompletion = new BlockingJobSync(job.getJobID(), numJobManagers); + + synchronized (lock) { + checkState(!shutdown, "mini cluster is shut down"); + checkState(runners == null, "mini cluster can only execute one job at a time"); + + this.runners = startJobRunners(job, onJobCompletion); + } + + try { + return onJobCompletion.getResult(); + } + finally { + // always clear the status for the next job + runners = null; + } + } + + private JobManagerRunner[] startJobRunners(JobGraph job, OnCompletionActions onCompletion) throws JobExecutionException { + LOG.info("Starting {} JobMaster(s) for job {} ({})", numJobManagers, job.getName(), job.getJobID()); + + JobManagerRunner[] runners = new JobManagerRunner[numJobManagers]; + for (int i = 0; i < numJobManagers; i++) { + try { + runners[i] = new JobManagerRunner(job, configuration, + rpcService, haServices, jobManagerServices, onCompletion); + runners[i].start(); + } + catch (Throwable t) { + // shut down all the ones so far + Exception shutdownCause = new Exception("Failed to properly start all mini cluster JobManagers", t); + + for (int k = 0; k <= i; k++) { + try { + if (runners[i] != null) { + runners[i].shutdown(shutdownCause); + } + } catch (Throwable ignored) { + // silent shutdown + } + } + + throw new JobExecutionException(job.getJobID(), "Could not start the JobManager(s) for the job", t); + } + } + + return runners; + } + + // ------------------------------------------------------------------------ + // test methods to simulate job master failures + // ------------------------------------------------------------------------ + + public void killJobMaster(int which) { + checkArgument(which >= 0 && which < numJobManagers, "no such job master"); + checkState(!shutdown, "mini cluster is shut down"); + + JobManagerRunner[] runners = this.runners; + checkState(runners != null, "mini cluster it not executing a job right now"); + + runners[which].shutdown(new Throwable("kill JobManager")); + } + + // ------------------------------------------------------------------------ + // utility classes + // ------------------------------------------------------------------------ + + /** + * Simple class that waits for all runners to have reported that they are done. + * In the case of a high-availability test setup, there may be multiple runners. + * After that, it marks the mini cluster as ready to receive new jobs. + */ + private class DetachedFinalizer implements OnCompletionActions { + + private final AtomicInteger numJobManagersToWaitFor; + + private DetachedFinalizer(int numJobManagersToWaitFor) { + this.numJobManagersToWaitFor = new AtomicInteger(numJobManagersToWaitFor); + } + + @Override + public void jobFinished(JobExecutionResult result) { + decrementCheckAndCleanup(); + } + + @Override + public void jobFailed(Throwable cause) { + decrementCheckAndCleanup(); + } + + @Override + public void jobFinishedByOther() { + decrementCheckAndCleanup(); + } + + @Override + public void onFatalError(Throwable exception) { + decrementCheckAndCleanup(); + } + + private void decrementCheckAndCleanup() { + if (numJobManagersToWaitFor.decrementAndGet() == 0) { + MiniClusterJobDispatcher.this.runners = null; + } + } + } + + // ------------------------------------------------------------------------ + + /** + * This class is used to sync on blocking jobs across multiple runners. + * Only after all runners reported back that they are finished, the + * result will be released. + * + * That way it is guaranteed that after the blocking job submit call returns, + * the dispatcher is immediately free to accept another job. + */ + private static class BlockingJobSync implements OnCompletionActions { + + private final JobID jobId; + + private final CountDownLatch jobMastersToWaitFor; + + private volatile Throwable jobException; + + private volatile Throwable runnerException; + + private volatile JobExecutionResult result; + + BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { + this.jobId = jobId; + this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); + } + + @Override + public void jobFinished(JobExecutionResult jobResult) { + this.result = jobResult; + jobMastersToWaitFor.countDown(); + } + + @Override + public void jobFailed(Throwable cause) { + jobException = cause; + jobMastersToWaitFor.countDown(); + } + + @Override + public void jobFinishedByOther() { + this.jobMastersToWaitFor.countDown(); + } + + @Override + public void onFatalError(Throwable exception) { + if (runnerException == null) { + runnerException = exception; + } + } + + public JobExecutionResult getResult() throws JobExecutionException, InterruptedException { + jobMastersToWaitFor.await(); + + final Throwable jobFailureCause = this.jobException; + final Throwable runnerException = this.runnerException; + final JobExecutionResult result = this.result; + + // (1) we check if teh job terminated with an exception + // (2) we check whether the job completed successfully + // (3) we check if we have exceptions from the JobManagers. the job may still have + // completed successfully in that case, if multiple JobMasters were running + // and other took over. only if all encounter a fatal error, the job cannot finish + + if (jobFailureCause != null) { + if (jobFailureCause instanceof JobExecutionException) { + throw (JobExecutionException) jobFailureCause; + } + else { + throw new JobExecutionException(jobId, "The job execution failed", jobFailureCause); + } + } + else if (result != null) { + return result; + } + else if (runnerException != null) { + throw new JobExecutionException(jobId, + "The job execution failed because all JobManagers encountered fatal errors", runnerException); + } + else { + throw new IllegalStateException("Bug: Job finished with neither error nor result."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java new file mode 100644 index 0000000..7721117 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +public interface FatalErrorHandler { + + void onFatalError(Throwable exception); +} http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index cf709c8..9e3c3b9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor; import akka.actor.ActorSystem; import com.typesafe.config.Config; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; @@ -28,6 +29,7 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; @@ -340,6 +342,16 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { return null; } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + return null; + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + return null; + } }; // start all the TaskManager services (network stack, library cache, ...) http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 2ac43be..1a5450d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -19,10 +19,13 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; -import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import java.util.concurrent.ConcurrentHashMap; + /** * A variant of the HighAvailabilityServices for testing. Each individual service can be set * to an arbitrary implementation, such as a mock or default service. @@ -37,6 +40,9 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderElectionService resourceManagerLeaderElectionService; + private volatile CheckpointRecoveryFactory checkpointRecoveryFactory; + + private volatile SubmittedJobGraphStore submittedJobGraphStore; // ------------------------------------------------------------------------ // Setters for mock / testing implementations @@ -58,6 +64,14 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices this.resourceManagerLeaderElectionService = leaderElectionService; } + public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) { + this.checkpointRecoveryFactory = checkpointRecoveryFactory; + } + + public void setSubmittedJobGraphStore(SubmittedJobGraphStore submittedJobGraphStore) { + this.submittedJobGraphStore = submittedJobGraphStore; + } + // ------------------------------------------------------------------------ // HA Services Methods // ------------------------------------------------------------------------ @@ -103,4 +117,27 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices throw new IllegalStateException("ResourceManagerLeaderElectionService has not been set"); } } + + @Override + public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { + CheckpointRecoveryFactory factory = checkpointRecoveryFactory; + + if (factory != null) { + return factory; + } else { + throw new IllegalStateException("CheckpointRecoveryFactory has not been set"); + } + } + + @Override + public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { + SubmittedJobGraphStore store = submittedJobGraphStore; + + if (store != null) { + return store; + } else { + throw new IllegalStateException("SubmittedJobGraphStore has not been set"); + + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java new file mode 100644 index 0000000..dc3b5fd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.rpc.RpcService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(JobManagerRunner.class) +public class JobManagerRunnerMockTest { + + private JobManagerRunner runner; + + private JobMaster jobManager; + + private JobMasterGateway jobManagerGateway; + + private LeaderElectionService leaderElectionService; + + private TestingOnCompletionActions jobCompletion; + + @Before + public void setUp() throws Exception { + jobManager = mock(JobMaster.class); + jobManagerGateway = mock(JobMasterGateway.class); + when(jobManager.getSelf()).thenReturn(jobManagerGateway); + + PowerMockito.whenNew(JobMaster.class).withAnyArguments().thenReturn(jobManager); + + jobCompletion = new TestingOnCompletionActions(); + + leaderElectionService = mock(LeaderElectionService.class); + when(leaderElectionService.hasLeadership()).thenReturn(true); + + HighAvailabilityServices haServices = mock(HighAvailabilityServices.class); + when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService); + + runner = PowerMockito.spy(new JobManagerRunner( + new JobGraph("test"), + mock(Configuration.class), + mock(RpcService.class), + haServices, + mock(JobManagerServices.class), + jobCompletion)); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testStartAndShutdown() throws Exception { + runner.start(); + verify(jobManager).init(); + verify(jobManager).start(); + verify(leaderElectionService).start(runner); + + assertTrue(!jobCompletion.isJobFinished()); + assertTrue(!jobCompletion.isJobFailed()); + + runner.shutdown(); + verify(leaderElectionService).stop(); + verify(jobManager).shutDown(); + } + + @Test + public void testShutdownBeforeGrantLeadership() throws Exception { + runner.start(); + verify(jobManager).init(); + verify(jobManager).start(); + verify(leaderElectionService).start(runner); + + runner.shutdown(); + verify(leaderElectionService).stop(); + verify(jobManager).shutDown(); + + assertTrue(!jobCompletion.isJobFinished()); + assertTrue(!jobCompletion.isJobFailed()); + + runner.grantLeadership(UUID.randomUUID()); + assertTrue(!jobCompletion.isJobFinished()); + assertTrue(!jobCompletion.isJobFailed()); + + } + + @Test + public void testJobFinishedByOtherBeforeGrantLeadership() throws Exception { + runner.start(); + + when(runner.isJobFinishedByOthers()).thenReturn(true); + runner.grantLeadership(UUID.randomUUID()); + + // runner should shutdown automatic and informed the job completion + verify(leaderElectionService).stop(); + verify(jobManager).shutDown(); + + assertTrue(jobCompletion.isJobFinished()); + assertTrue(jobCompletion.isJobFinishedByOther()); + } + + @Test + public void testJobFinished() throws Exception { + runner.start(); + + runner.grantLeadership(UUID.randomUUID()); + verify(jobManagerGateway).startJob(); + assertTrue(!jobCompletion.isJobFinished()); + + // runner been told by JobManager that job is finished + runner.jobFinished(mock(JobExecutionResult.class)); + + assertTrue(jobCompletion.isJobFinished()); + assertFalse(jobCompletion.isJobFinishedByOther()); + verify(leaderElectionService).stop(); + verify(jobManager).shutDown(); + assertTrue(runner.isShutdown()); + } + + @Test + public void testJobFailed() throws Exception { + runner.start(); + + runner.grantLeadership(UUID.randomUUID()); + verify(jobManagerGateway).startJob(); + assertTrue(!jobCompletion.isJobFinished()); + + // runner been told by JobManager that job is failed + runner.jobFailed(new Exception("failed manually")); + + assertTrue(jobCompletion.isJobFailed()); + verify(leaderElectionService).stop(); + verify(jobManager).shutDown(); + assertTrue(runner.isShutdown()); + } + + @Test + public void testLeadershipRevoked() throws Exception { + runner.start(); + + runner.grantLeadership(UUID.randomUUID()); + verify(jobManagerGateway).startJob(); + assertTrue(!jobCompletion.isJobFinished()); + + runner.revokeLeadership(); + verify(jobManagerGateway).suspendJob(any(Throwable.class)); + assertFalse(runner.isShutdown()); + } + + @Test + public void testRegainLeadership() throws Exception { + runner.start(); + + runner.grantLeadership(UUID.randomUUID()); + verify(jobManagerGateway).startJob(); + assertTrue(!jobCompletion.isJobFinished()); + + runner.revokeLeadership(); + verify(jobManagerGateway).suspendJob(any(Throwable.class)); + assertFalse(runner.isShutdown()); + + runner.grantLeadership(UUID.randomUUID()); + verify(jobManagerGateway, times(2)).startJob(); + } + + private static class TestingOnCompletionActions implements OnCompletionActions { + + private volatile JobExecutionResult result; + + private volatile Throwable failedCause; + + private volatile boolean finishedByOther; + + @Override + public void jobFinished(JobExecutionResult result) { + checkArgument(!isJobFinished(), "job finished already"); + checkArgument(!isJobFailed(), "job failed already"); + + this.result = result; + } + + @Override + public void jobFailed(Throwable cause) { + checkArgument(!isJobFinished(), "job finished already"); + checkArgument(!isJobFailed(), "job failed already"); + + this.failedCause = cause; + } + + @Override + public void jobFinishedByOther() { + checkArgument(!isJobFinished(), "job finished already"); + checkArgument(!isJobFailed(), "job failed already"); + + this.finishedByOther = true; + } + + @Override + public void onFatalError(Throwable exception) { + jobFailed(exception); + } + + boolean isJobFinished() { + return result != null || finishedByOther; + } + + boolean isJobFinishedByOther() { + return finishedByOther; + } + + boolean isJobFailed() { + return failedCause != null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3cda5933/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java index 6363662..e05c8d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java @@ -19,23 +19,21 @@ package org.apache.flink.runtime.rpc; import akka.actor.ActorSystem; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; -import org.junit.AfterClass; import org.junit.Test; import scala.Option; import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,19 +55,20 @@ public class RpcConnectionTest { // we start the RPC service with a very long timeout to ensure that the test // can only pass if the connection problem is not recognized merely via a timeout - rpcService = new AkkaRpcService(actorSystem, new Timeout(10000000, TimeUnit.SECONDS)); + rpcService = new AkkaRpcService(actorSystem, Time.of(10000000, TimeUnit.SECONDS)); Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class); - Await.result(future, new FiniteDuration(10000000, TimeUnit.SECONDS)); + future.get(10000000, TimeUnit.SECONDS); fail("should never complete normally"); } catch (TimeoutException e) { fail("should not fail with a generic timeout exception"); } - catch (RpcConnectionException e) { + catch (ExecutionException e) { // that is what we want - assertTrue("wrong error message", e.getMessage().contains("foo.bar.com.test.invalid")); + assertTrue(e.getCause() instanceof RpcConnectionException); + assertTrue("wrong error message", e.getCause().getMessage().contains("foo.bar.com.test.invalid")); } catch (Throwable t) { fail("wrong exception: " + t);