[FLINK-8485] [client] Unblock JobSubmissionClientActor#tryToSubmitJob The JobSubmissionClientActor blocked a ActorSystem's dispatcher thread when requesting the BlobServer port from the cluster. This fails when using the FlinkMiniCluster on a single core machine because we set the number of threads to 1.
This commit unblocks the JobSubmissionClientActor#tryToSubmitJob method and sets the lower limit of dispatcher threads to 2 when using the FlinkMiniCluster. This closes #5360. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09edf6a6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09edf6a6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09edf6a6 Branch: refs/heads/release-1.4 Commit: 09edf6a62822941e8a2f0e6179b85c6fce8fd8c9 Parents: fb088bc Author: Till Rohrmann <[email protected]> Authored: Thu Jan 25 11:40:33 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Jan 25 15:26:10 2018 +0100 ---------------------------------------------------------------------- .../client/JobSubmissionClientActor.java | 131 ++++++++----------- .../apache/flink/runtime/akka/AkkaUtils.scala | 2 +- 2 files changed, 58 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/09edf6a6/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java index 4ca6e8b..e9824ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -18,11 +18,6 @@ package org.apache.flink.runtime.client; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.actor.Status; -import akka.dispatch.Futures; - import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; @@ -35,14 +30,19 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobClientMessages; import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedThrowable; -import scala.concurrent.duration.FiniteDuration; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Status; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletionException; + +import scala.concurrent.duration.FiniteDuration; /** @@ -50,11 +50,11 @@ import java.util.concurrent.TimeUnit; */ public class JobSubmissionClientActor extends JobClientActor { - /** JobGraph which shall be submitted to the JobManager */ + /** JobGraph which shall be submitted to the JobManager. */ private JobGraph jobGraph; - /** true if a SubmitJobSuccess message has been received */ + /** true if a SubmitJobSuccess message has been received. */ private boolean jobSuccessfullySubmitted = false; - /** The cluster configuration */ + /** The cluster configuration. */ private final Configuration clientConfig; public JobSubmissionClientActor( @@ -66,7 +66,6 @@ public class JobSubmissionClientActor extends JobClientActor { this.clientConfig = clientConfig; } - @Override public void connectedToJobManager() { if (jobGraph != null && !jobSuccessfullySubmitted) { @@ -143,77 +142,61 @@ public class JobSubmissionClientActor extends JobClientActor { LOG.info("Sending message to JobManager {} to submit job {} ({}) and wait for progress", jobManager.path().toString(), jobGraph.getName(), jobGraph.getJobID()); - Futures.future(new Callable<Object>() { - @Override - public Object call() throws Exception { - final ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID); - final AkkaJobManagerGateway akkaJobManagerGateway = new AkkaJobManagerGateway(jobManagerGateway); + final ActorGateway jobManagerGateway = new AkkaActorGateway(jobManager, leaderSessionID); + final AkkaJobManagerGateway akkaJobManagerGateway = new AkkaJobManagerGateway(jobManagerGateway); - LOG.info("Upload jar files to job manager {}.", jobManager.path()); + LOG.info("Upload jar files to job manager {}.", jobManager.path()); - final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress( - akkaJobManagerGateway, - Time.milliseconds(timeout.toMillis())); - final InetSocketAddress blobServerAddress; - - try { - blobServerAddress = blobServerAddressFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } catch (Exception e) { - getSelf().tell( - decorateMessage(new JobManagerMessages.JobResultFailure( - new SerializedThrowable( - new JobSubmissionException( - jobGraph.getJobID(), - "Could not retrieve BlobServer address.", - e) - ) - )), - ActorRef.noSender()); - - return null; - } + final CompletableFuture<InetSocketAddress> blobServerAddressFuture = JobClient.retrieveBlobServerAddress( + akkaJobManagerGateway, + Time.milliseconds(timeout.toMillis())); + final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync( + (InetSocketAddress blobServerAddress) -> { try { jobGraph.uploadUserJars(blobServerAddress, clientConfig); - } catch (IOException exception) { - getSelf().tell( - decorateMessage(new JobManagerMessages.JobResultFailure( - new SerializedThrowable( - new JobSubmissionException( - jobGraph.getJobID(), - "Could not upload the jar files to the job manager.", - exception) - ) - )), - ActorRef.noSender()); - - return null; + } catch (IOException e) { + throw new CompletionException( + new JobSubmissionException( + jobGraph.getJobID(), + "Could not upload the jar files to the job manager.", + e)); } + }, + getContext().dispatcher()); + + jarUploadFuture + .thenAccept( + (Void ignored) -> { + LOG.info("Submit job to the job manager {}.", jobManager.path()); + + jobManager.tell( + decorateMessage( + new JobManagerMessages.SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), + getSelf()); - LOG.info("Submit job to the job manager {}.", jobManager.path()); - - jobManager.tell( - decorateMessage( - new JobManagerMessages.SubmitJob( - jobGraph, - ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), - getSelf()); - - // issue a SubmissionTimeout message to check that we submit the job within - // the given timeout - getContext().system().scheduler().scheduleOnce( - timeout, - getSelf(), - decorateMessage(JobClientMessages.getSubmissionTimeout()), - getContext().dispatcher(), - ActorRef.noSender()); - - return null; - } - }, getContext().dispatcher()); + // issue a SubmissionTimeout message to check that we submit the job within + // the given timeout + getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(JobClientMessages.getSubmissionTimeout()), + getContext().dispatcher(), + ActorRef.noSender()); + }) + .whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + getSelf().tell( + decorateMessage(new JobManagerMessages.JobResultFailure( + new SerializedThrowable(ExceptionUtils.stripCompletionException(throwable)))), + ActorRef.noSender()); + } + }); } - public static Props createActorProps( LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/09edf6a6/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index b80a070..ed14660 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -249,7 +249,7 @@ object AkkaUtils { | default-dispatcher { | fork-join-executor { | parallelism-factor = 1.0 - | parallelism-min = 1 + | parallelism-min = 2 | parallelism-max = 4 | } | }
