tillrohrmann commented on a change in pull request #13217: URL: https://github.com/apache/flink/pull/13217#discussion_r478371328
########## File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java ########## @@ -111,4 +122,48 @@ public static void executeProgram( Thread.currentThread().setContextClassLoader(contextClassLoader); } } + + /** + * This method blocks until the job status is not INITIALIZING anymore. + * If the job is FAILED, it throws an CompletionException with the failure cause. + * @param jobStatusSupplier supplier returning the job status. + */ + public static void waitUntilJobInitializationFinished( + JobID jobID, + SupplierWithException<JobStatus, Exception> jobStatusSupplier, + SupplierWithException<JobResult, Exception> jobResultSupplier, + ClassLoader userCodeClassloader) + throws JobInitializationException { + LOG.debug("Wait until job initialization is finished"); + WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 2000); + try { + JobStatus status = jobStatusSupplier.get(); + long attempt = 0; + while (status == JobStatus.INITIALIZING) { + Thread.sleep(waitStrategy.sleepTime(attempt++)); + status = jobStatusSupplier.get(); + } + if (status == JobStatus.FAILED) { + JobResult result = jobResultSupplier.get(); + Optional<SerializedThrowable> throwable = result.getSerializedThrowable(); + if (throwable.isPresent()) { + Throwable t = throwable.get().deserializeError(userCodeClassloader); + t = ExceptionUtils.stripCompletionException(t); Review comment: Where does the `CompletionException` come from? Ideally we strip it before we create the failure result of the job execution/submission because it is an implementation detail which should not leak out. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -332,7 +329,8 @@ private boolean isPartialResourceConfigured(JobGraph jobGraph) { private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName()); - final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob) + final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, Review comment: ```suggestion final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph, ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.CompletableFuture; + +/** + * Abstraction used by the {@link Dispatcher} to manage jobs. + */ +public final class DispatcherJob implements AutoCloseableAsync { + + private final Logger log = LoggerFactory.getLogger(DispatcherJob.class); + + private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture; + private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + + private final long initializationTimestamp; + private final JobID jobId; + private final String jobName; + + private final Object lock = new Object(); + + // internal field to track job status during initialization. Is not updated anymore after + // job is initialized, cancelled or failed. + @GuardedBy("lock") + private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING; + + private enum DispatcherJobStatus { + // We are waiting for the JobManagerRunner to be initialized + INITIALIZING(JobStatus.INITIALIZING), + // JobManagerRunner is initialized + JOB_MANAGER_RUNNER_INITIALIZED(null), + // waiting for cancellation. We stay in this status until the job result future completed, + // then we consider the JobManager to be initialized. + CANCELLING(JobStatus.CANCELLING); + + @Nullable + private final JobStatus jobStatus; + + DispatcherJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + public JobStatus asJobStatus() { + if (jobStatus == null) { + throw new IllegalStateException("This state is not defined as a 'JobStatus'"); + } + return jobStatus; + } + } + + static DispatcherJob createFor( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, executionType); + } + + private DispatcherJob( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + this.jobManagerRunnerFuture = jobManagerRunnerFuture; + this.jobId = jobId; + this.jobName = jobName; + this.initializationTimestamp = System.currentTimeMillis(); + this.jobResultFuture = new CompletableFuture<>(); + + FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> { + // JM has been initialized, or the initialization failed + synchronized (lock) { + if (jobStatus != DispatcherJobStatus.CANCELLING) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + if (throwable == null) { + // Forward result future + FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture); + } else { // failure during initialization + if (executionType == Dispatcher.ExecutionType.RECOVERY) { + jobResultFuture.completeExceptionally(throwable); + } else { + jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob( + jobId, + jobName, + JobStatus.FAILED, + throwable, + initializationTimestamp)); + } + } + } + return null; + })); + } + + public CompletableFuture<ArchivedExecutionGraph> getResultFuture() { + return jobResultFuture; + } + + public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { + return requestJobStatus(timeout).thenApply(status -> { + int[] tasksPerState = new int[ExecutionState.values().length]; + synchronized (lock) { + return new JobDetails( + jobId, + jobName, + initializationTimestamp, + 0, + 0, + status, + 0, + tasksPerState, + 0); + } + }); + } + + /** + * Cancel job. + * A cancellation will be scheduled if the initialization is not completed. + * The returned future will complete exceptionally if the JobManagerRunner initialization failed. + */ + public CompletableFuture<Acknowledge> cancel(Time timeout) { + synchronized (lock) { + if (isInitialized()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout)); + } else { + log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId); + + // cancel job + jobManagerRunnerFuture + .thenCompose(JobManagerRunner::getJobMasterGateway) + .thenCompose(jobMasterGateway -> jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT)) + .whenComplete((ignored, cancelThrowable) -> { + if (cancelThrowable != null) { + log.warn("Cancellation of job {} failed", jobId, cancelThrowable); + } + }); + jobStatus = DispatcherJobStatus.CANCELLING; + jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> { + if (archivedExecutionGraph != null) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; Review comment: `lock` is missing ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/UnavailableDispatcherOperationException.java ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.util.FlinkException; + +/** + * Exception indicating that a Dispatcher operation is temporarily unavailable. + */ +public class UnavailableDispatcherOperationException extends FlinkException { Review comment: Maybe it could extend `DispatcherException`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -465,17 +441,24 @@ private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner @Override public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) { - final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId); + Optional<DispatcherJob> maybeJob = getDispatcherJob(jobId); + if (maybeJob.isPresent()) { + return maybeJob.get().cancel(timeout); + } else { + log.debug("Dispatcher is unable to cancel job {}: not found", jobId); + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } + } - return jobMasterGatewayFuture.thenCompose((JobMasterGateway jobMasterGateway) -> jobMasterGateway.cancel(timeout)); + private Optional<DispatcherJob> getDispatcherJob(JobID jobId) { + return Optional.ofNullable(runningJobs.get(jobId)); } @Override public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) { CompletableFuture<ResourceOverview> taskManagerOverviewFuture = runResourceManagerCommand(resourceManagerGateway -> resourceManagerGateway.requestResourceOverview(timeout)); - final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation( - (JobMasterGateway jobMasterGateway) -> jobMasterGateway.requestJobStatus(timeout)); + final List<CompletableFuture<Optional<JobStatus>>> optionalJobInformation = queryJobMastersForInformation(dj -> dj.requestJobStatus(timeout)); Review comment: nit: In the absence of type information I wouldn't mind spelling variable names out. So instead of `dj` one could name it `dispatcherJob`. This will give the reader of the code a bit more information about the type of the parameter. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.CompletableFuture; + +/** + * Abstraction used by the {@link Dispatcher} to manage jobs. + */ +public final class DispatcherJob implements AutoCloseableAsync { + + private final Logger log = LoggerFactory.getLogger(DispatcherJob.class); + + private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture; + private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + + private final long initializationTimestamp; + private final JobID jobId; + private final String jobName; + + private final Object lock = new Object(); + + // internal field to track job status during initialization. Is not updated anymore after + // job is initialized, cancelled or failed. + @GuardedBy("lock") + private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING; + + private enum DispatcherJobStatus { + // We are waiting for the JobManagerRunner to be initialized + INITIALIZING(JobStatus.INITIALIZING), + // JobManagerRunner is initialized + JOB_MANAGER_RUNNER_INITIALIZED(null), + // waiting for cancellation. We stay in this status until the job result future completed, + // then we consider the JobManager to be initialized. + CANCELLING(JobStatus.CANCELLING); + + @Nullable + private final JobStatus jobStatus; + + DispatcherJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + public JobStatus asJobStatus() { + if (jobStatus == null) { + throw new IllegalStateException("This state is not defined as a 'JobStatus'"); + } + return jobStatus; + } + } + + static DispatcherJob createFor( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, executionType); + } + + private DispatcherJob( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { Review comment: Looking at the `ExecutionType`, I am wondering whether this isn't more a concept of the user of a `DispatcherJob`. Differently said, why don't we let the `Dispatcher` decide how to handle an initialization error? If we wanted to do it like this, then we would have to return a bit more information via the `jobResultFuture` (e.g. whether it failed during the initialization). One could create a `DispatcherJobResult` which contains the `ArchivedExecutionGraph` and whether it failed during initialization or not. ########## File path: flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java ########## @@ -88,6 +89,13 @@ private PerJobMiniClusterFactory( return miniCluster .submitJob(jobGraph) + .thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> { + org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(submissionResult.getJobID(), + () -> miniCluster.getJobStatus(submissionResult.getJobID()).get(), + () -> miniCluster.requestJobResult(submissionResult.getJobID()).get(), + Thread.currentThread().getContextClassLoader()); Review comment: Are we sure that the context class loader is the right class loader here? ########## File path: flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java ########## @@ -111,4 +122,48 @@ public static void executeProgram( Thread.currentThread().setContextClassLoader(contextClassLoader); } } + + /** + * This method blocks until the job status is not INITIALIZING anymore. + * If the job is FAILED, it throws an CompletionException with the failure cause. + * @param jobStatusSupplier supplier returning the job status. + */ + public static void waitUntilJobInitializationFinished( + JobID jobID, + SupplierWithException<JobStatus, Exception> jobStatusSupplier, + SupplierWithException<JobResult, Exception> jobResultSupplier, + ClassLoader userCodeClassloader) + throws JobInitializationException { + LOG.debug("Wait until job initialization is finished"); + WaitStrategy waitStrategy = new ExponentialWaitStrategy(50, 2000); + try { + JobStatus status = jobStatusSupplier.get(); + long attempt = 0; + while (status == JobStatus.INITIALIZING) { + Thread.sleep(waitStrategy.sleepTime(attempt++)); + status = jobStatusSupplier.get(); + } + if (status == JobStatus.FAILED) { + JobResult result = jobResultSupplier.get(); + Optional<SerializedThrowable> throwable = result.getSerializedThrowable(); + if (throwable.isPresent()) { + Throwable t = throwable.get().deserializeError(userCodeClassloader); + t = ExceptionUtils.stripCompletionException(t); + if (t instanceof JobInitializationException) { + throw t; + } + } + } + } catch (JobInitializationException initializationException) { + throw initializationException; + } catch (Throwable throwable) { + ExceptionUtils.checkInterrupted(throwable); + throw new JobInitializationException(jobID, "Error while waiting for job to be initialized", throwable); + } + } + + public static <T> void waitUntilJobInitializationFinished(JobID id, ClusterClient<T> client, ClassLoader userCodeClassloader) throws + JobInitializationException { + waitUntilJobInitializationFinished(id, () -> client.getJobStatus(id).get(), () -> client.requestJobResult(id).get(), userCodeClassloader); Review comment: `requestJobResult` can lead to a shut down of an attached per job cluster because the attached per job cluster waits until the job result has been served in case an attached deployment. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.CompletableFuture; + +/** + * Abstraction used by the {@link Dispatcher} to manage jobs. + */ +public final class DispatcherJob implements AutoCloseableAsync { + + private final Logger log = LoggerFactory.getLogger(DispatcherJob.class); + + private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture; + private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + + private final long initializationTimestamp; + private final JobID jobId; + private final String jobName; + + private final Object lock = new Object(); + + // internal field to track job status during initialization. Is not updated anymore after + // job is initialized, cancelled or failed. + @GuardedBy("lock") + private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING; + + private enum DispatcherJobStatus { + // We are waiting for the JobManagerRunner to be initialized + INITIALIZING(JobStatus.INITIALIZING), + // JobManagerRunner is initialized + JOB_MANAGER_RUNNER_INITIALIZED(null), + // waiting for cancellation. We stay in this status until the job result future completed, + // then we consider the JobManager to be initialized. + CANCELLING(JobStatus.CANCELLING); + + @Nullable + private final JobStatus jobStatus; + + DispatcherJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + public JobStatus asJobStatus() { + if (jobStatus == null) { + throw new IllegalStateException("This state is not defined as a 'JobStatus'"); + } + return jobStatus; + } + } + + static DispatcherJob createFor( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, executionType); + } + + private DispatcherJob( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + this.jobManagerRunnerFuture = jobManagerRunnerFuture; + this.jobId = jobId; + this.jobName = jobName; + this.initializationTimestamp = System.currentTimeMillis(); + this.jobResultFuture = new CompletableFuture<>(); + + FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> { + // JM has been initialized, or the initialization failed + synchronized (lock) { + if (jobStatus != DispatcherJobStatus.CANCELLING) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + if (throwable == null) { + // Forward result future + FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture); + } else { // failure during initialization + if (executionType == Dispatcher.ExecutionType.RECOVERY) { + jobResultFuture.completeExceptionally(throwable); + } else { + jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob( + jobId, + jobName, + JobStatus.FAILED, + throwable, + initializationTimestamp)); + } + } + } + return null; + })); + } + + public CompletableFuture<ArchivedExecutionGraph> getResultFuture() { + return jobResultFuture; + } + + public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { + return requestJobStatus(timeout).thenApply(status -> { + int[] tasksPerState = new int[ExecutionState.values().length]; + synchronized (lock) { + return new JobDetails( + jobId, + jobName, + initializationTimestamp, + 0, + 0, + status, + 0, + tasksPerState, + 0); + } + }); + } + + /** + * Cancel job. + * A cancellation will be scheduled if the initialization is not completed. + * The returned future will complete exceptionally if the JobManagerRunner initialization failed. + */ + public CompletableFuture<Acknowledge> cancel(Time timeout) { + synchronized (lock) { + if (isInitialized()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout)); + } else { + log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId); + + // cancel job + jobManagerRunnerFuture + .thenCompose(JobManagerRunner::getJobMasterGateway) + .thenCompose(jobMasterGateway -> jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT)) + .whenComplete((ignored, cancelThrowable) -> { + if (cancelThrowable != null) { + log.warn("Cancellation of job {} failed", jobId, cancelThrowable); + } + }); + jobStatus = DispatcherJobStatus.CANCELLING; + jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> { + if (archivedExecutionGraph != null) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + })); + return jobResultFuture.thenApply(ignored -> Acknowledge.get()); + } + } + } + + public CompletableFuture<JobStatus> requestJobStatus(Time timeout) { + return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState); + } + + /** + * Returns a future completing to the ArchivedExecutionGraph of the job. + */ + public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) { + synchronized (lock) { + if (isInitialized()) { + if (jobResultFuture.isDone()) { // job is not running anymore + return jobResultFuture; + } + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob( + timeout)); + } else { + Preconditions.checkState(this.jobStatus == DispatcherJobStatus.INITIALIZING || jobStatus == DispatcherJobStatus.CANCELLING); + return CompletableFuture.completedFuture( + ArchivedExecutionGraph.createFromInitializingJob( + jobId, + jobName, + jobStatus.asJobStatus(), + null, + initializationTimestamp)); + } + } + } + + /** + * The job is initialized once the JobManager runner has been initialized. + * It is also initialized if the runner initialization failed, or of it has been + * canceled (and the cancellation is complete). + */ + public boolean isInitialized() { + synchronized (lock) { + return jobStatus == DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + } + + /** + * Returns the {@link JobMasterGateway} from the JobManagerRunner. + * This method will fail with an {@link IllegalStateException} if the job is initialized. + * The returned future will complete exceptionally if the JobManagerRunner initialization failed. Review comment: This sentence should go into the `@return ....` line. ########## File path: flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobInitializationException; +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap; +import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.DispatcherServices; +import org.apache.flink.runtime.dispatcher.DispatcherTest; +import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter; +import org.apache.flink.runtime.dispatcher.TestingDispatcher; +import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Test for the ClientUtils. + */ +public class ClientUtilsTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final Time TIMEOUT = Time.seconds(10L); + + private static TestingRpcService rpcService; + + @Rule + public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + + final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L); + + final TestingLeaderElectionService jobMasterLeaderElectionService = new TestingLeaderElectionService(); + + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + + private BlobServer blobServer; + private DefaultDispatcherBootstrap dispatcherBootstrap; + private Configuration configuration; + private ResourceManagerGateway resourceManagerGateway; + private ArchivedExecutionGraphStore archivedExecutionGraphStore; + private JobGraphWriter jobGraphWriter; + private DefaultJobManagerRunnerFactory jobManagerRunnerFactory; + + @Before + public void setUp() throws Exception { + haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); + + configuration = new Configuration(); + + configuration.setString( + BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(configuration, new VoidBlobStore()); + resourceManagerGateway = new TestingResourceManagerGateway(); + archivedExecutionGraphStore = new MemoryArchivedExecutionGraphStore(); + dispatcherBootstrap = new DefaultDispatcherBootstrap(Collections.emptyList()); + + jobGraphWriter = NoOpJobGraphWriter.INSTANCE; + jobManagerRunnerFactory = DefaultJobManagerRunnerFactory.INSTANCE; + } + + @BeforeClass + public static void setupClass() { + rpcService = new TestingRpcService(); + } + + @AfterClass + public static void teardownClass() throws Exception { + if (rpcService != null) { + RpcUtils.terminateRpcService(rpcService, TIMEOUT); + + rpcService = null; + } + } + + private TestingDispatcher createAndStartDispatcher() throws Exception { + final TestingDispatcher dispatcher = + new TestingDispatcher( + rpcService, + DispatcherId.generate(), + dispatcherBootstrap, + new DispatcherServices( + configuration, + haServices, + () -> CompletableFuture.completedFuture(resourceManagerGateway), + blobServer, + heartbeatServices, + archivedExecutionGraphStore, + testingFatalErrorHandlerResource.getFatalErrorHandler(), + VoidHistoryServerArchivist.INSTANCE, + null, + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), + jobGraphWriter, + jobManagerRunnerFactory)); + dispatcher.start(); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + return dispatcher; + } + + @Test + public void testWaitUntilJobInitializationFinished_throwsInitializationException() throws + Exception { + final JobVertex testVertex = new DispatcherTest.FailingInitializationJobVertex("testVertex"); + testVertex.setInvokableClass(NoOpInvokable.class); + + JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex); + + Dispatcher dispatcher = createAndStartDispatcher(); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + CommonTestUtils.assertThrows("Could not instantiate JobManager", JobInitializationException.class, () -> { + ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(), + () -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), + () -> dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(), + ClassLoader.getSystemClassLoader()); + return null; + }); + CommonTestUtils.assertThrows("Could not instantiate JobManager", ExecutionException.class, () -> { Review comment: Why does the shut down of the Dispatcher fail if the job submission failed? A submission failure is something which should not affect Flink itself because it can happen and is normal. ########## File path: flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobInitializationException; +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap; +import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.DispatcherServices; +import org.apache.flink.runtime.dispatcher.DispatcherTest; +import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter; +import org.apache.flink.runtime.dispatcher.TestingDispatcher; +import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Test for the ClientUtils. + */ +public class ClientUtilsTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final Time TIMEOUT = Time.seconds(10L); + + private static TestingRpcService rpcService; + + @Rule + public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource(); + + final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L); + + final TestingLeaderElectionService jobMasterLeaderElectionService = new TestingLeaderElectionService(); + + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + + private BlobServer blobServer; + private DefaultDispatcherBootstrap dispatcherBootstrap; + private Configuration configuration; + private ResourceManagerGateway resourceManagerGateway; + private ArchivedExecutionGraphStore archivedExecutionGraphStore; + private JobGraphWriter jobGraphWriter; + private DefaultJobManagerRunnerFactory jobManagerRunnerFactory; + + @Before + public void setUp() throws Exception { + haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, jobMasterLeaderElectionService); + haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService()); + + configuration = new Configuration(); + + configuration.setString( + BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + + blobServer = new BlobServer(configuration, new VoidBlobStore()); + resourceManagerGateway = new TestingResourceManagerGateway(); + archivedExecutionGraphStore = new MemoryArchivedExecutionGraphStore(); + dispatcherBootstrap = new DefaultDispatcherBootstrap(Collections.emptyList()); + + jobGraphWriter = NoOpJobGraphWriter.INSTANCE; + jobManagerRunnerFactory = DefaultJobManagerRunnerFactory.INSTANCE; + } + + @BeforeClass + public static void setupClass() { + rpcService = new TestingRpcService(); + } + + @AfterClass + public static void teardownClass() throws Exception { + if (rpcService != null) { + RpcUtils.terminateRpcService(rpcService, TIMEOUT); + + rpcService = null; + } + } + + private TestingDispatcher createAndStartDispatcher() throws Exception { + final TestingDispatcher dispatcher = + new TestingDispatcher( + rpcService, + DispatcherId.generate(), + dispatcherBootstrap, + new DispatcherServices( + configuration, + haServices, + () -> CompletableFuture.completedFuture(resourceManagerGateway), + blobServer, + heartbeatServices, + archivedExecutionGraphStore, + testingFatalErrorHandlerResource.getFatalErrorHandler(), + VoidHistoryServerArchivist.INSTANCE, + null, + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), + jobGraphWriter, + jobManagerRunnerFactory)); + dispatcher.start(); + jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); + return dispatcher; + } + + @Test + public void testWaitUntilJobInitializationFinished_throwsInitializationException() throws + Exception { + final JobVertex testVertex = new DispatcherTest.FailingInitializationJobVertex("testVertex"); + testVertex.setInvokableClass(NoOpInvokable.class); + + JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex); + + Dispatcher dispatcher = createAndStartDispatcher(); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + CommonTestUtils.assertThrows("Could not instantiate JobManager", JobInitializationException.class, () -> { + ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(), + () -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), + () -> dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(), + ClassLoader.getSystemClassLoader()); + return null; + }); + CommonTestUtils.assertThrows("Could not instantiate JobManager", ExecutionException.class, () -> { + dispatcher.closeAsync().get(); + return null; + }); + } + + @Test + public void testWaitUntilJobInitializationFinished_doesNotThrowRuntimeException() throws Exception { + final JobVertex testVertex = new JobVertex("testVertex"); + testVertex.setInvokableClass(NoOpInvokable.class); + + JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex); + + TestingDispatcher dispatcher = createAndStartDispatcher(); + DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + + dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); + + // we don't expect an exception here + ClientUtils.waitUntilJobInitializationFinished(jobGraph.getJobID(), + () -> dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT).get(), + () -> dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT).get(), + ClassLoader.getSystemClassLoader()); + + // now "fail" the job + dispatcher.completeJobExecution( + ArchivedExecutionGraph.createFromInitializingJob(TEST_JOB_ID, "test", JobStatus.FAILED, new RuntimeException("yolo"), 1337)); + // ensure it is failed + org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> { + JobStatus status = dispatcherGateway.requestJobStatus( + jobGraph.getJobID(), + TIMEOUT).get(); + return status == JobStatus.FAILED; + }, Deadline.fromNow( Review comment: nit ```suggestion org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(() -> dispatcherGateway.requestJobStatus( jobGraph.getJobID(), TIMEOUT).get() == JobStatus.FAILED, Deadline.fromNow( ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.CompletableFuture; + +/** + * Abstraction used by the {@link Dispatcher} to manage jobs. + */ +public final class DispatcherJob implements AutoCloseableAsync { + + private final Logger log = LoggerFactory.getLogger(DispatcherJob.class); + + private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture; + private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + + private final long initializationTimestamp; + private final JobID jobId; + private final String jobName; + + private final Object lock = new Object(); + + // internal field to track job status during initialization. Is not updated anymore after + // job is initialized, cancelled or failed. + @GuardedBy("lock") + private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING; + + private enum DispatcherJobStatus { + // We are waiting for the JobManagerRunner to be initialized + INITIALIZING(JobStatus.INITIALIZING), + // JobManagerRunner is initialized + JOB_MANAGER_RUNNER_INITIALIZED(null), + // waiting for cancellation. We stay in this status until the job result future completed, + // then we consider the JobManager to be initialized. + CANCELLING(JobStatus.CANCELLING); + + @Nullable + private final JobStatus jobStatus; + + DispatcherJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + public JobStatus asJobStatus() { + if (jobStatus == null) { + throw new IllegalStateException("This state is not defined as a 'JobStatus'"); + } + return jobStatus; + } + } + + static DispatcherJob createFor( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, executionType); + } + + private DispatcherJob( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + this.jobManagerRunnerFuture = jobManagerRunnerFuture; + this.jobId = jobId; + this.jobName = jobName; + this.initializationTimestamp = System.currentTimeMillis(); + this.jobResultFuture = new CompletableFuture<>(); + + FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> { + // JM has been initialized, or the initialization failed + synchronized (lock) { + if (jobStatus != DispatcherJobStatus.CANCELLING) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + if (throwable == null) { + // Forward result future + FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture); + } else { // failure during initialization + if (executionType == Dispatcher.ExecutionType.RECOVERY) { + jobResultFuture.completeExceptionally(throwable); + } else { + jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob( + jobId, + jobName, + JobStatus.FAILED, + throwable, + initializationTimestamp)); + } + } + } + return null; + })); + } + + public CompletableFuture<ArchivedExecutionGraph> getResultFuture() { + return jobResultFuture; + } + + public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { + return requestJobStatus(timeout).thenApply(status -> { + int[] tasksPerState = new int[ExecutionState.values().length]; + synchronized (lock) { + return new JobDetails( + jobId, + jobName, + initializationTimestamp, + 0, + 0, + status, + 0, + tasksPerState, + 0); + } + }); + } + + /** + * Cancel job. + * A cancellation will be scheduled if the initialization is not completed. + * The returned future will complete exceptionally if the JobManagerRunner initialization failed. + */ + public CompletableFuture<Acknowledge> cancel(Time timeout) { + synchronized (lock) { + if (isInitialized()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout)); + } else { + log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId); + + // cancel job + jobManagerRunnerFuture + .thenCompose(JobManagerRunner::getJobMasterGateway) + .thenCompose(jobMasterGateway -> jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT)) + .whenComplete((ignored, cancelThrowable) -> { + if (cancelThrowable != null) { + log.warn("Cancellation of job {} failed", jobId, cancelThrowable); + } + }); + jobStatus = DispatcherJobStatus.CANCELLING; + jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> { + if (archivedExecutionGraph != null) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + })); + return jobResultFuture.thenApply(ignored -> Acknowledge.get()); Review comment: The returned future behaves differently depending on `isInitialized`. In this case here we wait for the job to have completely finished. In the if branch, we only wait for the cancel call to be acknowledged. I'd suggest to do the same here. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.CompletableFuture; + +/** + * Abstraction used by the {@link Dispatcher} to manage JobManagers, in + * particular during initialization. + * While a job is initializing, the JobMasterGateway is not available. A small subset + * of the methods of the JobMasterGateway necessary during initialization are provided + * by this class (job details, cancel). + */ +public final class DispatcherJob implements AutoCloseableAsync { + + private final Logger log = LoggerFactory.getLogger(DispatcherJob.class); + + private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture; + private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture; + + private final long initializationTimestamp; + private final JobID jobId; + private final String jobName; + + private final Object lock = new Object(); + + // internal field to track job status during initialization. Is not updated anymore after + // job is initialized, cancelled or failed. + @GuardedBy("lock") + private JobStatus jobStatus = JobStatus.INITIALIZING; + + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + + private enum SubmissionType { + INITIAL, RECOVERY + } + + static DispatcherJob createForSubmission( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + long initializationTimestamp) { + return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.INITIAL); + } + + static DispatcherJob createForRecovery( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + long initializationTimestamp) { + return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, initializationTimestamp, SubmissionType.RECOVERY); + } + + private DispatcherJob( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + long initializationTimestamp, + SubmissionType submissionType) { + this.jobManagerRunnerFuture = jobManagerRunnerFuture; + this.jobId = jobId; + this.jobName = jobName; + this.initializationTimestamp = initializationTimestamp; + this.jobResultFuture = new CompletableFuture<>(); + + FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> { + // JM has been initialized, or the initialization failed + synchronized (lock) { + if (throwable == null) { + // Forward result future + FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture); + + if (jobStatus == JobStatus.CANCELLING) { + log.info("Cancellation during initialization has been requested for job {}. Initialization completed, cancelling job.", jobId); + + // cancel job + jobManagerRunner + .getJobMasterGateway() + .thenCompose(gw -> gw.cancel(RpcUtils.INF_TIMEOUT)) + .whenComplete((ignored, cancelThrowable) -> { + if (cancelThrowable != null) { + log.warn("Cancellation of job {} failed", jobId, cancelThrowable); + } + }); + + // cancellation will eventually complete the jobResultFuture + jobResultFuture.whenComplete((archivedExecutionGraph, resultThrowable) -> { + synchronized (lock) { + if (resultThrowable == null) { + jobStatus = archivedExecutionGraph.getState(); + } else { + jobStatus = JobStatus.FAILED; + } + } + }); + } else { + jobStatus = JobStatus.RUNNING; // this status should never be exposed from the DispatcherJob. Only used internally for tracking running state + } + } else { // failure during initialization + if (submissionType == SubmissionType.RECOVERY) { + jobResultFuture.completeExceptionally(throwable); + } else { + jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob( + jobId, + jobName, + throwable, + JobStatus.FAILED, + initializationTimestamp)); + } + } + } + return null; + })); + } + + public CompletableFuture<ArchivedExecutionGraph> getResultFuture() { + return jobResultFuture; + } + + public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { + synchronized (lock) { + if (isRunning()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobDetails( + timeout)); + } else { + int[] tasksPerState = new int[ExecutionState.values().length]; + return CompletableFuture.completedFuture(new JobDetails( + jobId, + jobName, + initializationTimestamp, + 0, + 0, + jobStatus, + 0, + tasksPerState, + 0)); + } + } + } + + public CompletableFuture<Acknowledge> cancel(Time timeout) { + synchronized (lock) { + if (isRunning()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout)); + } else { + log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId); + jobStatus = JobStatus.CANCELLING; + return jobResultFuture.thenApply(ignored -> Acknowledge.get()); + } + } + } + + public CompletableFuture<JobStatus> requestJobStatus(Time timeout) { + synchronized (lock) { + if (isRunning()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJobStatus(timeout)); + } else { + return CompletableFuture.completedFuture(jobStatus); + } + } + } + + public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) { + synchronized (lock) { + if (isRunning()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob(timeout)); + } else { + return CompletableFuture.supplyAsync(() -> ArchivedExecutionGraph.createFromInitializingJob(jobId, jobName, null, jobStatus, initializationTimestamp)); + } + } + } + + public boolean isRunning() { + synchronized (lock) { + return jobStatus == JobStatus.RUNNING; + } + } + + public CompletableFuture<JobMasterGateway> getJobMasterGateway() { Review comment: In JavaDocs on usually uses `@throw IllegalStateException is thrown if the job is not initialized` after the `@return` tag. ########## File path: flink-clients/src/test/java/org/apache/flink/client/ClientUtilsTest.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobInitializationException; +import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.DefaultDispatcherBootstrap; +import org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; +import org.apache.flink.runtime.dispatcher.DispatcherServices; +import org.apache.flink.runtime.dispatcher.DispatcherTest; +import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore; +import org.apache.flink.runtime.dispatcher.NoOpJobGraphWriter; +import org.apache.flink.runtime.dispatcher.TestingDispatcher; +import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Test for the ClientUtils. + */ +public class ClientUtilsTest extends TestLogger { + + @ClassRule Review comment: Yes, the setup is quite manual. There is `MiniClusterResource` which spawns a `MiniCluster` as an external resource. I actually believe that we don't really need to start a `MiniCluster` and submit a job. `ClientUtils. waitUntilJobInitializationFinished` is abstracted so well that it can be tested in isolation. Just provide the respective supplier which produce a failure or not to the utility and then test its behaviour. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java ########## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.concurrent.CompletableFuture; + +/** + * Abstraction used by the {@link Dispatcher} to manage jobs. + */ +public final class DispatcherJob implements AutoCloseableAsync { + + private final Logger log = LoggerFactory.getLogger(DispatcherJob.class); + + private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture; + private final CompletableFuture<ArchivedExecutionGraph> jobResultFuture; + private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); + + private final long initializationTimestamp; + private final JobID jobId; + private final String jobName; + + private final Object lock = new Object(); + + // internal field to track job status during initialization. Is not updated anymore after + // job is initialized, cancelled or failed. + @GuardedBy("lock") + private DispatcherJobStatus jobStatus = DispatcherJobStatus.INITIALIZING; + + private enum DispatcherJobStatus { + // We are waiting for the JobManagerRunner to be initialized + INITIALIZING(JobStatus.INITIALIZING), + // JobManagerRunner is initialized + JOB_MANAGER_RUNNER_INITIALIZED(null), + // waiting for cancellation. We stay in this status until the job result future completed, + // then we consider the JobManager to be initialized. + CANCELLING(JobStatus.CANCELLING); + + @Nullable + private final JobStatus jobStatus; + + DispatcherJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + + public JobStatus asJobStatus() { + if (jobStatus == null) { + throw new IllegalStateException("This state is not defined as a 'JobStatus'"); + } + return jobStatus; + } + } + + static DispatcherJob createFor( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + return new DispatcherJob(jobManagerRunnerFuture, jobId, jobName, executionType); + } + + private DispatcherJob( + CompletableFuture<JobManagerRunner> jobManagerRunnerFuture, + JobID jobId, + String jobName, + Dispatcher.ExecutionType executionType) { + this.jobManagerRunnerFuture = jobManagerRunnerFuture; + this.jobId = jobId; + this.jobName = jobName; + this.initializationTimestamp = System.currentTimeMillis(); + this.jobResultFuture = new CompletableFuture<>(); + + FutureUtils.assertNoException(this.jobManagerRunnerFuture.handle((jobManagerRunner, throwable) -> { + // JM has been initialized, or the initialization failed + synchronized (lock) { + if (jobStatus != DispatcherJobStatus.CANCELLING) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + if (throwable == null) { + // Forward result future + FutureUtils.forward(jobManagerRunner.getResultFuture(), jobResultFuture); + } else { // failure during initialization + if (executionType == Dispatcher.ExecutionType.RECOVERY) { + jobResultFuture.completeExceptionally(throwable); + } else { + jobResultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob( + jobId, + jobName, + JobStatus.FAILED, + throwable, + initializationTimestamp)); + } + } + } + return null; + })); + } + + public CompletableFuture<ArchivedExecutionGraph> getResultFuture() { + return jobResultFuture; + } + + public CompletableFuture<JobDetails> requestJobDetails(Time timeout) { + return requestJobStatus(timeout).thenApply(status -> { + int[] tasksPerState = new int[ExecutionState.values().length]; + synchronized (lock) { + return new JobDetails( + jobId, + jobName, + initializationTimestamp, + 0, + 0, + status, + 0, + tasksPerState, + 0); + } + }); + } + + /** + * Cancel job. + * A cancellation will be scheduled if the initialization is not completed. + * The returned future will complete exceptionally if the JobManagerRunner initialization failed. + */ + public CompletableFuture<Acknowledge> cancel(Time timeout) { + synchronized (lock) { + if (isInitialized()) { + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.cancel(timeout)); + } else { + log.info("Cancellation during initialization requested for job {}. Job will be cancelled once JobManager has been initialized.", jobId); + + // cancel job + jobManagerRunnerFuture + .thenCompose(JobManagerRunner::getJobMasterGateway) + .thenCompose(jobMasterGateway -> jobMasterGateway.cancel(RpcUtils.INF_TIMEOUT)) + .whenComplete((ignored, cancelThrowable) -> { + if (cancelThrowable != null) { + log.warn("Cancellation of job {} failed", jobId, cancelThrowable); + } + }); + jobStatus = DispatcherJobStatus.CANCELLING; + jobResultFuture.whenComplete(((archivedExecutionGraph, throwable) -> { + if (archivedExecutionGraph != null) { + jobStatus = DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + })); + return jobResultFuture.thenApply(ignored -> Acknowledge.get()); + } + } + } + + public CompletableFuture<JobStatus> requestJobStatus(Time timeout) { + return requestJob(timeout).thenApply(ArchivedExecutionGraph::getState); + } + + /** + * Returns a future completing to the ArchivedExecutionGraph of the job. + */ + public CompletableFuture<ArchivedExecutionGraph> requestJob(Time timeout) { + synchronized (lock) { + if (isInitialized()) { + if (jobResultFuture.isDone()) { // job is not running anymore + return jobResultFuture; + } + return getJobMasterGateway().thenCompose(jobMasterGateway -> jobMasterGateway.requestJob( + timeout)); + } else { + Preconditions.checkState(this.jobStatus == DispatcherJobStatus.INITIALIZING || jobStatus == DispatcherJobStatus.CANCELLING); + return CompletableFuture.completedFuture( + ArchivedExecutionGraph.createFromInitializingJob( + jobId, + jobName, + jobStatus.asJobStatus(), + null, + initializationTimestamp)); + } + } + } + + /** + * The job is initialized once the JobManager runner has been initialized. + * It is also initialized if the runner initialization failed, or of it has been + * canceled (and the cancellation is complete). + */ + public boolean isInitialized() { + synchronized (lock) { + return jobStatus == DispatcherJobStatus.JOB_MANAGER_RUNNER_INITIALIZED; + } + } + + /** + * Returns the {@link JobMasterGateway} from the JobManagerRunner. + * This method will fail with an {@link IllegalStateException} if the job is initialized. Review comment: ```suggestion * This method will fail with an {@link IllegalStateException} if the job is not initialized. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org