This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 556dd6fbc00f713e1f2f10e441e1121c50fadad1 Author: Yi Zhang <[email protected]> AuthorDate: Thu Dec 18 14:05:08 2025 +0800 [FLINK-38758][runtime] Use JobStatus instead of ApplicationStatus in JobResult --- .../ApplicationDispatcherBootstrap.java | 5 +-- .../UnsuccessfulExecutionException.java | 26 +++++++++------ .../client/program/rest/RestClusterClient.java | 3 +- .../ApplicationDispatcherBootstrapITCase.java | 8 ++--- .../ApplicationDispatcherBootstrapTest.java | 31 +++++++---------- .../application/JobStatusPollingUtilsTest.java | 5 ++- .../client/program/rest/RestClusterClientTest.java | 7 ++-- .../cleanup/CheckpointResourcesCleanupRunner.java | 2 +- .../apache/flink/runtime/jobmaster/JobResult.java | 39 +++++++++++----------- .../rest/messages/json/JobResultDeserializer.java | 14 +++++--- .../rest/messages/json/JobResultSerializer.java | 4 ++- .../flink/runtime/dispatcher/DispatcherTest.java | 14 ++++---- .../CheckpointResourcesCleanupRunnerTest.java | 17 ++++------ .../ZooKeeperDefaultDispatcherRunnerTest.java | 4 +-- ...FileSystemJobResultStoreFileOperationsTest.java | 4 +-- .../runtime/jobmanager/BlobsCleanupITCase.java | 6 ++-- .../JobMasterServiceLeadershipRunnerTest.java | 4 +-- .../LeaderChangeClusterComponentsTest.java | 3 +- .../job/JobExecutionResultResponseBodyTest.java | 10 +++--- .../runtime/testutils/TestingJobResultStore.java | 14 ++++---- .../checkpointing/ChangelogRecoveryITCaseBase.java | 4 +-- .../environment/RemoteStreamEnvironmentTest.java | 3 +- 22 files changed, 107 insertions(+), 120 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 97336c7b265..4290dcd4748 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -201,7 +201,8 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { private Optional<ApplicationStatus> extractApplicationStatus(Throwable t) { final Optional<UnsuccessfulExecutionException> maybeException = ExceptionUtils.findThrowable(t, UnsuccessfulExecutionException.class); - return maybeException.map(UnsuccessfulExecutionException::getStatus); + return maybeException.map( + exception -> ApplicationStatus.fromJobStatus(exception.getStatus().orElse(null))); } private CompletableFuture<Void> fixJobIdAndRunApplicationAsync( @@ -377,7 +378,7 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { exception -> new JobResult.Builder() .jobId(jobId) - .applicationStatus(ApplicationStatus.UNKNOWN) + .jobStatus(null) .netRuntime(Long.MAX_VALUE) .build()); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java index bc360d5535c..b828c0e5f60 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/UnsuccessfulExecutionException.java @@ -20,30 +20,34 @@ package org.apache.flink.client.deployment.application; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobmaster.JobResult; +import javax.annotation.Nullable; + +import java.util.Optional; + import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; -/** Exception that signals the failure of an application with a given {@link ApplicationStatus}. */ +/** Exception that signals the failure of a job with a given {@link JobStatus}. */ @Internal public class UnsuccessfulExecutionException extends JobExecutionException { - private final ApplicationStatus status; + @Nullable private final JobStatus status; public UnsuccessfulExecutionException( final JobID jobID, - final ApplicationStatus status, + @Nullable final JobStatus status, final String message, final Throwable cause) { super(jobID, message, cause); - this.status = checkNotNull(status); + this.status = status; } - public ApplicationStatus getStatus() { - return status; + public Optional<JobStatus> getStatus() { + return Optional.ofNullable(status); } public static UnsuccessfulExecutionException fromJobResult( @@ -64,13 +68,13 @@ public class UnsuccessfulExecutionException extends JobExecutionException { } catch (Throwable t) { final JobID jobID = result.getJobId(); - final ApplicationStatus status = result.getApplicationStatus(); + final JobStatus status = result.getJobStatus().orElse(null); - return status == ApplicationStatus.CANCELED || status == ApplicationStatus.FAILED + return status == JobStatus.CANCELED || status == JobStatus.FAILED ? new UnsuccessfulExecutionException( - jobID, status, "Application Status: " + status.name(), t) + jobID, status, "Job Status: " + status.name(), t) : new UnsuccessfulExecutionException( - jobID, ApplicationStatus.UNKNOWN, "Job failed for unknown reason.", t); + jobID, null, "Job failed for unknown reason.", t); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 496612406b5..3cfe7dd7341 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -38,7 +38,6 @@ import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobSubmissionException; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServices; import org.apache.flink.runtime.highavailability.ClientHighAvailabilityServicesFactory; import org.apache.flink.runtime.highavailability.DefaultClientHighAvailabilityServicesFactory; @@ -1027,7 +1026,7 @@ public class RestClusterClient<T> implements ClusterClient<T> { }) .thenApply( jobResult -> { - if (jobResult.getApplicationStatus() == ApplicationStatus.UNKNOWN) { + if (jobResult.getJobStatus().isEmpty()) { throw new JobStateUnknownException( String.format("Result for Job %s is UNKNOWN", jobId)); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java index 3e0b3953c45..b5fb78bf94d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java @@ -33,7 +33,6 @@ import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory; import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; @@ -131,9 +130,7 @@ class ApplicationDispatcherBootstrapITCase { final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobId); haServices.revokeDispatcherLeadership(); // make sure the leadership is revoked to avoid race conditions - assertThat(firstJobResult.get()) - .extracting(JobResult::getApplicationStatus) - .isEqualTo(ApplicationStatus.UNKNOWN); + assertThat(firstJobResult.get().getJobStatus().isPresent()).isFalse(); haServices.grantDispatcherLeadership(); // job is suspended, wait until it's running @@ -145,8 +142,7 @@ class ApplicationDispatcherBootstrapITCase { // and wait for it to actually finish final JobResult secondJobResult = cluster.requestJobResult(jobId).get(); assertThat(secondJobResult.isSuccess()).isTrue(); - assertThat(secondJobResult.getApplicationStatus()) - .isEqualTo(ApplicationStatus.SUCCEEDED); + assertThat(secondJobResult.getJobStatus().orElse(null)).isEqualTo(JobStatus.FINISHED); // the cluster should shut down automatically once the application completes awaitClusterStopped(cluster); diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java index 6bf77bfff84..47c5e13fe7b 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java @@ -55,6 +55,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -260,7 +262,7 @@ class ApplicationDispatcherBootstrapTest { final CompletableFuture<Void> applicationFuture = runApplication(dispatcherBuilder, 2); final UnsuccessfulExecutionException exception = assertException(applicationFuture, UnsuccessfulExecutionException.class); - assertThat(exception.getStatus()).isEqualTo(ApplicationStatus.FAILED); + assertThat(exception.getStatus().orElse(null)).isEqualTo(JobStatus.FAILED); } @Test @@ -435,8 +437,7 @@ class ApplicationDispatcherBootstrapTest { .setRequestJobResultFunction( jobId -> CompletableFuture.completedFuture( - createJobResult( - jobId, ApplicationStatus.SUCCEEDED))) + createJobResult(jobId, JobStatus.FINISHED))) .setClusterShutdownFunction(status -> shutdownFunction.get()); // we're "listening" on this to be completed to verify that the error handler is called. @@ -820,8 +821,7 @@ class ApplicationDispatcherBootstrapTest { jobId -> submitted.thenApply( ignored -> - createJobResult( - jobId, ApplicationStatus.FAILED))) + createJobResult(jobId, JobStatus.FAILED))) .build(); final ApplicationDispatcherBootstrap bootstrap = @@ -886,10 +886,7 @@ class ApplicationDispatcherBootstrapTest { jobId -> CompletableFuture.completedFuture(jobStatus)); if (jobStatus != JobStatus.RUNNING) { builder.setRequestJobResultFunction( - jobID -> - CompletableFuture.completedFuture( - createJobResult( - jobID, ApplicationStatus.fromJobStatus(jobStatus)))); + jobID -> CompletableFuture.completedFuture(createJobResult(jobID, jobStatus))); } return builder; } @@ -979,25 +976,21 @@ class ApplicationDispatcherBootstrapTest { } private static JobResult createFailedJobResult(final JobID jobId) { - return createJobResult(jobId, ApplicationStatus.FAILED); + return createJobResult(jobId, JobStatus.FAILED); } private static JobResult createUnknownJobResult(final JobID jobId) { - return createJobResult(jobId, ApplicationStatus.UNKNOWN); + return createJobResult(jobId, null); } private static JobResult createJobResult( - final JobID jobID, final ApplicationStatus applicationStatus) { + final JobID jobID, @Nullable final JobStatus jobStatus) { JobResult.Builder builder = - new JobResult.Builder() - .jobId(jobID) - .netRuntime(2L) - .applicationStatus(applicationStatus); - if (applicationStatus == ApplicationStatus.CANCELED) { + new JobResult.Builder().jobId(jobID).netRuntime(2L).jobStatus(jobStatus); + if (jobStatus == JobStatus.CANCELED) { builder.serializedThrowable( new SerializedThrowable(new JobCancellationException(jobID, "Hello", null))); - } else if (applicationStatus == ApplicationStatus.FAILED - || applicationStatus == ApplicationStatus.UNKNOWN) { + } else if (jobStatus == JobStatus.FAILED || jobStatus == null) { builder.serializedThrowable( new SerializedThrowable(new JobExecutionException(jobID, "bla bla bla"))); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java index 087829f7ce5..ed057bf3289 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/JobStatusPollingUtilsTest.java @@ -20,7 +20,6 @@ package org.apache.flink.client.deployment.application; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.SerializedThrowable; @@ -164,7 +163,7 @@ class JobStatusPollingUtilsTest { return new JobResult.Builder() .jobId(jobId) .netRuntime(2L) - .applicationStatus(ApplicationStatus.FAILED) + .jobStatus(JobStatus.FAILED) .serializedThrowable(new SerializedThrowable(new Exception("bla bla bla"))) .build(); } @@ -173,7 +172,7 @@ class JobStatusPollingUtilsTest { return new JobResult.Builder() .jobId(jobId) .netRuntime(2L) - .applicationStatus(ApplicationStatus.SUCCEEDED) + .jobStatus(JobStatus.FINISHED) .build(); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 27e1835b263..71e00f38d55 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -35,7 +35,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobSubmissionException; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotSharingGroupId; @@ -880,7 +879,7 @@ class RestClusterClientTest { // On an UNKNOWN JobResult it should be retried JobExecutionResultResponseBody.created( new JobResult.Builder() - .applicationStatus(ApplicationStatus.UNKNOWN) + .jobStatus(null) .jobId(jobId) .netRuntime(Long.MAX_VALUE) .accumulatorResults( @@ -891,7 +890,7 @@ class RestClusterClientTest { .build()), JobExecutionResultResponseBody.created( new JobResult.Builder() - .applicationStatus(ApplicationStatus.SUCCEEDED) + .jobStatus(JobStatus.FINISHED) .jobId(jobId) .netRuntime(Long.MAX_VALUE) .accumulatorResults( @@ -902,7 +901,7 @@ class RestClusterClientTest { .build()), JobExecutionResultResponseBody.created( new JobResult.Builder() - .applicationStatus(ApplicationStatus.FAILED) + .jobStatus(JobStatus.FAILED) .jobId(jobId) .netRuntime(Long.MAX_VALUE) .serializedThrowable( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java index 529fd39d740..2aaa70876cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java @@ -216,7 +216,7 @@ public class CheckpointResourcesCleanupRunner implements JobManagerRunner { } private static JobStatus getJobStatus(JobResult jobResult) { - return jobResult.getApplicationStatus().deriveJobStatus(); + return jobResult.getJobStatus().orElseThrow(); } private static ExecutionGraphInfo generateExecutionGraphInfo( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java index cd8bd007fb1..b385b28da39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java @@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.Dispatcher; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; @@ -57,7 +56,8 @@ public class JobResult implements Serializable { private final JobID jobId; - private final ApplicationStatus applicationStatus; + /** Stores the job status, null if unknown. */ + @Nullable private final JobStatus jobStatus; private final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults; @@ -68,15 +68,18 @@ public class JobResult implements Serializable { private JobResult( final JobID jobId, - final ApplicationStatus applicationStatus, + @Nullable final JobStatus jobStatus, final Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults, final long netRuntime, @Nullable final SerializedThrowable serializedThrowable) { checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0"); + checkArgument( + jobStatus == null || jobStatus.isGloballyTerminalState(), + "jobStatus must be globally terminal or unknow(null)"); this.jobId = requireNonNull(jobId); - this.applicationStatus = requireNonNull(applicationStatus); + this.jobStatus = jobStatus; this.accumulatorResults = requireNonNull(accumulatorResults); this.netRuntime = netRuntime; this.serializedThrowable = serializedThrowable; @@ -84,16 +87,16 @@ public class JobResult implements Serializable { /** Returns {@code true} if the job finished successfully. */ public boolean isSuccess() { - return applicationStatus == ApplicationStatus.SUCCEEDED - || (applicationStatus == ApplicationStatus.UNKNOWN && serializedThrowable == null); + return jobStatus == JobStatus.FINISHED + || (jobStatus == null && serializedThrowable == null); } public JobID getJobId() { return jobId; } - public ApplicationStatus getApplicationStatus() { - return applicationStatus; + public Optional<JobStatus> getJobStatus() { + return Optional.ofNullable(jobStatus); } public Map<String, SerializedValue<OptionalFailure<Object>>> getAccumulatorResults() { @@ -124,7 +127,7 @@ public class JobResult implements Serializable { */ public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) throws JobExecutionException, IOException, ClassNotFoundException { - if (applicationStatus == ApplicationStatus.SUCCEEDED) { + if (jobStatus == JobStatus.FINISHED) { return new JobExecutionResult( jobId, netRuntime, @@ -140,17 +143,15 @@ public class JobResult implements Serializable { final JobExecutionException exception; - if (applicationStatus == ApplicationStatus.FAILED) { + if (jobStatus == JobStatus.FAILED) { exception = new JobExecutionException(jobId, "Job execution failed.", cause); - } else if (applicationStatus == ApplicationStatus.CANCELED) { + } else if (jobStatus == JobStatus.CANCELED) { exception = new JobCancellationException(jobId, "Job was cancelled.", cause); } else { exception = new JobExecutionException( jobId, - "Job completed with illegal application status: " - + applicationStatus - + '.', + "Job completed with illegal status: " + jobStatus + '.', cause); } @@ -164,7 +165,7 @@ public class JobResult implements Serializable { private JobID jobId; - private ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN; + private JobStatus jobStatus; private Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults; @@ -177,8 +178,8 @@ public class JobResult implements Serializable { return this; } - public Builder applicationStatus(final ApplicationStatus applicationStatus) { - this.applicationStatus = applicationStatus; + public Builder jobStatus(final JobStatus jobStatus) { + this.jobStatus = jobStatus; return this; } @@ -201,7 +202,7 @@ public class JobResult implements Serializable { public JobResult build() { return new JobResult( jobId, - applicationStatus, + jobStatus, accumulatorResults == null ? Collections.emptyMap() : accumulatorResults, netRuntime, serializedThrowable); @@ -233,7 +234,7 @@ public class JobResult implements Serializable { final JobResult.Builder builder = new JobResult.Builder(); builder.jobId(jobId); - builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState())); + builder.jobStatus(jobStatus.isGloballyTerminalState() ? jobStatus : null); final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java index 5eddeaf9b09..3ebd2aa6590 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rest.messages.json; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.OptionalFailure; @@ -70,7 +71,7 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> { public JobResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException { JobID jobId = null; - ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN; + JobStatus jobStatus = null; long netRuntime = -1; SerializedThrowable serializedThrowable = null; Map<String, SerializedValue<OptionalFailure<Object>>> accumulatorResults = null; @@ -90,8 +91,13 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> { break; case JobResultSerializer.FIELD_NAME_APPLICATION_STATUS: assertNextToken(p, JsonToken.VALUE_STRING); - applicationStatus = - ApplicationStatus.valueOf(p.getValueAsString().toUpperCase()); + try { + jobStatus = + ApplicationStatus.valueOf(p.getValueAsString().toUpperCase()) + .deriveJobStatus(); + } catch (UnsupportedOperationException e) { + // jobStatus = null to indicate that the job status is unknown + } break; case JobResultSerializer.FIELD_NAME_NET_RUNTIME: assertNextToken(p, JsonToken.VALUE_NUMBER_INT); @@ -113,7 +119,7 @@ public class JobResultDeserializer extends StdDeserializer<JobResult> { try { return new JobResult.Builder() .jobId(jobId) - .applicationStatus(applicationStatus) + .jobStatus(jobStatus) .netRuntime(netRuntime) .accumulatorResults(accumulatorResults) .serializedThrowable(serializedThrowable) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java index 33e375c89ee..562e340c249 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest.messages.json; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedThrowable; @@ -78,8 +79,9 @@ public class JobResultSerializer extends StdSerializer<JobResult> { gen.writeFieldName(FIELD_NAME_JOB_ID); jobIdSerializer.serialize(result.getJobId(), gen, provider); + // use application status to maintain backward compatibility gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS); - gen.writeString(result.getApplicationStatus().name()); + gen.writeString(ApplicationStatus.fromJobStatus(result.getJobStatus().orElse(null)).name()); gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS); gen.writeStartObject(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index a6325b252ff..ab25cf02dd0 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -221,8 +221,7 @@ public class DispatcherTest extends AbstractDispatcherTest { @Test public void testDuplicateJobSubmissionWithGloballyTerminatedButDirtyJob() throws Exception { final JobResult jobResult = - TestingJobResultStore.createJobResult( - jobGraph.getJobID(), ApplicationStatus.SUCCEEDED); + TestingJobResultStore.createJobResult(jobGraph.getJobID(), JobStatus.FINISHED); haServices.getJobResultStore().createDirtyResultAsync(new JobResultEntry(jobResult)).get(); assertDuplicateJobSubmission(); } @@ -230,8 +229,7 @@ public class DispatcherTest extends AbstractDispatcherTest { @Test public void testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() throws Exception { final JobResult jobResult = - TestingJobResultStore.createJobResult( - jobGraph.getJobID(), ApplicationStatus.SUCCEEDED); + TestingJobResultStore.createJobResult(jobGraph.getJobID(), JobStatus.FINISHED); haServices.getJobResultStore().createDirtyResultAsync(new JobResultEntry(jobResult)).get(); haServices.getJobResultStore().markResultAsCleanAsync(jobGraph.getJobID()).get(); @@ -450,8 +448,8 @@ public class DispatcherTest extends AbstractDispatcherTest { assertThatFuture(dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT)) .eventuallySucceeds() - .extracting(JobResult::getApplicationStatus) - .isEqualTo(ApplicationStatus.CANCELED); + .extracting(jobResult -> jobResult.getJobStatus().orElse(null)) + .isEqualTo(JobStatus.CANCELED); } @Test @@ -910,7 +908,7 @@ public class DispatcherTest extends AbstractDispatcherTest { Collections.singleton( new JobResult.Builder() .jobId(jobIdOfRecoveredDirtyJobs) - .applicationStatus(ApplicationStatus.SUCCEEDED) + .jobStatus(JobStatus.FINISHED) .netRuntime(1) .build())) .setDispatcherBootstrapFactory( @@ -983,7 +981,7 @@ public class DispatcherTest extends AbstractDispatcherTest { dispatcher.close(); final JobResult jobResult = jobResultFuture.get(); - assertThat(jobResult.getApplicationStatus()).isSameAs(ApplicationStatus.UNKNOWN); + assertThat(jobResult.getJobStatus().isPresent()).isFalse(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java index 5dbe48c120e..f1c3331ad10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult; @@ -386,7 +385,7 @@ class CheckpointResourcesCleanupRunnerTest { final JobID jobId = new JobID(); final CheckpointResourcesCleanupRunner testInstance = new TestInstanceBuilder() - .withJobResult(createJobResult(jobId, ApplicationStatus.CANCELED)) + .withJobResult(createJobResult(jobId, JobStatus.CANCELED)) .build(); assertThat(testInstance.getJobID()).isEqualTo(jobId); } @@ -454,7 +453,7 @@ class CheckpointResourcesCleanupRunnerTest { actualExecutionGraph -> actualExecutionGraph .getState() - .equals(jobResult.getApplicationStatus().deriveJobStatus())); + .equals(jobResult.getJobStatus().orElseThrow())); } @Test @@ -509,24 +508,20 @@ class CheckpointResourcesCleanupRunnerTest { } private static JobResult createDummySuccessJobResult() { - return createJobResult(new JobID(), ApplicationStatus.SUCCEEDED); + return createJobResult(new JobID(), JobStatus.FINISHED); } private static JobResult createJobResultWithFailure(SerializedThrowable throwable) { return new JobResult.Builder() .jobId(new JobID()) - .applicationStatus(ApplicationStatus.FAILED) + .jobStatus(JobStatus.FAILED) .serializedThrowable(throwable) .netRuntime(1) .build(); } - private static JobResult createJobResult(JobID jobId, ApplicationStatus applicationStatus) { - return new JobResult.Builder() - .jobId(jobId) - .applicationStatus(applicationStatus) - .netRuntime(1) - .build(); + private static JobResult createJobResult(JobID jobId, JobStatus jobStatus) { + return new JobResult.Builder().jobId(jobId).jobStatus(jobStatus).netRuntime(1).build(); } private static CheckpointRecoveryFactory createCheckpointRecoveryFactory() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index 3294fed82bc..7271011a49a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -18,13 +18,13 @@ package org.apache.flink.runtime.dispatcher.runner; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.testutils.AllCallbackWrapper; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches; @@ -227,7 +227,7 @@ class ZooKeeperDefaultDispatcherRunnerTest { // a successful cancellation should eventually remove all job information final JobResult jobResult = jobResultFuture.get(); - assertThat(jobResult.getApplicationStatus()).isEqualTo(ApplicationStatus.CANCELED); + assertThat(jobResult.getJobStatus().orElse(null)).isEqualTo(JobStatus.CANCELED); dispatcherLeaderElection.notLeader(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java index 2a0e1dbcb5b..4e50c1d414f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java @@ -255,8 +255,8 @@ public class FileSystemJobResultStoreFileOperationsTest { .extracting(JobResult::getJobId) .isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobId()); assertThat(deserializedJobResult) - .extracting(JobResult::getApplicationStatus) - .isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobResult().getApplicationStatus()); + .extracting(JobResult::getJobStatus) + .isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobResult().getJobStatus()); assertThat(deserializedJobResult) .extracting(JobResult::getNetRuntime) .isEqualTo(DUMMY_JOB_RESULT_ENTRY.getJobResult().getNetRuntime()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java index d8adebff1f3..92a0c9b7dea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/BlobsCleanupITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.BlobServerOptions; @@ -29,7 +30,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.client.JobSubmissionException; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -207,14 +207,14 @@ public class BlobsCleanupITCase extends TestLogger { // then the tasks will fail again and the restart strategy will finalise the job final JobResult jobResult = resultFuture.get(); assertThat(jobResult.isSuccess(), is(false)); - assertThat(jobResult.getApplicationStatus(), is(ApplicationStatus.FAILED)); + assertThat(jobResult.getJobStatus().orElse(null), is(JobStatus.FAILED)); } else if (testCase == TestCase.JOB_IS_CANCELLED) { miniCluster.cancelJob(jid); final JobResult jobResult = resultFuture.get(); assertThat(jobResult.isSuccess(), is(false)); - assertThat(jobResult.getApplicationStatus(), is(ApplicationStatus.CANCELED)); + assertThat(jobResult.getJobStatus().orElse(null), is(JobStatus.CANCELED)); } else { final JobResult jobResult = resultFuture.get(); Throwable cause = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index dfe5869581b..f3c367b52ca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; @@ -677,8 +676,7 @@ class JobMasterServiceLeadershipRunnerTest { @Test void testJobAlreadyDone() throws Exception { final JobID jobId = new JobID(); - final JobResult jobResult = - TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN); + final JobResult jobResult = TestingJobResultStore.createJobResult(jobId, null); jobResultStore.createDirtyResultAsync(new JobResultEntry(jobResult)).get(); try (JobManagerRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java index ba098f704e8..7b1de5c4a8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -114,7 +113,7 @@ class LeaderChangeClusterComponentsTest { highAvailabilityServices.revokeDispatcherLeadership().get(); JobResult jobResult = jobResultFuture.get(); - assertThat(jobResult.getApplicationStatus()).isEqualTo(ApplicationStatus.UNKNOWN); + assertThat(jobResult.getJobStatus().isPresent()).isFalse(); highAvailabilityServices.grantDispatcherLeadership(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java index 041124b3906..278b3047546 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.rest.messages.job; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; @@ -64,7 +64,7 @@ public class JobExecutionResultResponseBodyTest JobExecutionResultResponseBody.created( new JobResult.Builder() .jobId(TEST_JOB_ID) - .applicationStatus(ApplicationStatus.SUCCEEDED) + .jobStatus(JobStatus.FINISHED) .netRuntime(TEST_NET_RUNTIME) .accumulatorResults(TEST_ACCUMULATORS) .serializedThrowable( @@ -76,7 +76,7 @@ public class JobExecutionResultResponseBodyTest JobExecutionResultResponseBody.created( new JobResult.Builder() .jobId(TEST_JOB_ID) - .applicationStatus(ApplicationStatus.FAILED) + .jobStatus(JobStatus.FAILED) .netRuntime(TEST_NET_RUNTIME) .accumulatorResults(TEST_ACCUMULATORS) .build()) @@ -117,8 +117,8 @@ public class JobExecutionResultResponseBodyTest assertThat(actualJobExecutionResult.getJobId()) .isEqualTo(expectedJobExecutionResult.getJobId()); - assertThat(actualJobExecutionResult.getApplicationStatus()) - .isEqualTo(expectedJobExecutionResult.getApplicationStatus()); + assertThat(actualJobExecutionResult.getJobStatus()) + .isEqualTo(expectedJobExecutionResult.getJobStatus()); assertThat(actualJobExecutionResult.getNetRuntime()) .isEqualTo(expectedJobExecutionResult.getNetRuntime()); assertThat(actualJobExecutionResult.getAccumulatorResults()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java index 6790189f31c..63606bab2bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingJobResultStore.java @@ -19,13 +19,15 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.highavailability.JobResultEntry; import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.SupplierWithException; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collections; import java.util.Set; @@ -41,15 +43,11 @@ public class TestingJobResultStore implements JobResultStore { public static final JobResult DUMMY_JOB_RESULT = createSuccessfulJobResult(new JobID()); public static JobResult createSuccessfulJobResult(JobID jobId) { - return createJobResult(jobId, ApplicationStatus.SUCCEEDED); + return createJobResult(jobId, JobStatus.FINISHED); } - public static JobResult createJobResult(JobID jobId, ApplicationStatus applicationStatus) { - return new JobResult.Builder() - .jobId(jobId) - .applicationStatus(applicationStatus) - .netRuntime(1) - .build(); + public static JobResult createJobResult(JobID jobId, @Nullable JobStatus jobStatus) { + return new JobResult.Builder().jobId(jobId).jobStatus(jobStatus).netRuntime(1).build(); } private final Function<JobResultEntry, CompletableFuture<Void>> createDirtyResultConsumer; diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java index 55dabf253c4..0c5c9da827c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.OpenContext; @@ -31,7 +32,6 @@ import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; @@ -286,7 +286,7 @@ public abstract class ChangelogRecoveryITCaseBase extends TestLogger { if (jobResult.getSerializedThrowable().isPresent()) { throw jobResult.getSerializedThrowable().get(); } - assertSame(ApplicationStatus.SUCCEEDED, jobResult.getApplicationStatus()); + assertSame(JobStatus.FINISHED, jobResult.getJobStatus().orElse(null)); } private Configuration configure() throws IOException { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java index 45a2433da36..5b987adc0c4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java @@ -31,7 +31,6 @@ import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.client.JobStatusMessage; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.JobResult.Builder; @@ -195,7 +194,7 @@ public class RemoteStreamEnvironmentTest extends TestLogger { new Builder() .jobId(this.jobId) .netRuntime(0) - .applicationStatus(ApplicationStatus.SUCCEEDED) + .jobStatus(JobStatus.FINISHED) .build(); return CompletableFuture.completedFuture(jobResult); }
