Repository: flink Updated Branches: refs/heads/master c11144470 -> 1230bcaa0
[runtime] Improve error handling when submitting a job to the JobManager Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ddb5653 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ddb5653 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ddb5653 Branch: refs/heads/master Commit: 9ddb565314a4efb744167eccbe6de0d3299ac4d0 Parents: 9528a52 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 23 18:36:07 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Feb 24 00:08:27 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/program/Client.java | 15 +- .../org/apache/flink/util/ExceptionUtils.java | 26 ++- .../apache/flink/runtime/blob/BlobCache.java | 6 +- .../runtime/client/JobExecutionException.java | 5 +- .../librarycache/BlobLibraryCacheManager.java | 62 ++++-- .../flink/runtime/executiongraph/Execution.java | 2 +- .../apache/flink/runtime/jobgraph/JobGraph.java | 16 ++ .../apache/flink/runtime/client/JobClient.scala | 4 +- .../flink/runtime/jobmanager/JobInfo.scala | 1 - .../flink/runtime/jobmanager/JobManager.scala | 207 ++++++++++--------- .../runtime/messages/JobManagerMessages.scala | 27 +-- .../jobmanager/JobManagerStartupTest.java | 1 - .../flink/runtime/jobmanager/JobSubmitTest.java | 157 ++++++++++++++ .../runtime/jobmanager/JobManagerITCase.scala | 18 +- 14 files changed, 392 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index bd364ac..5a032a0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -329,8 +329,9 @@ public class Client { try { JobClient.uploadJarFiles(jobGraph, hostname, client, timeout); - } catch (IOException e) { - throw new ProgramInvocationException("Could not upload the programs JAR files to the JobManager.", e); + } + catch (IOException e) { + throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e); } try{ @@ -340,10 +341,12 @@ public class Client { else { JobClient.submitJobDetached(jobGraph, client, timeout); } - } catch (JobExecutionException e) { - throw new ProgramInvocationException("The program execution failed.", e); - } catch (Exception e) { - throw new ProgramInvocationException("Unexpected exception while program execution.", e); + } + catch (JobExecutionException e) { + throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); + } + catch (Exception e) { + throw new ProgramInvocationException("Exception during program execution.", e); } finally { actorSystem.shutdown(); http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 99a098e..9784844 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -24,6 +24,7 @@ package org.apache.flink.util; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; @@ -57,7 +58,7 @@ public class ExceptionUtils { /** * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to - * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions + * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions * are packed into runtime exceptions * * @param t The throwable to be thrown. @@ -76,8 +77,8 @@ public class ExceptionUtils { /** * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to - * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown directly, other exceptions - * are packed into a parent RuntimeEception. + * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions + * are packed into a parent RuntimeException. * * @param t The throwable to be thrown. * @param parentMessage The message for the parent RuntimeException, if one is needed. @@ -93,4 +94,23 @@ public class ExceptionUtils { throw new RuntimeException(parentMessage, t); } } + + /** + * Tries to throw the given {@code Throwable} in scenarios where the signatures allows only IOExceptions + * (and RuntimeException and Error). Throws this exception directly, if it is an IOException, + * a RuntimeException, or an Error. Otherwise does nothing. + * + * @param t The throwable to be thrown. + */ + public static void tryRethrowIOException(Throwable t) throws IOException { + if (t instanceof IOException) { + throw (IOException) t; + } + else if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + else if (t instanceof Error) { + throw (Error) t; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 0d1b29c..33bb7f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -189,10 +189,8 @@ public final class BlobCache implements BlobService { public void delete(BlobKey key) throws IOException{ final File localFile = BlobUtils.getStorageLocation(storageDir, key); - if (localFile.exists()) { - if (!localFile.delete()) { - LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath()); - } + if (localFile.exists() && !localFile.delete()) { + LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java index 1a56a93..99c3f89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java @@ -21,8 +21,9 @@ package org.apache.flink.runtime.client; import org.apache.flink.runtime.jobgraph.JobID; /** - * This exception is thrown by the {@link JobClient} if a job has been aborted as a result of an - * error which occurred during the execution. + * This exception is the base exception for all exceptions that denote any failure during + * teh execution of a job. The JobExecutionException and its subclasses are thrown by + * the {@link JobClient}. */ public class JobExecutionException extends Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java index c9f96ce..4dba50a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobService; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC private static Logger LOG = LoggerFactory.getLogger(BlobLibraryCacheManager.class); - private static ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID(); + private static ExecutionAttemptID JOB_ATTEMPT_ID = new ExecutionAttemptID(-1, -1); // -------------------------------------------------------------------------------------------- @@ -71,7 +72,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC // -------------------------------------------------------------------------------------------- - public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval){ + public BlobLibraryCacheManager(BlobService blobService, long cleanupInterval) { this.blobService = blobService; // Initializing the clean up task @@ -99,11 +100,31 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC LibraryCacheEntry entry = cacheEntries.get(jobId); if (entry == null) { - URL[] urls = new URL[requiredJarFiles.size()]; + // create a new entry in the library cache + BlobKey[] keys = requiredJarFiles.toArray(new BlobKey[requiredJarFiles.size()]); + URL[] urls = new URL[keys.length]; int count = 0; - for (BlobKey blobKey : requiredJarFiles) { - urls[count++] = registerReferenceToBlobKeyAndGetURL(blobKey); + try { + for (; count < keys.length; count++) { + BlobKey blobKey = keys[count]; + urls[count] = registerReferenceToBlobKeyAndGetURL(blobKey); + } + } + catch (Throwable t) { + // undo the reference count increases + try { + for (int i = 0; i < count; i++) { + unregisterReferenceToBlobKey(keys[i]); + } + } + catch (Throwable tt) { + LOG.error("Error while updating library reference counters.", tt); + } + + // rethrow or wrap + ExceptionUtils.tryRethrowIOException(t); + throw new IOException("Library cache could not register the user code libraries.", t); } URLClassLoader classLoader = new URLClassLoader(urls); @@ -205,19 +226,24 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC } int getNumberOfCachedLibraries() { - synchronized (lockObject) { - return blobKeyReferenceCounters.size(); - } + return blobKeyReferenceCounters.size(); } private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException { - - Integer references = blobKeyReferenceCounters.get(key); - int newReferences = references == null ? 1 : references.intValue() + 1; - - blobKeyReferenceCounters.put(key, newReferences); + // it is important that we fetch the URL before increasing the counter. + // in case the URL cannot be created (failed to fetch the BLOB), we have no stale counter + try { + URL url = blobService.getURL(key); + + Integer references = blobKeyReferenceCounters.get(key); + int newReferences = references == null ? 1 : references.intValue() + 1; + blobKeyReferenceCounters.put(key, newReferences); - return blobService.getURL(key); + return url; + } + catch (IOException e) { + throw new IOException("Cannot access jar file stored under " + key, e); + } } private void unregisterReferenceToBlobKey(BlobKey key) { @@ -226,8 +252,14 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC int newReferences = Math.max(references.intValue() - 1, 0); blobKeyReferenceCounters.put(key, newReferences); } + else { + // make sure we have an entry in any case, that the cleanup timer removes any + // present libraries + blobKeyReferenceCounters.put(key, 0); + } } - + + // -------------------------------------------------------------------------------------------- private static class LibraryCacheEntry { http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 56f9416..aa3c3e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -272,7 +272,7 @@ public class Execution implements Serializable { throw new NullPointerException(); } if (!slot.isAlive()) { - throw new JobException("Traget slot for deployment is not alive."); + throw new JobException("Target slot for deployment is not alive."); } // make sure exactly one deployment call happens from the correct state http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 58f4e3a..0cf2f5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -364,6 +364,22 @@ public class JobGraph implements Serializable { } /** + * Adds the BLOB referenced by the key to the JobGraph's dependencies. + * + * @param key + * path of the JAR file required to run the job on a task manager + */ + public void addBlob(BlobKey key) { + if (key == null) { + throw new IllegalArgumentException(); + } + + if (!userJarBlobKeys.contains(key)) { + userJarBlobKeys.add(key); + } + } + + /** * Checks whether the JobGraph has user code JAR files attached. * * @return True, if the JobGraph has user code JAR files attached, false otherwise. http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index d3ceeaf..19b3050 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -47,14 +47,14 @@ Actor with ActorLogMessages with ActorLogging { override def receiveWithLogMessages: Receive = { case SubmitJobDetached(jobGraph) => - jobManager forward SubmitJob(jobGraph, registerForEvents = false, detached = true) + jobManager forward SubmitJob(jobGraph, registerForEvents = false) case cancelJob: CancelJob => jobManager forward cancelJob case SubmitJobAndWait(jobGraph, listen) => val listener = context.actorOf(Props(classOf[JobClientListener], sender)) - jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detached = false), listener) + jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen), listener) case RequestBlobManagerPort => jobManager forward RequestBlobManagerPort http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala index 128454a..4b7446c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala @@ -31,7 +31,6 @@ import akka.actor.ActorRef */ class JobInfo(val client: ActorRef, val start: Long){ var end: Long = -1 - var detached: Boolean = false def duration: Long = { if(end != -1){ http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index ad5c9e8..ce3bc74 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -25,8 +25,7 @@ import akka.actor.Status.{Success, Failure} import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer -import org.apache.flink.runtime.client.{JobSubmissionException, JobExecutionException, -JobCancellationException} +import org.apache.flink.runtime.client.{JobSubmissionException, JobExecutionException, JobCancellationException} import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph} import org.apache.flink.runtime.jobmanager.web.WebInfoServer import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph @@ -36,7 +35,7 @@ import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.EnvironmentInformation -import org.apache.flink.runtime.{JobException, ActorLogMessages} +import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager @@ -139,6 +138,7 @@ class JobManager(val configuration: Configuration, } override def receiveWithLogMessages: Receive = { + case RegisterTaskManager(connectionInfo, hardwareInformation, numberOfSlots) => val taskManager = sender @@ -170,8 +170,8 @@ class JobManager(val configuration: Configuration, case RequestTotalNumberOfSlots => sender ! instanceManager.getTotalNumberOfSlots - case SubmitJob(jobGraph, listen, d) => - submitJob(jobGraph, listenToEvents = listen, detached = d) + case SubmitJob(jobGraph, listen) => + submitJob(jobGraph, listenToEvents = listen) case CancelJob(jobID) => log.info("Trying to cancel job with ID {}.", jobID) @@ -274,7 +274,7 @@ class JobManager(val configuration: Configuration, // is the client waiting for the job result? newJobStatus match { - case JobStatus.FINISHED if !jobInfo.detached => + case JobStatus.FINISHED => val accumulatorResults = accumulatorManager.getJobAccumulatorResults(jobID) jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, accumulatorResults) case JobStatus.CANCELED => @@ -397,116 +397,133 @@ class JobManager(val configuration: Configuration, * @param jobGraph representing the Flink job * @param listenToEvents true if the sender wants to listen to job status and execution state * change notifications. false if not. - * @param detached true if the job runs in detached mode, meaning that the sender does not wait - * for the result of the job. false otherwise. */ - private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean, detached: Boolean): Unit = { - try { - if (jobGraph == null) { - sender ! Failure(new JobSubmissionException(null, "JobGraph must not be null.")) - } else { - log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).") + private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = { + if (jobGraph == null) { + sender ! Failure(new JobSubmissionException(null, "JobGraph must not be null.")) + } + else { + val jobId = jobGraph.getJobID + val jobName = jobGraph.getName + var executionGraph: ExecutionGraph = null - if (jobGraph.getNumberOfVertices == 0) { - sender ! Failure(new JobSubmissionException(jobGraph.getJobID ,"Job is empty.")) - } else { - // Create the user code class loader - libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) + log.info(s"Received job ${jobId} (${jobName}).") - val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) + try { + // Important: We need to make sure that the library registration is the first action, + // because this makes sure that the uploaded jar files are removed in case of + // unsuccessful + try { + libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys) + } + catch { + case t: Throwable => + throw new JobSubmissionException(jobId, + "Cannot set up the user code libraries: " + t.getMessage, t) + } - // see if there already exists an ExecutionGraph for the corresponding job ID - val (executionGraph, jobInfo) = currentJobs.getOrElseUpdate(jobGraph.getJobID, - (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, - jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), - JobInfo(sender, System.currentTimeMillis()))) + val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) + if (userCodeLoader == null) { + throw new JobSubmissionException(jobId, "The user code class loader could not be initialized.") + } - val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) { - jobGraph.getNumberOfExecutionRetries - } else { - defaultExecutionRetries - } + if (jobGraph.getNumberOfVertices == 0) { + throw new JobSubmissionException(jobId, "The given job is empty") + } - executionGraph.setNumberOfRetriesLeft(jobNumberRetries) - executionGraph.setDelayBeforeRetrying(delayBetweenRetries) + // see if there already exists an ExecutionGraph for the corresponding job ID + executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID, + (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, + jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), + JobInfo(sender, System.currentTimeMillis())))._1 - if (userCodeLoader == null) { - throw new JobException("The user code class loader could not be initialized.") - } + // configure the execution graph + val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) { + jobGraph.getNumberOfExecutionRetries + } else { + defaultExecutionRetries + } + executionGraph.setNumberOfRetriesLeft(jobNumberRetries) + executionGraph.setDelayBeforeRetrying(delayBetweenRetries) + executionGraph.setScheduleMode(jobGraph.getScheduleMode) + executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) + + // initialize the vertices that have a master initialization hook + // file output formats create directories here, input formats create splits + if (log.isDebugEnabled) { + log.debug(s"Running initialization on master for job ${jobId} (${jobName}).") + } - if (log.isDebugEnabled) { - log.debug(s"Running master initialization of job ${jobGraph.getJobID} " + - s"(${jobGraph.getName}).") + for (vertex <- jobGraph.getVertices.asScala) { + val executableClass = vertex.getInvokableClassName + if (executableClass == null || executableClass.length == 0) { + throw new JobSubmissionException(jobId, + s"The vertex ${vertex.getID} (${vertex.getName}) has no invokable class.") } - - for (vertex <- jobGraph.getVertices.asScala) { - val executableClass = vertex.getInvokableClassName - if (executableClass == null || executableClass.length == 0) { - throw new JobException(s"The vertex ${vertex.getID} (${vertex.getName}) has no " + - s"invokable class.") - } - + try { vertex.initializeOnMaster(userCodeLoader) } - - // topological sorting of the job vertices - val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources - - if (log.isDebugEnabled) { - log.debug(s"Adding ${sortedTopology.size()} vertices from job graph " + - s"${jobGraph.getJobID} (${jobGraph.getName}).") + catch { + case t: Throwable => throw new JobExecutionException(jobId, + "Cannot initialize task '" + vertex.getName + "': " + t.getMessage, t) } + } - executionGraph.attachJobGraph(sortedTopology) + // topologically sort the job vertices and attach the graph to the existing one + val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources() + if (log.isDebugEnabled) { + log.debug(s"Adding ${sortedTopology.size()} vertices from " + + s"job graph ${jobId} (${jobName}).") + } + executionGraph.attachJobGraph(sortedTopology) - if (log.isDebugEnabled) { - log.debug(s"Successfully created execution graph from job graph " + - s"${jobGraph.getJobID} (${jobGraph.getName}).") - } + if (log.isDebugEnabled) { + log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).") + } - executionGraph.setScheduleMode(jobGraph.getScheduleMode) - executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling) + // get notified about job status changes + executionGraph.registerJobStatusListener(self) - // get notified about job status changes - executionGraph.registerJobStatusListener(self) + if (listenToEvents) { + // the sender wants to be notified about state changes + executionGraph.registerExecutionListener(sender) + executionGraph.registerJobStatusListener(sender) + } - if (listenToEvents) { - // the sender wants to be notified about state changes - executionGraph.registerExecutionListener(sender) - executionGraph.registerJobStatusListener(sender) - } + // done with submitting the job + sender ! Success(jobGraph.getJobID) + } + catch { + case t: Throwable => + log.error(s"Failed to submit job ${jobId} (${jobName})", t) - jobInfo.detached = detached + libraryCacheManager.unregisterJob(jobId) + currentJobs.remove(jobId) - log.info(s"Scheduling job ${jobGraph.getName}.") + if (executionGraph != null) { + executionGraph.fail(t) + } - executionGraph.scheduleForExecution(scheduler) + val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) { + t + } else { + new JobExecutionException(jobId, s"Failed to submit job ${jobId} (${jobName})", t) + } - sender ! Success(jobGraph.getJobID) - } + sender ! Failure(rt) + return } - } catch { - case t: Throwable => - log.error(t, "Job submission of job {} failed.", jobGraph.getJobID) - - currentJobs.get(jobGraph.getJobID) match { - case Some((executionGraph, jobInfo)) => - /* - * Register self to be notified about job status changes in case that it did not happen - * before. That way the proper cleanup of the job is triggered in the JobStatusChanged - * handler. - */ - if (!executionGraph.containsJobStatusListener(self)) { - executionGraph.registerJobStatusListener(self) - } - // let the execution graph fail, which will send a failure to the job client - executionGraph.fail(t) - case None => - libraryCacheManager.unregisterJob(jobGraph.getJobID) - currentJobs.remove(jobGraph.getJobID) - sender ! Failure(t) - } + // NOTE: Scheduling the job for execution is a separate action from the job submission. + // The success of submitting the job must be independent from the success of scheduling + // the job. + try { + log.info(s"Scheduling job ${executionGraph.getJobName}.") + executionGraph.scheduleForExecution(scheduler) + } + catch { + case t: Throwable => executionGraph.fail(t); + } } } @@ -813,8 +830,8 @@ object JobManager { * Starts the JobManager and job archiver based on the given configuration, in the * given actor system. * - * @param configuration - * @param actorSystem + * @param configuration The configuration for the JobManager + * @param actorSystem Teh actor system running the JobManager * @return A tuple of references (JobManager Ref, Archiver Ref) */ def startJobManagerActors(configuration: Configuration, http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 8aef552..28c5bea 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -25,23 +25,19 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobID, JobStatus, JobVertexI import org.apache.flink.runtime.taskmanager.TaskExecutionState /** - * The job manager specific messages + * The job manager specific actor messages */ object JobManagerMessages { + /** * Submits a job to the job manager. If [[registerForEvents]] is true, - * then the sender will be registered as listener for the state change messages. If [[detached]] - * is set to true, then the sender will detach from the job execution. Consequently, - * he will not receive the job execution result [[JobResult]]. The submission result will be sent - * back to the - * sender as a [[SubmissionResponse]] message. + * then the sender will be registered as listener for the state change messages. + * The submission result will be sent back to the sender as a success message. * * @param jobGraph * @param registerForEvents if true, then register for state change events - * @param detached if true, then detach from the job execution */ - case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false, - detached: Boolean = false) + case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false) /** * Cancels a job with the given [[jobID]] at the JobManager. The result of the cancellation is @@ -171,23 +167,14 @@ object JobManagerMessages { case object RequestBlobManagerPort /** - * Requests the final job status of the job with [[jobID]]. If the job has not been terminated - * then the result is sent back upon termination of the job. The result is a - * [[JobStatusResponse]] message. - * - * @param jobID - */ - case class RequestFinalJobStatus(jobID: JobID) - - /** * Denotes a successful job execution. * * @param jobID * @param runtime * @param accumulatorResults */ - case class JobResultSuccess(jobID: JobID, runtime: Long, accumulatorResults: java.util.Map[String, - AnyRef]) {} + case class JobResultSuccess(jobID: JobID, runtime: Long, + accumulatorResults: java.util.Map[String, AnyRef]) {} sealed trait CancellationResponse{ def jobID: JobID http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java index 595c00b..c0928b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java @@ -90,5 +90,4 @@ public class JobManagerStartupTest { // expected } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java new file mode 100644 index 0000000..5967bdd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.operators.util.TaskConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JobSubmitTest { + + private static final long TIMEOUT = 50000; + + private static ActorSystem jobManagerSystem; + private static ActorRef jobManager; + + @BeforeClass + public static void setupJobManager() { + Configuration config = new Configuration(); + + scala.Option<Tuple2<String, Object>> listeningAddress = scala.Option.empty(); + jobManagerSystem = AkkaUtils.createActorSystem(config, listeningAddress); + jobManager = JobManager.startJobManagerActors(config, jobManagerSystem)._1(); + } + + @AfterClass + public static void teardownJobmanager() { + if (jobManagerSystem != null) { + jobManagerSystem.shutdown(); + } + } + + @Test + public void testFailureWhenJarBlobsMissing() { + try { + // create a simple job graph + AbstractJobVertex jobVertex = new AbstractJobVertex("Test Vertex"); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + JobGraph jg = new JobGraph("test job", jobVertex); + + // request the blob port from the job manager + Future<Object> future = Patterns.ask(jobManager, JobManagerMessages.getRequestBlobManagerPort(), TIMEOUT); + int blobPort = (Integer) Await.result(future, new FiniteDuration(TIMEOUT, TimeUnit.MILLISECONDS)); + + // upload two dummy bytes and add their keys to the job graph as dependencies + BlobKey key1, key2; + BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort)); + try { + key1 = bc.put(new byte[10]); + key2 = bc.put(new byte[10]); + + // delete one of the blobs to make sure that the startup failed + bc.delete(key2); + } + finally { + bc.close(); + } + + jg.addBlob(key1); + jg.addBlob(key2); + + // submit the job + Future<Object> submitFuture = Patterns.ask(jobManager, + new JobManagerMessages.SubmitJob(jg, false), TIMEOUT); + try { + Await.result(submitFuture, new FiniteDuration(TIMEOUT, TimeUnit.MILLISECONDS)); + } + catch (JobExecutionException e) { + // that is what we expect + assertTrue(e.getCause() instanceof IOException); + } + catch (Exception e) { + fail("Wrong exception type"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testFailureWhenInitializeOnMasterFails() { + try { + // create a simple job graph + + AbstractJobVertex jobVertex = new AbstractJobVertex("Vertex that fails in initializeOnMaster") { + + @Override + public void initializeOnMaster(ClassLoader loader) throws Exception { + throw new RuntimeException("test exception"); + } + }; + + TaskConfig config = new TaskConfig(jobVertex.getConfiguration()); + jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + JobGraph jg = new JobGraph("test job", jobVertex); + + // submit the job + Future<Object> submitFuture = Patterns.ask(jobManager, + new JobManagerMessages.SubmitJob(jg, false), TIMEOUT); + try { + Await.result(submitFuture, new FiniteDuration(TIMEOUT, TimeUnit.MILLISECONDS)); + } + catch (JobExecutionException e) { + // that is what we expect + // test that the exception nesting is not too deep + assertTrue(e.getCause() instanceof RuntimeException); + } + catch (Exception e) { + fail("Wrong exception type"); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 7e2840a..fc67b46 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -32,7 +32,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} -import scheduler.{NoResourceAvailableException, SlotSharingGroup} +import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup} import scala.concurrent.Await import scala.concurrent.duration._ @@ -52,6 +52,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll { } "The JobManager actor" must { + "handle jobs when not enough slots" in { val vertex = new AbstractJobVertex("Test Vertex") vertex.setParallelism(2) @@ -69,12 +70,18 @@ WordSpecLike with Matchers with BeforeAndAfterAll { availableSlots should equal(1) - within(1 second) { + within(2 second) { jm ! SubmitJob(jobGraph) - val failure = expectMsgType[Failure] + val success = expectMsgType[Success] - failure.cause match { + jobGraph.getJobID should equal(success.status) + } + + within(2 second) { + val response = expectMsgType[Failure] + val exception = response.cause + exception match { case e: JobExecutionException => jobGraph.getJobID should equal(e.getJobID) new NoResourceAvailableException(1,1,0) should equal(e.getCause) @@ -84,7 +91,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll { jm ! NotifyWhenJobRemoved(jobGraph.getJobID) expectMsg(true) - } finally { + } + finally { cluster.stop() } }