tillrohrmann commented on a change in pull request #13217:
URL: https://github.com/apache/flink/pull/13217#discussion_r478371328
##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,48 @@ public static void executeProgram(
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
+
+ /**
+ * This method blocks until the job status is not INITIALIZING anymore.
+ * If the job is FAILED, it throws an CompletionException with the
failure cause.
+ * @param jobStatusSupplier supplier returning the job status.
+ */
+ public static void waitUntilJobInitializationFinished(
+ JobID jobID,
+ SupplierWithException<JobStatus, Exception>
jobStatusSupplier,
+ SupplierWithException<JobResult, Exception>
jobResultSupplier,
+ ClassLoader userCodeClassloader)
+ throws JobInitializationException {
+ LOG.debug("Wait until job initialization is finished");
+ WaitStrategy waitStrategy = new ExponentialWaitStrategy(50,
2000);
+ try {
+ JobStatus status = jobStatusSupplier.get();
+ long attempt = 0;
+ while (status == JobStatus.INITIALIZING) {
+ Thread.sleep(waitStrategy.sleepTime(attempt++));
+ status = jobStatusSupplier.get();
+ }
+ if (status == JobStatus.FAILED) {
+ JobResult result = jobResultSupplier.get();
+ Optional<SerializedThrowable> throwable =
result.getSerializedThrowable();
+ if (throwable.isPresent()) {
+ Throwable t =
throwable.get().deserializeError(userCodeClassloader);
+ t =
ExceptionUtils.stripCompletionException(t);
Review comment:
Where does the `CompletionException` come from? Ideally we strip it
before we create the failure result of the job execution/submission because it
is an implementation detail which should not leak out.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -332,7 +329,8 @@ private boolean isPartialResourceConfigured(JobGraph
jobGraph) {
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph
jobGraph) {
log.info("Submitting job {} ({}).", jobGraph.getJobID(),
jobGraph.getName());
- final CompletableFuture<Acknowledge> persistAndRunFuture =
waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph,
this::persistAndRunJob)
+ final CompletableFuture<Acknowledge> persistAndRunFuture =
waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph,
Review comment:
```suggestion
final CompletableFuture<Acknowledge> persistAndRunFuture =
waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not
updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private DispatcherJobStatus jobStatus =
DispatcherJobStatus.INITIALIZING;
+
+ private enum DispatcherJobStatus {
+ // We are waiting for the JobManagerRunner to be initialized
+ INITIALIZING(JobStatus.INITIALIZING),
+ // JobManagerRunner is initialized
+ JOB_MANAGER_RUNNER_INITIALIZED(null),
+ // waiting for cancellation. We stay in this status until the
job result future completed,
+ // then we consider the JobManager to be initialized.
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable
+ private final JobStatus jobStatus;
+
+ DispatcherJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public JobStatus asJobStatus() {
+ if (jobStatus == null) {
+ throw new IllegalStateException("This state is
not defined as a 'JobStatus'");
+ }
+ return jobStatus;
+ }
+ }
+
+ static DispatcherJob createFor(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, executionType);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.initializationTimestamp = System.currentTimeMillis();
+ this.jobResultFuture = new CompletableFuture<>();
+
+
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
throwable) -> {
+ // JM has been initialized, or the initialization failed
+ synchronized (lock) {
+ if (jobStatus !=
DispatcherJobStatus.CANCELLING) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ if (throwable == null) {
+ // Forward result future
+
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+ } else { // failure during initialization
+ if (executionType ==
Dispatcher.ExecutionType.RECOVERY) {
+
jobResultFuture.completeExceptionally(throwable);
+ } else {
+
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ JobStatus.FAILED,
+ throwable,
+
initializationTimestamp));
+ }
+ }
+ }
+ return null;
+ }));
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+ return jobResultFuture;
+ }
+
+ public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ return requestJobStatus(timeout).thenApply(status -> {
+ int[] tasksPerState = new
int[ExecutionState.values().length];
+ synchronized (lock) {
+ return new JobDetails(
+ jobId,
+ jobName,
+ initializationTimestamp,
+ 0,
+ 0,
+ status,
+ 0,
+ tasksPerState,
+ 0);
+ }
+ });
+ }
+
+ /**
+ * Cancel job.
+ * A cancellation will be scheduled if the initialization is not
completed.
+ * The returned future will complete exceptionally if the
JobManagerRunner initialization failed.
+ */
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
+ } else {
+ log.info("Cancellation during initialization
requested for job {}. Job will be cancelled once JobManager has been
initialized.", jobId);
+
+ // cancel job
+ jobManagerRunnerFuture
+
.thenCompose(JobManagerRunner::getJobMasterGateway)
+ .thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+ .whenComplete((ignored,
cancelThrowable) -> {
+ if (cancelThrowable != null) {
+ log.warn("Cancellation
of job {} failed", jobId, cancelThrowable);
+ }
+ });
+ jobStatus = DispatcherJobStatus.CANCELLING;
+
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+ if (archivedExecutionGraph != null) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
Review comment:
`lock` is missing
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception indicating that a Dispatcher operation is temporarily unavailable.
+ */
+public class UnavailableDispatcherOperationException extends FlinkException {
Review comment:
Maybe it could extend `DispatcherException`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -465,17 +441,24 @@ private JobManagerRunner
startJobManagerRunner(JobManagerRunner jobManagerRunner
@Override
public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time
timeout) {
- final CompletableFuture<JobMasterGateway>
jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);
+ Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId);
+ if (maybeJob.isPresent()) {
+ return maybeJob.get().cancel(timeout);
+ } else {
+ log.debug("Dispatcher is unable to cancel job {}: not
found", jobId);
+ return FutureUtils.completedExceptionally(new
FlinkJobNotFoundException(jobId));
+ }
+ }
- return jobMasterGatewayFuture.thenCompose((JobMasterGateway
jobMasterGateway) -> jobMasterGateway.cancel(timeout));
+ private Optional<DispatcherJob> getDispatcherJob(JobID jobId) {
+ return Optional.ofNullable(runningJobs.get(jobId));
}
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time
timeout) {
CompletableFuture<ResourceOverview> taskManagerOverviewFuture =
runResourceManagerCommand(resourceManagerGateway ->
resourceManagerGateway.requestResourceOverview(timeout));
- final List<CompletableFuture<Optional<JobStatus>>>
optionalJobInformation = queryJobMastersForInformation(
- (JobMasterGateway jobMasterGateway) ->
jobMasterGateway.requestJobStatus(timeout));
+ final List<CompletableFuture<Optional<JobStatus>>>
optionalJobInformation = queryJobMastersForInformation(dj ->
dj.requestJobStatus(timeout));
Review comment:
nit: In the absence of type information I wouldn't mind spelling
variable names out. So instead of `dj` one could name it `dispatcherJob`. This
will give the reader of the code a bit more information about the type of the
parameter.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not
updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private DispatcherJobStatus jobStatus =
DispatcherJobStatus.INITIALIZING;
+
+ private enum DispatcherJobStatus {
+ // We are waiting for the JobManagerRunner to be initialized
+ INITIALIZING(JobStatus.INITIALIZING),
+ // JobManagerRunner is initialized
+ JOB_MANAGER_RUNNER_INITIALIZED(null),
+ // waiting for cancellation. We stay in this status until the
job result future completed,
+ // then we consider the JobManager to be initialized.
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable
+ private final JobStatus jobStatus;
+
+ DispatcherJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public JobStatus asJobStatus() {
+ if (jobStatus == null) {
+ throw new IllegalStateException("This state is
not defined as a 'JobStatus'");
+ }
+ return jobStatus;
+ }
+ }
+
+ static DispatcherJob createFor(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, executionType);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
Review comment:
Looking at the `ExecutionType`, I am wondering whether this isn't more a
concept of the user of a `DispatcherJob`. Differently said, why don't we let
the `Dispatcher` decide how to handle an initialization error? If we wanted to
do it like this, then we would have to return a bit more information via the
`jobResultFuture` (e.g. whether it failed during the initialization). One could
create a `DispatcherJobResult` which contains the `ArchivedExecutionGraph` and
whether it failed during initialization or not.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java
##########
@@ -88,6 +89,13 @@ private PerJobMiniClusterFactory(
return miniCluster
.submitJob(jobGraph)
+
.thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {
+
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(submissionResult.getJobID(),
+ () ->
miniCluster.getJobStatus(submissionResult.getJobID()).get(),
+ () ->
miniCluster.requestJobResult(submissionResult.getJobID()).get(),
+
Thread.currentThread().getContextClassLoader());
Review comment:
Are we sure that the context class loader is the right class loader here?
##########
File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
##########
@@ -111,4 +122,48 @@ public static void executeProgram(
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
+
+ /**
+ * This method blocks until the job status is not INITIALIZING anymore.
+ * If the job is FAILED, it throws an CompletionException with the
failure cause.
+ * @param jobStatusSupplier supplier returning the job status.
+ */
+ public static void waitUntilJobInitializationFinished(
+ JobID jobID,
+ SupplierWithException<JobStatus, Exception>
jobStatusSupplier,
+ SupplierWithException<JobResult, Exception>
jobResultSupplier,
+ ClassLoader userCodeClassloader)
+ throws JobInitializationException {
+ LOG.debug("Wait until job initialization is finished");
+ WaitStrategy waitStrategy = new ExponentialWaitStrategy(50,
2000);
+ try {
+ JobStatus status = jobStatusSupplier.get();
+ long attempt = 0;
+ while (status == JobStatus.INITIALIZING) {
+ Thread.sleep(waitStrategy.sleepTime(attempt++));
+ status = jobStatusSupplier.get();
+ }
+ if (status == JobStatus.FAILED) {
+ JobResult result = jobResultSupplier.get();
+ Optional<SerializedThrowable> throwable =
result.getSerializedThrowable();
+ if (throwable.isPresent()) {
+ Throwable t =
throwable.get().deserializeError(userCodeClassloader);
+ t =
ExceptionUtils.stripCompletionException(t);
+ if (t instanceof
JobInitializationException) {
+ throw t;
+ }
+ }
+ }
+ } catch (JobInitializationException initializationException) {
+ throw initializationException;
+ } catch (Throwable throwable) {
+ ExceptionUtils.checkInterrupted(throwable);
+ throw new JobInitializationException(jobID, "Error
while waiting for job to be initialized", throwable);
+ }
+ }
+
+ public static <T> void waitUntilJobInitializationFinished(JobID id,
ClusterClient<T> client, ClassLoader userCodeClassloader) throws
+ JobInitializationException {
+ waitUntilJobInitializationFinished(id, () ->
client.getJobStatus(id).get(), () -> client.requestJobResult(id).get(),
userCodeClassloader);
Review comment:
`requestJobResult` can lead to a shut down of an attached per job
cluster because the attached per job cluster waits until the job result has
been served in case an attached deployment.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not
updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private DispatcherJobStatus jobStatus =
DispatcherJobStatus.INITIALIZING;
+
+ private enum DispatcherJobStatus {
+ // We are waiting for the JobManagerRunner to be initialized
+ INITIALIZING(JobStatus.INITIALIZING),
+ // JobManagerRunner is initialized
+ JOB_MANAGER_RUNNER_INITIALIZED(null),
+ // waiting for cancellation. We stay in this status until the
job result future completed,
+ // then we consider the JobManager to be initialized.
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable
+ private final JobStatus jobStatus;
+
+ DispatcherJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public JobStatus asJobStatus() {
+ if (jobStatus == null) {
+ throw new IllegalStateException("This state is
not defined as a 'JobStatus'");
+ }
+ return jobStatus;
+ }
+ }
+
+ static DispatcherJob createFor(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, executionType);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.initializationTimestamp = System.currentTimeMillis();
+ this.jobResultFuture = new CompletableFuture<>();
+
+
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
throwable) -> {
+ // JM has been initialized, or the initialization failed
+ synchronized (lock) {
+ if (jobStatus !=
DispatcherJobStatus.CANCELLING) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ if (throwable == null) {
+ // Forward result future
+
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+ } else { // failure during initialization
+ if (executionType ==
Dispatcher.ExecutionType.RECOVERY) {
+
jobResultFuture.completeExceptionally(throwable);
+ } else {
+
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ JobStatus.FAILED,
+ throwable,
+
initializationTimestamp));
+ }
+ }
+ }
+ return null;
+ }));
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+ return jobResultFuture;
+ }
+
+ public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ return requestJobStatus(timeout).thenApply(status -> {
+ int[] tasksPerState = new
int[ExecutionState.values().length];
+ synchronized (lock) {
+ return new JobDetails(
+ jobId,
+ jobName,
+ initializationTimestamp,
+ 0,
+ 0,
+ status,
+ 0,
+ tasksPerState,
+ 0);
+ }
+ });
+ }
+
+ /**
+ * Cancel job.
+ * A cancellation will be scheduled if the initialization is not
completed.
+ * The returned future will complete exceptionally if the
JobManagerRunner initialization failed.
+ */
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
+ } else {
+ log.info("Cancellation during initialization
requested for job {}. Job will be cancelled once JobManager has been
initialized.", jobId);
+
+ // cancel job
+ jobManagerRunnerFuture
+
.thenCompose(JobManagerRunner::getJobMasterGateway)
+ .thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+ .whenComplete((ignored,
cancelThrowable) -> {
+ if (cancelThrowable != null) {
+ log.warn("Cancellation
of job {} failed", jobId, cancelThrowable);
+ }
+ });
+ jobStatus = DispatcherJobStatus.CANCELLING;
+
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+ if (archivedExecutionGraph != null) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }));
+ return jobResultFuture.thenApply(ignored ->
Acknowledge.get());
+ }
+ }
+ }
+
+ public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+ return
requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+ }
+
+ /**
+ * Returns a future completing to the ArchivedExecutionGraph of the job.
+ */
+ public CompletableFuture<ArchivedExecutionGraph> requestJob(Time
timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ if (jobResultFuture.isDone()) { // job is not
running anymore
+ return jobResultFuture;
+ }
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.requestJob(
+ timeout));
+ } else {
+ Preconditions.checkState(this.jobStatus ==
DispatcherJobStatus.INITIALIZING || jobStatus ==
DispatcherJobStatus.CANCELLING);
+ return CompletableFuture.completedFuture(
+
ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ jobStatus.asJobStatus(),
+ null,
+ initializationTimestamp));
+ }
+ }
+ }
+
+ /**
+ * The job is initialized once the JobManager runner has been
initialized.
+ * It is also initialized if the runner initialization failed, or of it
has been
+ * canceled (and the cancellation is complete).
+ */
+ public boolean isInitialized() {
+ synchronized (lock) {
+ return jobStatus ==
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }
+
+ /**
+ * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+ * This method will fail with an {@link IllegalStateException} if the
job is initialized.
+ * The returned future will complete exceptionally if the
JobManagerRunner initialization failed.
Review comment:
This sentence should go into the `@return ....` line.
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.DispatcherTest;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
+import org.apache.flink.runtime.dispatcher.TestingDispatcher;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+
+ private static TestingRpcService rpcService;
+
+ @Rule
+ public final TestingFatalErrorHandlerResource
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+ final HeartbeatServices heartbeatServices = new
HeartbeatServices(1000L, 10000L);
+
+ final TestingLeaderElectionService jobMasterLeaderElectionService = new
TestingLeaderElectionService();
+
+ final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+
+ private BlobServer blobServer;
+ private DefaultDispatcherBootstrap dispatcherBootstrap;
+ private Configuration configuration;
+ private ResourceManagerGateway resourceManagerGateway;
+ private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+ private JobGraphWriter jobGraphWriter;
+ private DefaultJobManagerRunnerFactory jobManagerRunnerFactory;
+
+ @Before
+ public void setUp() throws Exception {
+ haServices.setJobMasterLeaderElectionService(TEST_JOB_ID,
jobMasterLeaderElectionService);
+ haServices.setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory());
+ haServices.setResourceManagerLeaderRetriever(new
SettableLeaderRetrievalService());
+
+ configuration = new Configuration();
+
+ configuration.setString(
+ BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
+
+ blobServer = new BlobServer(configuration, new VoidBlobStore());
+ resourceManagerGateway = new TestingResourceManagerGateway();
+ archivedExecutionGraphStore = new
MemoryArchivedExecutionGraphStore();
+ dispatcherBootstrap = new
DefaultDispatcherBootstrap(Collections.emptyList());
+
+ jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+ jobManagerRunnerFactory =
DefaultJobManagerRunnerFactory.INSTANCE;
+ }
+
+ @BeforeClass
+ public static void setupClass() {
+ rpcService = new TestingRpcService();
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+ if (rpcService != null) {
+ RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+
+ rpcService = null;
+ }
+ }
+
+ private TestingDispatcher createAndStartDispatcher() throws Exception {
+ final TestingDispatcher dispatcher =
+ new TestingDispatcher(
+ rpcService,
+ DispatcherId.generate(),
+ dispatcherBootstrap,
+ new DispatcherServices(
+ configuration,
+ haServices,
+ () ->
CompletableFuture.completedFuture(resourceManagerGateway),
+ blobServer,
+ heartbeatServices,
+ archivedExecutionGraphStore,
+
testingFatalErrorHandlerResource.getFatalErrorHandler(),
+ VoidHistoryServerArchivist.INSTANCE,
+ null,
+
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+ jobGraphWriter,
+ jobManagerRunnerFactory));
+ dispatcher.start();
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ return dispatcher;
+ }
+
+ @Test
+ public void
testWaitUntilJobInitializationFinished_throwsInitializationException() throws
+ Exception {
+ final JobVertex testVertex = new
DispatcherTest.FailingInitializationJobVertex("testVertex");
+ testVertex.setInvokableClass(NoOpInvokable.class);
+
+ JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob",
testVertex);
+
+ Dispatcher dispatcher = createAndStartDispatcher();
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ CommonTestUtils.assertThrows("Could not instantiate
JobManager", JobInitializationException.class, () -> {
+
ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(),
+ () ->
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(),
+ () ->
dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(),
+ ClassLoader.getSystemClassLoader());
+ return null;
+ });
+ CommonTestUtils.assertThrows("Could not instantiate
JobManager", ExecutionException.class, () -> {
Review comment:
Why does the shut down of the Dispatcher fail if the job submission
failed? A submission failure is something which should not affect Flink itself
because it can happen and is normal.
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.DispatcherTest;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
+import org.apache.flink.runtime.dispatcher.TestingDispatcher;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private static final JobID TEST_JOB_ID = new JobID();
+
+ private static final Time TIMEOUT = Time.seconds(10L);
+
+ private static TestingRpcService rpcService;
+
+ @Rule
+ public final TestingFatalErrorHandlerResource
testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
+
+ final HeartbeatServices heartbeatServices = new
HeartbeatServices(1000L, 10000L);
+
+ final TestingLeaderElectionService jobMasterLeaderElectionService = new
TestingLeaderElectionService();
+
+ final TestingHighAvailabilityServices haServices = new
TestingHighAvailabilityServices();
+
+ private BlobServer blobServer;
+ private DefaultDispatcherBootstrap dispatcherBootstrap;
+ private Configuration configuration;
+ private ResourceManagerGateway resourceManagerGateway;
+ private ArchivedExecutionGraphStore archivedExecutionGraphStore;
+ private JobGraphWriter jobGraphWriter;
+ private DefaultJobManagerRunnerFactory jobManagerRunnerFactory;
+
+ @Before
+ public void setUp() throws Exception {
+ haServices.setJobMasterLeaderElectionService(TEST_JOB_ID,
jobMasterLeaderElectionService);
+ haServices.setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory());
+ haServices.setResourceManagerLeaderRetriever(new
SettableLeaderRetrievalService());
+
+ configuration = new Configuration();
+
+ configuration.setString(
+ BlobServerOptions.STORAGE_DIRECTORY,
+ temporaryFolder.newFolder().getAbsolutePath());
+
+ blobServer = new BlobServer(configuration, new VoidBlobStore());
+ resourceManagerGateway = new TestingResourceManagerGateway();
+ archivedExecutionGraphStore = new
MemoryArchivedExecutionGraphStore();
+ dispatcherBootstrap = new
DefaultDispatcherBootstrap(Collections.emptyList());
+
+ jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+ jobManagerRunnerFactory =
DefaultJobManagerRunnerFactory.INSTANCE;
+ }
+
+ @BeforeClass
+ public static void setupClass() {
+ rpcService = new TestingRpcService();
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+ if (rpcService != null) {
+ RpcUtils.terminateRpcService(rpcService, TIMEOUT);
+
+ rpcService = null;
+ }
+ }
+
+ private TestingDispatcher createAndStartDispatcher() throws Exception {
+ final TestingDispatcher dispatcher =
+ new TestingDispatcher(
+ rpcService,
+ DispatcherId.generate(),
+ dispatcherBootstrap,
+ new DispatcherServices(
+ configuration,
+ haServices,
+ () ->
CompletableFuture.completedFuture(resourceManagerGateway),
+ blobServer,
+ heartbeatServices,
+ archivedExecutionGraphStore,
+
testingFatalErrorHandlerResource.getFatalErrorHandler(),
+ VoidHistoryServerArchivist.INSTANCE,
+ null,
+
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+ jobGraphWriter,
+ jobManagerRunnerFactory));
+ dispatcher.start();
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+ return dispatcher;
+ }
+
+ @Test
+ public void
testWaitUntilJobInitializationFinished_throwsInitializationException() throws
+ Exception {
+ final JobVertex testVertex = new
DispatcherTest.FailingInitializationJobVertex("testVertex");
+ testVertex.setInvokableClass(NoOpInvokable.class);
+
+ JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob",
testVertex);
+
+ Dispatcher dispatcher = createAndStartDispatcher();
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ CommonTestUtils.assertThrows("Could not instantiate
JobManager", JobInitializationException.class, () -> {
+
ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(),
+ () ->
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(),
+ () ->
dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(),
+ ClassLoader.getSystemClassLoader());
+ return null;
+ });
+ CommonTestUtils.assertThrows("Could not instantiate
JobManager", ExecutionException.class, () -> {
+ dispatcher.closeAsync().get();
+ return null;
+ });
+ }
+
+ @Test
+ public void
testWaitUntilJobInitializationFinished_doesNotThrowRuntimeException() throws
Exception {
+ final JobVertex testVertex = new JobVertex("testVertex");
+ testVertex.setInvokableClass(NoOpInvokable.class);
+
+ JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob",
testVertex);
+
+ TestingDispatcher dispatcher = createAndStartDispatcher();
+ DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ // we don't expect an exception here
+
ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(),
+ () ->
dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(),
+ () ->
dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(),
+ ClassLoader.getSystemClassLoader());
+
+ // now "fail" the job
+ dispatcher.completeJobExecution(
+
ArchivedExecutionGraph.createFromInitializingJob(TEST_JOB_ID, "test",
JobStatus.FAILED, new RuntimeException("yolo"), 1337));
+ // ensure it is failed
+
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> {
+ JobStatus status = dispatcherGateway.requestJobStatus(
+ jobGraph.getJobID(),
+ TIMEOUT).get();
+ return status == JobStatus.FAILED;
+ }, Deadline.fromNow(
Review comment:
nit
```suggestion
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() ->
dispatcherGateway.requestJobStatus(
jobGraph.getJobID(),
TIMEOUT).get() == JobStatus.FAILED,
Deadline.fromNow(
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not
updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private DispatcherJobStatus jobStatus =
DispatcherJobStatus.INITIALIZING;
+
+ private enum DispatcherJobStatus {
+ // We are waiting for the JobManagerRunner to be initialized
+ INITIALIZING(JobStatus.INITIALIZING),
+ // JobManagerRunner is initialized
+ JOB_MANAGER_RUNNER_INITIALIZED(null),
+ // waiting for cancellation. We stay in this status until the
job result future completed,
+ // then we consider the JobManager to be initialized.
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable
+ private final JobStatus jobStatus;
+
+ DispatcherJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public JobStatus asJobStatus() {
+ if (jobStatus == null) {
+ throw new IllegalStateException("This state is
not defined as a 'JobStatus'");
+ }
+ return jobStatus;
+ }
+ }
+
+ static DispatcherJob createFor(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, executionType);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.initializationTimestamp = System.currentTimeMillis();
+ this.jobResultFuture = new CompletableFuture<>();
+
+
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
throwable) -> {
+ // JM has been initialized, or the initialization failed
+ synchronized (lock) {
+ if (jobStatus !=
DispatcherJobStatus.CANCELLING) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ if (throwable == null) {
+ // Forward result future
+
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+ } else { // failure during initialization
+ if (executionType ==
Dispatcher.ExecutionType.RECOVERY) {
+
jobResultFuture.completeExceptionally(throwable);
+ } else {
+
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ JobStatus.FAILED,
+ throwable,
+
initializationTimestamp));
+ }
+ }
+ }
+ return null;
+ }));
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+ return jobResultFuture;
+ }
+
+ public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ return requestJobStatus(timeout).thenApply(status -> {
+ int[] tasksPerState = new
int[ExecutionState.values().length];
+ synchronized (lock) {
+ return new JobDetails(
+ jobId,
+ jobName,
+ initializationTimestamp,
+ 0,
+ 0,
+ status,
+ 0,
+ tasksPerState,
+ 0);
+ }
+ });
+ }
+
+ /**
+ * Cancel job.
+ * A cancellation will be scheduled if the initialization is not
completed.
+ * The returned future will complete exceptionally if the
JobManagerRunner initialization failed.
+ */
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
+ } else {
+ log.info("Cancellation during initialization
requested for job {}. Job will be cancelled once JobManager has been
initialized.", jobId);
+
+ // cancel job
+ jobManagerRunnerFuture
+
.thenCompose(JobManagerRunner::getJobMasterGateway)
+ .thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+ .whenComplete((ignored,
cancelThrowable) -> {
+ if (cancelThrowable != null) {
+ log.warn("Cancellation
of job {} failed", jobId, cancelThrowable);
+ }
+ });
+ jobStatus = DispatcherJobStatus.CANCELLING;
+
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+ if (archivedExecutionGraph != null) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }));
+ return jobResultFuture.thenApply(ignored ->
Acknowledge.get());
Review comment:
The returned future behaves differently depending on `isInitialized`. In
this case here we wait for the job to have completely finished. In the if
branch, we only wait for the cancel call to be acknowledged. I'd suggest to do
the same here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage JobManagers, in
+ * particular during initialization.
+ * While a job is initializing, the JobMasterGateway is not available. A small
subset
+ * of the methods of the JobMasterGateway necessary during initialization are
provided
+ * by this class (job details, cancel).
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not
updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private JobStatus jobStatus = JobStatus.INITIALIZING;
+
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private enum SubmissionType {
+ INITIAL, RECOVERY
+ }
+
+ static DispatcherJob createForSubmission(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ long initializationTimestamp) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, initializationTimestamp, SubmissionType.INITIAL);
+ }
+
+ static DispatcherJob createForRecovery(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ long initializationTimestamp) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, initializationTimestamp, SubmissionType.RECOVERY);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ long initializationTimestamp,
+ SubmissionType submissionType) {
+ this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.initializationTimestamp = initializationTimestamp;
+ this.jobResultFuture = new CompletableFuture<>();
+
+
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
throwable) -> {
+ // JM has been initialized, or the initialization failed
+ synchronized (lock) {
+ if (throwable == null) {
+ // Forward result future
+
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+
+ if (jobStatus == JobStatus.CANCELLING) {
+ log.info("Cancellation during
initialization has been requested for job {}. Initialization completed,
cancelling job.", jobId);
+
+ // cancel job
+ jobManagerRunner
+ .getJobMasterGateway()
+ .thenCompose(gw ->
gw.cancel(RpcUtils.INF_TIMEOUT))
+ .whenComplete((ignored,
cancelThrowable) -> {
+ if (cancelThrowable !=
null) {
+
log.warn("Cancellation of job {} failed", jobId, cancelThrowable);
+ }
+ });
+
+ // cancellation will eventually
complete the jobResultFuture
+
jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> {
+ synchronized (lock) {
+ if
(resultThrowable == null) {
+
jobStatus = archivedExecutionGraph.getState();
+ } else {
+
jobStatus = JobStatus.FAILED;
+ }
+ }
+ });
+ } else {
+ jobStatus = JobStatus.RUNNING;
// this status should never be exposed from the DispatcherJob. Only used
internally for tracking running state
+ }
+ } else { // failure during initialization
+ if (submissionType ==
SubmissionType.RECOVERY) {
+
jobResultFuture.completeExceptionally(throwable);
+ } else {
+
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ throwable,
+ JobStatus.FAILED,
+
initializationTimestamp));
+ }
+ }
+ }
+ return null;
+ }));
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+ return jobResultFuture;
+ }
+
+ public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ synchronized (lock) {
+ if (isRunning()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.requestJobDetails(
+ timeout));
+ } else {
+ int[] tasksPerState = new
int[ExecutionState.values().length];
+ return CompletableFuture.completedFuture(new
JobDetails(
+ jobId,
+ jobName,
+ initializationTimestamp,
+ 0,
+ 0,
+ jobStatus,
+ 0,
+ tasksPerState,
+ 0));
+ }
+ }
+ }
+
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ synchronized (lock) {
+ if (isRunning()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
+ } else {
+ log.info("Cancellation during initialization
requested for job {}. Job will be cancelled once JobManager has been
initialized.", jobId);
+ jobStatus = JobStatus.CANCELLING;
+ return jobResultFuture.thenApply(ignored ->
Acknowledge.get());
+ }
+ }
+ }
+
+ public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+ synchronized (lock) {
+ if (isRunning()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.requestJobStatus(timeout));
+ } else {
+ return
CompletableFuture.completedFuture(jobStatus);
+ }
+ }
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> requestJob(Time
timeout) {
+ synchronized (lock) {
+ if (isRunning()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.requestJob(timeout));
+ } else {
+ return CompletableFuture.supplyAsync(() ->
ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null,
jobStatus, initializationTimestamp));
+ }
+ }
+ }
+
+ public boolean isRunning() {
+ synchronized (lock) {
+ return jobStatus == JobStatus.RUNNING;
+ }
+ }
+
+ public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
Review comment:
In JavaDocs on usually uses `@throw IllegalStateException is thrown if
the job is not initialized` after the `@return` tag.
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobInitializationException;
+import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap;
+import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.DispatcherServices;
+import org.apache.flink.runtime.dispatcher.DispatcherTest;
+import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter;
+import org.apache.flink.runtime.dispatcher.TestingDispatcher;
+import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Test for the ClientUtils.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+ @ClassRule
Review comment:
Yes, the setup is quite manual. There is `MiniClusterResource` which
spawns a `MiniCluster` as an external resource.
I actually believe that we don't really need to start a `MiniCluster` and
submit a job. `ClientUtils. waitUntilJobInitializationFinished` is abstracted
so well that it can be tested in isolation. Just provide the respective
supplier which produce a failure or not to the utility and then test its
behaviour.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstraction used by the {@link Dispatcher} to manage jobs.
+ */
+public final class DispatcherJob implements AutoCloseableAsync {
+
+ private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+
+ private final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture;
+ private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture;
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+
+ private final long initializationTimestamp;
+ private final JobID jobId;
+ private final String jobName;
+
+ private final Object lock = new Object();
+
+ // internal field to track job status during initialization. Is not
updated anymore after
+ // job is initialized, cancelled or failed.
+ @GuardedBy("lock")
+ private DispatcherJobStatus jobStatus =
DispatcherJobStatus.INITIALIZING;
+
+ private enum DispatcherJobStatus {
+ // We are waiting for the JobManagerRunner to be initialized
+ INITIALIZING(JobStatus.INITIALIZING),
+ // JobManagerRunner is initialized
+ JOB_MANAGER_RUNNER_INITIALIZED(null),
+ // waiting for cancellation. We stay in this status until the
job result future completed,
+ // then we consider the JobManager to be initialized.
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable
+ private final JobStatus jobStatus;
+
+ DispatcherJobStatus(JobStatus jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public JobStatus asJobStatus() {
+ if (jobStatus == null) {
+ throw new IllegalStateException("This state is
not defined as a 'JobStatus'");
+ }
+ return jobStatus;
+ }
+ }
+
+ static DispatcherJob createFor(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ return new DispatcherJob(jobManagerRunnerFuture, jobId,
jobName, executionType);
+ }
+
+ private DispatcherJob(
+ CompletableFuture<JobManagerRunner> jobManagerRunnerFuture,
+ JobID jobId,
+ String jobName,
+ Dispatcher.ExecutionType executionType) {
+ this.jobManagerRunnerFuture = jobManagerRunnerFuture;
+ this.jobId = jobId;
+ this.jobName = jobName;
+ this.initializationTimestamp = System.currentTimeMillis();
+ this.jobResultFuture = new CompletableFuture<>();
+
+
FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner,
throwable) -> {
+ // JM has been initialized, or the initialization failed
+ synchronized (lock) {
+ if (jobStatus !=
DispatcherJobStatus.CANCELLING) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ if (throwable == null) {
+ // Forward result future
+
FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture);
+ } else { // failure during initialization
+ if (executionType ==
Dispatcher.ExecutionType.RECOVERY) {
+
jobResultFuture.completeExceptionally(throwable);
+ } else {
+
jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ JobStatus.FAILED,
+ throwable,
+
initializationTimestamp));
+ }
+ }
+ }
+ return null;
+ }));
+ }
+
+ public CompletableFuture<ArchivedExecutionGraph> getResultFuture() {
+ return jobResultFuture;
+ }
+
+ public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
+ return requestJobStatus(timeout).thenApply(status -> {
+ int[] tasksPerState = new
int[ExecutionState.values().length];
+ synchronized (lock) {
+ return new JobDetails(
+ jobId,
+ jobName,
+ initializationTimestamp,
+ 0,
+ 0,
+ status,
+ 0,
+ tasksPerState,
+ 0);
+ }
+ });
+ }
+
+ /**
+ * Cancel job.
+ * A cancellation will be scheduled if the initialization is not
completed.
+ * The returned future will complete exceptionally if the
JobManagerRunner initialization failed.
+ */
+ public CompletableFuture<Acknowledge> cancel(Time timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
+ } else {
+ log.info("Cancellation during initialization
requested for job {}. Job will be cancelled once JobManager has been
initialized.", jobId);
+
+ // cancel job
+ jobManagerRunnerFuture
+
.thenCompose(JobManagerRunner::getJobMasterGateway)
+ .thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT))
+ .whenComplete((ignored,
cancelThrowable) -> {
+ if (cancelThrowable != null) {
+ log.warn("Cancellation
of job {} failed", jobId, cancelThrowable);
+ }
+ });
+ jobStatus = DispatcherJobStatus.CANCELLING;
+
jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> {
+ if (archivedExecutionGraph != null) {
+ jobStatus =
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }));
+ return jobResultFuture.thenApply(ignored ->
Acknowledge.get());
+ }
+ }
+ }
+
+ public CompletableFuture<JobStatus> requestJobStatus(Time timeout) {
+ return
requestJob(timeout).thenApply(ArchivedExecutionGraph::getState);
+ }
+
+ /**
+ * Returns a future completing to the ArchivedExecutionGraph of the job.
+ */
+ public CompletableFuture<ArchivedExecutionGraph> requestJob(Time
timeout) {
+ synchronized (lock) {
+ if (isInitialized()) {
+ if (jobResultFuture.isDone()) { // job is not
running anymore
+ return jobResultFuture;
+ }
+ return
getJobMasterGateway().thenCompose(jobMasterGateway ->
jobMasterGateway.requestJob(
+ timeout));
+ } else {
+ Preconditions.checkState(this.jobStatus ==
DispatcherJobStatus.INITIALIZING || jobStatus ==
DispatcherJobStatus.CANCELLING);
+ return CompletableFuture.completedFuture(
+
ArchivedExecutionGraph.createFromInitializingJob(
+ jobId,
+ jobName,
+ jobStatus.asJobStatus(),
+ null,
+ initializationTimestamp));
+ }
+ }
+ }
+
+ /**
+ * The job is initialized once the JobManager runner has been
initialized.
+ * It is also initialized if the runner initialization failed, or of it
has been
+ * canceled (and the cancellation is complete).
+ */
+ public boolean isInitialized() {
+ synchronized (lock) {
+ return jobStatus ==
DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED;
+ }
+ }
+
+ /**
+ * Returns the {@link JobMasterGateway} from the JobManagerRunner.
+ * This method will fail with an {@link IllegalStateException} if the
job is initialized.
Review comment:
```suggestion
* This method will fail with an {@link IllegalStateException} if the
job is not initialized.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]