tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r478371328



##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,48 @@ public static void executeProgram(
                        
Thread.currentThread().setContextClassLoader(contextClassLoader);
                }
        }
+
+       /**
+        * This method blocks until the job status is not INITIALIZING anymore.
+        * If the job is FAILED, it throws an CompletionException with the 
failure cause.
+        * @param jobStatusSupplier supplier returning the job status.
+        */
+       public static void waitUntilJobInitializationFinished(
+                               JobID jobID,
+                               SupplierWithException<JobStatus, Exception> 
jobStatusSupplier,
+                               SupplierWithException<JobResult, Exception> 
jobResultSupplier,
+                               ClassLoader userCodeClassloader)
+                       throws JobInitializationException {
+               LOG.debug("Wait until job initialization is finished");
+               WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 
2000);
+               try {
+                       JobStatus status = jobStatusSupplier.get();
+                       long attempt = 0;
+                       while (status == JobStatus.INITIALIZING) {
+                               Thread.sleep(waitStrategy.sleepTime(attempt++));
+                               status = jobStatusSupplier.get();
+                       }
+                       if (status == JobStatus.FAILED) {
+                               JobResult result = jobResultSupplier.get();
+                               Optional<SerializedThrowable> throwable = 
result.getSerializedThrowable();
+                               if (throwable.isPresent()) {
+                                       Throwable t = 
throwable.get().deserializeError(userCodeClassloader);
+                                       t = 
ExceptionUtils.stripCompletionException(t);

Review comment:
       Where does the `CompletionException` come from? Ideally we strip it 
before we create the failure result of the job execution/submission because it 
is an implementation detail which should not leak out.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -332,7 +329,8 @@ private boolean isPartialResourceConfigured(JobGraph 
jobGraph) {
        private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph 
jobGraph) {
                log.info("Submitting job {} ({}).", jobGraph.getJobID(), 
jobGraph.getName());
 
-               final CompletableFuture<Acknowledge> persistAndRunFuture = 
waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, 
this::persistAndRunJob)
+               final CompletableFuture<Acknowledge> persistAndRunFuture = 
waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph,

Review comment:
       ```suggestion
                final CompletableFuture<Acknowledge> persistAndRunFuture = 
waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, executionType);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+                               if (throwable == null) {
+                                       // Forward result future
+                                       
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+                               } else { // failure during initialization
+                                       if (executionType == 
Dispatcher.ExecutionType.RECOVERY) {
+                                               
jobResultFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+                                                       jobId,
+                                                       jobName,
+                                                       JobStatus.FAILED,
+                                                       throwable,
+                                                       
initializationTimestamp));
+                                       }
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               return requestJobStatus(timeout).thenApply(status -> {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       synchronized (lock) {
+                               return new JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       status,
+                                       0,
+                                       tasksPerState,
+                                       0);
+                       }
+               });
+       }
+
+       /**
+        * Cancel job.
+        * A cancellation will be scheduled if the initialization is not 
completed.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        */
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+
+                               // cancel job
+                               jobManagerRunnerFuture
+                                       
.thenCompose(JobManagerRunner::getJobMasterGateway)
+                                       .thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+                                       .whenComplete((ignored, 
cancelThrowable) -> {
+                                               if (cancelThrowable != null) {
+                                                       log.warn("Cancellation 
of job {} failed", jobId, cancelThrowable);
+                                               }
+                                       });
+                               jobStatus = DispatcherJobStatus.CANCELLING;
+                               
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+                                       if (archivedExecutionGraph != null) {
+                                               jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;

Review comment:
       `lock` is missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception indicating that a Dispatcher operation is temporarily unavailable.
+ */
+public class UnavailableDispatcherOperationException extends FlinkException {

Review comment:
       Maybe it could extend `DispatcherException`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -465,17 +441,24 @@ private JobManagerRunner 
startJobManagerRunner(JobManagerRunner jobManagerRunner
 
        @Override
        public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time 
timeout) {
-               final CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+               Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+               if (maybeJob.isPresent()) {
+                       return maybeJob.get().cancel(timeout);
+               } else {
+                       log.debug("Dispatcher is unable to cancel job {}: not 
found", jobId);
+                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+               }
+       }
 
-               return jobMasterGatewayFuture.thenCompose((JobMasterGateway 
jobMasterGateway) -> jobMasterGateway.cancel(timeout));
+       private Optional<DispatcherJob> getDispatcherJob(JobID jobId) {
+               return Optional.ofNullable(runningJobs.get(jobId));
        }
 
        @Override
        public CompletableFuture<ClusterOverview> requestClusterOverview(Time 
timeout) {
                CompletableFuture<ResourceOverview> taskManagerOverviewFuture = 
runResourceManagerCommand(resourceManagerGateway -> 
resourceManagerGateway.requestResourceOverview(timeout));
 
-               final List<CompletableFuture<Optional<JobStatus>>> 
optionalJobInformation = queryJobMastersForInformation(
-                       (JobMasterGateway jobMasterGateway) -> 
jobMasterGateway.requestJobStatus(timeout));
+               final List<CompletableFuture<Optional<JobStatus>>> 
optionalJobInformation = queryJobMastersForInformation(dj -> 
dj.requestJobStatus(timeout));

Review comment:
       nit: In the absence of type information I wouldn't mind spelling 
variable names out. So instead of `dj` one could name it `dispatcherJob`. This 
will give the reader of the code a bit more information about the type of the 
parameter.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, executionType);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {

Review comment:
       Looking at the `ExecutionType`, I am wondering whether this isn't more a 
concept of the user of a `DispatcherJob`. Differently said, why don't we let 
the `Dispatcher` decide how to handle an initialization error? If we wanted to 
do it like this, then we would have to return a bit more information via the 
`jobResultFuture` (e.g. whether it failed during the initialization). One could 
create a `DispatcherJobResult` which contains the `ArchivedExecutionGraph` and 
whether it failed during initialization or not.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
##########
@@ -88,6 +89,13 @@ private PerJobMiniClusterFactory(
 
                return miniCluster
                        .submitJob(jobGraph)
+                       
.thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
+                               
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(submissionResult.getJobID(),
+                                       () -> 
miniCluster.getJobStatus(submissionResult.getJobID()).get(),
+                                       () -> 
miniCluster.requestJobResult(submissionResult.getJobID()).get(),
+                                       
Thread.currentThread().getContextClassLoader());

Review comment:
       Are we sure that the context class loader is the right class loader here?

##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,48 @@ public static void executeProgram(
                        
Thread.currentThread().setContextClassLoader(contextClassLoader);
                }
        }
+
+       /**
+        * This method blocks until the job status is not INITIALIZING anymore.
+        * If the job is FAILED, it throws an CompletionException with the 
failure cause.
+        * @param jobStatusSupplier supplier returning the job status.
+        */
+       public static void waitUntilJobInitializationFinished(
+                               JobID jobID,
+                               SupplierWithException<JobStatus, Exception> 
jobStatusSupplier,
+                               SupplierWithException<JobResult, Exception> 
jobResultSupplier,
+                               ClassLoader userCodeClassloader)
+                       throws JobInitializationException {
+               LOG.debug("Wait until job initialization is finished");
+               WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 
2000);
+               try {
+                       JobStatus status = jobStatusSupplier.get();
+                       long attempt = 0;
+                       while (status == JobStatus.INITIALIZING) {
+                               Thread.sleep(waitStrategy.sleepTime(attempt++));
+                               status = jobStatusSupplier.get();
+                       }
+                       if (status == JobStatus.FAILED) {
+                               JobResult result = jobResultSupplier.get();
+                               Optional<SerializedThrowable> throwable = 
result.getSerializedThrowable();
+                               if (throwable.isPresent()) {
+                                       Throwable t = 
throwable.get().deserializeError(userCodeClassloader);
+                                       t = 
ExceptionUtils.stripCompletionException(t);
+                                       if (t instanceof 
JobInitializationException) {
+                                               throw t;
+                                       }
+                               }
+                       }
+               } catch (JobInitializationException initializationException) {
+                       throw initializationException;
+               } catch (Throwable throwable) {
+                       ExceptionUtils.checkInterrupted(throwable);
+                       throw new JobInitializationException(jobID, "Error 
while waiting for job to be initialized", throwable);
+               }
+       }
+
+       public static <T> void waitUntilJobInitializationFinished(JobID id, 
ClusterClient<T> client, ClassLoader userCodeClassloader) throws
+               JobInitializationException {
+               waitUntilJobInitializationFinished(id, () -> 
client.getJobStatus(id).get(), () -> client.requestJobResult(id).get(), 
userCodeClassloader);

Review comment:
       `requestJobResult` can lead to a shut down of an attached per job 
cluster because the attached per job cluster waits until the job result has 
been served in case an attached deployment.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, executionType);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+                               if (throwable == null) {
+                                       // Forward result future
+                                       
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+                               } else { // failure during initialization
+                                       if (executionType == 
Dispatcher.ExecutionType.RECOVERY) {
+                                               
jobResultFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+                                                       jobId,
+                                                       jobName,
+                                                       JobStatus.FAILED,
+                                                       throwable,
+                                                       
initializationTimestamp));
+                                       }
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               return requestJobStatus(timeout).thenApply(status -> {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       synchronized (lock) {
+                               return new JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       status,
+                                       0,
+                                       tasksPerState,
+                                       0);
+                       }
+               });
+       }
+
+       /**
+        * Cancel job.
+        * A cancellation will be scheduled if the initialization is not 
completed.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        */
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+
+                               // cancel job
+                               jobManagerRunnerFuture
+                                       
.thenCompose(JobManagerRunner::getJobMasterGateway)
+                                       .thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+                                       .whenComplete((ignored, 
cancelThrowable) -> {
+                                               if (cancelThrowable != null) {
+                                                       log.warn("Cancellation 
of job {} failed", jobId, cancelThrowable);
+                                               }
+                                       });
+                               jobStatus = DispatcherJobStatus.CANCELLING;
+                               
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+                                       if (archivedExecutionGraph != null) {
+                                               jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                                       }
+                               }));
+                               return jobResultFuture.thenApply(ignored -> 
Acknowledge.get());
+                       }
+               }
+       }
+
+       public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+               return 
requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+       }
+
+       /**
+        * Returns a future completing to the ArchivedExecutionGraph of the job.
+        */
+       public CompletableFuture<ArchivedExecutionGraph> requestJob(Time 
timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               if (jobResultFuture.isDone()) { // job is not 
running anymore
+                                       return jobResultFuture;
+                               }
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.requestJob(
+                                       timeout));
+                       } else {
+                               Preconditions.checkState(this.jobStatus == 
DispatcherJobStatus.INITIALIZING || jobStatus == 
DispatcherJobStatus.CANCELLING);
+                               return CompletableFuture.completedFuture(
+                                       
ArchivedExecutionGraph.createFromInitializingJob(
+                                               jobId,
+                                               jobName,
+                                               jobStatus.asJobStatus(),
+                                               null,
+                                               initializationTimestamp));
+                       }
+               }
+       }
+
+       /**
+        * The job is initialized once the JobManager runner has been 
initialized.
+        * It is also initialized if the runner initialization failed, or of it 
has been
+        * canceled (and the cancellation is complete).
+        */
+       public boolean isInitialized() {
+               synchronized (lock) {
+                       return jobStatus == 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+               }
+       }
+
+       /**
+        * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+        * This method will fail with an {@link IllegalStateException} if the 
job is initialized.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.

Review comment:
       This sentence should go into the `@return ....` line.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.DispatcherTest;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
+import org.apache.flink.runtime.dispatcher.TestingDispatcher;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+
+       private static TestingRpcService rpcService;
+
+       @Rule
+       public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+       final HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 10000L);
+
+       final TestingLeaderElectionService jobMasterLeaderElectionService = new 
TestingLeaderElectionService();
+
+       final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+
+       private BlobServer blobServer;
+       private DefaultDispatcherBootstrap dispatcherBootstrap;
+       private Configuration configuration;
+       private ResourceManagerGateway resourceManagerGateway;
+       private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+       private JobGraphWriter jobGraphWriter;
+       private DefaultJobManagerRunnerFactory jobManagerRunnerFactory;
+
+       @Before
+       public void setUp() throws Exception {
+               haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, 
jobMasterLeaderElectionService);
+               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+               haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
+
+               configuration = new Configuration();
+
+               configuration.setString(
+                       BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
+
+               blobServer = new BlobServer(configuration, new VoidBlobStore());
+               resourceManagerGateway = new TestingResourceManagerGateway();
+               archivedExecutionGraphStore = new 
MemoryArchivedExecutionGraphStore();
+               dispatcherBootstrap = new 
DefaultDispatcherBootstrap(Collections.emptyList());
+
+               jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+               jobManagerRunnerFactory = 
DefaultJobManagerRunnerFactory.INSTANCE;
+       }
+
+       @BeforeClass
+       public static void setupClass() {
+               rpcService = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void teardownClass() throws Exception {
+               if (rpcService != null) {
+                       RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+
+                       rpcService = null;
+               }
+       }
+
+       private TestingDispatcher createAndStartDispatcher() throws Exception {
+               final TestingDispatcher dispatcher =
+                       new TestingDispatcher(
+                               rpcService,
+                               DispatcherId.generate(),
+                               dispatcherBootstrap,
+                               new DispatcherServices(
+                                       configuration,
+                                       haServices,
+                                       () -> 
CompletableFuture.completedFuture(resourceManagerGateway),
+                                       blobServer,
+                                       heartbeatServices,
+                                       archivedExecutionGraphStore,
+                                       
testingFatalErrorHandlerResource.getFatalErrorHandler(),
+                                       VoidHistoryServerArchivist.INSTANCE,
+                                       null,
+                                       
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+                                       jobGraphWriter,
+                                       jobManagerRunnerFactory));
+               dispatcher.start();
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               return dispatcher;
+       }
+
+       @Test
+       public void 
testWaitUntilJobInitializationFinished_throwsInitializationException() throws
+               Exception {
+               final JobVertex testVertex = new 
DispatcherTest.FailingInitializationJobVertex("testVertex");
+               testVertex.setInvokableClass(NoOpInvokable.class);
+
+               JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", 
testVertex);
+
+               Dispatcher dispatcher = createAndStartDispatcher();
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+               CommonTestUtils.assertThrows("Could not instantiate 
JobManager", JobInitializationException.class, () -> {
+                       
ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(),
+                               () -> 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(),
+                               () -> 
dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(),
+                               ClassLoader.getSystemClassLoader());
+                       return null;
+               });
+               CommonTestUtils.assertThrows("Could not instantiate 
JobManager", ExecutionException.class, () -> {

Review comment:
       Why does the shut down of the Dispatcher fail if the job submission 
failed? A submission failure is something which should not affect Flink itself 
because it can happen and is normal.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.DispatcherTest;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
+import org.apache.flink.runtime.dispatcher.TestingDispatcher;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static final JobID TEST_JOB_ID = new JobID();
+
+       private static final Time TIMEOUT = Time.seconds(10L);
+
+       private static TestingRpcService rpcService;
+
+       @Rule
+       public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+       final HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 10000L);
+
+       final TestingLeaderElectionService jobMasterLeaderElectionService = new 
TestingLeaderElectionService();
+
+       final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+
+       private BlobServer blobServer;
+       private DefaultDispatcherBootstrap dispatcherBootstrap;
+       private Configuration configuration;
+       private ResourceManagerGateway resourceManagerGateway;
+       private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+       private JobGraphWriter jobGraphWriter;
+       private DefaultJobManagerRunnerFactory jobManagerRunnerFactory;
+
+       @Before
+       public void setUp() throws Exception {
+               haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, 
jobMasterLeaderElectionService);
+               haServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+               haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
+
+               configuration = new Configuration();
+
+               configuration.setString(
+                       BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
+
+               blobServer = new BlobServer(configuration, new VoidBlobStore());
+               resourceManagerGateway = new TestingResourceManagerGateway();
+               archivedExecutionGraphStore = new 
MemoryArchivedExecutionGraphStore();
+               dispatcherBootstrap = new 
DefaultDispatcherBootstrap(Collections.emptyList());
+
+               jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+               jobManagerRunnerFactory = 
DefaultJobManagerRunnerFactory.INSTANCE;
+       }
+
+       @BeforeClass
+       public static void setupClass() {
+               rpcService = new TestingRpcService();
+       }
+
+       @AfterClass
+       public static void teardownClass() throws Exception {
+               if (rpcService != null) {
+                       RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+
+                       rpcService = null;
+               }
+       }
+
+       private TestingDispatcher createAndStartDispatcher() throws Exception {
+               final TestingDispatcher dispatcher =
+                       new TestingDispatcher(
+                               rpcService,
+                               DispatcherId.generate(),
+                               dispatcherBootstrap,
+                               new DispatcherServices(
+                                       configuration,
+                                       haServices,
+                                       () -> 
CompletableFuture.completedFuture(resourceManagerGateway),
+                                       blobServer,
+                                       heartbeatServices,
+                                       archivedExecutionGraphStore,
+                                       
testingFatalErrorHandlerResource.getFatalErrorHandler(),
+                                       VoidHistoryServerArchivist.INSTANCE,
+                                       null,
+                                       
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+                                       jobGraphWriter,
+                                       jobManagerRunnerFactory));
+               dispatcher.start();
+               jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+               return dispatcher;
+       }
+
+       @Test
+       public void 
testWaitUntilJobInitializationFinished_throwsInitializationException() throws
+               Exception {
+               final JobVertex testVertex = new 
DispatcherTest.FailingInitializationJobVertex("testVertex");
+               testVertex.setInvokableClass(NoOpInvokable.class);
+
+               JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", 
testVertex);
+
+               Dispatcher dispatcher = createAndStartDispatcher();
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+               CommonTestUtils.assertThrows("Could not instantiate 
JobManager", JobInitializationException.class, () -> {
+                       
ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(),
+                               () -> 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(),
+                               () -> 
dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(),
+                               ClassLoader.getSystemClassLoader());
+                       return null;
+               });
+               CommonTestUtils.assertThrows("Could not instantiate 
JobManager", ExecutionException.class, () -> {
+                       dispatcher.closeAsync().get();
+                       return null;
+               });
+       }
+
+       @Test
+       public void 
testWaitUntilJobInitializationFinished_doesNotThrowRuntimeException() throws 
Exception {
+               final JobVertex testVertex = new JobVertex("testVertex");
+               testVertex.setInvokableClass(NoOpInvokable.class);
+
+               JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", 
testVertex);
+
+               TestingDispatcher dispatcher = createAndStartDispatcher();
+               DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+               dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+               // we don't expect an exception here
+               
ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(),
+                       () -> 
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(),
+                       () -> 
dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(),
+                       ClassLoader.getSystemClassLoader());
+
+               // now "fail" the job
+               dispatcher.completeJobExecution(
+                       
ArchivedExecutionGraph.createFromInitializingJob(TEST_JOB_ID, "test", 
JobStatus.FAILED, new RuntimeException("yolo"), 1337));
+               // ensure it is failed
+               
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
+                       JobStatus status = dispatcherGateway.requestJobStatus(
+                               jobGraph.getJobID(),
+                               TIMEOUT).get();
+                       return status == JobStatus.FAILED;
+               }, Deadline.fromNow(

Review comment:
       nit
   
   ```suggestion
                
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() ->
                          dispatcherGateway.requestJobStatus(
                                jobGraph.getJobID(),
                                TIMEOUT).get() == JobStatus.FAILED, 
                       Deadline.fromNow(
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, executionType);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+                               if (throwable == null) {
+                                       // Forward result future
+                                       
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+                               } else { // failure during initialization
+                                       if (executionType == 
Dispatcher.ExecutionType.RECOVERY) {
+                                               
jobResultFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+                                                       jobId,
+                                                       jobName,
+                                                       JobStatus.FAILED,
+                                                       throwable,
+                                                       
initializationTimestamp));
+                                       }
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               return requestJobStatus(timeout).thenApply(status -> {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       synchronized (lock) {
+                               return new JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       status,
+                                       0,
+                                       tasksPerState,
+                                       0);
+                       }
+               });
+       }
+
+       /**
+        * Cancel job.
+        * A cancellation will be scheduled if the initialization is not 
completed.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        */
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+
+                               // cancel job
+                               jobManagerRunnerFuture
+                                       
.thenCompose(JobManagerRunner::getJobMasterGateway)
+                                       .thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+                                       .whenComplete((ignored, 
cancelThrowable) -> {
+                                               if (cancelThrowable != null) {
+                                                       log.warn("Cancellation 
of job {} failed", jobId, cancelThrowable);
+                                               }
+                                       });
+                               jobStatus = DispatcherJobStatus.CANCELLING;
+                               
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+                                       if (archivedExecutionGraph != null) {
+                                               jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                                       }
+                               }));
+                               return jobResultFuture.thenApply(ignored -> 
Acknowledge.get());

