Repository: flink
Updated Branches:
  refs/heads/master c11144470 -> 1230bcaa0


[runtime] Improve error handling when submitting a job to the JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ddb5653
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ddb5653
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ddb5653

Branch: refs/heads/master
Commit: 9ddb565314a4efb744167eccbe6de0d3299ac4d0
Parents: 9528a52
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 23 18:36:07 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 24 00:08:27 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |  15 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  26 ++-
 .../apache/flink/runtime/blob/BlobCache.java    |   6 +-
 .../runtime/client/JobExecutionException.java   |   5 +-
 .../librarycache/BlobLibraryCacheManager.java   |  62 ++++--
 .../flink/runtime/executiongraph/Execution.java |   2 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  16 ++
 .../apache/flink/runtime/client/JobClient.scala |   4 +-
 .../flink/runtime/jobmanager/JobInfo.scala      |   1 -
 .../flink/runtime/jobmanager/JobManager.scala   | 207 ++++++++++---------
 .../runtime/messages/JobManagerMessages.scala   |  27 +--
 .../jobmanager/JobManagerStartupTest.java       |   1 -
 .../flink/runtime/jobmanager/JobSubmitTest.java | 157 ++++++++++++++
 .../runtime/jobmanager/JobManagerITCase.scala   |  18 +-
 14 files changed, 392 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index bd364ac..5a032a0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -329,8 +329,9 @@ public class Client {
 
                try {
                        JobClient.uploadJarFiles(jobGraph, hostname, client, 
timeout);
-               } catch (IOException e) {
-                       throw new ProgramInvocationException("Could not upload 
the programs JAR files to the JobManager.", e);
+               }
+               catch (IOException e) {
+                       throw new ProgramInvocationException("Could not upload 
the program's JAR files to the JobManager.", e);
                }
 
                try{
@@ -340,10 +341,12 @@ public class Client {
                        else {
                                JobClient.submitJobDetached(jobGraph, client, 
timeout);
                        }
-               } catch (JobExecutionException e) {
-                       throw new ProgramInvocationException("The program 
execution failed.", e);
-               } catch (Exception e) {
-                       throw new ProgramInvocationException("Unexpected 
exception while program execution.", e);
+               }
+               catch (JobExecutionException e) {
+                       throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
+               }
+               catch (Exception e) {
+                       throw new ProgramInvocationException("Exception during 
program execution.", e);
                }
                finally {
                        actorSystem.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 99a098e..9784844 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -24,6 +24,7 @@
 
 package org.apache.flink.util;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 
@@ -57,7 +58,7 @@ public class ExceptionUtils {
        
        /**
         * Throws the given {@code Throwable} in scenarios where the signatures 
do not allow you to
-        * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown 
directly, other exceptions
+        * throw an arbitrary Throwable. Errors and RuntimeExceptions are 
thrown directly, other exceptions
         * are packed into runtime exceptions
         * 
         * @param t The throwable to be thrown.
@@ -76,8 +77,8 @@ public class ExceptionUtils {
        
        /**
         * Throws the given {@code Throwable} in scenarios where the signatures 
do not allow you to
-        * throw arbitrary Throwables. Errors and RuntimeExceptions are thrown 
directly, other exceptions
-        * are packed into a parent RuntimeEception.
+        * throw an arbitrary Throwable. Errors and RuntimeExceptions are 
thrown directly, other exceptions
+        * are packed into a parent RuntimeException.
         * 
         * @param t The throwable to be thrown.
         * @param parentMessage The message for the parent RuntimeException, if 
one is needed.
@@ -93,4 +94,23 @@ public class ExceptionUtils {
                        throw new RuntimeException(parentMessage, t);
                }
        }
+
+       /**
+        * Tries to throw the given {@code Throwable} in scenarios where the 
signatures allows only IOExceptions
+        * (and RuntimeException and Error). Throws this exception directly, if 
it is an IOException,
+        * a RuntimeException, or an Error. Otherwise does nothing.
+        *
+        * @param t The throwable to be thrown.
+        */
+       public static void tryRethrowIOException(Throwable t) throws 
IOException {
+               if (t instanceof IOException) {
+                       throw (IOException) t;
+               }
+               else if (t instanceof RuntimeException) {
+                       throw (RuntimeException) t;
+               }
+               else if (t instanceof Error) {
+                       throw (Error) t;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 0d1b29c..33bb7f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -189,10 +189,8 @@ public final class BlobCache implements BlobService {
        public void delete(BlobKey key) throws IOException{
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
key);
 
-               if (localFile.exists()) {
-                       if (!localFile.delete()) {
-                               LOG.warn("Failed to delete locally cached BLOB 
" + key + " at " + localFile.getAbsolutePath());
-                       }
+               if (localFile.exists() && !localFile.delete()) {
+                       LOG.warn("Failed to delete locally cached BLOB " + key 
+ " at " + localFile.getAbsolutePath());
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index 1a56a93..99c3f89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.client;
 import org.apache.flink.runtime.jobgraph.JobID;
 
 /**
- * This exception is thrown by the {@link JobClient} if a job has been aborted 
as a result of an
- * error which occurred during the execution.
+ * This exception is the base exception for all exceptions that denote any 
failure during
+ * teh execution of a job. The JobExecutionException and its subclasses are 
thrown by
+ * the {@link JobClient}.
  */
 public class JobExecutionException extends Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index c9f96ce..4dba50a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,7 @@ public final class BlobLibraryCacheManager extends TimerTask 
implements LibraryC
 
        private static Logger LOG = 
LoggerFactory.getLogger(BlobLibraryCacheManager.class);
        
-       private static ExecutionAttemptID JOB_ATTEMPT_ID = new 
ExecutionAttemptID();
+       private static ExecutionAttemptID JOB_ATTEMPT_ID = new 
ExecutionAttemptID(-1, -1);
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -71,7 +72,7 @@ public final class BlobLibraryCacheManager extends TimerTask 
implements LibraryC
        
        // 
--------------------------------------------------------------------------------------------
 
-       public BlobLibraryCacheManager(BlobService blobService, long 
cleanupInterval){
+       public BlobLibraryCacheManager(BlobService blobService, long 
cleanupInterval) {
                this.blobService = blobService;
 
                // Initializing the clean up task
@@ -99,11 +100,31 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                        LibraryCacheEntry entry = cacheEntries.get(jobId);
                        
                        if (entry == null) {
-                               URL[] urls = new URL[requiredJarFiles.size()];
+                               // create a new entry in the library cache
+                               BlobKey[] keys = requiredJarFiles.toArray(new 
BlobKey[requiredJarFiles.size()]);
+                               URL[] urls = new URL[keys.length];
 
                                int count = 0;
-                               for (BlobKey blobKey : requiredJarFiles) {
-                                       urls[count++] = 
registerReferenceToBlobKeyAndGetURL(blobKey);
+                               try {
+                                       for (; count < keys.length; count++) {
+                                               BlobKey blobKey = keys[count];
+                                               urls[count] = 
registerReferenceToBlobKeyAndGetURL(blobKey);
+                                       }
+                               }
+                               catch (Throwable t) {
+                                       // undo the reference count increases
+                                       try {
+                                               for (int i = 0; i < count; i++) 
{
+                                                       
unregisterReferenceToBlobKey(keys[i]);
+                                               }
+                                       }
+                                       catch (Throwable tt) {
+                                               LOG.error("Error while updating 
library reference counters.", tt);
+                                       }
+
+                                       // rethrow or wrap
+                                       ExceptionUtils.tryRethrowIOException(t);
+                                       throw new IOException("Library cache 
could not register the user code libraries.", t);
                                }
                                
                                URLClassLoader classLoader = new 
URLClassLoader(urls);
@@ -205,19 +226,24 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
        }
        
        int getNumberOfCachedLibraries() {
-               synchronized (lockObject) {
-                       return blobKeyReferenceCounters.size();
-               }
+               return blobKeyReferenceCounters.size();
        }
        
        private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws 
IOException {
-               
-               Integer references = blobKeyReferenceCounters.get(key);
-               int newReferences = references == null ? 1 : 
references.intValue() + 1;
-               
-               blobKeyReferenceCounters.put(key, newReferences);
+               // it is important that we fetch the URL before increasing the 
counter.
+               // in case the URL cannot be created (failed to fetch the 
BLOB), we have no stale counter
+               try {
+                       URL url = blobService.getURL(key);
+
+                       Integer references = blobKeyReferenceCounters.get(key);
+                       int newReferences = references == null ? 1 : 
references.intValue() + 1;
+                       blobKeyReferenceCounters.put(key, newReferences);
 
-               return blobService.getURL(key);
+                       return url;
+               }
+               catch (IOException e) {
+                       throw new IOException("Cannot access jar file stored 
under " + key, e);
+               }
        }
        
        private void unregisterReferenceToBlobKey(BlobKey key) {
@@ -226,8 +252,14 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                        int newReferences = Math.max(references.intValue() - 1, 
0);
                        blobKeyReferenceCounters.put(key, newReferences);
                }
+               else {
+                       // make sure we have an entry in any case, that the 
cleanup timer removes any
+                       // present libraries
+                       blobKeyReferenceCounters.put(key, 0);
+               }
        }
-       
+
+
        // 
--------------------------------------------------------------------------------------------
        
        private static class LibraryCacheEntry {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 56f9416..aa3c3e2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -272,7 +272,7 @@ public class Execution implements Serializable {
                        throw new NullPointerException();
                }
                if (!slot.isAlive()) {
-                       throw new JobException("Traget slot for deployment is 
not alive.");
+                       throw new JobException("Target slot for deployment is 
not alive.");
                }
 
                // make sure exactly one deployment call happens from the 
correct state

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 58f4e3a..0cf2f5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -364,6 +364,22 @@ public class JobGraph implements Serializable {
        }
 
        /**
+        * Adds the BLOB referenced by the key to the JobGraph's dependencies.
+        *
+        * @param key
+        *        path of the JAR file required to run the job on a task manager
+        */
+       public void addBlob(BlobKey key) {
+               if (key == null) {
+                       throw new IllegalArgumentException();
+               }
+
+               if (!userJarBlobKeys.contains(key)) {
+                       userJarBlobKeys.add(key);
+               }
+       }
+
+       /**
         * Checks whether the JobGraph has user code JAR files attached.
         *
         * @return True, if the JobGraph has user code JAR files attached, 
false otherwise.

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index d3ceeaf..19b3050 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -47,14 +47,14 @@ Actor with ActorLogMessages with ActorLogging {
 
   override def receiveWithLogMessages: Receive = {
     case SubmitJobDetached(jobGraph) =>
-      jobManager forward SubmitJob(jobGraph, registerForEvents = false, 
detached = true)
+      jobManager forward SubmitJob(jobGraph, registerForEvents = false)
 
     case cancelJob: CancelJob =>
       jobManager forward cancelJob
 
     case SubmitJobAndWait(jobGraph, listen) =>
       val listener = context.actorOf(Props(classOf[JobClientListener], sender))
-      jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen, detached 
= false), listener)
+      jobManager.tell(SubmitJob(jobGraph, registerForEvents = listen), 
listener)
 
     case RequestBlobManagerPort =>
       jobManager forward RequestBlobManagerPort

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 128454a..4b7446c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -31,7 +31,6 @@ import akka.actor.ActorRef
  */
 class JobInfo(val client: ActorRef, val start: Long){
   var end: Long = -1
-  var detached: Boolean = false
 
   def duration: Long = {
     if(end != -1){

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ad5c9e8..ce3bc74 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,8 +25,7 @@ import akka.actor.Status.{Success, Failure}
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.client.{JobSubmissionException, 
JobExecutionException,
-JobCancellationException}
+import org.apache.flink.runtime.client.{JobSubmissionException, 
JobExecutionException, JobCancellationException}
 import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, 
ExecutionGraph}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
@@ -36,7 +35,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.EnvironmentInformation
-import org.apache.flink.runtime.{JobException, ActorLogMessages}
+import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
@@ -139,6 +138,7 @@ class JobManager(val configuration: Configuration,
   }
 
   override def receiveWithLogMessages: Receive = {
+
     case RegisterTaskManager(connectionInfo, hardwareInformation, 
numberOfSlots) =>
       val taskManager = sender
 
@@ -170,8 +170,8 @@ class JobManager(val configuration: Configuration,
     case RequestTotalNumberOfSlots =>
       sender ! instanceManager.getTotalNumberOfSlots
 
-    case SubmitJob(jobGraph, listen, d) =>
-      submitJob(jobGraph, listenToEvents = listen, detached = d)
+    case SubmitJob(jobGraph, listen) =>
+      submitJob(jobGraph, listenToEvents = listen)
 
     case CancelJob(jobID) =>
       log.info("Trying to cancel job with ID {}.", jobID)
@@ -274,7 +274,7 @@ class JobManager(val configuration: Configuration,
 
           // is the client waiting for the job result?
             newJobStatus match {
-              case JobStatus.FINISHED if !jobInfo.detached =>
+              case JobStatus.FINISHED =>
                 val accumulatorResults = 
accumulatorManager.getJobAccumulatorResults(jobID)
                 jobInfo.client ! JobResultSuccess(jobID, jobInfo.duration, 
accumulatorResults)
               case JobStatus.CANCELED =>
@@ -397,116 +397,133 @@ class JobManager(val configuration: Configuration,
    * @param jobGraph representing the Flink job
    * @param listenToEvents true if the sender wants to listen to job status 
and execution state
    *                       change notifications. false if not.
-   * @param detached true if the job runs in detached mode, meaning that the 
sender does not wait
-   *                 for the result of the job. false otherwise.
    */
-  private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean, detached: 
Boolean): Unit = {
-    try {
-      if (jobGraph == null) {
-        sender ! Failure(new JobSubmissionException(null, "JobGraph must not 
be null."))
-      } else {
-        log.info(s"Received job ${jobGraph.getJobID} (${jobGraph.getName}).")
+  private def submitJob(jobGraph: JobGraph, listenToEvents: Boolean): Unit = {
+    if (jobGraph == null) {
+      sender ! Failure(new JobSubmissionException(null, "JobGraph must not be 
null."))
+    }
+    else {
+      val jobId = jobGraph.getJobID
+      val jobName = jobGraph.getName
+      var executionGraph: ExecutionGraph = null
 
-        if (jobGraph.getNumberOfVertices == 0) {
-          sender ! Failure(new JobSubmissionException(jobGraph.getJobID ,"Job 
is empty."))
-        } else {
-          // Create the user code class loader
-          libraryCacheManager.registerJob(jobGraph.getJobID, 
jobGraph.getUserJarBlobKeys)
+      log.info(s"Received job ${jobId} (${jobName}).")
 
-          val userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID)
+      try {
+        // Important: We need to make sure that the library registration is 
the first action,
+        // because this makes sure that the uploaded jar files are removed in 
case of
+        // unsuccessful
+        try {
+          libraryCacheManager.registerJob(jobGraph.getJobID, 
jobGraph.getUserJarBlobKeys)
+        }
+        catch {
+          case t: Throwable =>
+            throw new JobSubmissionException(jobId,
+            "Cannot set up the user code libraries: " + t.getMessage, t)
+        }
 
-          // see if there already exists an ExecutionGraph for the 
corresponding job ID
-          val (executionGraph, jobInfo) = 
currentJobs.getOrElseUpdate(jobGraph.getJobID,
-            (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
-              jobGraph.getJobConfiguration, timeout, 
jobGraph.getUserJarBlobKeys, userCodeLoader),
-              JobInfo(sender, System.currentTimeMillis())))
+        val userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID)
+        if (userCodeLoader == null) {
+          throw new JobSubmissionException(jobId, "The user code class loader 
could not be initialized.")
+        }
 
-          val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 
0) {
-            jobGraph.getNumberOfExecutionRetries
-          } else {
-            defaultExecutionRetries
-          }
+        if (jobGraph.getNumberOfVertices == 0) {
+          throw new JobSubmissionException(jobId, "The given job is empty")
+        }
 
-          executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
-          executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
+        // see if there already exists an ExecutionGraph for the corresponding 
job ID
+        executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID,
+          (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName,
+            jobGraph.getJobConfiguration, timeout, 
jobGraph.getUserJarBlobKeys, userCodeLoader),
+            JobInfo(sender, System.currentTimeMillis())))._1
 
-          if (userCodeLoader == null) {
-            throw new JobException("The user code class loader could not be 
initialized.")
-          }
+        // configure the execution graph
+        val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) {
+          jobGraph.getNumberOfExecutionRetries
+        } else {
+          defaultExecutionRetries
+        }
+        executionGraph.setNumberOfRetriesLeft(jobNumberRetries)
+        executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
+        executionGraph.setScheduleMode(jobGraph.getScheduleMode)
+        
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+
+        // initialize the vertices that have a master initialization hook
+        // file output formats create directories here, input formats create 
splits
+        if (log.isDebugEnabled) {
+          log.debug(s"Running initialization on master for job ${jobId} 
(${jobName}).")
+        }
 
-          if (log.isDebugEnabled) {
-            log.debug(s"Running master initialization of job 
${jobGraph.getJobID} " +
-              s"(${jobGraph.getName}).")
+        for (vertex <- jobGraph.getVertices.asScala) {
+          val executableClass = vertex.getInvokableClassName
+          if (executableClass == null || executableClass.length == 0) {
+            throw new JobSubmissionException(jobId,
+              s"The vertex ${vertex.getID} (${vertex.getName}) has no 
invokable class.")
           }
-
-          for (vertex <- jobGraph.getVertices.asScala) {
-            val executableClass = vertex.getInvokableClassName
-            if (executableClass == null || executableClass.length == 0) {
-              throw new JobException(s"The vertex ${vertex.getID} 
(${vertex.getName}) has no " +
-                s"invokable class.")
-            }
-
+          try {
             vertex.initializeOnMaster(userCodeLoader)
           }
-
-          // topological sorting of the job vertices
-          val sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources
-
-          if (log.isDebugEnabled) {
-            log.debug(s"Adding ${sortedTopology.size()} vertices from job 
graph " +
-              s"${jobGraph.getJobID} (${jobGraph.getName}).")
+          catch {
+            case t: Throwable => throw new JobExecutionException(jobId,
+              "Cannot initialize task '" + vertex.getName + "': " + 
t.getMessage, t)
           }
+        }
 
-          executionGraph.attachJobGraph(sortedTopology)
+        // topologically sort the job vertices and attach the graph to the 
existing one
+        val sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources()
+        if (log.isDebugEnabled) {
+          log.debug(s"Adding ${sortedTopology.size()} vertices from " +
+            s"job graph ${jobId} (${jobName}).")
+        }
+        executionGraph.attachJobGraph(sortedTopology)
 
-          if (log.isDebugEnabled) {
-            log.debug(s"Successfully created execution graph from job graph " +
-              s"${jobGraph.getJobID} (${jobGraph.getName}).")
-          }
+        if (log.isDebugEnabled) {
+          log.debug(s"Successfully created execution graph from job graph 
${jobId} (${jobName}).")
+        }
 
-          executionGraph.setScheduleMode(jobGraph.getScheduleMode)
-          
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
+        // get notified about job status changes
+        executionGraph.registerJobStatusListener(self)
 
-          // get notified about job status changes
-          executionGraph.registerJobStatusListener(self)
+        if (listenToEvents) {
+          // the sender wants to be notified about state changes
+          executionGraph.registerExecutionListener(sender)
+          executionGraph.registerJobStatusListener(sender)
+        }
 
-          if (listenToEvents) {
-            // the sender wants to be notified about state changes
-            executionGraph.registerExecutionListener(sender)
-            executionGraph.registerJobStatusListener(sender)
-          }
+        // done with submitting the job
+        sender ! Success(jobGraph.getJobID)
+      }
+      catch {
+        case t: Throwable =>
+          log.error(s"Failed to submit job ${jobId} (${jobName})", t)
 
-          jobInfo.detached = detached
+          libraryCacheManager.unregisterJob(jobId)
+          currentJobs.remove(jobId)
 
-          log.info(s"Scheduling job ${jobGraph.getName}.")
+          if (executionGraph != null) {
+            executionGraph.fail(t)
+          }
 
-          executionGraph.scheduleForExecution(scheduler)
+          val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
+            t
+          } else {
+            new JobExecutionException(jobId, s"Failed to submit job ${jobId} 
(${jobName})", t)
+          }
 
-          sender ! Success(jobGraph.getJobID)
-        }
+          sender ! Failure(rt)
+          return
       }
-    } catch {
-      case t: Throwable =>
-        log.error(t, "Job submission of job {} failed.", jobGraph.getJobID)
-
-        currentJobs.get(jobGraph.getJobID) match {
-          case Some((executionGraph, jobInfo)) =>
-            /*
-           * Register self to be notified about job status changes in case 
that it did not happen
-           * before. That way the proper cleanup of the job is triggered in 
the JobStatusChanged
-           * handler.
-           */
-            if (!executionGraph.containsJobStatusListener(self)) {
-              executionGraph.registerJobStatusListener(self)
-            }
 
-            // let the execution graph fail, which will send a failure to the 
job client
-            executionGraph.fail(t)
-          case None =>
-            libraryCacheManager.unregisterJob(jobGraph.getJobID)
-            currentJobs.remove(jobGraph.getJobID)
-            sender ! Failure(t)
-        }
+      // NOTE: Scheduling the job for execution is a separate action from the 
job submission.
+      // The success of submitting the job must be independent from the 
success of scheduling
+      // the job.
+      try {
+        log.info(s"Scheduling job ${executionGraph.getJobName}.")
+        executionGraph.scheduleForExecution(scheduler)
+      }
+      catch {
+        case t: Throwable => executionGraph.fail(t);
+      }
     }
   }
 
@@ -813,8 +830,8 @@ object JobManager {
    * Starts the JobManager and job archiver based on the given configuration, 
in the
    * given actor system.
    *
-   * @param configuration
-   * @param actorSystem
+   * @param configuration The configuration for the JobManager
+   * @param actorSystem Teh actor system running the JobManager
    * @return A tuple of references (JobManager Ref, Archiver Ref)
    */
   def startJobManagerActors(configuration: Configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 8aef552..28c5bea 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -25,23 +25,19 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobID, 
JobStatus, JobVertexI
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
 /**
- * The job manager specific messages
+ * The job manager specific actor messages
  */
 object JobManagerMessages {
+
   /**
    * Submits a job to the job manager. If [[registerForEvents]] is true,
-   * then the sender will be registered as listener for the state change 
messages. If [[detached]]
-   * is set to true, then the sender will detach from the job execution. 
Consequently,
-   * he will not receive the job execution result [[JobResult]]. The 
submission result will be sent
-   * back to the
-   * sender as a [[SubmissionResponse]] message.
+   * then the sender will be registered as listener for the state change 
messages.
+   * The submission result will be sent back to the sender as a success 
message.
    *
    * @param jobGraph
    * @param registerForEvents if true, then register for state change events
-   * @param detached if true, then detach from the job execution
    */
-  case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false,
-                       detached: Boolean = false)
+  case class SubmitJob(jobGraph: JobGraph, registerForEvents: Boolean = false)
 
   /**
    * Cancels a job with the given [[jobID]] at the JobManager. The result of 
the cancellation is
@@ -171,23 +167,14 @@ object JobManagerMessages {
   case object RequestBlobManagerPort
 
   /**
-   * Requests the final job status of the job with [[jobID]]. If the job has 
not been terminated
-   * then the result is sent back upon termination of the job. The result is a
-   * [[JobStatusResponse]] message.
-   *
-   * @param jobID
-   */
-  case class RequestFinalJobStatus(jobID: JobID)
-
-  /**
    * Denotes a successful job execution.
    *
    * @param jobID
    * @param runtime
    * @param accumulatorResults
    */
-  case class JobResultSuccess(jobID: JobID, runtime: Long, accumulatorResults: 
java.util.Map[String,
-    AnyRef]) {}
+  case class JobResultSuccess(jobID: JobID, runtime: Long,
+                              accumulatorResults: java.util.Map[String, 
AnyRef]) {}
 
   sealed trait CancellationResponse{
     def jobID: JobID

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index 595c00b..c0928b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -90,5 +90,4 @@ public class JobManagerStartupTest {
                        // expected
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
new file mode 100644
index 0000000..5967bdd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class JobSubmitTest {
+
+       private static final long TIMEOUT = 50000;
+
+       private static ActorSystem jobManagerSystem;
+       private static ActorRef jobManager;
+
+       @BeforeClass
+       public static void setupJobManager() {
+               Configuration config = new Configuration();
+
+               scala.Option<Tuple2<String, Object>> listeningAddress = 
scala.Option.empty();
+               jobManagerSystem = AkkaUtils.createActorSystem(config, 
listeningAddress);
+               jobManager = JobManager.startJobManagerActors(config, 
jobManagerSystem)._1();
+       }
+
+       @AfterClass
+       public static void teardownJobmanager() {
+               if (jobManagerSystem != null) {
+                       jobManagerSystem.shutdown();
+               }
+       }
+
+       @Test
+       public void testFailureWhenJarBlobsMissing() {
+               try {
+                       // create a simple job graph
+                       AbstractJobVertex jobVertex = new 
AbstractJobVertex("Test Vertex");
+                       jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
+
+                       // request the blob port from the job manager
+                       Future<Object> future = Patterns.ask(jobManager, 
JobManagerMessages.getRequestBlobManagerPort(), TIMEOUT);
+                       int blobPort = (Integer) Await.result(future, new 
FiniteDuration(TIMEOUT, TimeUnit.MILLISECONDS));
+
+                       // upload two dummy bytes and add their keys to the job 
graph as dependencies
+                       BlobKey key1, key2;
+                       BlobClient bc = new BlobClient(new 
InetSocketAddress("localhost", blobPort));
+                       try {
+                               key1 = bc.put(new byte[10]);
+                               key2 = bc.put(new byte[10]);
+
+                               // delete one of the blobs to make sure that 
the startup failed
+                               bc.delete(key2);
+                       }
+                       finally {
+                               bc.close();
+                       }
+
+                       jg.addBlob(key1);
+                       jg.addBlob(key2);
+
+                       // submit the job
+                       Future<Object> submitFuture = Patterns.ask(jobManager,
+                                       new JobManagerMessages.SubmitJob(jg, 
false), TIMEOUT);
+                       try {
+                               Await.result(submitFuture, new 
FiniteDuration(TIMEOUT, TimeUnit.MILLISECONDS));
+                       }
+                       catch (JobExecutionException e) {
+                               // that is what we expect
+                               assertTrue(e.getCause() instanceof IOException);
+                       }
+                       catch (Exception e) {
+                               fail("Wrong exception type");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testFailureWhenInitializeOnMasterFails() {
+               try {
+                       // create a simple job graph
+
+                       AbstractJobVertex jobVertex = new 
AbstractJobVertex("Vertex that fails in initializeOnMaster") {
+
+                               @Override
+                               public void initializeOnMaster(ClassLoader 
loader) throws Exception {
+                                       throw new RuntimeException("test 
exception");
+                               }
+                       };
+
+                       TaskConfig config = new 
TaskConfig(jobVertex.getConfiguration());
+                       jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       JobGraph jg = new JobGraph("test job", jobVertex);
+
+                       // submit the job
+                       Future<Object> submitFuture = Patterns.ask(jobManager,
+                                       new JobManagerMessages.SubmitJob(jg, 
false), TIMEOUT);
+                       try {
+                               Await.result(submitFuture, new 
FiniteDuration(TIMEOUT, TimeUnit.MILLISECONDS));
+                       }
+                       catch (JobExecutionException e) {
+                               // that is what we expect
+                               // test that the exception nesting is not too 
deep
+                               assertTrue(e.getCause() instanceof 
RuntimeException);
+                       }
+                       catch (Exception e) {
+                               fail("Wrong exception type");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ddb5653/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 7e2840a..fc67b46 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import scheduler.{NoResourceAvailableException, SlotSharingGroup}
+import 
org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, 
SlotSharingGroup}
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -52,6 +52,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
   }
 
   "The JobManager actor" must {
+
     "handle jobs when not enough slots" in {
       val vertex = new AbstractJobVertex("Test Vertex")
       vertex.setParallelism(2)
@@ -69,12 +70,18 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
         availableSlots should equal(1)
 
-        within(1 second) {
+        within(2 second) {
           jm ! SubmitJob(jobGraph)
 
-          val failure = expectMsgType[Failure]
+          val success = expectMsgType[Success]
 
-          failure.cause match {
+          jobGraph.getJobID should equal(success.status)
+        }
+
+        within(2 second) {
+          val response = expectMsgType[Failure]
+          val exception = response.cause
+          exception match {
             case e: JobExecutionException =>
               jobGraph.getJobID should equal(e.getJobID)
               new NoResourceAvailableException(1,1,0) should equal(e.getCause)
@@ -84,7 +91,8 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
         jm ! NotifyWhenJobRemoved(jobGraph.getJobID)
         expectMsg(true)
-      } finally {
+      }
+      finally {
         cluster.stop()
       }
     }

Reply via email to