[FLINK-8485] [client] Unblock JobSubmissionClientActor#tryToSubmitJob

The JobSubmissionClientActor blocked a ActorSystem's dispatcher thread when 
requesting
the BlobServer port from the cluster. This fails when using the 
FlinkMiniCluster on a
single core machine because we set the number of threads to 1.

This commit unblocks the JobSubmissionClientActor#tryToSubmitJob method and 
sets the
lower limit of dispatcher threads to 2 when using the FlinkMiniCluster.

This closes #5360.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09edf6a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09edf6a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09edf6a6

Branch: refs/heads/release-1.4
Commit: 09edf6a62822941e8a2f0e6179b85c6fce8fd8c9
Parents: fb088bc
Author: Till Rohrmann <[email protected]>
Authored: Thu Jan 25 11:40:33 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jan 25 15:26:10 2018 +0100

----------------------------------------------------------------------
 .../client/JobSubmissionClientActor.java        | 131 ++++++++-----------
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   2 +-
 2 files changed, 58 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09edf6a6/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 4ca6e8b..e9824ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.runtime.client;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.dispatch.Futures;
-
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -35,14 +30,19 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedThrowable;
-import scala.concurrent.duration.FiniteDuration;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Status;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletionException;
+
+import scala.concurrent.duration.FiniteDuration;
 
 
 /**
@@ -50,11 +50,11 @@ import java.util.concurrent.TimeUnit;
  */
 public class JobSubmissionClientActor extends JobClientActor {
 
-       /** JobGraph which shall be submitted to the JobManager */
+       /** JobGraph which shall be submitted to the JobManager. */
        private JobGraph jobGraph;
-       /** true if a SubmitJobSuccess message has been received */
+       /** true if a SubmitJobSuccess message has been received. */
        private boolean jobSuccessfullySubmitted = false;
-       /** The cluster configuration */
+       /** The cluster configuration. */
        private final Configuration clientConfig;
 
        public JobSubmissionClientActor(
@@ -66,7 +66,6 @@ public class JobSubmissionClientActor extends JobClientActor {
                this.clientConfig = clientConfig;
        }
 
-
        @Override
        public void connectedToJobManager() {
                if (jobGraph != null && !jobSuccessfullySubmitted) {
@@ -143,77 +142,61 @@ public class JobSubmissionClientActor extends 
JobClientActor {
                LOG.info("Sending message to JobManager {} to submit job {} 
({}) and wait for progress",
                        jobManager.path().toString(), jobGraph.getName(), 
jobGraph.getJobID());
 
-               Futures.future(new Callable<Object>() {
-                       @Override
-                       public Object call() throws Exception {
-                               final ActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManager, leaderSessionID);
-                               final AkkaJobManagerGateway 
akkaJobManagerGateway = new AkkaJobManagerGateway(jobManagerGateway);
+               final ActorGateway jobManagerGateway = new 
AkkaActorGateway(jobManager, leaderSessionID);
+               final AkkaJobManagerGateway akkaJobManagerGateway = new 
AkkaJobManagerGateway(jobManagerGateway);
 
-                               LOG.info("Upload jar files to job manager {}.", 
jobManager.path());
+               LOG.info("Upload jar files to job manager {}.", 
jobManager.path());
 
-                               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = JobClient.retrieveBlobServerAddress(
-                                       akkaJobManagerGateway,
-                                       Time.milliseconds(timeout.toMillis()));
-                               final InetSocketAddress blobServerAddress;
-
-                               try {
-                                       blobServerAddress = 
blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-                               } catch (Exception e) {
-                                       getSelf().tell(
-                                               decorateMessage(new 
JobManagerMessages.JobResultFailure(
-                                                       new SerializedThrowable(
-                                                               new 
JobSubmissionException(
-                                                                       
jobGraph.getJobID(),
-                                                                       "Could 
not retrieve BlobServer address.",
-                                                                       e)
-                                                       )
-                                               )),
-                                               ActorRef.noSender());
-
-                                       return null;
-                               }
+               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = JobClient.retrieveBlobServerAddress(
+                       akkaJobManagerGateway,
+                       Time.milliseconds(timeout.toMillis()));
 
+               final CompletableFuture<Void> jarUploadFuture = 
blobServerAddressFuture.thenAcceptAsync(
+                       (InetSocketAddress blobServerAddress) -> {
                                try {
                                        
jobGraph.uploadUserJars(blobServerAddress, clientConfig);
-                               } catch (IOException exception) {
-                                       getSelf().tell(
-                                               decorateMessage(new 
JobManagerMessages.JobResultFailure(
-                                                       new SerializedThrowable(
-                                                               new 
JobSubmissionException(
-                                                                       
jobGraph.getJobID(),
-                                                                       "Could 
not upload the jar files to the job manager.",
-                                                                       
exception)
-                                                       )
-                                               )),
-                                               ActorRef.noSender());
-
-                                       return null;
+                               } catch (IOException e) {
+                                       throw new CompletionException(
+                                               new JobSubmissionException(
+                                                       jobGraph.getJobID(),
+                                                       "Could not upload the 
jar files to the job manager.",
+                                                       e));
                                }
+                       },
+                       getContext().dispatcher());
+
+               jarUploadFuture
+                       .thenAccept(
+                               (Void ignored) -> {
+                                       LOG.info("Submit job to the job manager 
{}.", jobManager.path());
+
+                                       jobManager.tell(
+                                               decorateMessage(
+                                                       new 
JobManagerMessages.SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
+                                               getSelf());
 
-                               LOG.info("Submit job to the job manager {}.", 
jobManager.path());
-
-                               jobManager.tell(
-                                       decorateMessage(
-                                               new 
JobManagerMessages.SubmitJob(
-                                                       jobGraph,
-                                                       
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)),
-                                       getSelf());
-
-                               // issue a SubmissionTimeout message to check 
that we submit the job within
-                               // the given timeout
-                               getContext().system().scheduler().scheduleOnce(
-                                       timeout,
-                                       getSelf(),
-                                       
decorateMessage(JobClientMessages.getSubmissionTimeout()),
-                                       getContext().dispatcher(),
-                                       ActorRef.noSender());
-
-                               return null;
-                       }
-               }, getContext().dispatcher());
+                                       // issue a SubmissionTimeout message to 
check that we submit the job within
+                                       // the given timeout
+                                       
getContext().system().scheduler().scheduleOnce(
+                                               timeout,
+                                               getSelf(),
+                                               
decorateMessage(JobClientMessages.getSubmissionTimeout()),
+                                               getContext().dispatcher(),
+                                               ActorRef.noSender());
+                               })
+                       .whenComplete(
+                               (Void ignored, Throwable throwable) -> {
+                                       if (throwable != null) {
+                                               getSelf().tell(
+                                                       decorateMessage(new 
JobManagerMessages.JobResultFailure(
+                                                               new 
SerializedThrowable(ExceptionUtils.stripCompletionException(throwable)))),
+                                                       ActorRef.noSender());
+                                       }
+                               });
        }
 
-
        public static Props createActorProps(
                        LeaderRetrievalService leaderRetrievalService,
                        FiniteDuration timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/09edf6a6/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index b80a070..ed14660 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -249,7 +249,7 @@ object AkkaUtils {
          |    default-dispatcher {
          |      fork-join-executor {
          |        parallelism-factor = 1.0
-         |        parallelism-min = 1
+         |        parallelism-min = 2
          |        parallelism-max = 4
          |      }
          |    }

Reply via email to