Review comment:
       The returned future behaves differently depending on `isInitialized`. In 
this case here we wait for the job to have completely finished. In the if 
branch, we only wait for the cancel call to be acknowledged. I'd suggest to do 
the same here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small 
subset
+ * of the methods of the JobMasterGateway necessary during initialization are 
provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private enum SubmissionType {
+               INITIAL, RECOVERY
+       }
+
+       static DispatcherJob createForSubmission(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               long initializationTimestamp) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, initializationTimestamp, SubmissionType.INITIAL);
+       }
+
+       static DispatcherJob createForRecovery(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               long initializationTimestamp) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, initializationTimestamp, SubmissionType.RECOVERY);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               long initializationTimestamp,
+               SubmissionType submissionType) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = initializationTimestamp;
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (throwable == null) {
+                                       // Forward result future
+                                       
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+                                       if (jobStatus == JobStatus.CANCELLING) {
+                                               log.info("Cancellation during 
initialization has been requested for job {}. Initialization completed, 
cancelling job.", jobId);
+
+                                               // cancel job
+                                               jobManagerRunner
+                                                       .getJobMasterGateway()
+                                                       .thenCompose(gw -> 
gw.cancel(RpcUtils.INF_TIMEOUT))
+                                                       .whenComplete((ignored, 
cancelThrowable) -> {
+                                                       if (cancelThrowable != 
null) {
+                                                               
log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+                                                       }
+                                               });
+
+                                               // cancellation will eventually 
complete the jobResultFuture
+                                               
jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+                                                       synchronized (lock) {
+                                                               if 
(resultThrowable == null) {
+                                                                       
jobStatus = archivedExecutionGraph.getState();
+                                                               } else {
+                                                                       
jobStatus = JobStatus.FAILED;
+                                                               }
+                                                       }
+                                               });
+                                       } else {
+                                               jobStatus = JobStatus.RUNNING; 
// this status should never be exposed from the DispatcherJob. Only used 
internally for tracking running state
+                                       }
+                               } else { // failure during initialization
+                                       if (submissionType == 
SubmissionType.RECOVERY) {
+                                               
jobResultFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+                                                       jobId,
+                                                       jobName,
+                                                       throwable,
+                                                       JobStatus.FAILED,
+                                                       
initializationTimestamp));
+                                       }
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               synchronized (lock) {
+                       if (isRunning()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.requestJobDetails(
+                                       timeout));
+                       } else {
+                               int[] tasksPerState = new 
int[ExecutionState.values().length];
+                               return CompletableFuture.completedFuture(new 
JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       jobStatus,
+                                       0,
+                                       tasksPerState,
+                                       0));
+                       }
+               }
+       }
+
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isRunning()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+                               jobStatus = JobStatus.CANCELLING;
+                               return jobResultFuture.thenApply(ignored -> 
Acknowledge.get());
+                       }
+               }
+       }
+
+       public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+               synchronized (lock) {
+                       if (isRunning()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.requestJobStatus(timeout));
+                       } else {
+                               return 
CompletableFuture.completedFuture(jobStatus);
+                       }
+               }
+       }
+
+       public CompletableFuture<ArchivedExecutionGraph> requestJob(Time 
timeout) {
+               synchronized (lock) {
+                       if (isRunning()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.requestJob(timeout));
+                       } else {
+                               return CompletableFuture.supplyAsync(() -> 
ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, 
jobStatus, initializationTimestamp));
+                       }
+               }
+       }
+
+       public boolean isRunning() {
+               synchronized (lock) {
+                       return jobStatus == JobStatus.RUNNING;
+               }
+       }
+
+       public CompletableFuture<JobMasterGateway> getJobMasterGateway() {

Review comment:
       In JavaDocs on usually uses `@throw IllegalStateException is thrown if 
the job is not initialized` after the `@return` tag.

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.DispatcherTest;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
+import org.apache.flink.runtime.dispatcher.TestingDispatcher;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+       @ClassRule

Review comment:
       Yes, the setup is quite manual. There is `MiniClusterResource` which 
spawns a `MiniCluster` as an external resource.
   
   I actually believe that we don't really need to start a `MiniCluster` and 
submit a job. `ClientUtils. waitUntilJobInitializationFinished` is abstracted 
so well that it can be tested in isolation. Just provide the respective 
supplier which produce a failure or not to the utility and then test its 
behaviour.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+       private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+       private final CompletableFuture<JobManagerRunner> 
jobManagerRunnerFuture;
+       private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+       private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+
+       private final long initializationTimestamp;
+       private final JobID jobId;
+       private final String jobName;
+
+       private final Object lock = new Object();
+
+       // internal field to track job status during initialization. Is not 
updated anymore after
+       // job is initialized, cancelled or failed.
+       @GuardedBy("lock")
+       private DispatcherJobStatus jobStatus = 
DispatcherJobStatus.INITIALIZING;
+
+       private enum DispatcherJobStatus {
+               // We are waiting for the JobManagerRunner to be initialized
+               INITIALIZING(JobStatus.INITIALIZING),
+               // JobManagerRunner is initialized
+               JOB_MANAGER_RUNNER_INITIALIZED(null),
+               // waiting for cancellation. We stay in this status until the 
job result future completed,
+               // then we consider the JobManager to be initialized.
+               CANCELLING(JobStatus.CANCELLING);
+
+               @Nullable
+               private final JobStatus jobStatus;
+
+               DispatcherJobStatus(JobStatus jobStatus) {
+                       this.jobStatus = jobStatus;
+               }
+
+               public JobStatus asJobStatus() {
+                       if (jobStatus == null) {
+                               throw new IllegalStateException("This state is 
not defined as a 'JobStatus'");
+                       }
+                       return jobStatus;
+               }
+       }
+
+       static DispatcherJob createFor(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               return new DispatcherJob(jobManagerRunnerFuture, jobId, 
jobName, executionType);
+       }
+
+       private DispatcherJob(
+               CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+               JobID jobId,
+               String jobName,
+               Dispatcher.ExecutionType executionType) {
+               this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+               this.jobId = jobId;
+               this.jobName = jobName;
+               this.initializationTimestamp = System.currentTimeMillis();
+               this.jobResultFuture = new CompletableFuture<>();
+
+               
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
 throwable) -> {
+                       // JM has been initialized, or the initialization failed
+                       synchronized (lock) {
+                               if (jobStatus != 
DispatcherJobStatus.CANCELLING) {
+                                       jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                               }
+                               if (throwable == null) {
+                                       // Forward result future
+                                       
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+                               } else { // failure during initialization
+                                       if (executionType == 
Dispatcher.ExecutionType.RECOVERY) {
+                                               
jobResultFuture.completeExceptionally(throwable);
+                                       } else {
+                                               
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+                                                       jobId,
+                                                       jobName,
+                                                       JobStatus.FAILED,
+                                                       throwable,
+                                                       
initializationTimestamp));
+                                       }
+                               }
+                       }
+                       return null;
+               }));
+       }
+
+       public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+               return jobResultFuture;
+       }
+
+       public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+               return requestJobStatus(timeout).thenApply(status -> {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       synchronized (lock) {
+                               return new JobDetails(
+                                       jobId,
+                                       jobName,
+                                       initializationTimestamp,
+                                       0,
+                                       0,
+                                       status,
+                                       0,
+                                       tasksPerState,
+                                       0);
+                       }
+               });
+       }
+
+       /**
+        * Cancel job.
+        * A cancellation will be scheduled if the initialization is not 
completed.
+        * The returned future will complete exceptionally if the 
JobManagerRunner initialization failed.
+        */
+       public CompletableFuture<Acknowledge> cancel(Time timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(timeout));
+                       } else {
+                               log.info("Cancellation during initialization 
requested for job {}. Job will be cancelled once JobManager has been 
initialized.", jobId);
+
+                               // cancel job
+                               jobManagerRunnerFuture
+                                       
.thenCompose(JobManagerRunner::getJobMasterGateway)
+                                       .thenCompose(jobMasterGateway -> 
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+                                       .whenComplete((ignored, 
cancelThrowable) -> {
+                                               if (cancelThrowable != null) {
+                                                       log.warn("Cancellation 
of job {} failed", jobId, cancelThrowable);
+                                               }
+                                       });
+                               jobStatus = DispatcherJobStatus.CANCELLING;
+                               
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+                                       if (archivedExecutionGraph != null) {
+                                               jobStatus = 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+                                       }
+                               }));
+                               return jobResultFuture.thenApply(ignored -> 
Acknowledge.get());
+                       }
+               }
+       }
+
+       public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+               return 
requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+       }
+
+       /**
+        * Returns a future completing to the ArchivedExecutionGraph of the job.
+        */
+       public CompletableFuture<ArchivedExecutionGraph> requestJob(Time 
timeout) {
+               synchronized (lock) {
+                       if (isInitialized()) {
+                               if (jobResultFuture.isDone()) { // job is not 
running anymore
+                                       return jobResultFuture;
+                               }
+                               return 
getJobMasterGateway().thenCompose(jobMasterGateway -> 
jobMasterGateway.requestJob(
+                                       timeout));
+                       } else {
+                               Preconditions.checkState(this.jobStatus == 
DispatcherJobStatus.INITIALIZING || jobStatus == 
DispatcherJobStatus.CANCELLING);
+                               return CompletableFuture.completedFuture(
+                                       
ArchivedExecutionGraph.createFromInitializingJob(
+                                               jobId,
+                                               jobName,
+                                               jobStatus.asJobStatus(),
+                                               null,
+                                               initializationTimestamp));
+                       }
+               }
+       }
+
+       /**
+        * The job is initialized once the JobManager runner has been 
initialized.
+        * It is also initialized if the runner initialization failed, or of it 
has been
+        * canceled (and the cancellation is complete).
+        */
+       public boolean isInitialized() {
+               synchronized (lock) {
+                       return jobStatus == 
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+               }
+       }
+
+       /**
+        * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+        * This method will fail with an {@link IllegalStateException} if the 
job is initialized.

Review comment:
       ```suggestion
         * This method will fail with an {@link IllegalStateException} if the 
job is not initialized.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to